In [1]:
# load libraries
import pandas as pd
import os, re
import datetime as dt
from sqlalchemy import create_engine, text
import zipfile
from dask import dataframe as dd

## Setup

In [2]:
# Welches Jahr?
jahr = "2020"
# Welcher Zip?
zipname = "20211015_fahrplaene_gesamtdeutschland_gtfs"

# define paths
workingdir = "/home/maita/data/"#"/home/jupyter-maita.schade/VW_Data_Hub/Gap_Map/"
#storagedir = "smb://192.168.90.30/allmende%20verkehr/4%20Projekte/2%20Projekte%20Mobilitätswende/ÖV-Deutschlandkarte%20(Gap-Map)/Berechnungen/raw/gtfs/"

# constructed paths
rawdir = workingdir #+ "raw/"
rawdatadir = rawdir #+ "gtfs/" + 'delfi/'# + jahr + "/"
outdir = workingdir #+ "out/" + 'delfi/'# + jahr + "/"
#inpath = "{0}{1}_{2}.db".format(rawdatadir,jahr,datum)
zippath = rawdatadir + zipname + ".zip"

# set up zip file as default for functions
zf = zipfile.ZipFile(zippath) # this is the raw stuff

In [3]:
# choose file-based output connection
outpath = '{0}{1}.db'.format(outdir,zipname)
# set up DB connection
dbout = create_engine('sqlite:///' + outpath)

# Count service_ids

In [4]:
def interveningWeekdays(start, end, inclusive=True, weekdays=[0, 1, 2, 3, 4]):
    # a useful function from Stackoverflow, to count particular weekdays in date range
    if isinstance(start, dt.datetime):
        start = start.date()               # make a date from a datetime

    if isinstance(end, dt.datetime):
        end = end.date()                   # make a date from a datetime

    if end < start:
        # you can opt to return 0 or swap the dates around instead
        # raise ValueError("start date must be before end date")
        end, start = start, end

    if inclusive:
        end += dt.timedelta(days=1)  # correct for inclusivity

    try:
        # collapse duplicate weekdays
        weekdays = {weekday % 7 for weekday in weekdays}
    except TypeError:
        weekdays = [weekdays % 7]
        
    ref = dt.date.today()                    # choose a reference date
    ref -= dt.timedelta(days=ref.weekday())  # and normalize its weekday

    return sum((ref_plus - start).days // 7 - (ref_plus - end).days // 7
               for ref_plus in
               (ref + dt.timedelta(days=weekday) for weekday in weekdays))

def countDaysInIntervalHelper(calendarrow):
    # function to find number of days of service operation based on calendars.txt-entry
    servicepattern = calendarrow.loc["monday":"sunday"].to_numpy()
    servicedays = servicepattern.nonzero()[0].tolist()

    startdate = dt.datetime.strptime(str(int(calendarrow.get("start_date"))),"%Y%m%d")
    enddate = dt.datetime.strptime(str(int(calendarrow.get("end_date"))),"%Y%m%d")
    return(interveningWeekdays(startdate, enddate, weekdays = servicedays))

### Helper function to compare dates
def isInIntervalHelper(n, interval):
    '''works only on ARRAY-like n'''
    return(np.where((n <= max(interval)) & (n >= min(interval)), True, False))

In [5]:
# function to add frequencies... let's hope this is right
def addCountToCalendar(calendar_df, calendar_dates_df):
    # enriches stop_times DataFrame with information about how often in the feed
    # period each stop is made
    

    print("Getting number of service days for each service")
    # use service_id to find service...
    calendar_df["days_count"] = calendar_df.apply(countDaysInIntervalHelper, axis=1)    

    print("\t...aggregating calendar")
    calendar_df = calendar_dates_df.groupby(["service_id", "exception_type"], as_index=False
                              ).count(
                            ).pivot(index = "service_id", columns = "exception_type", values = "date"
                            ).reset_index(
                            ).merge(calendar_df, on="service_id", how="right"
                            )[['service_id', 1, 2, 'monday',
                                  'tuesday',  'wednesday',   'thursday',     'friday',   'saturday',
                                  'sunday', 'start_date',   'end_date', 'days_count']]
    
    print("\t...calculating total in calendar")
    calendar_df.days_count= (calendar_df.days_count + calendar_df[1].fillna(0) - calendar_df[2].fillna(0)
                            )
    
    return(calendar_df)

In [6]:
def feedDays(calendar_df, calendar_dates_df):
    ''' Enriches counted dataframe with average daily count for each stop,
    using the feed's calendar information to infer the number of days
    '''
    # calculate
    startdate =  min(pd.to_datetime(calendar_df.start_date,format="%Y%m%d"))
    enddate = max(pd.to_datetime(calendar_df.end_date,format="%Y%m%d"))
    excdates = pd.to_datetime(calendar_dates_df.date,format="%Y%m%d")

    firstdate = min(startdate, min(excdates))
    lastdate = max(enddate, max(excdates))

    ndays = (lastdate - firstdate).days
    
    return(ndays)

In [7]:
calendar_df = pd.read_csv(zf.open("calendar.txt"))
calendar_dates_df = pd.read_csv(zf.open("calendar_dates.txt"))

In [8]:
calendar_df = addCountToCalendar(calendar_df, calendar_dates_df)

Getting number of service days for each service
	...aggregating calendar
	...calculating total in calendar


In [9]:
ndays = feedDays(calendar_df, calendar_dates_df)

In [10]:
calendar_df.head()

Unnamed: 0,service_id,1,2,monday,tuesday,wednesday,thursday,friday,saturday,sunday,start_date,end_date,days_count
0,1,,7.0,1,1,1,1,1,0,0,20211001,20211211,44.0
1,2,,9.0,1,1,1,1,1,1,1,20211001,20211211,63.0
2,3,6.0,3.0,0,0,0,0,0,1,1,20211001,20211211,24.0
3,4,1.0,1.0,0,0,0,0,0,0,1,20211001,20211211,10.0
4,5,6.0,1.0,0,0,0,0,0,0,1,20211001,20211211,15.0


# Pick out routes

In [20]:
fv_routes = pd.read_csv(zipfile.ZipFile("/home/maita/Downloads/latest.zip"
                        ).open("routes.txt"))

In [21]:
fv_routes

Unnamed: 0,route_long_name,route_short_name,agency_id,route_type,route_id
0,EC,EC,1,2,6
1,EN,EN,1,2,10
2,EuroCity,EC,1,2,9
3,EuroCity-Express,ECE,1,2,8
4,EuroNight,EN,1,2,5
5,IC,IC,1,2,7
6,ICE,ICE,1,2,4
7,Intercity,IC,1,2,3
8,Intercity-Express,ICE,1,2,2
9,RJ,RJ,1,2,1


In [33]:
routes_df = pd.read_csv(zf.open("routes.txt"))

routes_df = fv_routes[["route_long_name"]
                ].merge(routes_df, how="inner",on="route_long_name"
                ).sort_values("route_id")

# Get things into database

## calendar

In [11]:
calendar_df.to_sql("calendar", 'sqlite:///' + outpath,
          if_exists = 'replace')

## routes

In [34]:
routes_df.to_sql("routes", 'sqlite:///' + outpath,
          if_exists = 'replace')

## trips, stops, et al.

In [12]:
%%time
chunksize = 200000

for table_name in ['stops','trips', 'stop_times']: #,'routes', 'calendar'
    print(table_name)

    j=0
    for df in pd.read_csv(zf.open(table_name + ".txt"),
                          chunksize=chunksize, iterator=True, encoding='utf-8',
                           dtype={'Unnamed: 0': 'float64',
                           'drop_off_type': 'object',
                           'pickup_type': 'object',
                           'stop_sequence': 'object',
                           'trip_id': 'object',
                           'stop_headsign': 'object'}
                         ):
        j+=1
        if j==1:
            df.to_sql(table_name, dbout, if_exists='replace')
        else:
            df.to_sql(table_name, dbout, if_exists='append')

stops


  return caller(func, *(extras + args), **kw)


trips


  return caller(func, *(extras + args), **kw)


stop_times
CPU times: user 4min 47s, sys: 9.46 s, total: 4min 56s
Wall time: 5min 5s


dd was supposed to parallelize this process, but it's actually just as slow, so let's not bother. 

In [None]:
# for table_name in ['stops','trips', 'routes']: #'stop_times','calendar'
#     print(table_name)

#     df = dd.read_csv(rawdatadir + table_name + ".txt", dtype={'level_id': 'float64',
#                                                               'parent_station': 'object',
#                                                               'platform_code': 'object',
#                                                               'stop_headsign': 'object',
#                                                               'trip_short_name': 'object'
#                                                               }

#                                                               )
    
#     df.to_sql(table_name,'sqlite:///'+outpath,
#           if_exists = 'replace')

## Testing dd vs regular loop

In [22]:
%%time
# loop--zip
start = dt.datetime.now()
chunksize = 200000
j = 0

for df in pd.read_csv(zf.open("trips.txt"), chunksize=chunksize, iterator=True, encoding='utf-8'):
    j+=1
#     print(j)
    if j%10==0:
        print('\t{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j*chunksize))

#     print("\t...", len(result_df))
    if j==1:
        df.to_csv(outdir + "trips_cp.txt", mode = 'w')
    else:
        df.to_csv(outdir + "trips_cp.txt", mode = 'a')


	5 seconds: completed 2000000 rows
	10 seconds: completed 4000000 rows
	16 seconds: completed 6000000 rows
	22 seconds: completed 8000000 rows
	27 seconds: completed 10000000 rows
	33 seconds: completed 12000000 rows
	39 seconds: completed 14000000 rows
	44 seconds: completed 16000000 rows
	50 seconds: completed 18000000 rows
CPU times: user 54.4 s, sys: 959 ms, total: 55.4 s
Wall time: 55.5 s


In [27]:
%%time
# loop--no zip
start = dt.datetime.now()
chunksize = 200000
j = 0

for df in pd.read_csv(rawdir + "trips.txt", chunksize=chunksize, iterator=True, encoding='utf-8'):
    j+=1
#     print(j)
    if j%10==0:
        print('\t{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j*chunksize))

#     print("\t...", len(result_df))
    if j==1:
        df.to_sql("trips",
          'sqlite:///'+outpath,
          if_exists = 'replace') #.to_csv(outdir + "trips_cp.txt", mode = 'w')
    else:
        df.to_sql('trips',
          'sqlite:///'+outpath,
          if_exists = 'append') #.to_csv(outdir + "trips_cp.txt", mode = 'a')


	27 seconds: completed 2000000 rows
	42 seconds: completed 4000000 rows
	57 seconds: completed 6000000 rows
	72 seconds: completed 8000000 rows
	87 seconds: completed 10000000 rows
	102 seconds: completed 12000000 rows
	117 seconds: completed 14000000 rows
	133 seconds: completed 16000000 rows
	148 seconds: completed 18000000 rows
CPU times: user 2min 24s, sys: 9.81 s, total: 2min 33s
Wall time: 2min 42s


In [25]:
%%time
table_name = 'trips'
df = dd.read_csv(rawdir + table_name + ".txt", 
                 dtype={'Unnamed: 0': 'float64',
                       'drop_off_type': 'object',
                       'pickup_type': 'object',
                       'stop_sequence': 'object',
                       'trip_id': 'object',
                       'stop_headsign': 'object',
                       'wheelchair_accessible': 'float64'}
                )
df.to_sql(table_name,
          'sqlite:///'+outpath,
          if_exists = 'replace')

CPU times: user 2min 44s, sys: 20.4 s, total: 3min 4s
Wall time: 3min


In [None]:
%%time
calendar_dd.size.compute()

In [None]:
calendar_dd.to_sql("calendar", 'sqlite:///' + outpath)

# Database Merging

In [5]:
%load_ext sql

%sql sqlite:////home/maita/data/20211015_fahrplaene_gesamtdeutschland_gtfs_test.db
# %sql sqlite:////home/jupyter-maita.schade/VW_Data_Hub/Gap_Map/out/delfi/20211015_fahrplaene_gesamtdeutschland_gtfs_test.db


In [None]:
%%sql 
SELECT 
    name
FROM 
    sqlite_schema
WHERE 
    type ='table' AND 
    name NOT LIKE 'sqlite_%';

    CREATE TABLE n_stops AS
       ...> SELECT stop_id, SUM(days_count)
       ...> FROM stop_times
       ...> LEFT JOIN trips ON trips.trip_id = stop_times.trip_id
       ...> LEFT JOIN calendar ON trips.service_id = calendar.service_id
       ...> GROUP BY stop_id;


Create a separate trips table with only those included in selected routes.

In [42]:
%%sql
CREATE TABLE trips_fv AS
SELECT routes.route_short_name, routes.route_id, trips.service_id, trips.trip_headsign, trips.direction_id, trips.trip_id
FROM routes
LEFT JOIN trips ON routes.route_id = trips.route_id

   sqlite:////home/maita/VW_Data_Hub/Gap_Map/out/delfi/20211015_fahrplaene_gesamtdeutschland_gtfs_test.db
 * sqlite:////home/maita/data/2021_reissue_2.db
Done.


[]

Count all the stops at each stop

In [None]:
%%time
%%sql 
CREATE TABLE n_stops AS 
SELECT stop_id, SUM(days_count) 
FROM stop_times 
LEFT JOIN trips ON trips.trip_id = stop_times.trip_id 
LEFT JOIN calendar 
ON trips.service_id = calendar.service_id 
GROUP BY stop_id;

 * sqlite:////home/jupyter-maita.schade/VW_Data_Hub/Gap_Map/out/delfi/20211015_fahrplaene_gesamtdeutschland_gtfs_test.db


Count only the fv-stops at each stop.

In [None]:
%%time
%%sql 
CREATE TABLE n_stops_fv AS 
SELECT stop_id, SUM(days_count) AS n
FROM (
    SELECT routes.route_short_name, routes.route_id, trips.service_id, trips.trip_headsign, trips.direction_id, trips.trip_id
    FROM routes
    LEFT JOIN trips ON routes.route_id = trips.route_id) AS trips_fv
LEFT JOIN stop_times ON trips_fv.trip_id = stop_times.trip_id 
LEFT JOIN calendar ON trips_fv.service_id = calendar.service_id 
GROUP BY stop_id;

 * sqlite:////home/maita/data/2021_reissue_2.db


Without route filter:

In [13]:
%%time
print(dt.datetime.now())

out_df = pd.read_sql_query(
    'SELECT n_stops.stop_id, n, stop_name, parent_station, stop_lat, stop_lon, location_type '
    'FROM ('
        'SELECT stop_id, SUM(days_count) AS n '
        'FROM trips '
        'LEFT JOIN stop_times ON trips.trip_id = stop_times.trip_id '
        'LEFT JOIN calendar ON trips.service_id = calendar.service_id '
        'GROUP BY stop_id '
    ') AS n_stops '
    'JOIN stops ON n_stops.stop_id = stops.stop_id',
    dbout
)

2021-10-28 13:39:26.129216
CPU times: user 51.8 s, sys: 3.58 s, total: 55.4 s
Wall time: 55.6 s


With route filter:

In [8]:
%%time
print(dt.datetime.now())

out_df = pd.read_sql_query(
    'SELECT n_stops.stop_id, n, stop_name, parent_station, stop_lat, stop_lon, location_type '
    'FROM ('
        'SELECT stop_id, SUM(days_count) AS n '
        'FROM ('
            'SELECT routes.route_short_name, routes.route_id, trips.service_id, trips.trip_headsign, trips.direction_id, trips.trip_id '
            'FROM routes '
            'LEFT JOIN trips ON routes.route_id = trips.route_id '
        ') AS trips_fv '
        'LEFT JOIN stop_times ON trips_fv.trip_id = stop_times.trip_id '
        'LEFT JOIN calendar ON trips_fv.service_id = calendar.service_id '
        'GROUP BY stop_id '
    ') AS n_stops '
    'JOIN stops ON n_stops.stop_id = stops.stop_id',
    dbout
)

2021-10-28 11:18:44.895163
CPU times: user 9min 15s, sys: 4min 33s, total: 13min 48s
Wall time: 20min 10s


In [15]:
out_df.sort_values("stop_id")

Unnamed: 0,stop_id,n,stop_name,parent_station,stop_lat,stop_lon,location_type
9708,000000000001,13854.0,"Torgau, Str. der Jugend",,51.557454,12.999008,0
11706,000000000001_G,1050.0,"Zinna, Dübener Allee",,51.566799,12.954188,0
13734,000000000001_G_G,438.0,"Eilenburg, Ernst-Mey-Str.",,51.465307,12.652963,0
14292,000000000001_G_G_G,616.0,Bernburg (Saale) Rosenhag,,51.797766,11.731336,0
171883,000000000001_G_G_G_G,5.0,Benndorf (Mansfelder Land),,51.575446,11.492005,0
...,...,...,...,...,...,...,...
137905,nl:4:18038,4366.0,Arnhem Centraal,,51.984781,5.901003,0
137898,nl:9:17980,5262.0,Glanerbrug,,52.218572,6.973882,0
137899,nl:9:17982,5262.0,Enschede De Eschmarke,,52.221137,6.951102,0
137907,nl:9:18124,5262.0,Enschede,,52.222560,6.890227,0


# Wrap up and write out

In [16]:
out_df["n_day"] = out_df.n/ndays

In [17]:
out_df.to_csv(outdir + zipname + "fv.nstops.csv")