<a href="https://colab.research.google.com/github/wogomezma/DataEngineering61890/blob/main/Entrega_1_Data_Engineering_Walter_Gomez_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [21]:
pip install sodapy



In [35]:
import pandas as pd
from sodapy import Socrata
import json
import psycopg2
from datetime import datetime
import io

In [24]:
with open('keys.json') as f:
    keys = json.load(f)

uri = keys['uri']
token = keys['token']
redshift_username = keys['redshift_username']
redshift_password = keys['redshift_password']
redshift_host = keys['redshift_host']
redshift_port = keys['redshift_port']
redshift_database = keys['redshift_database']
redshift_schema = keys['redshift_schema']

In [25]:
client = Socrata(uri, token)

In [26]:
# Convert to pandas DataFrame
results = 0
offset = 0
df = pd.DataFrame()

while results != []:
    results = client.get("v8jr-kywh", limit=50000, offset=offset)

    data = pd.DataFrame.from_records(results)
    offset += 50000

    if results != []:
        df = pd.concat([df, data], axis=0)

In [28]:
# Cambiar los tipos de datos
df['fecha_venta'] = pd.to_datetime(df['fecha_venta'])
df['anio_venta'] = df['anio_venta'].astype('int64')
df['mes_venta'] = df['mes_venta'].astype('int64')
df['dia_venta'] = df['dia_venta'].astype('int64')
df['latitud'] = df['latitud'].astype('float')
df['longitud'] = df['longitud'].astype('float')
df['eds_activas'] = df['eds_activas'].astype('int64')
df['numero_de_ventas'] = df['numero_de_ventas'].astype('int64')
df['vehiculos_atendidos'] = df['vehiculos_atendidos'].astype('int64')
df['cantidad_volumen_suministrado'] = df['cantidad_volumen_suministrado'].astype('float')
df['date_update'] = pd.Timestamp(datetime.now())

In [36]:
# Verificación del tipo de datos
df.dtypes

fecha_venta                      datetime64[ns]
anio_venta                                int64
mes_venta                                 int64
dia_venta                                 int64
codigo_municipio_dane                    object
departamento                             object
municipio                                object
latitud                                 float64
longitud                                float64
tipo_agente                              object
tipo_de_combustible                      object
eds_activas                               int64
numero_de_ventas                          int64
vehiculos_atendidos                       int64
cantidad_volumen_suministrado           float64
date_update                      datetime64[us]
dtype: object

In [37]:
df

Unnamed: 0,fecha_venta,anio_venta,mes_venta,dia_venta,codigo_municipio_dane,departamento,municipio,latitud,longitud,tipo_agente,tipo_de_combustible,eds_activas,numero_de_ventas,vehiculos_atendidos,cantidad_volumen_suministrado,date_update
0,2022-06-17,2022,6,17,68682,SANTANDER,FLORIDABLANCA,7.079705,-73.067993,ESTACION DE SERVICIO DE GNCV,GNV,2,671,576,4909.30,2024-08-06 01:26:49.712780
1,2020-07-13,2020,7,13,23230,CORDOBA,MONTERIA,8.584698,-75.950554,ESTACION DE SERVICIO DE GNCV,GNV,1,261,168,1413.75,2024-08-06 01:26:49.712780
2,2022-04-16,2022,4,16,85850,CASANARE,YOPAL,5.242745,-72.258026,ESTACION DE SERVICIO DE GNCV,GNV,7,1162,560,8883.95,2024-08-06 01:26:49.712780
3,2022-06-06,2022,6,6,68680,SANTANDER,BUCARAMANGA,7.155834,-73.111572,ESTACION DE SERVICIO DE GNCV,GNV,9,1957,1331,13073.23,2024-08-06 01:26:49.712780
4,2023-02-06,2023,2,6,13138,BOLIVAR,TURBACO,10.353107,-75.379715,ESTACION DE SERVICIO DE GNCV,GNV,1,63,53,660.86,2024-08-06 01:26:49.712780
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
32158,2024-05-18,2024,5,18,66661,RISARALDA,DOSQUEBRADAS,4.842021,-75.669937,ESTACION DE SERVICIO DE GNCV,GNV,3,1100,911,8024.33,2024-08-06 01:26:49.712780
32159,2024-05-17,2024,5,17,25255,CUNDINAMARCA,PUERTO SALGAR,5.619753,-74.580193,ESTACION DE SERVICIO DE GNCV,GNV,1,57,44,4815.07,2024-08-06 01:26:49.712780
32160,2024-05-18,2024,5,18,76765,VALLE DEL CAUCA,PALMIRA,3.558393,-76.227798,ESTACION DE SERVICIO DE GNCV,GNV,6,840,603,6021.73,2024-08-06 01:26:49.712780
32161,2024-05-18,2024,5,18,47477,MAGDALENA,SITIONUEVO,10.890276,-74.629662,ESTACION DE SERVICIO DE GNCV,GNV,2,78,62,6697.00,2024-08-06 01:26:49.712780


In [None]:
# Inicializar variables para descargar los datos
offset = 0
total_records_inserted = 0  # Definir el contador fuera del bloque try

# Conectar a Redshift usando psycopg2
try:
    conn = psycopg2.connect(
        dbname=redshift_database,
        user=redshift_username,
        password=redshift_password,
        host=redshift_host,
        port=redshift_port
    )
    conn.autocommit = True

    with conn.cursor() as cursor:
        # Verificar si la tabla existe
        cursor.execute(f"""
            SELECT COUNT(*)
            FROM information_schema.tables
            WHERE table_schema = '{redshift_schema}'
            AND table_name = 'daily_sales_data';
        """)

        table_exists = cursor.fetchone()[0] == 1

        if not table_exists:
            print("La tabla 'daily_sales_data' no existe en Redshift. Creando la tabla...")

            # Crear la tabla si no existe
            cursor.execute(f"""
                CREATE TABLE {redshift_schema}.daily_sales_data (
                    fecha_venta DATE,
                    anio_venta INT,
                    mes_venta INT,
                    dia_venta INT,
                    latitud FLOAT,
                    longitud FLOAT,
                    eds_activas INT,
                    numero_de_ventas INT,
                    vehiculos_atendidos INT,
                    cantidad_volumen_suministrado FLOAT,
                    date_update TIMESTAMP
                );
            """)

        print("La tabla 'daily_sales_data' ahora existe.")

        # Insertar datos en Redshift
        insert_query = f"""
            INSERT INTO {redshift_schema}.daily_sales_data (
                fecha_venta, anio_venta, mes_venta, dia_venta, latitud, longitud,
                eds_activas, numero_de_ventas, vehiculos_atendidos,
                cantidad_volumen_suministrado, date_update
            ) VALUES %s
        """

        from psycopg2.extras import execute_values

        # Preparar los datos para la inserción
        values = [
            (
                row['fecha_venta'],
                row['anio_venta'],
                row['mes_venta'],
                row['dia_venta'],
                row['latitud'],
                row['longitud'],
                row['eds_activas'],
                row['numero_de_ventas'],
                row['vehiculos_atendidos'],
                row['cantidad_volumen_suministrado'],
                row['date_update']
            )
            for index, row in df.iterrows()
        ]

        # Ejecutar la inserción de datos
        execute_values(cursor, insert_query, values)

        # Actualizar el contador de registros insertados
        total_records_inserted += len(values)

        print("Datos cargados exitosamente a Redshift.")

except psycopg2.Error as e:
    print(f"Error al conectar o cargar datos en Redshift: {e}")
finally:
    if conn:
        conn.close()

# Mostrar el total de registros insertados
print(f"Total de registros insertados en Redshift: {total_records_inserted}")

La tabla 'daily_sales_data' ahora existe.
