### Librerias a importar (ordenar)

In [1]:
import csv
from datetime import datetime, timedelta
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Float, DDL, update
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import func
import psycopg2
import re
from sqlalchemy.sql import text

### Nos conectamos a la base de datos

Vamos a necesitar postgresql un usuario y una base de datos.
Dentro de la base de datos utilize:

`create extension cube;
create extension earthdistance;`

In [2]:
# Crear una conexión a la base de datos PostgreSQL
engine = create_engine('postgresql://postgres@localhost/trips')

### Definición y creación de las tablas

Creamos la tabla trip como tabla principal. Podriamos crear la tabla de region y datasource, pero en este caso no tiene mucho sentido dado que solo tienen un atributo. Es importante recalcar que lo ideal seria crearlo para que sea facíl de escalar a futuro. Además de esto creamos los triggers para la ingesta de datos. Por ultimo creamos una tabla de trip groups para unir todos los viajes similares.

In [3]:
# Declarar las clases de SQLAlchemy para las tablas de la base de datos
Base = declarative_base()
    
class TripGroup(Base):
    __tablename__ = 'tripgroup'
    id = Column(Integer, primary_key=True)
    origin_lat = Column(Float)
    origin_lng = Column(Float)
    destination_lat = Column(Float)
    destination_lng = Column(Float)
    datetime = Column(DateTime)
    region = Column(String)

class Trip(Base):
    __tablename__ = 'trip'
    id = Column(Integer, primary_key=True)
    origin_lat = Column(Float)
    origin_lng = Column(Float)
    destination_lat = Column(Float)
    destination_lng = Column(Float)
    datetime = Column(DateTime)
    region = Column(String)
    datasource = Column(String)
    tripgroup_id = Column(Integer, ForeignKey('tripgroup.id'))
    
class LogData(Base):
    __tablename__ = 'log_data'
    id = Column(Integer, primary_key=True)
    datetime = Column(DateTime)
    action = Column(String)
    object_id = Column(Integer)

# Crear las tablas en la base de datos
Base.metadata.create_all(engine)

create_insert_log = DDL("""
CREATE OR REPLACE FUNCTION create_insert_log()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO log_data (datetime, action) 
    VALUES (now(), 'INSERT TO TRIP');
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""")

log_trigger = DDL("""
CREATE OR REPLACE TRIGGER create_insert_log
AFTER INSERT ON trip
FOR EACH ROW
EXECUTE FUNCTION create_insert_log();
""")

# Ejecutar las funciones
engine.execute(create_insert_log)
engine.execute(log_trigger)### Librerias a importar (ordenar)

# Crear una sesión de SQLAlchemy para interactuar con la base de datos
Session = sessionmaker(bind=engine)
session = Session()

  Base = declarative_base()


### Creación de las filas

Si utilizamos un archivo .csv se agregan las filas en bulk, si no es así se usa la función create_row para insertar solo una fila en particular.

Los grupos se crean mediante cercanía en millas y en hora. Para este caso en particular se agrupan los viajes que no tengan una distancia mayor de 5 millas en cualquiera de las dos coordenadas y no tengan una separación mayor a 3 horas.

**¿Por qué no elegir N grupos y separarlos en los más cercanos por coordenadas y hora para cada región?**

Es simple, si llevamos esto a la realidad, probablemente las restricciones del problema van a ser del tipo:

a) Que la parada inicial y final no este a más de X millas de donde salen originalmente (para que utilicen el servicio)

b) Que la hora de salida no tenga más de X horas de diferencia con su hora original de salida (para que utilicen el servicio)

Entonces se dejan parametrizados estas variables. No hacía sentido crear N grupos, crear N grupo de viajes y después saber que las personas no utilizan los nuevos medios de trasporte.

In [4]:
# Function to extract lat,long from string
def extract_lat_long(string):
    match = re.search("\((.*)\)", string)
    lat,long = match.group(1).split(" ")
    return (float(lat),float(long))

def upsert_group(origin_lat, origin_lng, destination_lat, destination_lng, datetime, region, miles=5, hours=3):
    date_time_sum_three = datetime + timedelta(hours=hours)
    date_time_minus_three = datetime - timedelta(hours=hours)
    group = engine.execute(text(f"SELECT * FROM TRIPGROUP WHERE point({origin_lat},{origin_lng}) <@> point(tripgroup.origin_lat, tripgroup.origin_lng) < {miles} AND point({destination_lat},{destination_lng}) <@> point(tripgroup.destination_lat, tripgroup.destination_lng) < {miles} AND datetime BETWEEN '{date_time_minus_three}' AND '{date_time_sum_three}';")).fetchall()
    if group:
        # if group exists, return group id
        return group[0].id
    else:
        # create new group and return the id
        new_group = TripGroup(origin_lat=origin_lat, origin_lng=origin_lng, destination_lat=destination_lat, destination_lng=destination_lng, datetime=datetime, region=region)
        session.add(new_group)
        session.commit()
        return new_group.id

# Ocupo esta funcion solo si quiero agregar un dato
def create_row(region_name, datasource_name, origin_coord, destination_coord, timestamp):
    origin_coord = extract_lat_long(origin_coord)
    destination_coord = extract_lat_long(destination_coord)
    group_id = upsert_group(origin_lat=origin_coord[0], origin_lng=origin_coord[1], destination_lat=destination_coord[0], destination_lng=destination_coord[1], datetime=datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S'), region=region_name)
    # Crear un objeto de la clase Trip para el registro
    trip = Trip(origin_lat=origin_coord[0], origin_lng=origin_coord[1], destination_lat=destination_coord[0], destination_lng=destination_coord[1], datetime=datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S'), region=region_name, datasource=datasource_name, tripgroup_id=group_id)
    session.add(trip)
    # Hacer un commit para guardar los cambios en la base de datos
    session.commit()

# Ocupo esta funcion si quiero agregar varios datos en un archivo csv
def read_trips(file_name):
  # Leer el archivo CSV
  with open(file_name, 'r') as f:
    reader = csv.DictReader(f)

    # Create a list or queue to hold the data temporarily
    trips_data = []

    # Define a batch size, for example, 1000
    BATCH_SIZE = 1000000

    # Iterar sobre cada fila del archivo
    for row in reader:
        region_name = row['region']
        datasource_name = row['datasource']
        origin_coord = extract_lat_long(row['origin_coord'])
        destination_coord = extract_lat_long(row['destination_coord'])
        timestamp = datetime.strptime(row['datetime'], '%Y-%m-%d %H:%M:%S')

        group_id = upsert_group(origin_lat=origin_coord[0], origin_lng=origin_coord[1], destination_lat=destination_coord[0], destination_lng=destination_coord[1], datetime=timestamp, region=region_name)

        # Crear un objeto de la clase Trip para el registro
        trip = {'origin_lat': origin_coord[0], 'origin_lng': origin_coord[1], 'destination_lat': destination_coord[0], 'destination_lng': destination_coord[1], 'datetime': timestamp, 'region': region_name, 'datasource': datasource_name, 'tripgroup_id': group_id}

        trips_data.append(trip)

        if len(trips_data) >= BATCH_SIZE:
            # Insert the data in batches using the bulk_insert_mappings() method
            session.bulk_insert_mappings(Trip, trips_data)
            session.commit()
            # Clear the list or queue for the next batch
            trips_data.clear()

    # Insert any remaining data
    if trips_data:
        session.bulk_insert_mappings(Trip, trips_data)
        session.commit()

### Utilizamos funcion read_trips

In [5]:
read_trips('trips.csv')

### Probamos que funcione correctamente

In [6]:
trips = session.query(Trip).order_by(Trip.region, Trip.datetime)

print('origin_lat', 'origin_lng', 'destination_lat', 'destination_lng', 'datetime', 'datasource', 'region', 'group')
# Imprimir los resultados de la consulta
for trip in trips:
    print(trip.origin_lat, trip.origin_lng, trip.destination_lat, trip.destination_lng, trip.datetime, trip.datasource, trip.region, trip.tripgroup_id)

origin_lat origin_lng destination_lat destination_lng datetime datasource region group
9.840595540189936 53.50991873196815 10.12399795075635 53.62949412099623 2018-05-01 05:24:37 funny_car Hamburg 970
9.936476031944927 53.45543416459435 10.04091186185177 53.42024644236509 2018-05-01 10:22:59 baba_car Hamburg 930
10.05260098579818 53.53497739746809 10.05889649564977 53.49486429314853 2018-05-04 00:46:12 cheap_mobile Hamburg 892
10.20044161946335 53.54722485790852 9.857518857977054 53.52506880668962 2018-05-05 12:39:21 cheap_mobile Hamburg 905
9.961144659266848 53.42865933587807 10.08503581364391 53.43645004358303 2018-05-06 06:34:22 cheap_mobile Hamburg 893
10.17801484401512 53.55155909331071 9.958672518621771 53.5509263400788 2018-05-07 10:28:04 pt_search_app Hamburg 962
9.80304883152659 53.50847122244616 9.834883820942993 53.63694321201706 2018-05-09 09:28:24 cheap_mobile Hamburg 908
10.1457401708168 53.43686908113324 10.19421502623748 53.49444402955017 2018-05-10 03:37:56 bad_diesel_

### Podemos ver los puntos en un map

In [7]:
import folium
import pandas as pd
import re

query = 'SELECT * FROM trip'
df = pd.read_sql(query, engine)

# Create a map object
m = folium.Map(location=[45, 10], zoom_start=5)

# Add markers for origin and destination coordinates
for i, row in df.iterrows():
    folium.Marker(location=[row["origin_lat"], row["origin_lng"]],
                  popup=f"Datetime: {row['datetime']}\nDatasource: {row['datasource']}\nRegion: {row['region']}",
                  icon=folium.Icon(color='green')).add_to(m)
    folium.Marker(location=[row["destination_lat"], row["destination_lng"]],
                  popup=f"Datetime: {row['datetime']}\nDatasource: {row['datasource']}\nRegion: {row['region']}",
                  icon=folium.Icon(color='red')).add_to(m)

# Display the map
m.save("map.html")

### Podemos incluir una opcion de sugerir nuevo punto de partida

**¿Por qué no automatizar esta selección?**

Por dos principales razones:
1) Las coordenadas pueden ser no accesibles (ej propiedad privada)

2) La idea es no cambiar los puntos de partida de un día a otro empeorando el servicio

In [8]:
def groups_with_bigger_one():
    query = "SELECT tripgroup_id FROM trip GROUP BY tripgroup_id HAVING COUNT(tripgroup_id) > 1"
    return engine.execute(query).fetchall()

def new_trips_suggestions():
    posible_groups = groups_with_bigger_one()
    for group in posible_groups:
        query_suggestion = f"SELECT AVG(origin_lat), AVG(origin_lng), AVG(destination_lat), AVG(destination_lng) FROM trip WHERE tripgroup_id = {group[0]}"
        result_suggestion = engine.execute(query_suggestion).fetchall()[0]
        query_original = f"SELECT origin_lat,origin_lng,destination_lat,destination_lng FROM tripgroup WHERE id = {group[0]}"
        result_original = engine.execute(query_original).fetchall()[0]
        if result_suggestion[0] != result_original[0] or result_suggestion[1] != result_original[1] or result_suggestion[2] != result_original[2] or result_suggestion[3] != result_original[3]:
            print(f"Change suggestion for trip group id {group[0]}")
            print("Replace:")
            print(result_original)
            print("For:")
            print(result_suggestion)
            print("\n")

def update_trip_group_coord(id, origin_lat, origin_lng, destination_lat, destination_lng):
    stmt = update(TripGroup).where(TripGroup.id == id).values(origin_lat=origin_lat, origin_lng=origin_lng, destination_lat=destination_lat, destination_lng=destination_lng)
    result = engine.execute(stmt)
    trips_that_can_change = engine.execute(f"SELECT * FROM TRIP JOIN TRIPGROUP ON tripgroup.id = {id} AND trip.tripgroup_id != {id} AND point(tripgroup.origin_lat,tripgroup.origin_lng) <@> point(trip.origin_lat, trip.origin_lng) < 5 AND point(tripgroup.destination_lat,tripgroup.destination_lng) <@> point(trip.destination_lat, trip.destination_lng) < 5 WHERE trip.datetime BETWEEN tripgroup.datetime - INTERVAL '3 hour' AND tripgroup.datetime + INTERVAL '3 hour';").fetchall()
    for trip in trips_that_can_change:
        print(f"Warning, with the new change you can group trip id {trip[0]} in group {id} and eliminate group id {trip[8]}")

new_trips_suggestions()

Change suggestion for trip group id 901
Replace:
(7.703220872633631, 45.12041666625482, 7.713229884381637, 45.08729268877508)
For:
(7.694560076855213, 45.098041164790736, 7.714413538145733, 45.07097225070298)


Change suggestion for trip group id 927
Replace:
(7.739660019780326, 45.10100884469237, 7.597114891145975, 45.1162369066238)
For:
(7.736244253378469, 45.09924891243395, 7.629811190981598, 45.090481860176865)




### Podemos seguir la suggerencia o cambiar un poco las coordenadas

La sugerencia anterior es a partir del promedio de lat y lng del grupo. Con esto podemos aceptar la sugenrencia y cambiar las coordenadas, además te avisa en caso de encontrar una mejor solución (menor cantidad de viajes) con el cambio.

In [9]:
# Solo ejecutar en base a las sugerencias de arriba
#update_trip_group_coord(829, 7.736244253378469, 45.09924891243395, 7.629811190981598, 45.090481860176865)

### Obtener promedio por región y bounding box

Creamos las queries donde consultamos los datos que pidieron.

In [10]:
def weekly_trips_mean_by_region(region_id):
    query = "SELECT AVG(count) as average_trips FROM (SELECT date_trunc('week', datetime) as week, count(*) as count FROM trip WHERE region = '{}' GROUP BY week) as subquery".format(region_id)
    return engine.execute(query).scalar()

def weekly_trips_mean_by_bounding_box(min_lat, max_lat, min_lng, max_lng):
    query = "SELECT AVG(count) as average_trips FROM (SELECT date_trunc('week', datetime) as week, count(*) as count FROM trip WHERE origin_lat BETWEEN {} AND {} AND origin_lng BETWEEN {} AND {} AND destination_lat BETWEEN {} AND {} AND destination_lng BETWEEN {} AND {} GROUP BY week) as subquery".format(min_lat, max_lat, min_lng, max_lng, min_lat, max_lat, min_lng, max_lng)
    return engine.execute(query).scalar()

def weekly_trips_mean_by_region_and_box(min_lat, max_lat, min_lng, max_lng, region_id):
    query = "SELECT AVG(count) as average_trips FROM (SELECT date_trunc('week', datetime) as week, count(*) as count FROM trip WHERE origin_lat BETWEEN {} AND {} AND origin_lng BETWEEN {} AND {} AND destination_lat BETWEEN {} AND {} AND destination_lng BETWEEN {} AND {} AND region = '{}' GROUP BY week) as subquery".format(min_lat, max_lat, min_lng, max_lng, min_lat, max_lat, min_lng, max_lng, region_id)
    return engine.execute(query).scalar()

print(weekly_trips_mean_by_region('Hamburg'))
print(weekly_trips_mean_by_bounding_box(14.3, 14.6, 49.9, 50.1))
print(weekly_trips_mean_by_region_and_box(14.3, 14.6, 49.9, 50.1, 'Prague'))


5.6000000000000000
3.0000000000000000
3.0000000000000000


### Consultamos la ingesta de datos

Creamos una funcion que consulta entre dos fechas la cantidad de filas creadas.

In [11]:
def data_ingest_by_date(start_time, end_time):
    query = text(f"SELECT COUNT(*) FROM log_data WHERE datetime BETWEEN '{start_time}' AND '{end_time}'")
    return engine.execute(query).scalar()

data_ingest_by_date(start_time = '2022-01-01', end_time = '2024-01-31')

100

### Comprobamos que escale

Generalmente los datos van a ir consumiendose en tiempo real y no con un archivo grande .csv por lo que los inserts no deberian ser de mayor preocupación. Creamos un archivo con 1 millon de filas para testear el tiempo de todas las consultas juntas y como se puede ver es bastante escalable.

In [12]:
read_trips('trips_scale.csv')

In [18]:
import time

start_time = time.time()
print(weekly_trips_mean_by_region('Hamburg'))
print(weekly_trips_mean_by_bounding_box(14.3, 14.6, 49.9, 50.1))
print(weekly_trips_mean_by_region_and_box(14.3, 14.6, 49.9, 50.1, 'Prague'))
print(data_ingest_by_date(start_time = '2022-01-01', end_time = '2024-01-31'))

end_time = time.time()
total_time = end_time - start_time
print("Total time taken: ", total_time)

56453.600000000000
30243.000000000000
30243.000000000000
1006420
Total time taken:  0.23499011993408203
