# Einlesen des deutschlandweiten Datensatzes
- Nutzung Duckdb API/engine
- Stand 03.11.2025

Hilfe zu Duckdb und Erweiterungen

- https://github.com/duckdblabs/duckdb_spatial 

In [None]:
import pandas as pd
import duckdb
import datetime as dt
import geopandas as gpd
import matplotlib.pyplot as plt
from dotenv import load_dotenv
import os
from urllib import request
import requests

In [None]:
load_dotenv()

In [None]:
duckdb.__version__

In [None]:
heute = dt.datetime.now().strftime('%Y%m%d')
heute

## Download des GTFS-Datensatzes
- sollte über den personalisierten Link aus dem Portal funktionieren
- 

In [None]:
zip = 'gtfs_d.zip'

In [None]:
response = requests.get(os.getenv("zip_url"))
with open(zip, "wb") as f:
    f.write(response.content)

## Festlegen DB und initialsieren der DB

In [None]:
#Ablage der DuckDB Datenbank
db = 'gtfs_d.db'

In [None]:
duck = duckdb.connect(db)

In [None]:
duck.sql("""
    INSTALL spatial;
    LOAD spatial;
    INSTALL httpfs;
    LOAD httpfs;
    INSTALL zipfs FROM community;
    LOAD zipfs;
         """)


## Erstellen der Datenbank mithilfe des SQL-Skripts gtfs.sql

In [None]:
sql= """
--Erzeugen eines ENUM Type (begrenzte Anzahl von Werten) nur mit String nicht integer
--bei Type gibt es kein replace, daher können diese nur einmalig ausgeführt, ansonsten Error
CREATE TYPE enum01 AS ENUM ('0', '1');
CREATE TYPE enum012 AS ENUM ('0', '1', '2');
CREATE TYPE enum0123 AS ENUM ('0', '1', '2', '3');
CREATE TYPE enum01234 AS ENUM ('0', '1', '2', '3', '4');
CREATE TYPE enum12 AS ENUM ('1', '2');
CREATE TYPE enum_route_type AS ENUM ('0','1', '2','3', '4', '5', '6', '7', '11', '12');"""

In [None]:
duck.sql(sql)

In [None]:
duck.sql("create or replace table vbn as select * from st_read('vbn.gpkg');")

In [None]:
duck.sql("select * from vbn")

In [None]:
sql = f"""

--Einlesen der GTFS Tabellen

CREATE or REPLACE table agency AS 
	select * from read_csv('zip://{zip}/agency.txt', delim=',', header=true, dateformat = '%Y%m%d',
	columns={{'agency_id': 'VARCHAR', 
	'agency_name': 'VARCHAR', 
	'agency_url': 'VARCHAR', 
	'agency_timezone': 'VARCHAR', 
	'agency_lang': 'VARCHAR', 
	'agency_phone':'VARCHAR'}});


CREATE OR REPLACE table calendar AS 
	select * from read_csv('zip://{zip}/calendar.txt', delim=',', header=true, dateformat = '%Y%m%d',
	columns={{'service_id': 'VARCHAR', 
	'monday': 'enum01', 
	'tuesday': 'enum01', 
	'wednesday': 'enum01', 
	'thursday':'enum01', 
	'friday':'enum01',
	'saturday': 'enum01',
	'sunday' : 'enum01',
	'start_date' : 'DATE',
	'end_date' : 'DATE'}});


CREATE or REPLACE table calendar_dates AS 
	select * from read_csv('zip://{zip}/calendar_dates.txt', delim=',', header=true, dateformat = '%Y%m%d',
	columns={{'service_id': 'VARCHAR', 
	'date': 'DATE', 
	'exception_type': 'enum12'}});

create or replace table frequencies as select * from read_csv_auto('zip://{zip}/frequencies.txt');
create or replace table levels as select * from read_csv_auto('zip://{zip}/levels.txt');
create or replace table pathways as select * from read_csv_auto('zip://{zip}/pathways.txt');

CREATE or REPLACE table routes AS 
	select * from read_csv('zip://{zip}/routes.txt', delim=',', header=true, dateformat = '%Y%m%d',
	columns={{'route_id': 'VARCHAR', 
	'agency_id': 'VARCHAR', 
	'route_short_name': 'VARCHAR', 
	'route_long_name': 'VARCHAR', 
	'route_type':'INTEGER',  -- eigentlich enum aber fehlerhafte Daten mit route_type 715
	'route_color':'VARCHAR',
	'route_text_color': 'VARCHAR',
	'route_desc' : 'VARCHAR'}});

create or replace table shapes as select * from read_csv_auto('zip://{zip}/shapes.txt');

CREATE or REPLACE table stop_times AS 
	select * from read_csv('zip://{zip}/stop_times.txt',
    header=true, 
    dateformat = '%Y%m%d',
    store_rejects=true,
	columns={{
	'trip_id': 'VARCHAR', 
	'stop_id': 'VARCHAR',
	'stop_sequence':'INT16',
	'pickup_type' : 'enum0123',
	'drop_off_type' : 'enum0123',
	'stop_headsign':'VARCHAR',
	'arrival_time': 'VARCHAR', 
	'departure_time': 'VARCHAR', 
}});

CREATE or REPLACE table stops AS 
	select * from read_csv('zip://{zip}/stops.txt',   
    header=true, 
    dateformat = '%Y%m%d',
     store_rejects=true,
	columns={{'stop_id': 'VARCHAR', 
	'stop_code': 'VARCHAR',     
	'stop_name': 'VARCHAR', 
	'stop_desc': 'VARCHAR', 
	'stop_lat':'DOUBLE', 
	'stop_lon':'DOUBLE',	
	'location_type' : 'enum01234',
	'parent_station' : 'VARCHAR',
	'wheelchair_boarding': 'enum012',
	'platform_code': 'VARCHAR',
	'level_id': 'VARCHAR'}});

create or replace table transfers as select * from read_csv_auto('zip://{zip}/transfers.txt');
--create or replace table trips as select * from read_csv_auto('/home/ts/python/duckdb/gtfs/trips.txt');

CREATE or REPLACE TABLE trips AS 
	select * from read_csv('zip://{zip}/trips.txt', 
    delim=',', 
    header=true, 
    dateformat = '%Y%m%d',
    store_rejects=true,
	columns={{'route_id' : 'VARCHAR', 
		'service_id': 'VARCHAR', 
	'trip_id': 'VARCHAR', 
	'trip_headsign': 'VARCHAR', 
	'trip_short_name': 'VARCHAR', 
	'direction_id':'enum01', 
	'block_id':'VARCHAR',
	'shape_id': 'VARCHAR',
	'wheelchair_accessible' : 'enum012',
	'bikes_allowed' : 'enum012'}});


--Erstellen einer Tabelle Stops mit Geom-Spalte
create or replace table stops_geom as select *, st_point(stop_lon, stop_lat) as geom from stops;
ALTER TABLE stops add column geom Geometry;
UPDATE stops set geom = st_point(stop_lon, stop_lat);

-- Verknüpfung des Verlaufs mit den Haltestellennamen
create or replace view vw_trip_stop as
	SELECT a.agency_id, r.route_id, r.route_short_name, st.trip_id, t.trip_short_name, t.service_id, st.stop_sequence, st.stop_id, s.stop_name, st.arrival_time, st.departure_time  
	FROM stop_times st 
	JOIN stops s on s.stop_id = st.stop_id 
	JOIN trips t on st.trip_id = t.trip_id 
	JOIN routes r on r.route_id = t.route_id
	JOIN agency a on a.agency_id = r.agency_id;

 --   trip_id mit weiteren Daten anreichern
create or replace view vw_trip_long as
	SELECT a.agency_id,a.agency_name, r.route_id, r.route_short_name, t.trip_id, t.trip_short_name, t.service_id

	FROM trips t 
	JOIN routes r on r.route_id = t.route_id
	JOIN agency a on a.agency_id = r.agency_id;

"""

In [None]:
duck.sql(sql)

### Fehler / verworfene Zeilen aus reject_errors

In [None]:
duck.sql(" from reject_errors").df()

### Auswertung auffällige Koordinaten

In [None]:
duck.sql("""(select * FROM stops order by stop_lat asc limit 5)
         union 
         (select * FROM stops order by stop_lat desc limit 5)
        union
         (select * FROM stops order by stop_lon asc limit 5)
         union 
         (select * FROM stops order by stop_lon desc limit 5)

         order by stop_lat, stop_lon
        ;
         
         """)

## Haltestellen im VBN

In [None]:
duck.sql("""
         create or replace table stops_vbn as 
	        select s.stop_id, s.stop_name, s.geom from stops s join vbn on st_within(s.geom, vbn.geom);
         """)

In [None]:
duck.sql("from stops_vbn")

In [None]:
duck.sql("""
         select count(*) from stop_times
            """)

### Erstellen des gesamten Linestring je trip_id

In [None]:
duck.sql("""
         create or replace table line_trip as SELECT trip_id, 
         st_transform(st_makeline(list(geom)), 'EPSG:4326', 'EPSG:25832') as geometry, st_length(geometry) as length
         FROM (select trip_id, stop_times.stop_id, stop_sequence, geom from 
         stop_times
         
         join stops on stops.stop_id = stop_times.stop_id
         order by trip_id, stop_sequence)

         group by trip_id
         """)

### Erstellen Linestring der einzelnen Fahrtabschnitte

In [None]:
duck.sql("""create or replace table line_segment as
         select start.trip_id, start.stop_id as stop_id_s, ende.stop_id as stop_id_e, start.stop_sequence, 
         st_transform(st_makeLine(start.geom, ende.geom), 'EPSG:4326', 'EPSG:25832') as geometry, st_length(geometry) as length
         from 
         (select trip_id, stop_times.stop_id, stop_sequence, geom 
         from stop_times          
         join stops on stops.stop_id = stop_times.stop_id
         order by trip_id, stop_sequence) as start
         join 
        (select trip_id, stop_times.stop_id, stop_sequence, geom 
         from stop_times          
         join stops on stops.stop_id = stop_times.stop_id
         order by trip_id, stop_sequence) as ende

         on start.trip_id = ende.trip_id and start.stop_sequence +1 = ende.stop_sequence 
         """)

In [None]:
duck.sql("""SELECT l.trip_id, (l.length/1000)::int64 as lenght_km, t.*
         FROM line_segment l
         join vw_trip_long t on l.trip_id = t.trip_id
         order by l.length desc LIMIT 500;""")

In [None]:
duck.sql("SELECT * exclude(geom), st_astext(geom) as geo FROM stops LIMIT 10").df()

In [None]:
# Example: df is your DataFrame and 'wkt_column' is the column with WKT strings
# Replace 'wkt_column' with your actual column name

sel_df = duck.sql("""SELECT * exclude(geom), st_astext(geom) as geometry 
                  FROM stops
                  where stop_id like ('de:04011%')""").df()

gdf = gpd.GeoDataFrame(
    sel_df,
    geometry=gpd.GeoSeries.from_wkt(sel_df['geometry']),
    crs='EPSG:4326'  # Set CRS as needed
)

gdf.plot()

In [None]:
duck.close()