In [0]:
!pip install meteostat
!pip install aiohttp
!pip install asyncio
!pip install nest_asyncio

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, MapType, TimestampType, ArrayType
from pyspark.sql import SparkSession
import requests
import pandas as pd
from pyspark.sql.functions import col, to_timestamp, month, to_date, min, max, hour, coalesce, explode, sequence, expr, date_format, when, lit, sum, udf, avg, corr
from meteostat import Point, Hourly
from datetime import datetime
from functools import reduce
from pyspark.sql.types import IntegerType
import seaborn as sns
import matplotlib.pyplot as plt



In [0]:

# Creo una sesión de Spark
spark = SparkSession.builder.appName("SubwayDemand").getOrCreate()


In [0]:

schema = StructType([
    StructField("day_of_week", StringType(), True),  # Ej.: "Monday"
    StructField("destination_latitude", FloatType(), True),  # Coordenada geográfica
    StructField("destination_longitude", FloatType(), True),  # Coordenada geográfica
    StructField("destination_station_complex_id", StringType(), True),  # Identificador
    StructField("destination_station_complex_name", StringType(), True),  # Nombre
    StructField("estimated_average_ridership", FloatType(), True),  # Número estimado
    StructField("hour_of_day", IntegerType(), True),  # Hora del día (0-23)
    StructField("month", IntegerType(), True),  # Mes del año (1-12)
    StructField("origin_latitude", FloatType(), True),  # Coordenada geográfica
    StructField("origin_longitude", FloatType(), True),  # Coordenada geográfica
    StructField("origin_point", MapType(StringType(), FloatType()), True),  # Map de coordenadas
    StructField("origin_station_complex_id", StringType(), True),  # Identificador
    StructField("origin_station_complex_name", StringType(), True),  # Nombre
    StructField("timestamp", TimestampType(), True),  # Fecha y hora
    StructField("year", IntegerType(), True)  # Año
])


In [0]:


#introduzco la url con los datos ya filtrados (dentro de manhattan y rango de fechas objeto de estudio)

# URL base (sin limitar la consulta a 1000 registros)
base_url = "https://data.ny.gov/resource/jsu2-fbtj.json?$query=SELECT%0A%20%20%60year%60%2C%0A%20%20%60month%60%2C%0A%20%20%60day_of_week%60%2C%0A%20%20%60hour_of_day%60%2C%0A%20%20%60timestamp%60%2C%0A%20%20%60origin_station_complex_id%60%2C%0A%20%20%60origin_station_complex_name%60%2C%0A%20%20%60origin_latitude%60%2C%0A%20%20%60origin_longitude%60%2C%0A%20%20%60destination_station_complex_id%60%2C%0A%20%20%60destination_station_complex_name%60%2C%0A%20%20%60destination_latitude%60%2C%0A%20%20%60destination_longitude%60%2C%0A%20%20%60estimated_average_ridership%60%2C%0A%20%20%60origin_point%60%2C%0A%20%20%60destination_point%60%0AWHERE%0A%20%20((%60origin_station_complex_id%60%20IN%20(%0A%20%20%20%20%20%20%208%2C%0A%20%20%20%20%20%20%209%2C%0A%20%20%20%20%20%20%2010%2C%0A%20%20%20%20%20%20%2013%2C%0A%20%20%20%20%20%20%2014%2C%0A%20%20%20%20%20%20%2016%2C%0A%20%20%20%20%20%20%2017%2C%0A%20%20%20%20%20%20%2020%2C%0A%20%20%20%20%20%20%2022%2C%0A%20%20%20%20%20%20%20103%2C%0A%20%20%20%20%20%20%20107%2C%0A%20%20%20%20%20%20%20118%2C%0A%20%20%20%20%20%20%20119%2C%0A%20%20%20%20%20%20%20143%2C%0A%20%20%20%20%20%20%20144%2C%0A%20%20%20%20%20%20%20145%2C%0A%20%20%20%20%20%20%20146%2C%0A%20%20%20%20%20%20%20147%2C%0A%20%20%20%20%20%20%20149%2C%0A%20%20%20%20%20%20%20150%2C%0A%20%20%20%20%20%20%20151%2C%0A%20%20%20%20%20%20%20152%2C%0A%20%20%20%20%20%20%20153%2C%0A%20%20%20%20%20%20%20154%2C%0A%20%20%20%20%20%20%20155%2C%0A%20%20%20%20%20%20%20156%2C%0A%20%20%20%20%20%20%20157%2C%0A%20%20%20%20%20%20%20158%2C%0A%20%20%20%20%20%20%20159%2C%0A%20%20%20%20%20%20%20160%2C%0A%20%20%20%20%20%20%20162%2C%0A%20%20%20%20%20%20%20164%2C%0A%20%20%20%20%20%20%20165%2C%0A%20%20%20%20%20%20%20167%2C%0A%20%20%20%20%20%20%20168%2C%0A%20%20%20%20%20%20%20169%2C%0A%20%20%20%20%20%20%20220%2C%0A%20%20%20%20%20%20%20222%2C%0A%20%20%20%20%20%20%20223%2C%0A%20%20%20%20%20%20%20224%2C%0A%20%20%20%20%20%20%20225%2C%0A%20%20%20%20%20%20%20228%2C%0A%20%20%20%20%20%20%20231%2C%0A%20%20%20%20%20%20%20232%2C%0A%20%20%20%20%20%20%20234%2C%0A%20%20%20%20%20%20%20276%2C%0A%20%20%20%20%20%20%20277%2C%0A%20%20%20%20%20%20%20296%2C%0A%20%20%20%20%20%20%20297%2C%0A%20%20%20%20%20%20%20298%2C%0A%20%20%20%20%20%20%20299%2C%0A%20%20%20%20%20%20%20300%2C%0A%20%20%20%20%20%20%20301%2C%0A%20%20%20%20%20%20%20303%2C%0A%20%20%20%20%20%20%20304%2C%0A%20%20%20%20%20%20%20305%2C%0A%20%20%20%20%20%20%20306%2C%0A%20%20%20%20%20%20%20307%2C%0A%20%20%20%20%20%20%20308%2C%0A%20%20%20%20%20%20%20309%2C%0A%20%20%20%20%20%20%20310%2C%0A%20%20%20%20%20%20%20311%2C%0A%20%20%20%20%20%20%20312%2C%0A%20%20%20%20%20%20%20313%2C%0A%20%20%20%20%20%20%20314%2C%0A%20%20%20%20%20%20%20316%2C%0A%20%20%20%20%20%20%20318%2C%0A%20%20%20%20%20%20%20319%2C%0A%20%20%20%20%20%20%20320%2C%0A%20%20%20%20%20%20%20321%2C%0A%20%20%20%20%20%20%20323%2C%0A%20%20%20%20%20%20%20324%2C%0A%20%20%20%20%20%20%20325%2C%0A%20%20%20%20%20%20%20326%2C%0A%20%20%20%20%20%20%20327%2C%0A%20%20%20%20%20%20%20328%2C%0A%20%20%20%20%20%20%20329%2C%0A%20%20%20%20%20%20%20333%2C%0A%20%20%20%20%20%20%20392%2C%0A%20%20%20%20%20%20%20393%2C%0A%20%20%20%20%20%20%20394%2C%0A%20%20%20%20%20%20%20395%2C%0A%20%20%20%20%20%20%20396%2C%0A%20%20%20%20%20%20%20397%2C%0A%20%20%20%20%20%20%20398%2C%0A%20%20%20%20%20%20%20399%2C%0A%20%20%20%20%20%20%20403%2C%0A%20%20%20%20%20%20%20404%2C%0A%20%20%20%20%20%20%20405%2C%0A%20%20%20%20%20%20%20407%2C%0A%20%20%20%20%20%20%20409%2C%0A%20%20%20%20%20%20%20413%2C%0A%20%20%20%20%20%20%20414%2C%0A%20%20%20%20%20%20%20436%2C%0A%20%20%20%20%20%20%20437%2C%0A%20%20%20%20%20%20%20438%2C%0A%20%20%20%20%20%20%20439%2C%0A%20%20%20%20%20%20%20440%2C%0A%20%20%20%20%20%20%20441%2C%0A%20%20%20%20%20%20%20471%2C%0A%20%20%20%20%20%20%20475%2C%0A%20%20%20%20%20%20%20476%2C%0A%20%20%20%20%20%20%20477%2C%0A%20%20%20%20%20%20%20601%2C%0A%20%20%20%20%20%20%20602%2C%0A%20%20%20%20%20%20%20605%2C%0A%20%20%20%20%20%20%20607%2C%0A%20%20%20%20%20%20%20609%2C%0A%20%20%20%20%20%20%20610%2C%0A%20%20%20%20%20%20%20611%2C%0A%20%20%20%20%20%20%20612%2C%0A%20%20%20%20%20%20%20613%2C%0A%20%20%20%20%20%20%20614%2C%0A%20%20%20%20%20%20%20618%2C%0A%20%20%20%20%20%20%20619%2C%0A%20%20%20%20%20%20%20622%2C%0A%20%20%20%20%20%20%20623%2C%0A%20%20%20%20%20%20%20624%2C%0A%20%20%20%20%20%20%20625%2C%0A%20%20%20%20%20%20%20628%2C%0A%20%20%20%20%20%20%20635%0A%20%20%20%20%20))%0A%20%20%20%20%20AND%20((%60destination_station_complex_id%60%20IN%20(%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%208%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%209%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%2010%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%2013%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%2014%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%2016%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%2017%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%2020%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%2022%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20103%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20107%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20118%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20119%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20143%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20144%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20145%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20146%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20147%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20149%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20150%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20151%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20152%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20153%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20154%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20155%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20156%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20157%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20158%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20159%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20160%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20162%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20164%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20165%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20167%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20168%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20169%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20220%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20222%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20223%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20224%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20225%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20228%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20231%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20232%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20234%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20276%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20277%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20296%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20297%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20298%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20299%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20300%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20301%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20303%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20304%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20305%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20306%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20307%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20308%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20309%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20310%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20311%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20312%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20313%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20314%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20316%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20318%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20319%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20320%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20321%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20323%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20324%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20325%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20326%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20327%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20328%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20329%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20333%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20392%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20393%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20394%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20395%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20396%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20397%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20398%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20399%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20403%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20404%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20405%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20407%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20409%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20413%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20414%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20436%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20437%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20438%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20439%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20440%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20441%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20471%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20475%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20476%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20477%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20601%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20602%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20605%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20607%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20609%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20610%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20611%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20612%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20613%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20614%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20618%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20619%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20622%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20623%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20624%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20625%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20628%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20635%0A%20%20%20%20%20%20%20%20%20%20%20%20))%0A%20%20%20%20%20%20%20%20%20%20%20%20AND%20(%60timestamp%60%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20BETWEEN%20%222024-03-01T00%3A00%3A00%22%20%3A%3A%20floating_timestamp%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20AND%20%222024-03-07T23%3A59%3A59%22%20%3A%3A%20floating_timestamp)))%0A%20%20%20%20OR%20(%60timestamp%60%0A%20%20%20%20%20%20%20%20%20%20BETWEEN%20%222024-06-01T00%3A00%3A00%22%0A%20%20%20%20%20%20%20%20%20%20AND%20%222024-06-08T23%3A59%3A59%22%20%3A%3A%20floating_timestamp)"

# Tamaño del lote (3 millones de registros)
limit = 3000000

# Se crea el df
all_data = pd.DataFrame()

paginated_url = f"{base_url} LIMIT {limit}"
print(f"Consultando: {paginated_url}")

# Se realiza la solicitud a la página
response = requests.get(paginated_url)

# Compruebo que la solicitud ha sido exitosa
if response.status_code == 200:
    # Carga los datos en un DataFrame
    data = response.json()
    temp_df = pd.DataFrame(data)
    
    # Si hay datos, se añade al df creado
    if not temp_df.empty:
        all_data = temp_df
        print(f"Se obtuvieron {len(all_data)} registros.")
    else:
        print("No se encontraron datos.")
else:
    print(f"Error al consultar la API. Código de estado: {response.status_code}")

# Imprimo en pantalla cuántos registros se han descargado
print(f"Total de registros descargados: {len(all_data)}")

# Convierto el DataFrame de Pandas a un DataFrame de Spark
subway_demand = spark.createDataFrame(all_data)

# Muestro en pantalla el esquema y las primeras filas del DataFrame en Spark
subway_demand.printSchema()
subway_demand.show()
 

In [0]:
# Calculo el número de filas en el DataFrame de Spark
df_length = subway_demand.count()

# Imprimo el número de filas
print(f"El número de filas en el DataFrame es: {df_length}")

In [0]:
subway_demand.columns

In [0]:

# Convierto `timestamp` a formato de fecha y hora
subway_demand = subway_demand.withColumn(
    "timestamp", 
    to_timestamp(col("timestamp"))  # PySpark reconoce ISO 8601 automáticamente
)

# Filtro los datos de los primeros 6 meses del año
subway_demand = subway_demand.filter(month(col("timestamp")).between(1, 6))

# Convierto latitudes y longitudes a formato numérico
subway_demand = subway_demand.withColumn("origin_latitude", col("origin_latitude").cast("double")) \
    .withColumn("origin_longitude", col("origin_longitude").cast("double")) \
    .withColumn("destination_latitude", col("destination_latitude").cast("double")) \
    .withColumn("destination_longitude", col("destination_longitude").cast("double"))

# Selecciono únicamente las columnas necesarias
subway_demand = subway_demand.select(
    "timestamp",
    col("estimated_average_ridership").alias("ridership"),
    'origin_station_complex_id',
    'destination_station_complex_id',
    "origin_latitude",
    "origin_longitude",
    "day_of_week",
    "hour_of_day",
    "destination_longitude",
    "destination_latitude"
)
# Creo una columna solo con la fecha
subway_demand = subway_demand.withColumn("date", to_date("timestamp"))

# Imprimo los datos resultantes
subway_demand.show()


In [0]:
subway_demand.describe()

In [0]:

# Defino los límites de latitud y longitud para Manhattan
lat_min = 40.4774
lat_max = 40.8820
lon_min = -74.0473
lon_max = -73.9067

# Filtro las filas donde las latitudes y longitudes de origen y destino estén dentro de Manhattan
subway_demand = subway_demand.filter(
    (col("origin_latitude").between(lat_min, lat_max)) &
    (col("origin_longitude").between(lon_min, lon_max)) &
    (col("destination_latitude").between(lat_min, lat_max)) &
    (col("destination_longitude").between(lon_min, lon_max))
)

# Imprimo los datos filtrados
subway_demand.show()


In [0]:
print(f"El número total de filas en subway_demand es: {subway_demand.count()}")

In [0]:

# Calculo los valores máximos y mínimos de la columna `timestamp` para chequear el temaño de la muestra en términos temporales
min_max_values = subway_demand.select(
    min(col("timestamp")).alias("min_timestamp"),
    max(col("timestamp")).alias("max_timestamp")
)

# Imprimo dichos valores
min_max_values.show()


In [0]:
# Coordenadas de Manhattan (Centro de Manhattan)
latitude = 40.7831
longitude = -73.9712

# Creo el objeto Point para Manhattan
location = Point(latitude, longitude)

# Establezco el rango de fechas (primeros 6 meses de 2024)
start_date = datetime(2024, 1, 1)
end_date = datetime(2024, 6, 30)

# Obtengo los datos climáticos por hora para la ubicación y el rango de fechas
data = Hourly(location, start_date, end_date)
weather_df = data.fetch()

# Imprimo las primeras filas del df
print(weather_df.head())

In [0]:
# Convierto el DataFrame de Pandas a DataFrame de Spark
spark_weather_df = spark.createDataFrame(weather_df.reset_index())

# Extraigo la hora como entero desde la columna `time`

spark_weather_df = spark_weather_df.withColumn("hour", hour(spark_weather_df["time"]))

spark_weather_df = spark_weather_df.withColumn("date", to_date(spark_weather_df["time"]))
# Imprimo las primeras filas del DataFrame de Spark
spark_weather_df.show()

In [0]:

# Inicio una sesión de Spark
spark = SparkSession.builder \
    .appName("NY Data API") \
    .getOrCreate()

# URL de la API
url = "https://data.cityofnewyork.us/resource/bkfu-528j.csv"

# Creo una función para obtener los datos por año
def fetch_data_by_year(year):
    params = {
        "$limit": 1500000,  # me traigo 1.5M de registros
        "$where": f"start_date_time >= '{year}-01-01T00:00:00.000' AND start_date_time <= '{year}-12-31T23:59:59.999'"
    }
    
    try:
        # Realizo la solicitud para obtener los datos
        response = requests.get(url, params=params)

        # Verifico si la solicitud fue exitosa
        if response.status_code == 200:
            # Esto guarda el archivo CSV temporalmente
            with open(f"data_{year}.csv", "wb") as file:
                file.write(response.content)

            # Esto lee el CSV descargado en un df de Pandas
            df = pd.read_csv(f"data_{year}.csv")

            # Esto convierte el DataFrame de Pandas a un dataframe de Spark
            events_df = spark.createDataFrame(df)

            # Esto imprime el esquema y las primeras filas del df
            events_df.printSchema()
            return events_df

        else:
            print(f"Error al descargar los datos para el año {year}. Código de estado: {response.status_code}")
            return None

    except requests.exceptions.RequestException as e:
        print(f"Error en la solicitud HTTP: {e}")
        return None

    except Exception as e:
        print(f"Se ha producido un error: {e}")
        return None


years = [2024, 2023, 2022, 2021]
events_df = None

for year in years:
    year_data = fetch_data_by_year(year)
    if year_data is not None:
        if events_df is None:
            events_df = year_data
        else:
            events_df = events_df.union(year_data)

# Imprimo el esquema y las primeras filas del df 
if events_df is not None:
    events_df.printSchema()
    events_df.show(10)


In [0]:
print(type(events_df))

In [0]:
events_df.columns

In [0]:


# Calculo los valores máximos y mínimos de la columna `timestamp`
min_max_values = events_df.select(
    min(col("start_date_time")).alias("min_start_date_time"),
    max(col("start_date_time")).alias("max_start_date_time")
)

# Imprimo los valores
min_max_values.show()

In [0]:

# configuro la política de parser de fechas
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Elimino duplicados
events_df = events_df.dropDuplicates()

# Uniformizo las columnas de fecha a tipo Timestamp
events_df = events_df.withColumn(
    "start_date_time",
    to_timestamp(col("start_date_time"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
).withColumn(
    "end_date_time",
    to_timestamp(col("end_date_time"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
)

# Se verifican los datos tras la conversión
events_df.show(10, truncate=False)

# Filtro las filas donde end_date_time es anterior a start_date_time
events_df = events_df.filter(col("end_date_time") >= col("start_date_time"))

# Creo la secuencia de horas entre start_date_time y end_date_time
events_with_hours_df = events_df.withColumn(
    "hour_sequence",
    explode(sequence(
        col("start_date_time"),
        col("end_date_time"),
        expr("INTERVAL 1 HOUR")
    ))
)

# Añado una columna para la fecha
events_with_hours_df = events_with_hours_df.withColumn("date", date_format(col("hour_sequence"), "yyyy-MM-dd"))

# Extraigo la hora como entero
events_with_hours_df = events_with_hours_df.withColumn("hour", hour(col("hour_sequence")))

# Filtro por el rango de fechas del 1 de enero al 30 de junio de 2024
filtered_events_with_hours_df = events_with_hours_df.filter(
    (col("date") >= "2024-01-01") & (col("date") <= "2024-06-30")
)

# Creo una columna que marque si un evento ocurre en esa hora
filtered_events_with_hours_df = filtered_events_with_hours_df.withColumn(
    "event_count",
    when(col("event_type").isNotNull(), lit(1)).otherwise(lit(0))
)

# Agrupo por fecha y hora, pivotar para obtener eventos por tipo
aggregated_events_df = filtered_events_with_hours_df.groupBy("date", "hour") \
    .pivot("event_type") \
    .agg({"event_count": "sum"})

# Relleno valores nulos con 0
aggregated_events_df = aggregated_events_df.fillna(0)

# Verifico columnas disponibles
print("Columns in aggregated_events_df:", aggregated_events_df.columns)

# Identifico las columnas a sumar (excuyo 'date' y 'hour')
columns_to_sum = [col(c).cast("int") for c in aggregated_events_df.columns if c not in ["date", "hour"]]

# Este bucle es capaz de manejar el caso en el que no hay columnas para sumar
if columns_to_sum:
    aggregated_events_df = aggregated_events_df.withColumn(
        "nº events",
        reduce(lambda x, y: x + y, columns_to_sum)
    )
else:
    aggregated_events_df = aggregated_events_df.withColumn("nº events", lit(0))

# Imprimo el resultado
aggregated_events_df.show(10, truncate=False)


In [0]:
# Renombro la columna "hour_of_day" a "hour" en subway_demand para el join
subway_demand = subway_demand.withColumnRenamed("hour_of_day", "hour")

# Realizo el join con aggregated_events_df
subway_demand = subway_demand.join(
    aggregated_events_df,
    on=["date", "hour"],
    how="left"
)

# Realizo el join con spark_weather_df
subway_demand = subway_demand.join(
    spark_weather_df,
    on=["date", "hour"],
    how="left"
)

# Imprimo los datos resultantes
subway_demand.show(truncate=False)


In [0]:
# Lista de columnas que quiero evaluar
columns = [
    'date',
    'hour',
    'timestamp',
    'ridership',
    'origin_station_complex_id',
    'destination_station_complex_id',
    'origin_latitude',
    'origin_longitude',
    'day_of_week',
    'destination_longitude',
    'destination_latitude'
]

# Creo una expresion que cuenta los valores nulos por columna
missing_values = subway_demand.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in columns
])

# Muestro el resultado
missing_values.show()


In [0]:
display(subway_demand)

In [0]:
subway_demand.columns

In [0]:
print(subway_demand.count())

In [0]:
# Calculo los valores máximos y mínimos de la columna `timestamp`
min_max_values = subway_demand.select(
    min(col("time")).alias("time"),
    max(col("time")).alias("time")
)

# Imprimo los valores
min_max_values.show()

In [0]:
subway_demand.select("day_of_week").distinct().show()


In [0]:
# Lista de columnas
event_columns = [
    "Athletic Race / Tour", "Bike the Block", "Block Party", "Clean-Up",
    "Farmers Market", "Grid Request", "Health Fair", "Open Culture",
    "Open Street Partner Event", "Parade", "Plaza Event", "Plaza Partner Event",
    "Press Conference", "Production Event", "Religious Event", "Sidewalk Sale",
    "Single Block Festival", "Special Event", "Sport - Adult", "Sport - Youth",
    "Stickball", "Street Event", "Street Festival", "Theater Load in and Load Outs", "nº events"
]

weather_columns = ["temp", "dwpt", "rhum", "prcp", "wspd",  "pres"]

In [0]:
# Correlación entre ridership y resto de variables

col_name = "ridership"
# Convierto la columna de string a número
subway_demand = subway_demand.withColumn(col_name, col(col_name).cast("double"))

# Calculo la correlación
corr_value = subway_demand.stat.corr("ridership", col_name)
print(f"Correlación entre ridership y {col_name}: {corr_value}")



In [0]:
col_name = "ridership"
# Convierto la columna de string a número
subway_demand = subway_demand.withColumn(col_name, col(col_name).cast("double"))

In [0]:
# Defino el mapeo de días de la semana a valores entre 0 y 6
day_of_week_mapping = {
    "Monday": 0,
    "Tuesday": 1,
    "Wednesday": 2,
    "Thursday": 3,
    "Friday": 4,
    "Saturday": 5,
    "Sunday": 6
}

# Creo una columna nueva en subway_demand con el mapeo
def map_day_of_week(day):
    return day_of_week_mapping.get(day, -1)  # Devuelve -1 si el día no está en el mapeo

map_day_of_week_udf = udf(map_day_of_week, IntegerType())

# Aplico el mapeo al df
subway_demand = subway_demand.withColumn("day_of_week_mapped", map_day_of_week_udf(subway_demand["day_of_week"]))


In [0]:

# Convierto a Pandas para visualizar
subway_demand_pandas = subway_demand.select(
    "day_of_week_mapped", "hour", "ridership"
).groupBy("day_of_week_mapped", "hour").mean("ridership").toPandas()

# Me aseguro de que las horas están en el formato adecuado
subway_demand_pandas['hour'] = subway_demand_pandas['hour'].astype(int)  # Convertir a entero si es necesario

# Pivoto el df para un mapa de calor
heatmap_data = subway_demand_pandas.pivot(index="day_of_week_mapped", columns="hour", values="avg(ridership)")

# Ordeno las columnas
heatmap_data = heatmap_data.reindex(sorted(heatmap_data.columns), axis=1)

# Creo mapa de calor
plt.figure(figsize=(12, 6))
sns.heatmap(heatmap_data, fmt='.0f', cmap="coolwarm", annot=False)
plt.title("Heatmap de Demanda por Día de la Semana y Hora")
plt.xlabel("Hora")
plt.ylabel("Día de la Semana")
plt.show()


In [0]:
# Agrego para tendencias generales
subway_demand_aggregated = subway_demand.groupBy("day_of_week_mapped", "hour").agg(
    sum("ridership").alias("total_ridership"),
    avg("prcp").alias("mean_prcp"),
    avg("nº events").alias("mean_num_events"),
    avg("wspd").alias("mean_wspd"),
    avg("temp").alias("mean_temp")
)

In [0]:
subway_demand.columns

In [0]:
# Agrego para tendencias generales
subway_demand_aggregated_v2 = subway_demand.groupBy("hour_of_day").agg(
    sum("estimated_average_ridership").alias("total_ridership")
).show()

In [0]:
# Utilizo los datos proporcionados para comparar demanda de metro y transporte privado (hide-railing) (etsos datso de metro caambiarían para cada ejecución pero se entiende como útiles para identificar tendencias)
demanda_hide = {
    "hour_of_day": list(range(24)),
    "demanda_hide_raling": [
        11166, 7975, 5302, 3594, 2495, 3016, 6615, 11788, 16052, 15469, 14389, 14661, 
        15051, 15023, 15355, 15901, 17385, 20886, 22842, 21811, 20292, 20270, 18501, 15087
    ]
}

demanda_metro = {
    "hour": [
        1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 14, 15, 16, 17, 18, 19, 20, 21
    ],
    "demanda_metro": [
        15743, 8967, 5908, 15798, 63548, 161880, 366988, 554463, 351540, 225463, 213274,
        372792, 370593, 350064, 427309, 575796, 410883, 252378, 182647, 150110
    ]
}

# Creo los dfs
df_hide = pd.DataFrame(demanda_hide)
df_metro = pd.DataFrame(demanda_metro)

# Normalizo los datos para comparación entre ambos
df_hide["demanda_hide_raling_normalized"] = df_hide["demanda_hide_raling"] / df_hide["demanda_hide_raling"].max()
df_metro["demanda_metro_normalized"] = df_metro["demanda_metro"] / df_metro["demanda_metro"].max()

# Grafico las demandas normalizadas
plt.figure(figsize=(10, 6))

# Demanda Hide Raling
plt.plot(df_hide["hour_of_day"], df_hide["demanda_hide_raling_normalized"], label="Hide Railing", marker='o')

# Demanda Metro
plt.plot(df_metro["hour"], df_metro["demanda_metro_normalized"], label="Metro", marker='o')

# Configuro el gráfico
plt.title("Comparación de la demanda normalizada", fontsize=14)
plt.xlabel("Hora del día", fontsize=12)
plt.ylabel("Demanda Normalizada", fontsize=12)
plt.xticks(range(0, 24))
plt.legend()
plt.grid(alpha=0.3)

# Muestro el gráfico
plt.tight_layout()
plt.show()

In [0]:
# Calculo las correlaciones entre ridership y las demás variables
correlations = subway_demand_aggregated.select(
    corr("total_ridership", "mean_prcp").alias("corr_ridership_prcp"),
    corr("total_ridership", "mean_num_events").alias("corr_ridership_num_events"),
    corr("total_ridership", "mean_temp").alias("corr_ridership_temp"),
    corr("total_ridership", "mean_wspd").alias("corr_ridership_wspd")
).collect()

# Extraigo los valores de correlación
correlation_values = {
    "prcp": correlations[0]["corr_ridership_prcp"],
    "num_events": correlations[0]["corr_ridership_num_events"],
    "temp": correlations[0]["corr_ridership_temp"],
    "wspd": correlations[0]["corr_ridership_wspd"]
}

# Convierto a un df de Pandas para visualizarlo
correlation_df = pd.DataFrame.from_dict(correlation_values, orient="index", columns=["Correlation"])

# Creo un mapa de calor
plt.figure(figsize=(6, 4))
sns.heatmap(correlation_df, annot=True, cmap="coolwarm", cbar=True, linewidths=0.5)
plt.title("Mapa de calor de correlaciones")
plt.ylabel("Variables")
plt.xlabel("Correlación")
plt.tight_layout()
plt.show()

# Visualización adicional: distribuciones y relaciones
# Convierto el df de Spark a Pandas para graficar
subway_demand_pandas = subway_demand_aggregated.toPandas()

# Pairplot para explorar relaciones entre variables
sns.pairplot(subway_demand_pandas, vars=["total_ridership", "mean_prcp", "mean_num_events", "mean_temp","mean_wspd"], diag_kind="kde")
plt.suptitle("Pairplot de variables", y=1.02)
plt.show()

# Distribución de la ridership en función de cada variable
plt.figure(figsize=(15, 5))

# Distribución de prcp-demanda
plt.subplot(1, 4, 1)
sns.scatterplot(data=subway_demand_pandas, x="mean_prcp", y="total_ridership", alpha=0.6)
plt.title("Demanda vs. Precipitación")
plt.xlabel("Precipitaciones promedio")
plt.ylabel("Demanda")

# Distribución de num_events-demanda
plt.subplot(1, 4, 2)
sns.scatterplot(data=subway_demand_pandas, x="mean_num_events", y="total_ridership", alpha=0.6, color="orange")
plt.title("Demanda vs. Número de eventos")
plt.xlabel("Número de eventos promedio")

# Distribución de temp-demanda
plt.subplot(1, 4, 3)
sns.scatterplot(data=subway_demand_pandas, x="mean_temp", y="total_ridership", alpha=0.6, color="green")
plt.title("Demanda vs. Temperatura")
plt.xlabel("Temperatura promedio")


# Distribución de wspd-demanda
plt.subplot(1, 4, 3)
sns.scatterplot(data=subway_demand_pandas, x="mean_wspd", y="total_ridership", alpha=0.6, color="green")
plt.title("Demanda vs. Velocidad del viento")
plt.xlabel("Velocidad del viento")


plt.tight_layout()
plt.show()

# Evaluo de la influencia del día de la semana y la hora
plt.figure(figsize=(15, 10))

# Boxplot para el día de la semana
plt.subplot(2, 1, 1)
sns.lineplot(data=subway_demand_pandas, x="day_of_week_mapped", y="total_ridership", palette="viridis")
plt.title("Demanda por día de la semana")
plt.xlabel("Día de la semana")
plt.ylabel("Demanda")

subway_demand_pandas["hour"] = pd.to_numeric(subway_demand_pandas["hour"], errors="coerce")

subway_demand_pandas = subway_demand_pandas.sort_values("hour")

# Boxplot para la hora
plt.subplot(2, 1, 2)
sns.lineplot(data=subway_demand_pandas, x="hour", y="total_ridership", marker="o", color="blue")
plt.title("Demanda por hora del día")
plt.xlabel("Hora")
plt.ylabel("Demanda")

plt.tight_layout()
plt.show()