#### Importamos dependencias

In [1]:
from requests.auth import HTTPBasicAuth
from impala.dbapi import connect
from datetime import datetime
from pyhive import hive
import pandas as pd
import requests
import sqlparse
import json
import sys
import os
import re

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [3]:
with open('data/data.json') as f:
    file = f.read()
    json_file = json.loads(file)

In [4]:
print(json.dumps(json_file, indent=2))

{
  "de_ber_4con.productos_parametria_dim_productos": {
    "database": "de_ber_4con",
    "table_name": "productos_parametria_dim_productos",
    "create_table": "",
    "associated_term": "productos_parametria_dim_productos_unisys",
    "loading_query": "SELECT id_concepto AS id_concepto, cod_producto AS cod_producto, id_moneda AS id_moneda, fecha_proceso AS fecha_proceso, id_producto AS id_producto, marca_sfb AS marca_sfb, marca_producto_cta_comitente AS marca_producto_cta_comitente, cast(fecha_vencimiento_producto as varchar(10)) AS fecha_vencimiento_producto, cast(fecha_vigencia_producto as varchar(10)) AS fecha_vigencia_producto, desc_producto AS desc_producto, nro_campo_c_467 AS nro_campo_c_467, nro_campo_garantia AS nro_campo_garantia, nro_campo_lisol AS nro_campo_lisol, factor_ponderacion AS factor_ponderacion, fecha_vencimiento_producto_rel AS fecha_vencimiento_producto_rel, fecha_vigencia_producto_rel AS fecha_vigencia_producto_rel, estado AS estado, marca_producto_privado A

In [5]:
'''Variables'''
## Atlas dev
headers = {
    "Content-Type": "application/json",
    "Accept": "application/json"
}
credentials = HTTPBasicAuth('atlasadmin', 'xNB8lZ!Dut')
atlas_urls = ["172.30.213.141","172.30.213.142"]
atlas_port = 31000

## Hive dev
hive_host = 'edh-master-01d.root.corp'
hive_port = 10000
hive_user = 'admin'


## Impala dev
impala_host = '172.30.213.211'
impala_port = 21050
impala_user = 'admin'

In [6]:
#HIVE
def describe_table( hive_database, table_name ):
    #Creo la conexión y obtengo el cursos
    hive_conn = hive.connect(host=hive_host, port=hive_port, username=hive_user, database=hive_database)
    hive_cursor = hive_conn.cursor()

    # Obtengo el create
    hive_cursor.execute(f'DESCRIBE {hive_database}.{table_name}')
    databases = hive_cursor.fetchall()
    
    return databases

In [11]:
#IMPALA
def describe_table_impala( impala_database, table_name ):
    #Creo la conexión y obtengo el cursos
    impala_conn = connect(host=impala_host, port=impala_port, database=impala_database)
    impala_cursor = impala_conn.cursor()

    # Obtengo el create
    impala_cursor.execute(f'DESCRIBE {impala_database}.{table_name}')
    databases = impala_cursor.fetchall()
    
    return databases

#### Creamos el dataframe de metadata para HIVE

In [9]:
lista_metadata = []

for table in json_file.keys():
    if '4con' not in table and 'datamart' not in table:
        db, table_name = table.split('.')
        describe = describe_table( db, table_name )
        db_split = db.split('_')
        entidad = db_split[1]
        if 'datamart' in db:
            zona = 'datamart'
        else:
            zona = db_split[2][1:]
        
        a = list(map(lambda x: list(x)+[db, table_name, entidad, zona], describe ))
        
        lista_metadata.append(a)

[['id_concepto', 'tinyint', 'Código de concepto', 'de_ber_3ref', 'productos_parametria_dim_productos', 'ber', 'ref'], ['cod_producto', 'smallint', 'Código de producto', 'de_ber_3ref', 'productos_parametria_dim_productos', 'ber', 'ref'], ['id_producto', 'varchar(10)', 'Identificación del producto', 'de_ber_3ref', 'productos_parametria_dim_productos', 'ber', 'ref'], ['marca_sfb', 'char(1)', 'Señal que indica si es un producto SFB', 'de_ber_3ref', 'productos_parametria_dim_productos', 'ber', 'ref'], ['marca_producto_cta_comitente', 'char(1)', 'Señal es producto cta comitente', 'de_ber_3ref', 'productos_parametria_dim_productos', 'ber', 'ref'], ['id_moneda', 'tinyint', 'Código de moneda', 'de_ber_3ref', 'productos_parametria_dim_productos', 'ber', 'ref'], ['fecha_vencimiento_producto', 'date', 'Fecha de vencimiento del producto', 'de_ber_3ref', 'productos_parametria_dim_productos', 'ber', 'ref'], ['fecha_vigencia_producto', 'date', 'Fecha de vigencia del producto', 'de_ber_3ref', 'producto

In [24]:
lista_aplanada = [item for sublist1 in lista_metadata for item in sublist1]

In [26]:
#HIVE
nombres_columnas = ['campo', 'tipo_dato', 'comentario', 'base','tabla','entidad','zona']

# Crear el DataFrame
df_metadata = pd.DataFrame(lista_aplanada, columns=nombres_columnas)

#Eliminamos la garbage
options = ['# Partition Information', '# col_name', ''] 
rslt_df = df_metadata.loc[~df_metadata['campo'].isin(options)]

#Guardamos
rslt_df.to_csv('data/metadata_hive.csv')

#### Creamos el dataframe de metadata para IMPALA <br>
#### Agregamos un campo 'primary_key' que funciona cómo flag

#nombre_campo|tipo_dato|comentario|primary_key|base|tabla|entidad|zona

In [21]:
lista_metadata_impala = []

for table in json_file.keys():
    if '1raw' not in table and '2cur' not in table and '3ref' not in table:
        db, table_name = table.split('.')
        describe = describe_table_impala( db, table_name )
        db_split = db.split('_')
        entidad = db_split[1]
        if 'datamart' in db:
            zona = 'datamart'
        else:
            zona = db_split[2][1:]
        a = list(map(lambda x: list(x[:4])+[db, table_name, entidad, zona], describe ))
        lista_metadata_impala.append(a)

In [23]:
lista_aplanada_impala = [item for sublist1 in lista_metadata_impala for item in sublist1]

In [25]:
#IMPALA
nombres_columnas_impala = ['campo', 'tipo_dato', 'comentario','primary_key', 'base','tabla','entidad','zona']

# Crear el DataFrame
df_metadata_impala = pd.DataFrame(lista_aplanada_impala, columns=nombres_columnas_impala)

In [26]:
df_metadata_impala

Unnamed: 0,campo,tipo_dato,comentario,primary_key,base,tabla,entidad,zona
0,id_concepto,tinyint,Código de concepto,true,de_ber_4con,productos_parametria_dim_productos,ber,con
1,cod_producto,smallint,Código de producto,true,de_ber_4con,productos_parametria_dim_productos,ber,con
2,id_moneda,tinyint,Código de moneda,true,de_ber_4con,productos_parametria_dim_productos,ber,con
3,fecha_proceso,varchar(8),Fecha de proceso de la tabla de landing,true,de_ber_4con,productos_parametria_dim_productos,ber,con
4,id_producto,varchar(10),Identificación del producto,false,de_ber_4con,productos_parametria_dim_productos,ber,con
...,...,...,...,...,...,...,...,...
4547,tasa_maxima,"decimal(8,4)",Tasa Maxima p/PF,false,de_bsj_4con,productos_parametria_dim_divisas,bsj,con
4548,tasa_minima,"decimal(8,4)",Tasa Minima p/PF,false,de_bsj_4con,productos_parametria_dim_divisas,bsj,con
4549,mto_maximo_tasa_regulada,"decimal(15,2)",Monto Maximo Tasa Regulada,false,de_bsj_4con,productos_parametria_dim_divisas,bsj,con
4550,tasa_referencia,"decimal(8,4)",Tasa de Referencia,false,de_bsj_4con,productos_parametria_dim_divisas,bsj,con


In [27]:
#Guardamos
df_metadata_impala.to_csv('data/metadata_impala.csv')