Antes de comenzar cualquier trabajo, pienso en las variables, columnas o data entregada por parte del cliente. Por lo cual se torna indispensable para mi, hacer un listado de variables en conjunto a una descripcion.


region: Indica la ciudad o región donde se originó el viaje.

origin_coord: Coordenadas de origen del viaje. Formato POINT.

destination_coord: Coordenadas de destino del viaje. Formato POINT.

datetime: Fecha y hora cuando el viaje se realizó.

datasource: Fuente del dato.


1. Procesos automatizados para ingerir y almacenar los datos bajo demanda
    a. Los viajes que son similares en términos de origen, destino y hora del día deben agruparse. Describa
    el enfoque que utilizó para agregar viajes similares.


In [None]:
'''
Ingesta y almacenamiento de datos: Es fundamental pensar en un proceso automatizado para leer los datos necesarios y almacenarlos en una base de datos SQL.
Por ende, sera necesario utilizar librerias que nos brinden apoyo en estas tareas, tales como Pandas para leer la data y sqlalchemy para interactuar con la BD.
IMPORTANTE verificar si los datos son nuevos o ya han sido almacenados previamente, para evitar duplicidad de datos e interferir con la estadistica o procesos automaticos.
'''


In [None]:
'''

Para hacer la ingesta de datos utilizaremos pandas y sqlalchemy.
'''
import pandas as pd
from sqlalchemy import create_engine

# Crea una conexión a la base de datos
engine = create_engine('MiBaseDeDatos')

# Lee los datos del CSV
df = pd.read_csv('trips.csv')

# Almacena los datos en la base de datos
df.to_sql('viajes', engine, if_exists='append')

In [None]:
'''
Para agrupar es posible hacerlo de diversas maneras. Una de ellas es utilizar la libreria geohash2 disponible para este trabajo
'''

import geohash2

def extract_coords(point_str):
    lon, lat = map(float, point_str.strip('POINT (').strip(')').split()) #Extraemos las coordenadas de la cadena POINT.

    return lon, lat


df['origin_coord'] = df['origin_coord'].apply(lambda x: geohash2.encode(*extract_coords(x)))# Se convierten las coordenadas a geohash
df['destination_coord'] = df['destination_coord'].apply(lambda x: geohash2.encode(*extract_coords(x)))


df['datetime'] = pd.to_datetime(df['datetime']).dt.round('H')#Se redondea la hora mas cercana.


df_grouped = df.groupby(['region', 'origin_coord', 'destination_coord', 'datetime']).size().reset_index(name='count')# Agrupa los viajes.

Segunda Opcion

In [None]:
'''
Tambien es posible usar la siguiente metodologia en conjunto con DBSCAN
'''

import pandas as pd
from sqlalchemy import create_engine

# Crea una conexión a la base de datos
engine = create_engine('MiBaseDeDatos')

# Lee los datos del CSV
df = pd.read_csv('trips.csv')

# Almacena los datos en la base de datos
df.to_sql('viajes', engine, if_exists='append')


In [None]:

df[['origin_longitude', 'origin_latitude']] = df['origin_coord'].str.replace('POINT \(', '').str.replace('\)', '').str.split(' ', expand=True)
df[['destination_longitude', 'destination_latitude']] = df['destination_coord'].str.replace('POINT \(', '').str.replace('\)', '').str.split(' ', expand=True)#Se Extraen las coordenadas de las columnas de texto
df[['origin_longitude', 'origin_latitude', 'destination_longitude', 'destination_latitude']] = df[['origin_longitude', 'origin_latitude', 'destination_longitude', 'destination_latitude']].apply(pd.to_numeric)# Convertir a formato numerico

#Esto lo que hace es procesar los datos de coordenadas.

In [None]:
from sklearn.cluster import DBSCAN
import numpy as np


coords = df[['origin_longitude', 'origin_latitude', 'destination_longitude', 'destination_latitude']].values #Combinamos coordenadas de origen y destino en una matriz.


dbscan = DBSCAN(eps=0.1, min_samples=10)# DBSCAN
clusters = dbscan.fit_predict(coords)


df['cluster'] = clusters#Se agregan las etiquetas de los clusters a los datos originales.


'''
Los viajes con orígenes y destinos similares serán agrupados en el mismo clúster. 
Esta solucion proporciona una agrupación más sofisticada y adaptable que la simple agrupación por códigos geohash y tiempo, especialmente si los patrones de viaje tienen una distribución densa y no uniforme.
'''


2.Un servicio que es capaz de proporcionar la siguiente funcionalidad:
a. Devuelve el promedio semanal de la cantidad de viajes para un área definida por un bounding box y la región.
b. Informar sobre el estado de la ingesta de datos sin utilizar una solución de polling.

In [None]:
#Lo primero es crear una una funcion que calcule el promedio. Asi mismo, podemos utilizar geopandas para detectar si un viaje esta dentro de un bounding box.

import geopandas as gpd
from shapely.geometry import Point, Polygon

def obtener_promedio_semanal(df, bounding_box, region):
    geometry = [Point(xy) for xy in zip(df['longitude'], df['latitude'])]#Crear un GeoDataFrame con el dataframe inicial.
    gdf = gpd.GeoDataFrame(df, geometry=geometry)
    polygon = Polygon(bounding_box)#Se crea un polígono para el bounding box.
    region_data = gdf[(gdf['region'] == region) & (gdf['geo'].within(polygon))]#Filtrar los datos por región y por ubicación

    
    region_data['week'] = region_data['datetime'].dt.week#Calcular el promedio semanal.
    weekly_avg = region_data.groupby('week').size().mean()
    return weekly_avg

Para generar el servicio es necesario tener un endpoint en la API que acepte un bounding box y una región como parámetros.

In [None]:
from flask import Flask, request, jsonify
app = Flask(__name__)

@app.route('/promedio_semanal', methods=['GET'])
def promedio_semanal():
    bounding_box = request.args.get('bounding_box', type=list)#Obtener parámetros de la request

    region = request.args.get('region', type=str)
    
    weekly_avg = obtener_promedio_semanal(df, bounding_box, region)#Calcular el promedio semanal

    
    return jsonify({'promedio_semanal': weekly_avg})#Devolver la respuesta como JSON



In [None]:
'''
Para la ingesta de datos sin polling podemos usar websockets, que permiten una comunicación bidireccional entre el cliente y el servidor. 
A su vez, Flask-SocketIO facilita el uso de websockets en una aplicación Flask.
'''

from flask_socketio import SocketIO, emit

app = Flask(__name__)
socketio = SocketIO(app)
@app.route('/ingesta_data', methods=['POST'])
def ingest_data():
    emit('ingesta_data', {'status': 'started'}, broadcast=True)#Ingesta de datos
    try:
        emit('ingesta_data', {'status': 'finished'}, broadcast=True)# Proceso de ingesta de datos...
        emit('ingesta_data', {'status': 'error', 'message': str(e)}, broadcast=True)
    return jsonify({'status': 'success'})

'''
Con esto los clientes/trabajadores podrían conectarse al websocket y recibir actualizaciones en tiempo real sobre el estado de la ingesta de datos.
'''

Hacer escalable la solución a 100 millones de entradas:

Optimización de consultas a la base de datos: Las consultas a la base de datos deben estar optimizadas para asi extraer los datos de forma eficiente. 
Los índices en las columnas utilizadas con frecuencia para las consultas pueden ayudar a acelerar la recuperación de los datos. 
Además, asi se evita la selección de todas las columnas si solo necesitas unas pocas para tu análisis (Efectividad).

Tecnologías adecuadas para manejar grandes cantidades de datos: Usando PostgreSQL, puedes beneficiarte de su capacidad para manejar grandes conjuntos de datos. 
A su vez, cuando trabajamos con cantidades enormes de datos, Spark y Hadoop pueden ser más adecuadas porque permiten el procesamiento distribuido de los datos

Simplificación del modelo de datos: Para mejorar la eficiencia,se debe simplificar el modelo de datos lo maximo posible. 
Se puede eliminar de datos redundantes, la normalización de los datos para minimizar la duplicación, etc.

Escalabilidad horizontal y vertical: La escalabilidad horizontal se refiere a la adición de más máquinas a tu pool de recursos, mientras que la escalabilidad vertical se refiere a la adición de más poder a una máquina existente. Ambas formas de escalabilidad pueden ayudar a manejar cargas de trabajo más grandes.

In [None]:
'''
Asi mismo, para probar si la solucion es optima en cuanto a escalabilidad .
'''

import time
from sqlalchemy import insert

data = [{'column1': 'value1', 'column2': 'value2', ...} for _ in range(100000000)]#Crear 100 millones de entradas



start = time.time()
engine.execute(insert(my_table), data)
print('Tiempo de inserción:', time.time() - start) #Medir el tiempo que lleva insertar 100 millones de entradas


start = time.time()
engine.execute(my_table.select().where(my_table.c.column1 == 'value1')) #Medir el tiempo que lleva hacer una consulta
print('Tiempo de consulta:', time.time() - start)

'''Prueba de carga: Aquí, podriamos insertar 100 millones de entradas en la base de datos y medir cuánto tiempo lleva hacerlo. 
   ----
   Prueba de escalabilidad: Podriamos aumentar gradualmente el volumen de datos en la base de datos y medir cómo cambia el rendimiento. 
   Por ejemplo, podríamos insertar 10 millones de entradas, medir el rendimiento, luego insertar otros 10 millones, medir el rendimiento, y así sucesivamente, hasta llegar a 100 millones de entradas.
   ----
   Prueba de estrés: Aquí, podriamos simular un alto nivel de demanda en la base de datos para ver cómo se comporta. 
   Esto podría implicar hacer muchas consultas a la base de datos en un corto período de tiempo.
'''



Para el diagrama:

1)Cloud Scheduler activa eventos en intervalos regulares.

2)Estos eventos disparan mensajes en "Cloud Pub/Sub".

3)Cuando un mensaje es publicado, activa tu "App Python/Docker" que está corriendo en "Cloud Run".

4)La "App Python/Docker" procesa los datos y los almacena temporalmente en "Cloud Storage" si es necesario.

5)Finalmente, los datos procesados se cargan en tu base de datos "Cloud SQL".

IMPORTANTE MENCIONAR QUE EL DIAGRAMA SE INCLUIRA DE MANERA SIMPLE POR TEMAS DE TIEMPO.