In [None]:
import pandas as pd
import numpy as np
import sqlite3
from sqlalchemy import create_engine
import ipywidgets as widgets
import os
import folium

In [None]:
# Variables definidas por usuario

## define la locacion por defecto del archivo csv, con una variable de ambiente para que sea posible modificarlo al desplegarse 
default_file_path = os.getenv('CSV_FILE_PATH', '/app/data/trips.csv')
## define la locacion por defecto de la base de datos sqlite, con una variable de ambiente para que sea posible modificarlo al desplegarse 
default_db_string = os.getenv('DATABASE_FILE_PATH', 'sqlite:////app/data/mydatabase.db')



In [None]:
# Funciones para procesos automatizados para ingerir y alamcenar los datos bajo demanda

def ingest_data(file_path):
    """
    Esta funcion lee el CSV y los guarda en un dataframe
    """
    df = pd.read_csv(file_path)
    # convierte la columna datetime de string a datetime
    df['datetime'] = pd.to_datetime(df['datetime'])
    return df

def store_data(df,db_string,table_name,if_exists='replace'):
    """
    Esta funcion guarda los datos a un tabla en la base de datos SQL
    """
    engine = create_engine(db_string)
    df.to_sql(table_name, con=engine, if_exists = if_exists)


In [None]:
# Ejecuta carga bajo demanda

## lectura de csv como dataframe
df_trips = ingest_data(default_file_path)

## guarda dataframe en la base de datos SQL
store_data(df_trips, default_db_string, 'trips_ingesta')

## para monitorear o reportar sobre el proceso de ingesta, se puede usar un servicio de 'message queue'
## como pub/sub en GCP, para que al iniciar y terminar la carga de datos se envie un mensaje desde la db
## y esto sea procesado por la notebook, debido a que no esta configurado en este repositorio estos print
## se usan como placeholder para mostrar la funcionalidad
print("Ingesta iniciada...")


In [None]:
# Procesamiento de datos 

## Ya que no se define un criterio de similitud en los requerimientos, decidi agrupar por bins/buckets
## tanto de coordenadas como de tiempo, redondeando las coordenadas a 3 decimales o aproximadamente un
## grid de ~110m, y el tiempo a viajes dentro de la misma hora de 0 a 59 minutos
## esto da por hecho que la ingesta no trae duplicados y que no se estan generando duplicados por la misma
## lo cual es un riesgo muy real con un proceso bajo demanada

def extract_coords(df, origin_col, destination_col):
    """
    Esta funcion extrae los datos numericos de latitud y longitud del string de coordenadas en el dataset
    """
    # Extraer coordenadas de origin
    df['origin_longitude'], df['origin_latitude'] = df[origin_col].str.extract(r'POINT \((.*?) (.*?)\)')
    df['origin_longitude'] = df['origin_longitude'].astype(float)
    df['origin_latitude'] = df['origin_latitude'].astype(float)

    # Extraer coordenadas de destino
    df['destination_longitude'], df['destination_latitude'] = df[destination_col].str.extract(r'POINT \((.*?) (.*?)\)')
    df['destination_longitude'] = df['destination_longitude'].astype(float)
    df['destination_latitude'] = df['destination_latitude'].astype(float)

    return df

def agrupar_trips(df, coord_precision):
    """
    Esta funcion reduce la precision de las coordenadas de los viajes, y redondea las la fecha por horas, para agrupar viajes similares
    """
    # Reduce la precision de las coordenadas
    df['origin_lon_rounded'] = df['origin_longitude'].round(decimals=coord_precision)
    df['origin_lat_rounded'] = df['origin_latitude'].round(decimals=coord_precision)
    df['dest_lon_rounded'] = df['destination_longitude'].round(decimals=coord_precision)
    df['dest_lat_rounded'] = df['destination_latitude'].round(decimals=coord_precision)

    # Creacion columna de hora redondeada a la hora en punto mas cercana, siempre hacia abajo
    df['datetime_hour'] = df['datetime'].dt.floor('H')

    # Crea la columna 'trip' que combina los nuevos valores para agrupar
    df['trip'] = df[['origin_lat_rounded', 'origin_lon_rounded', 'dest_lat_rounded', 'dest_lon_rounded', 'datetime_hour']].astype(str).agg(', '.join, axis=1)

    return df

df_trips = extract_coords(df_trips, 'origin_coord', 'destination_coord')
df_trips = agrupar_trips(df_trips, 3)

## aca se guardan los datos en la tabla final 'trips' esta tabla se usara para el procesamiento de los viajes semanales
## tambien seria el punto en que se procesan los mensajes de carga completada, una vez el servico este conectado 
try:
    store_data(df_trips,default_db_string,'trips','append')
    print("Ingesta completada con exito")
except Exception as e:
    print('Falla al cargar los datos')
    print('Error:', e)


In [None]:
# Promedio semanal de viajes - Calculo

## debido al tamaño esperado de 100 millones de registros, se calcula sobre la base de datos
## en vez de cargar los datos a la notebook
def avg_weekly_trips(bounding_box, region):
    """
    Esta funcion calcula el promedio semanal de viajes definidos para un area y region
    """
    query = """
        SELECT 
            AVG(num_trips) AS avg_trips_per_week
        FROM 
        (
            SELECT
                strftime('%Y', datetime) AS year,
                strftime('%W', datetime) AS week_of_year,
                COUNT(*) AS num_trips
            FROM 
                trips
            WHERE 
                origin_latitude BETWEEN :lat_min AND :lat_max
                AND origin_longitude BETWEEN :lon_min AND :lon_max
                AND region = :region
            GROUP BY
                year,
                week_of_year
        );
        """

    params = {
    "lat_min": bounding_box['lat_min'], 
    "lat_max": bounding_box['lat_max'], 
    "lon_min": bounding_box['lon_min'], 
    "lon_max": bounding_box['lon_max'],
    "region": region,
    }

    engine = create_engine(default_db_string)
    result = pd.read_sql_query(query, engine, params=params)


    return result


In [None]:
# Promedio semanal de viajes - Resultado

## decidi usar una interfaz grafica para mostrar los resultados, ya que el promedio
## de viajes semanales es sobre el dataset, sin tomar en cuenta o filtrar por la serie de tiempo.

## define los datos necesarios para el calculo del promedio semanal

## Esto asume que los datos de latitud y longitud en efecto pertenezcan a la region seleccionada
bounding_box = {'lat_min': 53.5386, 'lat_max': 53.5728, 'lon_min': 9.9843, 'lon_max': 10.0242} 
region = 'Hamburg'

weekly_avg = avg_weekly_trips(bounding_box, region)

## dictionario de ciudades y coordenadas
cities = {
    "Turin": [45.0703, 7.6869],
    "Hamburg": [53.5488, 9.9872],
    "Prague": [50.0755, 14.4378],
    # es posibel como mejora añadir ciudades/regiones dinamicante
}

# Creacion de mapa folium centrado en la region
map = folium.Map(location=cities[region], zoom_start=13,width='50%', height='50%')

# Añade rectangulo al mapa para el bounding box
folium.Rectangle(
    bounds=[[bounding_box['lat_min'], bounding_box['lon_min']], [bounding_box['lat_max'], bounding_box['lon_max']]],
    color="#ff7800",
    fill=True,
    fill_color="#ffff00",
    fill_opacity=0.2,
).add_to(map)

# Añade popup con el calculo de promedio de viajes semanales en el centro del bounding box
popup_text = f"viajes promedio semanales: {weekly_avg}"
popup_location = ((bounding_box['lat_min'] + bounding_box['lat_max']) / 2, 
                  (bounding_box['lon_min'] + bounding_box['lon_max']) / 2)

folium.Marker(
    location=popup_location, 
    popup=folium.Popup(popup_text, max_width=250)
).add_to(map)

# Display el mapa
map