In [4]:
import pandas as pd
import psycopg2
import os
from sqlalchemy import create_engine
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, regexp_extract, regexp_replace, broadcast
from pyspark.sql.types import StringType, DateType, IntegerType, StructType, StructField
import json

In [2]:
# Configuración de Spark
spark = SparkSession.builder.appName("Ecobici").getOrCreate()

Carga de los archivos de usuarios y creación de la tabla que se cargará a la BD

In [3]:
usuarios_columnas = ["id_usuario", "genero_usuario", "edad_usuario", "fecha_alta"] # Columnas que se van a tener en cuenta
carpeta = "usuarios" # Carpeta donde estan todos los archivos de usuarios por año
dataframes = [] #   Lista donde se van a cargar los dataframes generados para cada archivo

for archivo in os.listdir(carpeta): # Iterar sobre la carpeta de usuarios para abrir cada archivo csv que contenga "usuarios" en el nombre
  if archivo.endswith(".csv") and ("usuarios-ecobici" in archivo or "usuarios_ecobici" in archivo):
    ruta_archivo = os.path.join(carpeta, archivo)
    df = spark.read.csv(ruta_archivo, header=True, inferSchema=True)

    for columna in df.columns:
      df = df.withColumnRenamed(columna, columna.lower().replace('"', '')) # Eliminar las "" de las columnas que tienen algunos archivos para dejar todos con el mismo formato

    df = df.select(usuarios_columnas) # Quedarse con las columnas seleccionadas
    df = df.withColumn("fecha_alta", to_date(col("fecha_alta"), "dd-MM-yy")) # Dejar todos los valores de fecha_alta con un mismo formato

    dataframes.append(df) # Agregar el dataframe a la lista

usuarios_df = dataframes[0]
for df in dataframes[1:]:
  usuarios_df = usuarios_df.union(df) # Concatenar todos los dataframes

usuarios_df = usuarios_df.withColumnRenamed("genero_usuario", "genero").withColumnRenamed("edad_usuario", "edad") # Renombrar columnas
usuarios_df = usuarios_df.select([col(c).alias(c.lower()) for c in usuarios_df.columns]) # Convertir los nombres de las columnas a minúscula
usuarios_df = usuarios_df.dropna(subset=["edad"]) # Eliminar las filas donde edad sea Null
usuarios_df = usuarios_df.filter(~col("edad").contains(",")) # Eliminar las filas donde edad contiene "," para poder convertirlo a int
usuarios_df = usuarios_df.withColumn("genero", col("genero").substr(1,1)) # Quedarse solo con la primera letra de la columna genero

usuarios_df = usuarios_df.withColumn("edad", col("edad").cast(IntegerType())) # Convertir edad a int

usuarios_df = usuarios_df.filter((col("edad") >= 10) & (col("edad") <= 125)) # Eliminar las filas con edad inconsistente
usuarios_df = usuarios_df.orderBy("fecha_alta", ascending=False) # Ordenar por fecha_alta para eliminar los registros con id_usuario duplicado y mantener los que tengan mayor fecha
usuarios_df = usuarios_df.dropDuplicates(["id_usuario"]) # Eliminar los duplicados manteniendo el primero (mayor fecha_alta)

usuarios_df.printSchema() # Mostrar el esquema del DataFrame
print(f"Cantidad de filas: {usuarios_df.count()}")
nulos_id_usuario = usuarios_df.filter(col("id_usuario").isNull()) # Revisar que no haya id_usuario nulos
print("Usuarios nulos")
nulos_id_usuario.show()
usuarios_df.show()   

root
 |-- id_usuario: integer (nullable = true)
 |-- genero: string (nullable = true)
 |-- edad: integer (nullable = true)
 |-- fecha_alta: date (nullable = true)

Cantidad de filas: 680092
Usuarios nulos
+----------+------+----+----------+
|id_usuario|genero|edad|fecha_alta|
+----------+------+----+----------+
+----------+------+----+----------+

+----------+------+----+----------+
|id_usuario|genero|edad|fecha_alta|
+----------+------+----+----------+
|        78|     M|  31|2019-02-18|
|        85|     M|  30|2019-02-18|
|       108|     M|  51|2019-02-18|
|       192|     F|  32|2019-02-19|
|       211|     M|  39|2019-02-19|
|       253|     M|  52|2019-02-19|
|       255|     M|  31|2019-02-19|
|       300|     M|  50|2019-02-19|
|       321|     M|  46|2019-02-20|
|       332|     M|  32|2019-02-20|
|       463|     M|  22|2019-02-20|
|       472|     M|  52|2019-02-20|
|       481|     M|  34|2019-02-20|
|       497|     M|  28|2019-02-20|
|       593|     O|  52|2019-02-20|
| 

Carga del archivo de estaciones y creación de la tabla que se cargará a la BD

In [37]:
estaciones_columnas = ["ID Comet", "NÚMERO de Estación ", "NOMBRE", "BARRIO", "COMUNA"] # Columnas que se van a tener en cuenta

estaciones_df = spark.read.csv("nuevas-estaciones-bicicletas-publicas.csv", header=True, sep=";", inferSchema=True, encoding="ISO-8859-1", multiLine=True)
estaciones_df = estaciones_df.select(estaciones_columnas) # Quedarse con las columnas seleccionadas
estaciones_df = estaciones_df.withColumnRenamed("ID Comet", "id_estacion").withColumnRenamed("NÚMERO de Estación ", "numero_estacion") # Renombrar columnas

estaciones_df = estaciones_df.select([col(c).alias(c.lower()) for c in estaciones_df.columns]) # Convertir los nombres de las columnas a minúscula
estaciones_df = estaciones_df.withColumn("comuna", regexp_extract("comuna", r"(\d+)", 1).cast(IntegerType())) # Quedarse solo con el número de la columna comuna

estaciones_df = estaciones_df.withColumn("id_estacion", col("id_estacion").cast(IntegerType())) # Convertir id_estacion a int

estaciones_df = estaciones_df.dropDuplicates(["id_estacion"]) # Eliminar los duplicados
estaciones_df = estaciones_df.dropna(subset=["id_estacion"]) # Eliminar las filas donde id_estacion sea Null

estaciones_df.printSchema() # Mostrar el esquema del DataFrame
print(f"Cantidad de filas: {estaciones_df.count()}")
nulos_id_estacion = estaciones_df.filter(col("id_estacion").isNull()) # Revisar que no haya id_estacion nulos
print("Estaciones nulas")
nulos_id_estacion.show()
estaciones_df.show()

root
 |-- id_estacion: integer (nullable = true)
 |-- numero_estacion: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- barrio: string (nullable = true)
 |-- comuna: integer (nullable = true)

Cantidad de filas: 360
Estaciones nulas
+-----------+---------------+------+------+------+
|id_estacion|numero_estacion|nombre|barrio|comuna|
+-----------+---------------+------+------+------+
+-----------+---------------+------+------+------+

+-----------+---------------+-------------------+----------------+------+
|id_estacion|numero_estacion|             nombre|          barrio|comuna|
+-----------+---------------+-------------------+----------------+------+
|          2|              2|           RETIRO I|          RETIRO|     1|
|          3|              3|             ADUANA|       MONSERRAT|     1|
|          4|              4|         PLAZA ROMA|     SAN NICOLAS|     1|
|          5|              5|       PLAZA ITALIA|         PALERMO|    14|
|          6|           

Carga del archivo de viajes y creación de la tabla que se cargará a la BD

In [38]:
viajes_columnas = ["Id_recorrido","duracion_recorrido","id_estacion_origen","id_estacion_destino","id_usuario","modelo_bicicleta"] # Columnas que se van a tener en cuenta

viajes_df = spark.read.csv("trips_2023.csv", header=True, inferSchema=True)
viajes_df = viajes_df.select(viajes_columnas) # Quedarse con las columnas seleccionadas
viajes_df = viajes_df.withColumnRenamed("duracion_recorrido", "duracion_segundos") # Renombrar columnas
viajes_df = viajes_df.select([col(c).alias(c.lower()) for c in viajes_df.columns]) # Convertir los nombres de las columnas a minúscula

columnas_baecobici = ["id_recorrido", "id_estacion_origen", "id_estacion_destino", "id_usuario"]
for columna in columnas_baecobici:
      viajes_df = viajes_df.withColumn(columna, regexp_extract(columna, r'(\d+)', 1)) # Eliminar "BAEcobici" de las columnas que lo tienen

valores_modelo = viajes_df.select("modelo_bicicleta").distinct().collect() # Obtener valores únicos de la columna modelo_bicicleta para saber como almacenarlo en la BD
for valor in valores_modelo:
    print(f"Valor modelo bicicleta: {valor[0]}") # Mostrar los valores únicos

viajes_df = viajes_df.withColumn("duracion_segundos", regexp_replace(col("duracion_segundos"), ",", "")) # Quitar las comas de la columna duracion_segundos

columnas_int = ["id_recorrido","duracion_segundos","id_estacion_origen","id_estacion_destino","id_usuario"]
for columna in columnas_int:
  viajes_df = viajes_df.withColumn(columna, col(columna).cast(IntegerType())) # Convertir columnas a int

nulos_estacion_origen = viajes_df.filter(col("id_estacion_origen").isNull()) # Revisar que no haya id_estacion_origen nulos
nulos_estacion_destino = viajes_df.filter(col("id_estacion_destino").isNull()) # Revisar que no haya id_estacion_destinon nulos
nulos_id_recorrido = viajes_df.filter(col("id_recorrido").isNull()) # Revisar que no haya id_registro nulos
print("Estaciones nulas")
nulos_estacion_origen.show()
nulos_estacion_destino.show()
nulos_id_recorrido.show()
viajes_df = viajes_df.dropna() # Eliminar filas nulas
viajes_df.printSchema() # Mostrar el esquema del DataFrame
print(f"Cantidad de filas: {viajes_df.count()}")
viajes_df.show()

Valor modelo bicicleta: ICONIC
Valor modelo bicicleta: FIT
Estaciones nulas
+------------+-----------------+------------------+-------------------+----------+----------------+
|id_recorrido|duracion_segundos|id_estacion_origen|id_estacion_destino|id_usuario|modelo_bicicleta|
+------------+-----------------+------------------+-------------------+----------+----------------+
+------------+-----------------+------------------+-------------------+----------+----------------+

+------------+-----------------+------------------+-------------------+----------+----------------+
|id_recorrido|duracion_segundos|id_estacion_origen|id_estacion_destino|id_usuario|modelo_bicicleta|
+------------+-----------------+------------------+-------------------+----------+----------------+
|    19632972|             1372|                26|               null|   1020610|             FIT|
|    19632583|              466|                38|               null|    774845|             FIT|
+------------+---------

Filtrar el dataframe de viajes para solo conservar los registros donde id_usuario exista en el dataframe usuarios y id_estacion origen y id_estacion_destion existan en el dataframe estaciones

In [39]:
# Alias para los DataFrames
usuarios_alias = usuarios_df.alias("u")
estaciones_alias = estaciones_df.alias("e")
viajes_alias = viajes_df.alias("v")

# Unir usuarios_df y estaciones_df con viajes_df
viajes_filtrado = viajes_alias.join(usuarios_alias, on="id_usuario", how="inner") \
    .join(estaciones_alias, col("v.id_estacion_origen") == col("e.id_estacion"), how="inner") \
    .join(estaciones_alias.alias("e_destino"), col("v.id_estacion_destino") == col("e_destino.id_estacion"), how="inner") \
    .select("v.id_recorrido", "v.duracion_segundos", "v.id_estacion_origen", "v.id_estacion_destino", "v.id_usuario", "v.modelo_bicicleta")

viajes_filtrado.printSchema() # Mostrar el esquema del DataFrame
# Mostrar el DataFrame resultante
print(f"Cantidad de filas: {viajes_filtrado.count()}")
viajes_filtrado.show()

root
 |-- id_recorrido: integer (nullable = true)
 |-- duracion_segundos: integer (nullable = true)
 |-- id_estacion_origen: integer (nullable = true)
 |-- id_estacion_destino: integer (nullable = true)
 |-- id_usuario: integer (nullable = true)
 |-- modelo_bicicleta: string (nullable = true)

Cantidad de filas: 2456749
+------------+-----------------+------------------+-------------------+----------+----------------+
|id_recorrido|duracion_segundos|id_estacion_origen|id_estacion_destino|id_usuario|modelo_bicicleta|
+------------+-----------------+------------------+-------------------+----------+----------------+
|    17910696|             1848|               358|                278|    861866|          ICONIC|
|    17600256|              288|               444|                  3|    217525|          ICONIC|
|    17255670|             1103|               280|                280|    954201|          ICONIC|
|    17996972|             1165|               273|                367|    179

In [40]:
# Leer el archivo de configuración
with open("config.json") as f:
    config = json.load(f)

# Acceder a las configuraciones
user = config["DB_USER"]
password = config["DB_PASSWORD"]
host = config["DB_HOST"]
database = config["DB_DATABASE"]
port = config["DB_PORT"]

driver = "org.postgresql.Driver"

# Configurar las credenciales de PostgreSQL
postgresql_url = f"jdbc:postgresql://{host}:{port}/{database}"
properties = {"user": user, "password": password, "driver": "org.postgresql.Driver"}

# Esquema de las tablas
usuarios_schema = "id_usuario INT PRIMARY KEY, genero CHAR(1), edad SMALLINT, fecha_alta DATE"
estaciones_schema = "id_estacion INT PRIMARY KEY, numero_estacion INT, nombre VARCHAR(70), barrio VARCHAR(50), comuna SMALLINT"
viajes_schema = "id_recorrido INT PRIMARY KEY, duracion_segundos INT, id_estacion_origen INT REFERENCES estaciones(id_estacion), id_estacion_destino INT REFERENCES estaciones(id_estacion), id_usuario INT REFERENCES usuarios(id_usuario), modelo_bicicleta VARCHAR(10)"


# Obtener la conexión JDBC
connection = spark._jvm.java.sql.DriverManager.getConnection(postgresql_url, properties["user"], properties["password"])
# Crear un objeto Statement
statement = connection.createStatement()
# Sentencia SQL para crear la tabla si no existe
tablas = ["viajes", "usuarios", "estaciones"]
for tabla in tablas:
        statement.execute(f"DROP TABLE IF EXISTS {tabla}")

statement.execute(f"CREATE TABLE usuarios ({usuarios_schema})")
statement.execute(f"CREATE TABLE estaciones ({estaciones_schema})")
statement.execute(f"CREATE TABLE viajes ({viajes_schema})")

statement.execute("CREATE INDEX idx_id_usuario ON viajes (id_usuario)")
statement.execute("CREATE INDEX idx_id_estacion_origen ON viajes (id_estacion_origen)")
statement.execute("CREATE INDEX idx_id_estacion_destino ON viajes (id_estacion_destino)")

# Escribir el DataFrame en la tabla PostgreSQL
usuarios_df.write.jdbc(url=postgresql_url, table="usuarios", mode="append", properties=properties)
estaciones_df.write.jdbc(url=postgresql_url, table="estaciones", mode="append", properties=properties)
viajes_filtrado.write.jdbc(url=postgresql_url, table="viajes", mode="append", properties=properties)

# Cerrar SparkSession
spark.stop()
