# ETL: Geografía y Correlativas de turismo

## Librerias

In [1]:
# Generales
import pandas as pd
import numpy as np
import time
import re
import warnings

warnings.filterwarnings('ignore')

# Funciones Snowflake
import funciones as snow_func

In [2]:
# Aumentar número de columnas que se pueden ver
pd.options.display.max_columns = None
# En los dataframes, mostrar los float con dos decimales
pd.options.display.float_format = '{:,.10f}'.format
# Cada columna será tan grande como sea necesario para mostrar todo su contenido
pd.set_option('display.max_colwidth', 0)

### Snowflake

In [3]:
# Librerias necesarias para subir a Snowflake
import os
import json
import snowflake.connector # [pip install snowflake-connector-python]
from snowflake.connector.pandas_tools import write_pandas # [pip install "snowflake-connector-python[pandas]"]
from snowflake.snowpark import Session

In [4]:
# Paso 1: Definir la ruta al archivo JSON en el escritorio
desktop_path = os.path.join(os.path.expanduser("~"), "Desktop\Conn")
json_file_path = os.path.join(desktop_path, "snowflake_credentials.json")

# Paso 2: Leer las credenciales desde el archivo JSON
with open(json_file_path, 'r') as file:
    credentials = json.load(file)

# Paso 3: Definir los parámetros de conexión usando las credenciales
connection_parameters = {
        "account": credentials["ACCOUNT_SNOWFLAKE"],
        "user": credentials["USER_SNOWFLAKE"],
        "password": credentials["PASSWORD_SNOWFLAKE"],
        "role": credentials["ROLE_SNOWFLAKE"],
        "warehouse": credentials["WAREHOUSE"]
    }

# Paso 5: Crear un objeto de conexión utilizando snowflake.connector
session = Session.builder.configs(connection_parameters).create()
print("Sesión actual:", {session})

Sesión actual: {<snowflake.snowpark.session.Session object at 0x0000019EE6A148D0>}


In [5]:
# Crear objeto de conexión
conn = session.connection

## 1. Divipola

In [6]:
# Solo se debe cambiar la ubicación del archivo
path_insumos = "C:/Users/nrivera/OneDrive - PROCOLOMBIA/Documentos/017B-Documentos-Colombia/Cargue/Insumos/GEOGRAFIA/"
divipola_file = 'DIVIPOLA.xlsx'

In [7]:
# Importar datos
df_departamentos_divipola = pd.read_excel(path_insumos + divipola_file, sheet_name="Departamento", dtype=str)
df_municipio_divipola = pd.read_excel(path_insumos + divipola_file, sheet_name="Municipio", dtype=str)
df_departamentos_municipio_divipola = pd.read_excel(path_insumos + divipola_file, sheet_name="Departamento - Municipio", dtype=str)

## 2. Departamentos DIAN - DANE

In [8]:
# Solo se debe cambiar la ubicación del archivo
departamentos_file = 'Departamentos - DIAN - DIVIPOLA.xlsx'

In [9]:
# Importar datos
df_dian_departamentos = pd.read_excel(path_insumos + departamentos_file, sheet_name="DEPARTAMENTO", dtype=str)

## 3. MODELO RELACIONAL DE PAÍSES

In [10]:
# Solo se debe cambiar la ubicación del archivo
correlativa_file = 'MODELO_RELACIONAL.xlsx'

In [11]:
# Importar datos
df_continentes = pd.read_excel(path_insumos + correlativa_file, sheet_name="CONTINENTES", dtype=str)
df_hubs = pd.read_excel(path_insumos + correlativa_file, sheet_name="HUB", dtype=str)
df_region = pd.read_excel(path_insumos + correlativa_file, sheet_name="SUBREGION", dtype=str)
df_paises = pd.read_excel(path_insumos + correlativa_file, sheet_name="PAISES", dtype=str)
df_paises_exportaciones = pd.read_excel(path_insumos + correlativa_file, sheet_name="EXPORTACIONES", dtype=str)
df_paises_inversion = pd.read_excel(path_insumos + correlativa_file, sheet_name="INVERSION", dtype=str)
df_paises_turismo = pd.read_excel(path_insumos + correlativa_file, sheet_name="TURISMO", dtype=str)
df_tlcs = pd.read_excel(path_insumos + correlativa_file, sheet_name="TLCS", dtype=str)
df_paises_tlcs = pd.read_excel(path_insumos + correlativa_file, sheet_name="TLCS_PAISES", dtype=str)
df_tlcs_tabla = pd.read_excel(path_insumos + correlativa_file, sheet_name="TLCS_TABLA", dtype=str)

In [12]:
# Eliminar la columna de validación manual
df_paises_exportaciones = df_paises_exportaciones.drop(columns=["¿ESTA_EN_PAISES?"])
df_paises_inversion = df_paises_inversion.drop(columns=["¿ESTA_EN_PAISES?"])
df_paises_turismo = df_paises_turismo.drop(columns=["¿ESTA_EN_PAISES?"])
df_tlcs_tabla = df_tlcs_tabla.drop(columns=["¿ESTA_EN_PAISES?"])
df_tlcs_tabla = df_tlcs_tabla.drop(columns=["¿ESTA_EN_EXPORTACIONES?"])

# 4. Subir a Snowflake - Geografía

##### Usar base de datos para análisis

In [13]:
# Usar base de datos:
sql_database_usar = """
USE DOCUMENTOS_COLOMBIA;
"""
# Ejecutar
snow_func.snowflake_sql(conn, sql_database_usar)

Unnamed: 0,status
0,Statement executed successfully.


In [14]:
# Crear esquema geografia:
sql_schema_geo = """
USE SCHEMA GEOGRAFIA;
"""
# Ejecutar
snow_func.snowflake_sql(conn, sql_schema_geo)

Unnamed: 0,status
0,Statement executed successfully.


In [15]:
# Asegurar que estamos en la ubicación que se desea para subir las bases de datos
ubicacion = "SELECT CURRENT_WAREHOUSE() AS WAREHOUSE, CURRENT_DATABASE() AS DATABASE, CURRENT_SCHEMA() AS SCHEMA;"
# Ejecutar
snow_func.snowflake_sql(conn, ubicacion)

Unnamed: 0,WAREHOUSE,DATABASE,SCHEMA
0,WH_PROCOLOMBIA_ANALITICA,DOCUMENTOS_COLOMBIA,GEOGRAFIA


##### SUBIR BASES DE DATOS

In [16]:
# Lista de pd a subir
bases_de_datos = [
    df_departamentos_divipola,
    df_municipio_divipola,
    df_departamentos_municipio_divipola,
    df_dian_departamentos,
    df_continentes,
    df_region,
    df_paises,
    df_paises_exportaciones,
    df_paises_inversion,
    df_paises_turismo,
    df_tlcs,
    df_paises_tlcs,
    df_tlcs_tabla,
    df_hubs
    ]

nombres_tablas = [
    'DIVIPOLA_DEPARTAMENTOS',
    'DIVIPOLA_MUNICIPIOS',
    'DIVIPOLA_DEPARTAMENTOS_MUNICIPIOS',
    'DIAN_DEPARTAMENTOS',
    'CONTINENTES',
    'REGIONES',
    'PAISES',
    'PAISES_EXPORTACIONES',
    'PAISES_INVERSION',
    'PAISES_TURISMO',
    'TLCS',
    'TLCS_PAISES',
    'TLCS_TABLA',
    'HUBS'
]

In [17]:
# pd de verificación
df_resultados_verificacion = pd.DataFrame()
# Subir y verificar bases a Snowflake
for base, tabla in zip(bases_de_datos, nombres_tablas):
    # Cargar el DataFrame en Snowflake y capturar el mensaje de carga
    mensaje_carga = snow_func.snowflake_cargar_df(conn, base, f'{tabla}')
    
    # Verificar y almacenar el resultado en el DataFrame
    resultado = snow_func.snowflake_sql(conn, f"SELECT COUNT(*) FROM {tabla};")
    total_registros = resultado  # Extraer el total de registros

    # Crear un DataFrame temporal para la nueva fila
    nueva_fila = pd.DataFrame({
        'Tabla': [tabla],
        'Total_Registros': [total_registros],
        'Mensaje_Carga': [mensaje_carga]
    })

    # Concatenar la nueva fila al DataFrame de resultados
    df_resultados_verificacion = pd.concat([df_resultados_verificacion, nueva_fila], ignore_index=True)

In [18]:
# Ver resultados
df_resultados_verificacion

Unnamed: 0,Tabla,Total_Registros,Mensaje_Carga
0,DIVIPOLA_DEPARTAMENTOS,COUNT(*) 0 34,DataFrame cargado exitosamente en la tabla: 34 filas en 1 chunks.\nTiempo de carga: 2.32 segundos.\nProceso terminado
1,DIVIPOLA_MUNICIPIOS,COUNT(*) 0 1123,DataFrame cargado exitosamente en la tabla: 1123 filas en 1 chunks.\nTiempo de carga: 1.80 segundos.\nProceso terminado
2,DIVIPOLA_DEPARTAMENTOS_MUNICIPIOS,COUNT(*) 0 1122,DataFrame cargado exitosamente en la tabla: 1122 filas en 1 chunks.\nTiempo de carga: 2.41 segundos.\nProceso terminado
3,DIAN_DEPARTAMENTOS,COUNT(*) 0 35,DataFrame cargado exitosamente en la tabla: 35 filas en 1 chunks.\nTiempo de carga: 1.86 segundos.\nProceso terminado
4,CONTINENTES,COUNT(*) 0 9,DataFrame cargado exitosamente en la tabla: 9 filas en 1 chunks.\nTiempo de carga: 1.84 segundos.\nProceso terminado
5,REGIONES,COUNT(*) 0 17,DataFrame cargado exitosamente en la tabla: 17 filas en 1 chunks.\nTiempo de carga: 2.64 segundos.\nProceso terminado
6,PAISES,COUNT(*) 0 264,DataFrame cargado exitosamente en la tabla: 264 filas en 1 chunks.\nTiempo de carga: 2.74 segundos.\nProceso terminado
7,PAISES_EXPORTACIONES,COUNT(*) 0 235,DataFrame cargado exitosamente en la tabla: 235 filas en 1 chunks.\nTiempo de carga: 2.31 segundos.\nProceso terminado
8,PAISES_INVERSION,COUNT(*) 0 87,DataFrame cargado exitosamente en la tabla: 87 filas en 1 chunks.\nTiempo de carga: 1.79 segundos.\nProceso terminado
9,PAISES_TURISMO,COUNT(*) 0 297,DataFrame cargado exitosamente en la tabla: 297 filas en 1 chunks.\nTiempo de carga: 1.96 segundos.\nProceso terminado


##### CREAR TABLA CONSOLIDADA

In [19]:
# Query para crear la tabla
sql_consolidada = """
CREATE OR REPLACE TABLE DOCUMENTOS_COLOMBIA.GEOGRAFIA.ST_PAISES AS
SELECT PAISES.M49_CODE,
    PAISES.ISO_ALPHA2_CODE,
    PAISES.ISO_ALPHA3_CODE,
    PAISES.COUNTRY_OR_AREA,
    PAISES.REGION_CODE,
    PAISES.SUB_REGION_CODE,
    PAISES.COD_NEO,
    PAISES_EXPORTACIONES.PAIS_LLAVE_EXPORTACIONES,
    PAISES_EXPORTACIONES.REGION_NAME_EXPORTACIONES,
    PAISES_EXPORTACIONES.HUB_NAME_EXPORTACIONES,
    PAISES_INVERSION.PAIS_INVERSION_BANREP,
    PAISES_TURISMO.CODIGO_PAIS_MIGRACION,
    PAISES_TURISMO.NOMBRE_PAIS_MIGRACION,
    PAISES_TURISMO.REGION_NAME_TURISMO,
    PAISES_TURISMO.REGION_NAME_TURISMO_AGREGADA,
    PAISES_TURISMO.HUB_NAME_TURISMO,
    TLCS_PAISES.ID_TLC,
    TLCS.NOMBRE_TLC,
    CONTINENTES.REGION_NAME
FROM DOCUMENTOS_COLOMBIA.GEOGRAFIA.PAISES AS PAISES
LEFT JOIN DOCUMENTOS_COLOMBIA.GEOGRAFIA.PAISES_EXPORTACIONES AS PAISES_EXPORTACIONES ON PAISES.M49_CODE = PAISES_EXPORTACIONES.M49_CODE
LEFT JOIN DOCUMENTOS_COLOMBIA.GEOGRAFIA.PAISES_INVERSION AS PAISES_INVERSION ON PAISES.M49_CODE = PAISES_INVERSION.M49_CODE
LEFT JOIN DOCUMENTOS_COLOMBIA.GEOGRAFIA.PAISES_TURISMO AS PAISES_TURISMO ON PAISES.M49_CODE = PAISES_TURISMO.M49_CODE
LEFT JOIN DOCUMENTOS_COLOMBIA.GEOGRAFIA.TLCS_PAISES AS TLCS_PAISES ON PAISES.M49_CODE = TLCS_PAISES.M49_CODE
LEFT JOIN DOCUMENTOS_COLOMBIA.GEOGRAFIA.TLCS AS TLCS ON TLCS_PAISES.ID_TLC = TLCS.ID_TLC
LEFT JOIN DOCUMENTOS_COLOMBIA.GEOGRAFIA.CONTINENTES AS CONTINENTES ON PAISES.REGION_CODE = CONTINENTES.REGION_CODE;
"""
# Ejecutar
snow_func.snowflake_sql(conn, sql_consolidada)

Unnamed: 0,status
0,Table ST_PAISES successfully created.


In [20]:
# Rellenar vacíos
# Ejecutar
snow_func.snowflake_sql(conn, """
UPDATE DOCUMENTOS_COLOMBIA.GEOGRAFIA.ST_PAISES
SET PAIS_LLAVE_EXPORTACIONES = 'PAÍS NO INCLUIDO'
WHERE PAIS_LLAVE_EXPORTACIONES IS NULL;
""")

Unnamed: 0,number of rows updated,number of multi-joined rows updated
0,40,0


In [21]:
# Rellenar vacíos
# Ejecutar
snow_func.snowflake_sql(conn, """
UPDATE DOCUMENTOS_COLOMBIA.GEOGRAFIA.ST_PAISES
SET PAIS_INVERSION_BANREP = 'PAÍS NO INCLUIDO'
WHERE PAIS_INVERSION_BANREP IS NULL;
""")

Unnamed: 0,number of rows updated,number of multi-joined rows updated
0,212,0


In [22]:
# Rellenar vacíos
# Ejecutar
snow_func.snowflake_sql(conn, """
UPDATE DOCUMENTOS_COLOMBIA.GEOGRAFIA.ST_PAISES
SET CODIGO_PAIS_MIGRACION = 'PAÍS NO INCLUIDO'
WHERE CODIGO_PAIS_MIGRACION IS NULL;
""")

Unnamed: 0,number of rows updated,number of multi-joined rows updated
0,6,0


In [23]:
# Rellenar vacíos
# Ejecutar
snow_func.snowflake_sql(conn, """
UPDATE DOCUMENTOS_COLOMBIA.GEOGRAFIA.ST_PAISES
SET NOMBRE_PAIS_MIGRACION = 'PAÍS NO INCLUIDO'
WHERE NOMBRE_PAIS_MIGRACION IS NULL;
""")

Unnamed: 0,number of rows updated,number of multi-joined rows updated
0,6,0


In [24]:
# Rellenar vacíos
# Ejecutar
snow_func.snowflake_sql(conn, """
UPDATE DOCUMENTOS_COLOMBIA.GEOGRAFIA.ST_PAISES
SET COUNTRY_OR_AREA = 'PAÍS NO INCLUIDO'
WHERE COUNTRY_OR_AREA IS NULL;
""")

Unnamed: 0,number of rows updated,number of multi-joined rows updated
0,0,0


In [25]:
# Verificar que se creó correctamente:
# Ejecutar
snow_func.snowflake_sql(conn, "SELECT COUNT (*) FROM ST_PAISES;")

Unnamed: 0,COUNT (*)
0,327


### Cerrar sesión, conexión y cursor

In [26]:
conn.close()
session.close()