# Proyecto ETL Databricks Spark

### Extracción de datos desde tabla.

In [0]:
# Importar librerías necesarias
from pyspark.sql.functions import col, round, unix_timestamp, count, when, isnull

# Leer y mostrar un subconjunto de los datos
raw_df = spark.read.table("samples.nyctaxi.trips")
display(raw_df.limit(10))

### Conociendo el dataset.

La tabla provista desde Amazon S3 por databricks ya viene con las columnas en el tipo de datos correcto, pero es una buena practica revisar.

In [0]:
# imprimir tipo de datos de columnas (esquema)
raw_df.printSchema()

In [0]:
# Mostrar cantidad de filas
raw_df.count()

In [0]:
# Mostrar estadísticas descriptivas para columna de tipo numerico
raw_df.select("fare_amount").describe().show()

In [0]:
# Muestra cantidad de valores únicos
raw_df.select("dropoff_zip").distinct().count()

In [0]:
# Verificamos si existen valores nulos en alguna columna
raw_df.select([
    count(when(isnull(c), c)).alias(c)
    for c in raw_df.columns
]).show()

# alias(c) renombra la columna resultante con el nombre original (c), para que el resultado sea legible.
# raw_df.columns devuelve una lista con los nombres de todas las columnas del DataFrame

### Transformaciones iniciales.

In [0]:
# Cambiar el nombre de las columnas en el DataFrame
renamed_df = raw_df.withColumnRenamed("tpep_pickup_datetime", "fecha_hora_recogida") \
    .withColumnRenamed("tpep_dropoff_datetime", "fecha_hora_destino") \
    .withColumnRenamed("trip_distance", "distancia_viaje_millas") \
    .withColumnRenamed("fare_amount", "tarifa_usd") \
    .withColumnRenamed("pickup_zip", "codigo_postal_recogida") \
    .withColumnRenamed("dropoff_zip", "codigo_postal_destino")

In [0]:
# Vimos anteriormente que la columna que ahora llamamos "tarifa_usd" contiene valores negativos, lo cual es incoherente con el contexto del negocio.
# filtramos un nuevo dataset sin valores negativos
renamed_df = renamed_df.filter(col("tarifa_usd") >= 0)
renamed_df.select("tarifa_usd").describe().show()

In [0]:
# Vamos a trabajar con el sistema métrico de unidades, por lo tanto vamos a convertir las distancias de millas a kilómetros y redondear a 2 decimales.
renamed_df = renamed_df.withColumn("distancia_viaje_millas", round(col("distancia_viaje_millas") * 1.60934, 2))
renamed_df = renamed_df.withColumnRenamed("distancia_viaje_millas", "distancia_viaje_km")

In [0]:
# Vamos a crear una columna adicional con el tiempo de duración del viaje en minutos.
renamed_df = renamed_df.withColumn(
    "duracion_viaje_min",
    round((unix_timestamp("fecha_hora_destino") - unix_timestamp("fecha_hora_recogida")) / 60, 2)
)

In [0]:
# Veamos las transformaciones realizadas hasta ahora:
display(renamed_df.limit(5))

# Anális Exploratorio (EDA)

De nuestro dataFrame de Spark llamado: **renamed_df** podemos observar que tenemos un listado de codigos postales de recogida y de destino, pero nos sería más facil conocer los nombres de los barrios a donde hacen referencia.

Para esto vamos a cargar un **archivo csv** con esta información en Databricks. Lo vamos a transformar en una tabla y vamos a trabajar con **SQL**. 

In [0]:
# Leemos la tabla con la información ampliada de códigos postales.
zip_df = spark.read.table("workspace.default.nyc_zip_codes")

### Consultas SQL

In [0]:
# Vamos a creear vista temporales con los dos dataframes que tenemos. 
renamed_df.createOrReplaceTempView("trip_table")
zip_df.createOrReplaceTempView("zipcode_table")

In [0]:
%sql
-- Analicemos si todos los códigos postales en trip_table tienen una correspondencia con los codigos postales en zipcode_table.
SELECT DISTINCT t.codigo_postal_destino
FROM trip_table AS t
WHERE t.codigo_postal_destino NOT IN (SELECT z.zip FROM zipcode_table AS z)
ORDER BY t.codigo_postal_destino;

Como vemos, existen 55 filas en **trip_table** que no tienen correspondencia con zip_code en **zipcode_table**.

Esto nos deja una tarea pendiente para limpieza de este nuevo conjunto de datos.

In [0]:
%sql
-- Vamos a crear una tabla llamada "total_trips_table"
-- Trabajamos con JOINs en SQL para cruzar datos.
CREATE TABLE IF NOT EXISTS total_trips_table AS
SELECT t.distancia_viaje_km, t.tarifa_usd, t.duracion_viaje_min, zr.post_office AS barrio_recogida, zd.post_office AS barrio_destino
FROM trip_table AS t
LEFT JOIN zipcode_table AS zr
  ON t.codigo_postal_recogida = zr.zip
LEFT JOIN zipcode_table AS zd
  ON t.codigo_postal_destino = zd.zip
WHERE zr.post_office IS NOT NULL AND zd.post_office IS NOT NULL;  -- Eliminamos posibilidades donde sea desconocido ya sea barrio_recogida o barrio_destino.

In [0]:
%sql
-- Veamos los 10 viajes más largos
SELECT * FROM total_trips_table
ORDER BY distancia_viaje_km DESC
LIMIT 10;

In [0]:
%sql
-- Veamos ahora el TOP 5 barrios desde dónde se han solicitado más taxis.
SELECT barrio_recogida,
  ROUND(AVG(tarifa_usd)) AS tarifa_promedio_usd,
  ROUND(AVG(distancia_viaje_km), 2) AS distancia_promedio_km,
  ROUND(AVG(duracion_viaje_min), 2) AS duracion_promedio_minutos,
  COUNT(*) AS total_recogidas
FROM total_trips_table
GROUP BY barrio_recogida
ORDER BY total_recogidas DESC
LIMIT 5;

### Correlación

In [0]:

p_corr = renamed_df.stat.corr("distancia_viaje_km", "tarifa_usd")
print(f"Correlación entre distancia y tarifa: {p_corr}")

In [0]:
import matplotlib.pyplot as plt
pd_df = renamed_df.toPandas()
pd_df.plot.scatter(x="distancia_viaje_km", y="tarifa_usd")
plt.show()