# Cuaderno para cargar metadatos 

Este cuaderno toma un directorio de MinIO con estructura de datos abiertos y crea la definición para Hive-Metastore para cada una de las tablas

## Librerias

In [None]:
from minio import Minio
import pandas as pd
from io import StringIO
from io import BytesIO
import json
from pyhive import hive


## Definicion de coneccion a MinIO

In [None]:
client = Minio(
        "minio:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )

Ruta al diccionario de correpondencia 

In [None]:
#corr_file = None
corr_file = 'correspondence.json'
#corr_file = 'correspondence.csv'

## Funciones de ayuda

In [None]:
# Obtine el diccionario de correspondencia de un archivo 
def get_corrr_dic(corr_file = None):
    corr_dic = {"tables":{},"columns":{}}
    if corr_file is None:
        # Si se pasa None a la funcion dara un diccionario en blanco 
        pass
    elif corr_file.endswith(".json"):
        with open(corr_file, 'r') as myfile:
            corr_dic = json.loads(myfile.read())
    elif corr_file.endswith(".csv"):
        df = pd.read_csv(corr_file)
        df.apply(fill_dic,1,new_dic=corr_dic) 
    else:
        raise Exception("Format not found, only .json and .csv")
    return corr_dic
    
# llena los datos en un diccionario de correpondencia con la informacion de un renglón 
def fill_dic(row,new_dic):
    if row["type"] == "tables":
        new_dic["tables"][row["original_name"]]=row["final_name"]
    if row["type"] == "columns":
        try:
            new_dic["columns"][row["table"]][row["original_name"]]=row["final_name"]
        except KeyError:
            new_dic["columns"][row["table"]] = {}
            new_dic["columns"][row["table"]][row["original_name"]]=row["final_name"]

# Crea la definicion de la tabla en el directorio dado utilizando el diccionario de datos
def create_hive_table(client, bucket, directory_object_name):
    table_name = get_table_name(directory_object_name)
    col_def = get_col_def(client, bucket, directory_object_name)
    data_location = directory_object_name+"conjunto_de_datos/"
    # TODO: revisar si la tabla ya existe 
    table_def = """ CREATE EXTERNAL TABLE {} ({})
        ROW FORMAT DELIMITED
        FIELDS TERMINATED BY ','
        LINES TERMINATED BY '\\n'
        LOCATION 's3a://{}/{}'
        TBLPROPERTIES ('skip.header.line.count'='1')
        """.format(table_name,col_def,bucket,data_location)
    return table_def


# Funcion para establecer el nombre de la tabla, en una version peeliminar es el nombre del directorio, pero se puede refinar para lograr algo mas conciso 
def get_table_name(directory_object_name):
    old_name = directory_object_name.split("/")[-2]
    try:
        new_name = corr_dic["tables"][old_name]
    except KeyError:
        new_name = old_name
    return new_name

def get_col_name(variable,table_name):
    try:
        new_name = corr_dic["columns"][table_name][variable]
    except KeyError:
        new_name = variable
    return new_name
    
# Fucion que crea la definicion de las variables, nombre de la variable y tipo 
def get_col_def (client, bucket, directory_object_name):
    data_dictionary = get_data_dictionary(client, bucket, directory_object_name)
    # TODO: revisar si este ese el orden real de las columans de la tabla 
    table_name = get_table_name(directory_object_name)
    names = [get_col_name(x,table_name) for x in data_dictionary["Variable"]]
    types = [get_type(x) for x in data_dictionary["Tipo"]]
    return ", ".join(["{} {}".format(x,y) for x,y in zip(names,types)])
    
# Obtiene el diccionario de datos del formato de datos abiertos para definir las varibles
def get_data_dictionary(client, bucket, directory_object_name):
    dic_location = [obj.object_name for obj in client.list_objects(bucket, directory_object_name+"diccionario_de_datos/")]    
    try:
        response = client.get_object(bucket, dic_location[0])
        s = str(response.read(),'latin-1')
    finally:
        response.close()
        response.release_conn()
    df = pd.read_csv(StringIO(s),names=["id","Variable","Descripcion","Tipo","valor","etiqueta_rango"], index_col=False)
    valid_entries = ~pd.to_numeric(df["id"],errors='coerce',downcast='integer').isna()
    return df[valid_entries]

# Convierte el tipo de columna del estilo datos abiertos al estilo SQL
def get_type(entrada):
    # TODO: Investigar los tipos de HIve y ver cual se justa mejor
    # TODO: agregar lognitudes 
    lookup = {"C":"STRING","N":"DECIMAL"}
    for key in lookup:
        if entrada.startswith(key):
            return lookup[key]
    raise Exception('Variable del tipo "{}" no encontrado'.format(entrada))

# atraviesa todos los sub-directorios para crear un a tabla para cada uno
def create_dataset_tables(client,bucket,data_set):
    definitions = [create_hive_table(client, bucket, obj.object_name) for obj in client.list_objects(bucket, data_set) if obj.is_dir]
    return definitions

# ToDo: ejecutar directamente en el Hive-Metastore y revisar si la definición ha sido exitosa 
    

## Ejemplo de ejecucion

En este ejemplo se toman los datos cargados en el cuaderno CargaObjetos.ipynb para crear las definiciones de tablas.

Es necesario copiar la salida de las siguientes celdas y pegarla en el archivo 'services\hivemetastore\create-table.hql'
y ejecutar el comando 
```
docker-compose exec hive-metastore beeline -u jdbc:hive2:// -f /tmp/create-table.hql
```
con esto se definiran los datos de las tablas. 


**Nota:** En los datos a descargar hay un problema con el archivo *conjunto_de_datos_enigh_2018_ns_csv\conjunto_de_datos_poblacion_enigh_2018_ns\diccionario_de_datos\diccionario_datos_poblacion_enigh_2018_ns.csv* en la linea *81* es necesario poner comillas para que el csv se detecte de manera correcta. 
Se puede hacer de manera automatica en el cuaderno DescargaDatos.ipynb

In [None]:
corr_dic = get_corrr_dic(corr_file)

In [None]:
sqls= create_dataset_tables(client,"hive","warehouse/conjunto_de_datos_enigh_2018_ns_csv/")

#print(create_dataset_tables(client,"hive","warehouse/conjunto_de_datos_enigh2016_nueva_serie_csv/"))

#print(create_dataset_tables(client,"hive","warehouse/enigh_ncv_2014_csv/"))

In [None]:
sqls[0]

In [None]:
def get_connection(host='hive-server',port=10000,auth='NOSASL',database="default"):
    conn = hive.Connection(host=host,port=port,auth=auth,database=database)
    return conn

def procc_SQL_list(list_sqls):
    conn = get_connection()
    cursor = conn.cursor()
    for sql in list_sqls:
        try:
            cursor.execute(sql)
        except:
            print(" Error al crear la tabla "+sql.split(" ")[11])
    conn.close()

def drop_list_tables(list_sqls):
    conn = get_connection()
    cursor = conn.cursor()
    for sql in list_sqls:
        try:
            cursor.execute("DROP TABLE "+sql.split(" ")[3])
        except:
            print("Error al eliminar tabla "+sql.split(" ")[3])
    conn.close()

    
def list_tables():
    conn = get_connection()
    try:
        df = pd.read_sql("SHOW TABLES", conn)
    except:
        print("Error al listar tablas")
    conn.close()
    return df

In [None]:
procc_SQL_list(sqls)

In [None]:
list_tables()