# Ingestión y Migración de datos

### Importación de librerías

In [12]:
import sys
import os
import pandas as pd
from psycopg2 import sql
from sqlalchemy import create_engine
sys.path.append(os.path.abspath('../source'))
from connection_db.db_utils import get_connection

sys.path.append()  permite importar módulos desde directorios que no están en la ruta de búsqueda predeterminada de Python, lo que es necesario en este caso para reutilizar un módulo creado en el directorio "source/connection_db" del proyecto dedicado a la conexión a la base de datos.

In [13]:
data= pd.read_csv('../data/road_accident_dataset.csv')
data.head(n=5)

Unnamed: 0,Country,Year,Month,Day of Week,Time of Day,Urban/Rural,Road Type,Weather Conditions,Visibility Level,Number of Vehicles Involved,...,Number of Fatalities,Emergency Response Time,Traffic Volume,Road Condition,Accident Cause,Insurance Claims,Medical Cost,Economic Loss,Region,Population Density
0,USA,2002,October,Tuesday,Evening,Rural,Street,Windy,220.414651,1,...,2,58.62572,7412.75276,Wet,Weather,4,40499.856982,22072.878502,Europe,3866.273014
1,UK,2014,December,Saturday,Evening,Urban,Street,Windy,168.311358,3,...,1,58.04138,4458.62882,Snow-covered,Mechanical Failure,3,6486.600073,9534.399441,North America,2333.916224
2,USA,2012,July,Sunday,Afternoon,Urban,Highway,Snowy,341.286506,4,...,4,42.374452,9856.915064,Wet,Speeding,4,29164.412982,58009.145124,South America,4408.889129
3,UK,2017,May,Saturday,Evening,Urban,Main Road,Clear,489.384536,2,...,3,48.554014,4958.646267,Icy,Distracted Driving,3,25797.212566,20907.151302,Australia,2810.822423
4,Canada,2002,July,Tuesday,Afternoon,Rural,Highway,Rainy,348.34485,1,...,4,18.31825,3843.191463,Icy,Distracted Driving,8,15605.293921,13584.060759,South America,3883.645634


Para asegurar una correcta integración de los datos, primero se transforman los nombres de las columnas a un formato estandarizado, convirtiéndolos en minúsculas y reemplazando los espacios con guiones bajos

In [14]:
def format_column_name(column_name: str) -> str:
    return column_name.strip().lower().replace(" ", "_").replace("/", "_")


Este fragmento de código define una función llamada get_engine(), que crea y devuelve un motor de conexión a una base de datos PostgreSQL utilizando SQLAlchemy. La conexión se establece a través de variables de entorno (PG_USER, PG_PASSWORD, PG_HOST, PG_PORT y PG_DATABASE), lo que permite mantener las credenciales y la configuración del acceso de forma segura sin exponerlas directamente en el código.

In [15]:
def get_engine():
    return create_engine(
        f"postgresql://{os.getenv('PG_USER')}:{os.getenv('PG_PASSWORD')}@{os.getenv('PG_HOST')}:{os.getenv('PG_PORT')}/{os.getenv('PG_DATABASE')}"
    )

In [16]:
def insert_data(df: pd.DataFrame):
    """Inserta un DataFrame en la tabla 'accidents' de PostgreSQL."""
    try:
        # Transformar los nombres de las columnas automáticamente
        df.columns = [format_column_name(col) for col in df.columns]

        # Reemplazar valores NaN con None (compatibilidad con PostgreSQL)
        df = df.where(pd.notna(df), None)

        # Insertar los datos
        engine = get_engine()
        df.to_sql(name="accidents", con=engine, if_exists='replace', index=False)

        print(f"{len(df)} registros insertados.")
    except Exception as e:
        print(f"Error al insertar datos: {e}")

insert_data(data)

132000 registros insertados.


In [None]:
def insert_data(df: pd.DataFrame):
    """Inserta un DataFrame en la tabla 'accidents' de PostgreSQL."""
    try:
        # Transformar los nombres de las columnas automáticamente
        df.columns = [format_column_name(col) for col in df.columns]

        # Reemplazar valores NaN con None (compatibilidad con PostgreSQL)
        df = df.where(pd.notna(df), None)

        # Insertar los datos
        engine = get_engine()
        df.to_sql(name="accidents", con=engine, if_exists='replace', index=False)

        print(f"{len(df)} registros insertados.")
    except Exception as e:
        print(f"Error al insertar datos: {e}")

insert_data(data)

132000 registros insertados.


Antes de la inserción, se realiza una limpieza de los valores nulos, reemplazando los NaN por None, dado que PostgreSQL trabaja con None para representar valores ausentes, lo que garantiza una correcta compatibilidad con la base de datos. El proceso de inserción se lleva a cabo asegurando que los datos se añadan sin sobrescribir la tabla existente, permitiendo que la base de datos crezca de manera continua sin perder registros previos. Finalmente, se incorpora un mecanismo de manejo de errores para capturar cualquier inconveniente que pueda surgir durante la ejecución y proporcionar mensajes claros en caso de fallos.