# Laden der VBN GTFS-R Daten

In [None]:
import pandas as pd
import geopandas as gpd
import os
import duckdb
from google.transit import gtfs_realtime_pb2
import requests
import datetime as dt
import locale
from dotenv import load_dotenv
import time

In [None]:
load_dotenv(override=True)

In [None]:
##2
locale.setlocale(locale.LC_ALL, 'en_GB.UTF-8') # damit Wochentag in englisch

In [None]:
url = os.getenv('URL_GTFS_R')
url

## Starten der DuckDB

In [None]:
duck = duckdb.connect(database=':memory:')

In [None]:
duck.sql("""install spatial;""")
duck.sql("""load spatial;""")

### Einlesen der Linien Bedienungsebene 1+,1

In [None]:
# Weitere Version
sql = f"""
INSTALL postgres;
LOAD postgres;
ATTACH 'dbname=zvbn_postgis user={os.environ.get('POSTGRES_USER')} password={os.environ.get('POSTGRES_PW')} host=127.0.0.1 port=5432' AS dm (TYPE POSTGRES, READ_ONLY);
SHOW ALL TABLES;
"""
duck.sql(sql) #Herstellen der Verbindung


In [None]:
df_linien = duck.sql("select nummer from dm.basis.linien where aktiv = True and ebene in ('1', '1+') order by nummer").df() # Abfrage
df_linien

### Einlesen der VBN-Grenzen zum Ermitteln der Linien / Halte im VBN aus GTFS

In [None]:
duck.sql("""create or replace table vbn as 
         select * 
         from st_read('/home/zvbn/python/gtfs/grenzen/vbn_01082018.shp') as vbn""")

## Einlesen GTFS und ermitteln des Tagesfahrplans

In [None]:
base_url = os.path.join(os.getenv('BASE_PATH_GTFS'), 'gtfs_d')
gtfs_stops = os.path.join(base_url, 'stops.txt')
gtfs_stop_times = os.path.join(base_url,'stop_times.txt')
gtfs_trips = os.path.join(base_url,'trips.txt')
gtfs_routes = os.path.join(base_url,'routes.txt')
gtfs_agency = os.path.join(base_url,'agency.txt')
gtfs_calendar = os.path.join(base_url,'calendar.txt')
gtfs_calendar_dates = os.path.join(base_url,'calendar_dates.txt')

In [None]:
duck.sql(f"""
CREATE or replace TABLE stops AS select * from read_csv('{gtfs_stops}', store_rejects = true, ignore_errors = true);
create or replace table agency as select * from read_csv('{gtfs_agency}', store_rejects = true, ignore_errors = true); 

CREATE or replace TABLE stop_times AS select * from read_csv('{gtfs_stop_times}', store_rejects = true, ignore_errors = true);
CREATE or replace TABLE trips AS select * from read_csv('{gtfs_trips}', delim=',',columns = {{ 
        'route_id' : 'VARCHAR',       
        'service_id' : 'VARCHAR',       
            
        'trip_id' : 'VARCHAR',       
        'trip_headsign' : 'VARCHAR',       
        'trip_short_name': 'VARCHAR',        
        'direction_id': 'VARCHAR',        
        'block_id': 'VARCHAR',        
        'shape_id': 'VARCHAR'  ,      
        'wheelchair_accessible': 'VARCHAR' ,       
        'bikes_allowed': 'VARCHAR'        
    }}, store_rejects = true, ignore_errors = true);
CREATE or replace TABLE routes AS select * from read_csv('{gtfs_routes}', store_rejects = true, ignore_errors = true);
CREATE or replace TABLE calendar AS select * from read_csv('{gtfs_calendar}', store_rejects = true, ignore_errors = true);
CREATE or replace TABLE calendar_dates AS select * from read_csv('{gtfs_calendar_dates}', store_rejects = true, ignore_errors = true);
         """)

In [None]:
duck.sql("FROM reject_errors;")

In [None]:
# Show the length (row count) of all tables in DuckDB
tables = duck.sql("SHOW TABLES").df()['name'].tolist()
table_lengths = {}
for table in tables:
    count = duck.sql(f"SELECT COUNT(*) AS nrows FROM {table}").df().iloc[0]['nrows']
    table_lengths[table] = count
    print(f" {table} Rows {count} ")

In [None]:
##2
locale.setlocale(locale.LC_ALL, 'en_GB.UTF-8') # damit Wochentag in englisch
weekday_today_name = dt.datetime.now().strftime('%A').lower()
weekday_today_name

In [None]:
duck.sql(f"""create or replace table verkehrt_heute as select * from
         (select distinct t.service_id, cd.verkehrt_cd, c.verkehrt_c
         from trips t
         left join (select service_id, date, exception_type as verkehrt_cd from calendar_dates 
         where date =  {dt.datetime.now().strftime('%Y%m%d')} and exception_type = 1) as cd 
         on t.service_id = cd.service_id
         left join  (select service_id, {weekday_today_name} as verkehrt_c
         from calendar
         where {weekday_today_name} = 1) as c
         on t.service_id = c.service_id
         where c.verkehrt_c = 1 or cd.verkehrt_cd = 1)""")

In [None]:
duck.sql("""from verkehrt_heute""") 

In [None]:
duck.sql(""" create or replace table lin_vbn as
         (select r.agency_id,a.agency_name ,r.route_id, r.route_short_name from
         (select s.stop_id
         from stops s, vbn
         
         where st_contains(vbn.geom, st_point(s.stop_lon, s.stop_lat)) 
         group by all) as s 
         join stop_times st on s.stop_id = st.stop_id
         join trips t on st.trip_id = t.trip_id
         join routes r on t.route_id = r.route_id
         join agency a on r.agency_id = a.agency_id
         group by all)""")

In [None]:
duck.sql(""" select * from lin_vbn""")

### Ermitteln der Fahrten im VBN

In [None]:
duck.sql("""select * 
         from trips t
         join (select service_id 
            from verkehrt_heute 
            where verkehrt_cd = 1 or verkehrt_c = 1) vh on t.service_id = vh.service_id
         join lin_vbn l on t.route_id = l.route_id
         """)

In [None]:
duck.sql("""select a.agency_name, a.agency_id, r.route_short_name,
         from routes r
         join agency a on r.agency_id = a.agency_id
         where lower(r.route_short_name) like lower('%680%') """).df()

In [None]:
duck.sql("from agency where lower(agency_name) like lower('%Weser%') ").df()

## Einlesen Protobuf

In [None]:
def get_gtfsr_data(url):
    """Fetches GTFS-realtime data from the specified URL and processes it into DataFrames.
    Args:
        url (str): The URL to fetch the GTFS-realtime data from.    
    Returns:
        tuple: A tuple containing two DataFrames:
            - trip_updates_df: DataFrame with trip updates.
            - stop_time_updates_df: DataFrame with stop time updates.
    """
    feed = gtfs_realtime_pb2.FeedMessage()
    response = requests.get(url)
    feed.ParseFromString(response.content)
    trip_updates = []
    stop_time_updates = []
    is_deleted = []
    for entity in feed.entity:
        #print(entity)
        id = entity.id
        #print(id)

        if entity.HasField('trip_update'):
        #     print(entity.trip_update)
        #     #print(entity.trip.trip_id)
            # print('tu: ',entity.trip_update.trip.trip_id, 
            #       entity.trip_update.trip.route_id, 
            #        entity.trip_update.trip.start_time, 
            #        entity.trip_update.trip.start_date,
            #        entity.trip_update.trip.direction_id,
            #        entity.trip_update.trip.schedule_relationship,
            #        )
            trip_updates.append([entity.trip_update.trip.trip_id, entity.trip_update.trip.route_id, 
                                entity.trip_update.trip.start_time, entity.trip_update.trip.start_date, 
                                entity.trip_update.trip.schedule_relationship])
            
            if entity.trip_update.stop_time_update:
                for stu in entity.trip_update.stop_time_update:
                    #print('stu: ', stu.stop_sequence, stu.stop_id, 'dep: ',stu.departure, ' arr:', stu.arrival)
                    stop_time_updates.append([entity.trip_update.trip.trip_id, 
                                    stu.stop_sequence,
                                    stu.stop_id,
                                    stu.departure.delay,
                                    stu.arrival.delay])
                

        if entity.HasField('alert'):
            print(entity.alert)

        if entity.HasField('is_deleted'):
            print(entity.is_deleted)

    trip_updates_df = pd.DataFrame(trip_updates, columns=['trip_id', 'route_id', 'start_time', 'start_date', 'schedule_relationship'])
    stop_time_updates_df = pd.DataFrame(stop_time_updates, columns=['trip_id', 'stop_sequence', 'stop_id', 'departure', 'arrival'])

    return trip_updates_df, stop_time_updates_df
    


### Einlesen von GTFS-R feeds

In [None]:
for i in range(0, 2):
    trip_updates_df, stop_time_updates_df = get_gtfsr_data(url)
    ts = dt.datetime.now().strftime('%Y%m%d%H%M')
    trip_updates_df.to_parquet(f'data/trip_updates_{ts}.parquet', index=False)  # Speichern als Parquet-Datei
    stop_time_updates_df.to_parquet(f'data/stop_time_updates_{ts}.parquet', index=False)  # Speichern als Parquet-Datei
    time.sleep(60)  # 1 Minute warten, bevor die nächste Abfrage erfolgt

## Zusammenfahren GTFS-Realtime und GTFS

In [None]:
duck.sql("""create or replace table trip_updates as select * from 'data/trip_updates*.parquet';""")
duck.sql("""create or replace table stop_times_updates as select * from 'data/stop_time_updates*.parquet';""")

In [None]:
duck.sql("""create or replace table trip_updates_vbn as
         (select t.trip_id, a.agency_name, r.route_short_name, t.trip_short_name,t.trip_headsign,tu.start_time ,a.agency_id,  
         stu.stop_sequence,stu.arrival, stu.departure, s.stop_name
             from trips t
             join routes r on t.route_id = r.route_id
             join agency a on r.agency_id = a.agency_id             
             join trip_updates tu on t.trip_id = tu.trip_id
             join stop_times_updates stu on tu.trip_id = stu.trip_id
             join stops s on stu.stop_id = s.stop_id
             join lin_vbn lv on r.agency_id = lv.agency_id and r.route_id = lv.route_id
           order by r.route_short_name, t.trip_short_name, stu.stop_sequence)""")

In [None]:
duck.sql("""select * from trip_updates_vbn where route_short_name like '%440%'""").df()

In [None]:
duck.sql("""select agency_name, route_short_name 
         from trip_updates_vbn 
         group by all
         order by agency_name, route_short_name""").df()

## Verknüpfung Fahrten heute mit Verlauf

In [None]:
"""update rt set sollabfahrt_ts = datum::date + interval (split_part(sollabfahrt, ':', 1)::int * 3600 + split_part(sollabfahrt, ':', 2)::int * 60 + split_part(sollabfahrt, ':', 3)::int) seconds where trim(sollabfahrt) != '' """

In [None]:
#Berechnng Abfahrten
##1
duck.sql("""select departure_time::time, current_time, datediff('HOUR', departure_time::time, current_time::time), 
         current_time::time from stop_times
        """)

In [None]:
duck.sql("""select r.route_short_name, t.trip_short_name, t.trip_headsign, st.stop_sequence, 
         s.stop_name, st.departure_time, tu.departure, tu.arrival, max(departure) over (partition by t.trip_short_name) as max_departure,
         from stop_times st
         join trips t on st.trip_id = t.trip_id
         join verkehrt_heute vh on t.service_id = vh.service_id
         join stops s on st.stop_id = s.stop_id
         join routes r on t.route_id = r.route_id
         join lin_vbn lv on r.agency_id = lv.agency_id and r.route_id = lv.route_id
         join trip_updates_vbn tu on t.trip_id = tu.trip_id and st.stop_sequence = tu.stop_sequence

         where r.route_short_name in (select nummer from dm.basis.linien where aktiv = True and ebene in ('1', '1+') order by nummer) 
         -- and abs(datediff('HOUR', departure_time::time, current_time::time)) < 2

         qualify max_departure = departure

         order by r.route_short_name
                 
         
         """).df()

In [None]:
duck.close()