In [1]:
import os
import requests
from datetime import datetime
import pandas as pd
import numpy as np
import json
from google.transit import gtfs_realtime_pb2


FUENTES = {
    "ACES": {
        "url": "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-ace",
        "lineas": ["A", "C", "E", "Sr"]
    },
    "BDFMS": {
        "url": "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-bdfm",
        "lineas": ["B", "D", "F", "M", "Sf"]
    },
    "G": {
        "url": "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-g",
        "lineas": ["G"]
    },
    "JZ": {
        "url": "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-jz",
        "lineas": ["J", "Z"]
    },
    "NQRW": {
        "url": "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-nqrw",
        "lineas": ["N", "Q", "R", "W"]
    },
    "L": {
        "url": "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-l",
        "lineas": ["L"]
    },
    "1234567S": {
        "url": "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs",
        "lineas": ["1", "2", "3", "4", "5", "6", "7", "S"]
    },
    "SIR": {
        "url": "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-si",
        "lineas": ["SIR"]
    }
}

In [2]:
def extraccion_linea(url, linea):
    """
    Extrae los datos de una línea
    """
    response = requests.get(url)
    fuentes = gtfs_realtime_pb2.FeedMessage()
    fuentes.ParseFromString(response.content)

    datos_linea = []
    for entity in fuentes.entity:
        if entity.HasField('trip_update'):
            trayecto = entity.trip_update

            if trayecto.trip.route_id == linea:
                for stop in trayecto.stop_time_update:
                    campos = {
                        'viaje_id': trayecto.trip.trip_id,
                        'linea_id': trayecto.trip.route_id,
                        'parada_id': stop.stop_id,
                        'hora_llegada': datetime.fromtimestamp(stop.arrival.time) if stop.HasField('arrival') else None,
                        'hora_partida': datetime.fromtimestamp(stop.departure.time) if stop.HasField('departure') else None,
                        'timestamp': datetime.now(),                       
                    }

                    datos_linea.append(campos)
    return datos_linea


In [3]:
def extraccion_datos():
    """
    Extrae todas las líneas y las convierte a DF
    """

    todas_las_lineas = []
    for info in FUENTES.values():
        todas_las_lineas.extend(info['lineas'])
    
    todos_los_datos = []
    for linea in todas_las_lineas:
        for grupo, info in FUENTES.items():
            if linea in info['lineas']:
                fuentes_url = info['url']
            todos_los_datos.extend(extraccion_linea(fuentes_url, linea))  

    return pd.DataFrame(todos_los_datos)
        

In [4]:
df = extraccion_datos()

In [5]:
df

Unnamed: 0,viaje_id,linea_id,parada_id,hora_llegada,hora_partida,timestamp
0,043100_A..N,A,H04N,2026-02-25 16:15:30,2026-02-25 16:15:30,2026-02-25 15:43:37.827979
1,043100_A..N,A,H17N,2026-02-25 16:19:57,2026-02-25 16:19:57,2026-02-25 15:43:37.827993
2,043100_A..N,A,H03N,2026-02-25 16:22:15,2026-02-25 16:22:15,2026-02-25 15:43:37.828004
3,043100_A..N,A,H02N,2026-02-25 16:22:16,2026-02-25 16:22:16,2026-02-25 15:43:37.828014
4,043100_A..N,A,H01N,2026-02-25 16:22:17,2026-02-25 16:22:17,2026-02-25 15:43:37.828024
...,...,...,...,...,...,...
123625,061600_7..S,7,721S,2026-02-25 16:47:10,2026-02-25 16:47:30,2026-02-25 15:44:58.814369
123626,061600_7..S,7,723S,2026-02-25 16:52:10,2026-02-25 16:52:30,2026-02-25 15:44:58.814373
123627,061600_7..S,7,724S,2026-02-25 16:53:40,2026-02-25 16:54:00,2026-02-25 15:44:58.814376
123628,061600_7..S,7,725S,2026-02-25 16:55:10,2026-02-25 16:55:30,2026-02-25 15:44:58.814379


In [6]:
df['hora_llegada'] = df['hora_llegada'].dt.tz_localize('UTC').dt.tz_convert('America/New_York')  
df['hora_partida'] = df['hora_partida'].dt.tz_localize('UTC').dt.tz_convert('America/New_York')
df['timestamp'] = df['timestamp'].dt.tz_localize('UTC').dt.tz_convert('America/New_York')

In [7]:
df['dia'] = df['timestamp'].dt.strftime("%A")
df['dia'] = df['dia'].apply(
    lambda x: 'Weekday' if x not in ('Saturday', 'Sunday') else x
)
df['hora_llegada'] = df['hora_llegada'].dt.strftime('%H:%M:%S')
df['hora_partida'] = df['hora_partida'].dt.strftime('%H:%M:%S')
df['timestamp'] = df['timestamp'].dt.strftime('%H:%M:%S')

In [8]:
df

Unnamed: 0,viaje_id,linea_id,parada_id,hora_llegada,hora_partida,timestamp,dia
0,043100_A..N,A,H04N,11:15:30,11:15:30,10:43:37,Weekday
1,043100_A..N,A,H17N,11:19:57,11:19:57,10:43:37,Weekday
2,043100_A..N,A,H03N,11:22:15,11:22:15,10:43:37,Weekday
3,043100_A..N,A,H02N,11:22:16,11:22:16,10:43:37,Weekday
4,043100_A..N,A,H01N,11:22:17,11:22:17,10:43:37,Weekday
...,...,...,...,...,...,...,...
123625,061600_7..S,7,721S,11:47:10,11:47:30,10:44:58,Weekday
123626,061600_7..S,7,723S,11:52:10,11:52:30,10:44:58,Weekday
123627,061600_7..S,7,724S,11:53:40,11:54:00,10:44:58,Weekday
123628,061600_7..S,7,725S,11:55:10,11:55:30,10:44:58,Weekday


In [9]:
norte = (df['parada_id'].str[-1] == 'N')
sur = (df['parada_id'].str[-1] == 'S')

df.loc[norte, 'direccion'] = 1
df.loc[sur, 'direccion'] = 0

df['direccion'] = df['direccion'].astype('Int64')


In [10]:
df

Unnamed: 0,viaje_id,linea_id,parada_id,hora_llegada,hora_partida,timestamp,dia,direccion
0,043100_A..N,A,H04N,11:15:30,11:15:30,10:43:37,Weekday,1
1,043100_A..N,A,H17N,11:19:57,11:19:57,10:43:37,Weekday,1
2,043100_A..N,A,H03N,11:22:15,11:22:15,10:43:37,Weekday,1
3,043100_A..N,A,H02N,11:22:16,11:22:16,10:43:37,Weekday,1
4,043100_A..N,A,H01N,11:22:17,11:22:17,10:43:37,Weekday,1
...,...,...,...,...,...,...,...,...
123625,061600_7..S,7,721S,11:47:10,11:47:30,10:44:58,Weekday,0
123626,061600_7..S,7,723S,11:52:10,11:52:30,10:44:58,Weekday,0
123627,061600_7..S,7,724S,11:53:40,11:54:00,10:44:58,Weekday,0
123628,061600_7..S,7,725S,11:55:10,11:55:30,10:44:58,Weekday,0


In [11]:
tot_filas = len(df)

for columna in df.columns:
    nulos = df[columna].isnull().sum()
    proporcion = nulos/tot_filas
    print(f"Proporcion nulos en {columna}: {proporcion}")

Proporcion nulos en viaje_id: 0.0
Proporcion nulos en linea_id: 0.0
Proporcion nulos en parada_id: 0.0
Proporcion nulos en hora_llegada: 0.008840896222599693
Proporcion nulos en hora_partida: 0.018709051201164766
Proporcion nulos en timestamp: 0.0
Proporcion nulos en dia: 0.0
Proporcion nulos en direccion: 0.0


In [12]:
df = df.dropna()
df

Unnamed: 0,viaje_id,linea_id,parada_id,hora_llegada,hora_partida,timestamp,dia,direccion
0,043100_A..N,A,H04N,11:15:30,11:15:30,10:43:37,Weekday,1
1,043100_A..N,A,H17N,11:19:57,11:19:57,10:43:37,Weekday,1
2,043100_A..N,A,H03N,11:22:15,11:22:15,10:43:37,Weekday,1
3,043100_A..N,A,H02N,11:22:16,11:22:16,10:43:37,Weekday,1
4,043100_A..N,A,H01N,11:22:17,11:22:17,10:43:37,Weekday,1
...,...,...,...,...,...,...,...,...
123624,061600_7..S,7,720S,11:46:10,11:46:30,10:44:58,Weekday,0
123625,061600_7..S,7,721S,11:47:10,11:47:30,10:44:58,Weekday,0
123626,061600_7..S,7,723S,11:52:10,11:52:30,10:44:58,Weekday,0
123627,061600_7..S,7,724S,11:53:40,11:54:00,10:44:58,Weekday,0


In [13]:
df_previsto = pd.read_csv('stop_times.txt', sep = ',')
df_previsto

Unnamed: 0,trip_id,stop_id,arrival_time,departure_time,stop_sequence
0,AFA25GEN-1038-Sunday-00_000600_1..S03R,101S,00:06:00,00:06:00,1
1,AFA25GEN-1038-Sunday-00_000600_1..S03R,103S,00:07:30,00:07:30,2
2,AFA25GEN-1038-Sunday-00_000600_1..S03R,104S,00:09:00,00:09:00,3
3,AFA25GEN-1038-Sunday-00_000600_1..S03R,106S,00:10:30,00:10:30,4
4,AFA25GEN-1038-Sunday-00_000600_1..S03R,107S,00:12:00,00:12:00,5
...,...,...,...,...,...
2484304,SIR-FA2017-SI017-Weekday-08_147100_SI..N03R,S27N,25:03:00,25:03:00,17
2484305,SIR-FA2017-SI017-Weekday-08_147100_SI..N03R,S28N,25:06:00,25:06:00,18
2484306,SIR-FA2017-SI017-Weekday-08_147100_SI..N03R,S29N,25:08:00,25:08:00,19
2484307,SIR-FA2017-SI017-Weekday-08_147100_SI..N03R,S30N,25:10:00,25:10:00,20


In [14]:
df_previsto['day'] = df_previsto['trip_id'].str.split('-').str[-2]
df_previsto['trip_id'] = df_previsto['trip_id'].str.split('_', n=1).str[-1]
df_previsto

Unnamed: 0,trip_id,stop_id,arrival_time,departure_time,stop_sequence,day
0,000600_1..S03R,101S,00:06:00,00:06:00,1,Sunday
1,000600_1..S03R,103S,00:07:30,00:07:30,2,Sunday
2,000600_1..S03R,104S,00:09:00,00:09:00,3,Sunday
3,000600_1..S03R,106S,00:10:30,00:10:30,4,Sunday
4,000600_1..S03R,107S,00:12:00,00:12:00,5,Sunday
...,...,...,...,...,...,...
2484304,147100_SI..N03R,S27N,25:03:00,25:03:00,17,Weekday
2484305,147100_SI..N03R,S28N,25:06:00,25:06:00,18,Weekday
2484306,147100_SI..N03R,S29N,25:08:00,25:08:00,19,Weekday
2484307,147100_SI..N03R,S30N,25:10:00,25:10:00,20,Weekday


In [15]:
def normalizar_horas(columna):
    columna = columna.str.replace('24:', '00:', regex=False)
    columna = columna.str.replace('25:', '01:', regex=False)
    columna = columna.str.replace('26:', '02:', regex=False)
    columna = columna.str.replace('27:', '03:', regex=False)
    return columna

df_previsto['arrival_time'] = normalizar_horas(df_previsto['arrival_time'])
df_previsto['departure_time'] = normalizar_horas(df_previsto['departure_time'])

In [16]:
def hora_a_segundos(hora):
    if pd.isna(hora): 
        return np.nan
    
    partes = hora.split(':')

    return int(partes[0]) * 3600 + int(partes[1]) * 60 + int(partes[2])

df['segundos_reales'] = df['hora_llegada'].apply(hora_a_segundos)
df_previsto['segundos_previstos'] = df_previsto['arrival_time'].apply(hora_a_segundos)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['segundos_reales'] = df['hora_llegada'].apply(hora_a_segundos)


In [17]:
df_merge = pd.merge(df, df_previsto, left_on=['viaje_id', 'parada_id', 'dia'], right_on=['trip_id', 'stop_id', 'day'])

In [18]:
df_merge['delay'] = df_merge['segundos_reales']-df_merge['segundos_previstos']
df_merge.loc[df_merge['delay'] > 43200, 'delay'] -= 86400
df_merge.loc[df_merge['delay'] < -43200, 'delay'] += 86400

In [19]:
def hora_posterior(hora1, hora2):
    partes1 = hora1.split(':')
    partes2 = hora2.split(':')

    return (
        (int(partes1[0]) > int(partes2[0])) | 
        ((int(partes1[0]) == int(partes2[0])) & (int(partes1[1]) > int(partes2[1]))) |
        ((int(partes1[0]) == int(partes2[0])) & (int(partes1[1]) == int(partes2[1])) & ((int(partes1[2]) > int(partes2[2]))))
    )

df_merge['delay'] = np.where(
    df_merge.apply(lambda row: hora_posterior(row['timestamp'], row['hora_llegada']), axis=1),
    df_merge['delay'],  # valor si True
    None    # valor si False
)

df_merge

Unnamed: 0,viaje_id,linea_id,parada_id,hora_llegada,hora_partida,timestamp,dia,direccion,segundos_reales,trip_id,stop_id,arrival_time,departure_time,stop_sequence,day,segundos_previstos,delay
0,052350_A..S57R,A,A55S,10:47:13,10:47:13,10:43:37,Weekday,0,38833,052350_A..S57R,A55S,09:47:30,09:49:00,23,Weekday,35250,
1,052350_A..S57R,A,A57S,10:49:58,10:49:58,10:43:37,Weekday,0,38998,052350_A..S57R,A57S,09:51:00,09:51:00,24,Weekday,35460,
2,052350_A..S57R,A,A59S,10:51:28,10:51:28,10:43:37,Weekday,0,39088,052350_A..S57R,A59S,09:52:30,09:52:30,25,Weekday,35550,
3,052350_A..S57R,A,A60S,10:53:28,10:53:28,10:43:37,Weekday,0,39208,052350_A..S57R,A60S,09:54:30,09:54:30,26,Weekday,35670,
4,052350_A..S57R,A,A61S,10:54:58,10:54:58,10:43:37,Weekday,0,39298,052350_A..S57R,A61S,09:56:00,09:56:00,27,Weekday,35760,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
63262,061800_6..N03R,6,613N,12:00:00,12:03:00,10:44:54,Weekday,1,43200,061800_6..N03R,613N,11:00:00,11:03:00,27,Weekday,39600,
63263,061800_6..N03R,6,612N,12:04:30,12:04:30,10:44:54,Weekday,1,43470,061800_6..N03R,612N,11:04:30,11:04:30,28,Weekday,39870,
63264,061800_6..N03R,6,611N,12:06:30,12:06:30,10:44:54,Weekday,1,43590,061800_6..N03R,611N,11:06:30,11:06:30,29,Weekday,39990,
63265,061800_6..N03R,6,610N,12:07:30,12:07:30,10:44:54,Weekday,1,43650,061800_6..N03R,610N,11:07:30,11:07:30,30,Weekday,40050,


In [20]:
nulos = df_merge['delay'].isna().sum()
proporcion = nulos/(len(df_merge))
print(nulos, proporcion)

57877 0.9148055068203013


In [21]:
df_merge.sort_values('hora_llegada')

Unnamed: 0,viaje_id,linea_id,parada_id,hora_llegada,hora_partida,timestamp,dia,direccion,segundos_reales,trip_id,stop_id,arrival_time,departure_time,stop_sequence,day,segundos_previstos,delay
57595,047650_6..N03R,6,639N,08:58:30,08:58:30,10:44:53,Weekday,1,32310,047650_6..N03R,639N,07:58:30,07:58:30,2,Weekday,28710,3600
51923,047650_6..N03R,6,639N,08:58:30,08:58:30,10:44:51,Weekday,1,32310,047650_6..N03R,639N,07:58:30,07:58:30,2,Weekday,28710,3600
53341,047650_6..N03R,6,639N,08:58:30,08:58:30,10:44:52,Weekday,1,32310,047650_6..N03R,639N,07:58:30,07:58:30,2,Weekday,28710,3600
60431,047650_6..N03R,6,639N,08:58:30,08:58:30,10:44:54,Weekday,1,32310,047650_6..N03R,639N,07:58:30,07:58:30,2,Weekday,28710,3600
59013,047650_6..N03R,6,639N,08:58:30,08:58:30,10:44:53,Weekday,1,32310,047650_6..N03R,639N,07:58:30,07:58:30,2,Weekday,28710,3600
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1381,064050_A..N55R,A,A02N,13:22:00,13:22:00,10:43:39,Weekday,1,48120,064050_A..N55R,A02N,12:22:00,12:22:00,37,Weekday,44520,
2793,064050_A..N55R,A,A02N,13:22:00,13:22:00,10:43:41,Weekday,1,48120,064050_A..N55R,A02N,12:22:00,12:22:00,37,Weekday,44520,
2087,064050_A..N55R,A,A02N,13:22:00,13:22:00,10:43:40,Weekday,1,48120,064050_A..N55R,A02N,12:22:00,12:22:00,37,Weekday,44520,
1028,064050_A..N55R,A,A02N,13:22:00,13:22:00,10:43:38,Weekday,1,48120,064050_A..N55R,A02N,12:22:00,12:22:00,37,Weekday,44520,


In [22]:
df_merge['viaje_id'].nunique()

380

In [23]:
df_merge = df_merge.drop(['timestamp', 'segundos_reales', 'trip_id', 'stop_id', 'arrival_time', 'departure_time', 'day', 'segundos_previstos'], axis=1)
df_merge = df_merge.dropna()

In [24]:
df_merge

Unnamed: 0,viaje_id,linea_id,parada_id,hora_llegada,hora_partida,dia,direccion,stop_sequence,delay
8,053200_A..S58R,A,A42S,10:43:17,10:43:17,Weekday,0,19,3887
26,053800_A..S57R,A,A41S,10:43:17,10:43:17,Weekday,0,18,3587
86,057500_A..N55R,A,H06N,10:43:17,10:43:17,Weekday,1,6,3587
118,057550_A..N54R,A,A61N,10:43:17,10:43:17,Weekday,1,4,3797
361,053200_A..S58R,A,A42S,10:43:17,10:43:17,Weekday,0,19,3887
...,...,...,...,...,...,...,...,...,...
62724,057750_6..N01R,6,636N,10:43:30,10:43:30,Weekday,1,5,3600
62757,058050_6..N03R,6,639N,10:42:30,10:42:30,Weekday,1,2,3600
62758,058050_6..N03R,6,638N,10:44:00,10:44:00,Weekday,1,3,3600
62787,058200_6..S03R,6,609S,10:43:30,10:43:30,Weekday,0,2,3600


In [25]:
import io
from minio import Minio

# Leer credenciales de MinIO desde variables de entorno
ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY")
assert ACCESS_KEY is not None, "La variable de entorno MINIO_ACCESS_KEY no está definida."
SECRET_KEY = os.getenv("MINIO_SECRET_KEY")
assert SECRET_KEY is not None, "La variable de entorno MINIO_SECRET_KEY no está definida."

# Crear un cliente de MinIO
client = Minio(
    endpoint="minio.fdi.ucm.es",
    access_key=ACCESS_KEY,
    secret_key=SECRET_KEY,
)

# Serializar el DataFrame a Parquet en memoria
buffer = io.BytesIO()
df_merge.to_parquet(buffer, index=False)
buffer.seek(0)

bucket = "pd1"
destination_file = "grupo5/processed/gtfs_real_time/mta_real_time.parquet"

client.put_object(
    bucket_name=bucket,
    object_name=destination_file,
    data=buffer,
    length=buffer.getbuffer().nbytes,
    content_type="application/octet-stream",
)

print(f"DataFrame subido a MinIO como '{destination_file}' en el bucket '{bucket}'.")

DataFrame subido a MinIO como 'grupo5/processed/gtfs_real_time/mta_real_time.parquet' en el bucket 'pd1'.
