## Abfrage Schnittstelle und Ablage in DuckDBzum Hafas Echtzeit-Archiv Produktiv / Demo-System / Ablage in DuckDB

Stand: 18.11.2023

#### Aufgaben
- Schema XML https://fahrplaner.vbn.de/archive/services/archiveExportService/v14?wsdl 
- Dokumentation unter docs

#### Import Module

In [2]:
import requests
import xml.etree.ElementTree as ET
import xml.dom.minidom
import datetime as dt
import time
import calendar
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, Polygon, LineString

import tarfile

from datetime import timedelta

import duckdb

import os
import glob
import sys

import shutil
import string

from sqlalchemy import create_engine #als Alternative zu Mysql pyscopg2 Connector
from sqlalchemy import text

from importlib import reload

In [3]:
import para
import rt_archiv_func_07 as rt_func #Import der benutzerdefinierten Funktionen
reload(rt_func)

<module 'rt_archiv_func_07' from '/home/zvbn/python/rt2/rt_archiv_func_07.py'>

In [4]:
pd.options.display.max_columns = 500

# Erstellen der Funktionen

## Aufrufen der Abfrage

In [5]:
def request_xml(api_version, xml_request, xml_out, xml_path):
    #Zugriff auf Hafas RT Archiv Produktiv System und Zugriffsschlüssel
    myUrl = 'https://fahrplaner.vbn.de/archive/services/archiveExportService/v'+str(api_version)+'?wsdl'
    clientID = 'PMQmY5p9y8kmoTno'
    req_ini = requests.post(myUrl, data=xml_request)
    root = ET.fromstring(req_ini.text)
    #Ermitteln der Export ID
    for child in root.iter('exportId'):
        #print(child.tag, child.attrib, child.text)
        exportId = child.text
    xml_status = ('<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" '
               'xmlns:v'+str(api_version)+'="http://v'+str(api_version)+'.export.service.data.archive.itcs.hafas.hacon.de/">'
               '<soapenv:Header/>'
                    '<soapenv:Body>'
                        '<v'+str(api_version)+':getArchiveExportStatus>'
                            '<exportId>' + exportId + '</exportId>'
                        '</v'+str(api_version)+':getArchiveExportStatus>'
                    '</soapenv:Body>'
              '</soapenv:Envelope>')
    #Abfragen und Warten auf Completed
    status = ''
    time.sleep(2) # initiales Warten auf Beendigung
    while status != 'COMPLETED':
        r = requests.post(myUrl, data=xml_status)
        #print(r, '\n',r.text)
        root = ET.fromstring(r.text)
        for child in root.iter('status'):
            #print(child.tag, child.attrib, child.text)
            status = child.text
            print(f'{dt.datetime.now()} Status: {status}')
            if status != 'COMPLETED': # Pause falls Job nicht beendet (Status nicht completed d.h. in process)
                time.sleep(20) # Pause von 20 Sekunden bis zur nächsten Abfrage des Status
    
    # Afrage nach Beendigung
    xml_jl = ('<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" '
               'xmlns:v'+str(api_version)+'="http://v'+str(api_version)+'.export.service.data.archive.itcs.hafas.hacon.de/">'
                 '<soapenv:Header/><soapenv:Body>'
                    '<v'+str(api_version)+':getArchiveJourneyList>'
                       '<exportId>' + exportId + '</exportId>'              
                     '</v'+str(api_version)+':getArchiveJourneyList>'
                 '</soapenv:Body>'
          '</soapenv:Envelope>')
    
    r = requests.post(myUrl, data=xml_jl)

    #Ausgabe des Ergebnis XML
    dom = xml.dom.minidom.parseString(r.text)
    pretty_xml_as_string = dom.toprettyxml()
    
    jl = open(os.path.join(xml_path, xml_out), 'w')
    print(pretty_xml_as_string, file = jl)
    print(os.path.join(xml_path, xml_out), 'gespeichert')

    jl.close()

### Import des xml und Umwandeln nach Dataframe

In [6]:
def import_rt_xml_to_df(xml_file):
    format_date = '%Y-%m-%dT%H:%M:%S'
    lop = []
    
    # create element tree object 
    tree = ET.parse(xml_file)
    
    # get root element 
    root = tree.getroot() 

    for child in root.iter('archiveExportJourneyAndDetailsDto'):
        for journey in child.iter('journey'):

            #Ermitteln der Feldinhalte
            operday = dt.datetime.strptime(rt_func.isnone(journey.find('operatingDay'))[:-6], format_date).strftime('%Y-%m-%d')
            fnr = rt_func.isnone(journey.find('journeyID'))

            deviceId = rt_func.isnone(journey.find('deviceId'))
            clientId = rt_func.split_deviceid(journey.find('deviceId'))            

            journeyOperator = rt_func.isnone(journey.find('journeyOperator'))
            ex_lineid = rt_func.isnone(journey.find('externalLineId'))
            ex_linid_short = ':'.join(ex_lineid.split(':')[0:3])
            lineshortname = rt_func.isnone(journey.find('lineShortName'))
            destination = rt_func.isnone(journey.find('destination'))

            hasRealtime = rt_func.isnone_boolean(journey.find('hasRealtime'))
            journeyRtType = rt_func.isnone(journey.find('journeyRtType'))            

            journeycancelled = rt_func.isnone(journey.find('journeyCancelled')).capitalize()
            ts_reported_cancelled = rt_func.isnone(journey.find('lastTimestampJourneyCancellationReported'))
            reported_cancelled = True if len(ts_reported_cancelled) > 0 else False
            cancelled_kum = True if str(reported_cancelled) == 'True' else True if str(journeycancelled) == 'True' else False
            
            lop.append([operday, fnr, destination, hasRealtime, journeyOperator, ex_lineid, ex_linid_short, lineshortname, \
                        reported_cancelled, journeycancelled, ts_reported_cancelled, cancelled_kum, deviceId, clientId, journeyRtType])
            
            child.clear()

    df_fahrten = pd.DataFrame(lop, columns=['datum','fnr' ,'destination','hasRealtime' ,'vu', 'lineid', 'lineid_short', 'lineshort', \
                                            'reported_cancelled', 'journey_cancelled','ts_reported_cancelled' ,'cancelled_kum', 'deviceid', 'clientid', 'journeyrttype' ])
    return df_fahrten

## Ausgabe als formatiertes xml

In [7]:
#Testen des XML mit schöner Ausgabe
def print_pretty_xml(xml_request):
    dom = xml.dom.minidom.parseString(xml_request)
    pretty_xml_as_string = dom.toprettyxml()
    print(pretty_xml_as_string)

## Xml to tar.gz
- Packen und Löschen des Ausgangs xml Files

In [8]:
def xml_to_targz(xml_path, xml_file):
    tar_gz = xml_file + '.tar.gz'

    if os.path.exists(os.path.join(xml_path, tar_gz)):
        with tarfile.open(os.path.join(xml_path, tar_gz), 'r:gz') as tar:
            # Extract all files to the specified directory    
            tar.extractall(xml_path)
    else:
        print('no tar.gz')

    with tarfile.open(os.path.join(xml_path, tar_gz), 'w:gz') as archive:
        # Add files to the tarball
        archive.add(os.path.join(xml_path, xml_file), arcname= xml_file)
                    
    os.remove(os.path.join(xml_path, xml_file))

## Ermitteln verschiedener Zeitpunkte 

In [9]:
#Ermitteln der Zeitstempel
jetzt = dt.datetime.now().strftime('%Y%m%d%H%M')
heute = dt.date.today().strftime('%Y%m%d')
heute_ll = dt.datetime.now().strftime('%d.%m.%Y %H:%M')
gestern = (dt.date.today() - timedelta(1)).strftime('%Y-%m-%d')
vorgestern = (dt.datetime.now() - timedelta(2)).strftime('%Y-%m-%d')
vorvierwochen = (dt.date.today() - timedelta(28)).strftime('%Y-%m-%d')
vorsechswochen = (dt.date.today() - timedelta(42)).strftime('%Y-%m-%d')
letztesiebentage = (dt.date.today() - timedelta(7)).strftime('%Y-%m-%d')

print(f'Heute: {heute} \nJetzt: {jetzt} \nGestern: {gestern} \nVorgestern: {vorgestern} \nVor vier Wochen: {vorvierwochen} \nVor sechs Wochen: {vorsechswochen}')
print('vor einer Woche ' + letztesiebentage)
#ermitteln des letzten Monats

today = dt.date.today()
first = today.replace(day=1) #auf den ersten des aktuellen Monats setzen
lastmonth_last = first - dt.timedelta(days=1) # 1 Tag abziehen vom ersten Tag 
lastmonth_first = lastmonth_last.replace(day=1) #ersetzen des ersten Tages
lastmonth_first_str = str(lastmonth_first)
lastmonth_last_str = str(lastmonth_last)
print(f'Erster Tag des letzen Monats: {lastmonth_first} und letzter Tag:  {lastmonth_last}')

Heute: 20231204 
Jetzt: 202312040821 
Gestern: 2023-12-03 
Vorgestern: 2023-12-02 
Vor vier Wochen: 2023-11-06 
Vor sechs Wochen: 2023-10-23
vor einer Woche 2023-11-27
Erster Tag des letzen Monats: 2023-11-01 und letzter Tag:  2023-11-30


#### Aktuelle Version der Schnittstellenbeschreibung unter 
- docs/SmartVMS Export API v12.pdf
- docs/SmartVMS Export API v14_datatypes-1.xlsx

In [10]:
api_version = 14 #14 auf demo und ab August 2023 auf prod

In [11]:
#Zugriff auf Hafas RT Archiv Produktiv System und Zugriffsschlüssel
myUrl = 'https://fahrplaner.vbn.de/archive/services/archiveExportService/v'+str(api_version)+'?wsdl'
clientID = 'PMQmY5p9y8kmoTno'

# Einlesen der Linienliste / Zuordnung Bündel

Einlesen aus der lokalen DM Datenbank

In [12]:
#Zugriff auf die lokale Datenbank auf dem Wortmann Debian Server

try:
    engine = create_engine("postgresql+psycopg2://postgres:"+para.key_dm_db+"@127.0.0.1:5432/zvbn_postgis")
    #conn_dm = psycopg2.connect(database='zvbn_postgis', user='postgres', password=para.key_dm_db, host = '127.0.0.1')
    sql_lin = 'SELECT nummer AS linie, buendel, \'\' AS rt_operator, ebene, dlid, id \
        FROM basis.linien \
        WHERE buendel IS NOT NULL AND aktiv IS TRUE \
        ORDER BY buendel, ebene, nummer'
    sql_buendel = 'SELECT * FROM basis.lin_buendel'
    df_lin_dm =  pd.read_sql(text(sql_lin), engine.connect())
    df_buendel = pd.read_sql(text(sql_buendel), engine.connect())
    df_lin_dm.to_csv('input/linien_dm.csv', sep=';', index=False)
    print('Verbindung erfolgreich -lokale Datei aktualisiert')
except:
    df_lin_dm = pd.read_csv('input/linien_dm.csv', sep=';') #aktuelle Zuordnung Linie zu Bündel aus DM
    print(f'Verbindung nicht erfolgreich - Verwendung lokale Datei')

Verbindung erfolgreich -lokale Datei aktualisiert


In [13]:
#External Linids, die nicht mit de:VBN starten
#df_lin_dm.fillna('-').query('not (dlid.str.startswith("de:VBN") or dlid.str.startswith("-"))').dlid.sort_values()

#### Erstellen der xml Filterparameter (Test)

In [14]:
#Abfagen aller Daten für einen Tag über die Externallinid (de:VBN:* und Metronomlinien mit de:hvv:) de:VBN:*,de:hvv:RB33:,de:hvv:RB41:,de:hvv:RE4:
#lineExternalNamePattern Abfrage über DLID
xml_request_test = ('<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" '
               'xmlns:v'+str(api_version)+'="http://v'+str(api_version)+'.export.service.data.archive.itcs.hafas.hacon.de/">'
               '<soapenv:Header/><soapenv:Body><v'+str(api_version)+':createArchiveJob>'
               '<filter>'
                    '<clientId>' + clientID + '</clientId>'                    
                    '<startDate>' + gestern + '</startDate>'
                    '<endDate>' + gestern + '</endDate>'
                    '<lineShortNamePattern>760</lineShortNamePattern>'                
                    '<hasRealtime>ALL</hasRealtime>'
               '</filter>'
               '</v' + str(api_version) + ':createArchiveJob></soapenv:Body></soapenv:Envelope>')

# Abruf XML und Erstellen Dataframe

## Gesamt VBN

- Abfagen aller Daten für einen Tag über die Externallinid (de:VBN:* und Metronomlinien mit de:hvv:) de:VBN:*,de:hvv:RB33:,de:hvv:RB41:,de:hvv:RE4: und 910 aus Cloppenburg
- lineExternalNamePattern Abfrage über DLID

In [15]:
xml_request_dlid = ('<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" '
               'xmlns:v'+str(api_version)+'="http://v'+str(api_version)+'.export.service.data.archive.itcs.hafas.hacon.de/">'
               '<soapenv:Header/><soapenv:Body><v'+str(api_version)+':createArchiveJob>'
               '<filter>'
                    '<clientId>' + clientID + '</clientId>'                    
                    '<startDate>' + gestern + '</startDate>'
                    '<endDate>' + gestern + '</endDate>'
                    '<lineExternalNamePattern>de:VBN:*,de:hvv:RB33:,de:hvv:RB41:,de:hvv:RE4:,de:VBN-VGC:910:</lineExternalNamePattern>'                
                    '<hasRealtime>ALL</hasRealtime>'
               '</filter>'
               '</v' + str(api_version) + ':createArchiveJob></soapenv:Body></soapenv:Envelope>')

In [16]:
print_pretty_xml(xml_request_dlid)

<?xml version="1.0" ?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:v14="http://v14.export.service.data.archive.itcs.hafas.hacon.de/">
	<soapenv:Header/>
	<soapenv:Body>
		<v14:createArchiveJob>
			<filter>
				<clientId>PMQmY5p9y8kmoTno</clientId>
				<startDate>2023-12-03</startDate>
				<endDate>2023-12-03</endDate>
				<lineExternalNamePattern>de:VBN:*,de:hvv:RB33:,de:hvv:RB41:,de:hvv:RE4:</lineExternalNamePattern>
				<hasRealtime>ALL</hasRealtime>
			</filter>
		</v14:createArchiveJob>
	</soapenv:Body>
</soapenv:Envelope>



In [17]:
xml_path = 'api_xml'
xml_out = 'rt_archiv_'+gestern+'_vbn.xml'

request_xml(14, xml_request_dlid, xml_path=xml_path, xml_out=xml_out)
df_rt_vbn = import_rt_xml_to_df(os.path.join(xml_path, xml_out))

xml_to_targz(xml_path=xml_path, xml_file=xml_out)

2023-12-04 08:21:51.085009 Status: IN_PROCESS
2023-12-04 08:22:11.255744 Status: IN_PROCESS
2023-12-04 08:22:31.450248 Status: IN_PROCESS
2023-12-04 08:22:51.633984 Status: IN_PROCESS
2023-12-04 08:23:11.804057 Status: IN_PROCESS
2023-12-04 08:23:31.995925 Status: IN_PROCESS
2023-12-04 08:23:52.202246 Status: IN_PROCESS
2023-12-04 08:24:12.381432 Status: IN_PROCESS
2023-12-04 08:24:32.571463 Status: IN_PROCESS
2023-12-04 08:24:52.737320 Status: IN_PROCESS
2023-12-04 08:25:12.931725 Status: IN_PROCESS
2023-12-04 08:25:33.111809 Status: IN_PROCESS
2023-12-04 08:25:53.297926 Status: IN_PROCESS
2023-12-04 08:26:14.482230 Status: IN_PROCESS
2023-12-04 08:26:34.673222 Status: IN_PROCESS
2023-12-04 08:26:54.832146 Status: IN_PROCESS
2023-12-04 08:27:14.997640 Status: IN_PROCESS
2023-12-04 08:27:35.170465 Status: COMPLETED
api_xml/rt_archiv_2023-12-03_vbn.xml gespeichert


## Für Zusatzfahrten

- Abfragen aller Daten über die RTTypes
    - REALTIME_EXTRA und weitere
    - DEVIATION_OF_SCHEDULED
    - etc.

In [18]:

xml_request_zusatz_umleitung = ('<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" '
               'xmlns:v'+str(api_version)+'="http://v'+str(api_version)+'.export.service.data.archive.itcs.hafas.hacon.de/">'
               '<soapenv:Header/><soapenv:Body><v'+str(api_version)+':createArchiveJob>'
               '<filter>'
                    '<clientId>' + clientID + '</clientId>'                    
                    '<startDate>' + gestern + '</startDate>'
                    '<endDate>' + gestern + '</endDate>'
                    '<filterJourneyRtTypeList>REALTIME_EXTRA</filterJourneyRtTypeList>'                
                    '<filterJourneyRtTypeList>REALTIME_EXTRA_REPLACEMENT</filterJourneyRtTypeList>'                
                    '<filterJourneyRtTypeList>REALTIME_EXTRA_REPORTED</filterJourneyRtTypeList>'                
                    '<filterJourneyRtTypeList>REALTIME_EXTRA_MAINTENANCE</filterJourneyRtTypeList>'                
                    '<filterJourneyRtTypeList>DEVIATION_OF_SCHEDULED</filterJourneyRtTypeList>'                
                    '<filterJourneyRtTypeList>DEVIATION_OF_REALTIME_EXTRA</filterJourneyRtTypeList>'                
                    '<filterJourneyRtTypeList>DEVIATION_OF_REPLACEMENT</filterJourneyRtTypeList>'                
                    '<filterJourneyRtTypeList>SUPPLEMENTARY</filterJourneyRtTypeList>'                
                    '<filterJourneyRtTypeList>UNKNOWN</filterJourneyRtTypeList>'                
                    '<hasRealtime>ALL</hasRealtime>'
               '</filter>'
               '</v' + str(api_version) + ':createArchiveJob></soapenv:Body></soapenv:Envelope>')

In [19]:
print_pretty_xml(xml_request_zusatz_umleitung)

<?xml version="1.0" ?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:v14="http://v14.export.service.data.archive.itcs.hafas.hacon.de/">
	<soapenv:Header/>
	<soapenv:Body>
		<v14:createArchiveJob>
			<filter>
				<clientId>PMQmY5p9y8kmoTno</clientId>
				<startDate>2023-12-03</startDate>
				<endDate>2023-12-03</endDate>
				<filterJourneyRtTypeList>REALTIME_EXTRA</filterJourneyRtTypeList>
				<filterJourneyRtTypeList>REALTIME_EXTRA_REPLACEMENT</filterJourneyRtTypeList>
				<filterJourneyRtTypeList>REALTIME_EXTRA_REPORTED</filterJourneyRtTypeList>
				<filterJourneyRtTypeList>REALTIME_EXTRA_MAINTENANCE</filterJourneyRtTypeList>
				<filterJourneyRtTypeList>DEVIATION_OF_SCHEDULED</filterJourneyRtTypeList>
				<filterJourneyRtTypeList>DEVIATION_OF_REALTIME_EXTRA</filterJourneyRtTypeList>
				<filterJourneyRtTypeList>DEVIATION_OF_REPLACEMENT</filterJourneyRtTypeList>
				<filterJourneyRtTypeList>SUPPLEMENTARY</filterJourneyRtTypeList>
				<filterJourneyR

In [20]:
xml_path = 'api_xml'
xml_out = 'rt_archiv_'+gestern+'_zusatz.xml'

request_xml(14, xml_request_zusatz_umleitung, xml_path=xml_path, xml_out=xml_out)
df_rt_zusatz = import_rt_xml_to_df(os.path.join(xml_path, xml_out))

xml_to_targz(xml_path=xml_path, xml_file=xml_out)

2023-12-04 08:30:20.222377 Status: IN_PROCESS
2023-12-04 08:30:40.390224 Status: IN_PROCESS
2023-12-04 08:31:00.604697 Status: IN_PROCESS
2023-12-04 08:31:20.834629 Status: IN_PROCESS
2023-12-04 08:31:41.024925 Status: IN_PROCESS
2023-12-04 08:32:01.251235 Status: IN_PROCESS
2023-12-04 08:32:21.443627 Status: IN_PROCESS
2023-12-04 08:32:41.619743 Status: IN_PROCESS
2023-12-04 08:33:01.797297 Status: IN_PROCESS
2023-12-04 08:33:22.978963 Status: IN_PROCESS
2023-12-04 08:33:43.182406 Status: IN_PROCESS
2023-12-04 08:34:03.367093 Status: IN_PROCESS
2023-12-04 08:34:23.546239 Status: IN_PROCESS
2023-12-04 08:34:43.728836 Status: IN_PROCESS
2023-12-04 08:35:03.889991 Status: COMPLETED
api_xml/rt_archiv_2023-12-03_zusatz.xml gespeichert


In [24]:
df_rt_zusatz.clientid.drop_duplicates()

0       HOCHBAHN-IDS
4                 ue
7                vwg
18               vgb
51               vos
59                DB
64              BSAG
78          regiobus
88             bruns
90            vrr_nx
101          vrr_nwb
116             DBRB
288             bsvg
290               me
539                -
608            !ADD!
762            goevb
889         eurobahn
1201          NWB-LS
1310       kvg-stade
1376           rebus
1560             RBB
1562       IVU-Regio
2182             vej
3305         vrahden
Name: clientid, dtype: object

In [22]:
df_rt_zusatz.journeyrttype.value_counts()

journeyrttype
DEVIATION_OF_SCHEDULED         4688
REALTIME_EXTRA                 1620
REALTIME_EXTRA_REPORTED         332
DEVIATION_OF_REALTIME_EXTRA       9
Name: count, dtype: int64

In [31]:
#xlsx = 'rt_' + gestern + '.xlsx' #lokal für Test
xlsx = '/var/www/rt2/rt_' + gestern + '.xlsx'
sn01 = '01 extra_deviation'
sn02 = '02 stat_extra_deviation'
sn03 = '03 alle_fahrten_vbn'
sn04 = '04 ausfall vbn'
with pd.ExcelWriter(xlsx, engine='xlsxwriter') as writer:
    #Zusatzfahrten
    liste_ausschluss_betreiber = ['ue', 'HOCHBAHN-IDS', 'bsvg', 'vos', 'regiobus']
    df_rt_zusatz[~(df_rt_zusatz['clientid'].isin(liste_ausschluss_betreiber))].sort_values(['clientid', 'lineid']).to_excel(excel_writer=writer, sheet_name=sn01, index=False)
    writer.sheets[sn01].autofilter('A1:O{}'.format(df_rt_zusatz.shape[0]))
    writer.sheets[sn01].freeze_panes(1,0)
    writer.sheets[sn01].set_column('E:G', 20)
    writer.sheets[sn01].set_column('C:C', 20)
    writer.sheets[sn01].set_column('A:A', 20)
    writer.sheets[sn01].set_column('M:M', 25)
    writer.sheets[sn01].set_column('O:O', 25)
    # Statistik Abweichungen
    df_stat_extra_dev = df_rt_zusatz[['clientid', 'journeyrttype']].value_counts().reset_index()
    df_stat_extra_dev.to_excel(excel_writer=writer, sheet_name=sn02, index=False)
    writer.sheets[sn02].freeze_panes(1,0)
    writer.sheets[sn02].set_column('A:C', 20)
    writer.sheets[sn02].autofilter('A1:O{}'.format(df_stat_extra_dev.shape[0]))
    writer.sheets[sn02].set_column('A:B', 25)
    #Alle Fahrten im VBN (de:VBN:*)
    df_rt_vbn.sort_values(['vu', 'lineid', 'fnr']).to_excel(excel_writer=writer, sheet_name=sn03, index=False)
    writer.sheets[sn03].autofilter('A1:O{}'.format(df_rt_vbn.shape[0]))
    writer.sheets[sn03].freeze_panes(1,0)
    writer.sheets[sn03].set_column('E:G', 20)
    writer.sheets[sn03].set_column('M:M', 25)
    writer.sheets[sn03].set_column('O:O', 20)
    #Statistik Ausfälle im VBN
    df_ausfall_vbn = df_rt_vbn[['vu', 'cancelled_kum']].groupby(by='vu', group_keys=True).agg( anzahl=pd.NamedAgg(column="cancelled_kum", aggfunc="count"), \
                                                                                          ausfall=pd.NamedAgg(column="cancelled_kum", aggfunc="sum")).reset_index()
    df_ausfall_vbn['anteil_ausfall_proz'] = df_ausfall_vbn.apply(lambda x: int(x.ausfall / x.anzahl * 1000)/10, axis = 1)
    df_ausfall_vbn.to_excel(excel_writer=writer, sheet_name=sn04, index=False)  
    writer.sheets[sn04].set_column('A:A', 30)
    writer.sheets[sn04].set_column('B:D', 22)
    writer.sheets[sn04].freeze_panes(1,0)

# Schreiben der Tabellen nach Duckdb

Initiales Anlegen der DB

In [24]:
with duckdb.connect('db/rt.db') as con:
    con.install_extension("spatial")
    con.load_extension("spatial")
    con.sql("CREATE or replace TABLE fahrten_"+gestern.replace('-', '_')+" AS SELECT * FROM df_rt_vbn")
    con.sql("CREATE or replace TABLE zusatz_"+gestern.replace('-', '_')+" AS SELECT * FROM df_rt_zusatz")
    con.sql("CREATE OR REPLACE TABLE lin_dm AS SELECT * FROM df_lin_dm")


Importieren / Parsen aus xml und Anfügen von Datensätzen
https://duckdb.org/docs/api/python/data_ingestion

In [1]:
xml_import = 'rt_archiv_2023-11-21.xml'
xml_import_path = 'api_xml'

tar_gz = xml_import + '.tar.gz'

if os.path.exists(os.path.join(xml_import_path, tar_gz)):
    with tarfile.open(os.path.join(xml_import_path, tar_gz), 'r:gz') as tar:
        # Extract all files to the specified directory    
        tar.extractall(xml_import_path)
else:
    print('no tar.gz')

df_rt_app = import_rt_xml_to_df(os.path.join(xml_import_path, xml_import))

with duckdb.connect('db/rt.db') as con:
    con.install_extension("spatial")
    con.load_extension("spatial")
    con.sql("INSERT INTO fahrten SELECT * FROM df_rt_app")
    print(con.sql('select datum, count(datum) fahrten_tag from fahrten group by datum order by datum;'))


with tarfile.open(os.path.join(xml_import_path, tar_gz), 'w:gz') as archive:
    # Add files to the tarball
    archive.add(os.path.join(xml_import_path, xml_import), arcname= xml_import)
                
os.remove(os.path.join(xml_import_path, xml_import))

NameError: name 'os' is not defined

In [None]:
with duckdb.connect('db/rt.db') as con:
    print(con.sql('select datum, count(datum) fartehn_tag from fahrten group by datum order by datum;'))
    

In [None]:
# with duckdb.connect('db/rt.db') as con:
#    con.sql("delete from fahrten where datum >= '2023-11-11'")

In [None]:
with duckdb.connect('db/rt.db') as con:
    print(con.sql('show tables;'))
    print(con.sql('describe fahrten;'))

In [None]:
with duckdb.connect('db/rt.db') as con:
    con.install_extension("spatial")
    con.load_extension("spatial")
    df_false_rt = con.sql("SELECT * FROM fahrten WHERE hasRealtime = False ORDER BY lineid").df()
    df_rt = con.sql("SELECT * FROM fahrten").df()
    df_fahrten_buendel = con.sql("SELECT * FROM fahrten LEFT JOIN lin_dm ON fahrten.lineid_short = lin_dm.dlid ORDER BY lineid").df()

In [None]:
lin = df_rt.lineid_short.drop_duplicates().sort_values()
lin

Ermitteln der ausgefallenen Fahrten

In [None]:
df_rt.query('reported_cancelled == True and journey_cancelled == "False" and datum == "2023-11-21"')

In [None]:
df_rt[['datum', 'cancelled_kum', 'fnr']].groupby('datum').agg({'fnr': 'count', 'cancelled_kum':'sum'})

In [None]:
df_rt[['datum', 'vu','cancelled_kum', 'fnr']].groupby(['datum', 'vu']).agg({'fnr': 'count', 'cancelled_kum':'sum'}).reset_index().to_excel('reports/ausfall.xlsx', index=False)

In [None]:
df_rt[df_rt.cancelled_kum == True][['datum','vu' ,'cancelled_kum', 'fnr']].groupby(['datum', 'vu']).count()

Ausgabe der Werte im Fahrtverlauf (noch aus der alten Version)

In [None]:
format = '%Y-%m-%dT%H:%M:%S'
log_verlauf_arr = []

root = ET.fromstring(r.text)
for child in root.iter('archiveExportJourneyAndDetailsDto'):
    for journey in child.iter('journey'):
        rt = rt_func.isnone(journey.find('hasRealtime'))
        deviceid = rt_func.isnone(journey.find('deviceId'))
        fnr = rt_func.isnone(journey.find('journeyID'))
        lineshortname = str(rt_func.isnone(journey.find('lineShortName'))).strip()
        ex_lineid = rt_func.isnone(journey.find('externalLineId'))
        journeyOperator = rt_func.isnone(journey.find('journeyOperator'))
        operday = dt.datetime.strptime(rt_func.isnone(journey.find('operatingDay'))[:-6], format).strftime('%Y-%m-%d')
        ts_reported_cancelled = rt_func.isnone(journey.find('lastTimestampJourneyCancellationReported'))
        reported_cancelled = True if len(ts_reported_cancelled) > 0 else False

    for details in child.iter('details'):
        index = rt_func.isnone(details.find('index'))
        for ddelay in details.iter('departureDelay'):
            dep_del = rt_func.isnone_delay(ddelay.find('delay'))

        for adelay in details.iter('arrivalDelay'):
            arr_del = rt_func.isnone_delay(adelay.find('delay'))
        
        canc = rt_func.isnone(details.find('cancelled'))
        
        additional =  rt_func.isnone(details.find('additional'))

        for station in details.iter('station'):
            lat = int(station.find('latitude').text)/1000000
            lon = int(station.find('longitude').text)/1000000
            snr = station.find('stationExternalNumber').text
            if station.find('stationName') is not None:
                sname = station.find('stationName').text
            else:
                sname = '-'
        
        for dschedule in details.iter('scheduleDepartureTime'):
            dschedtime= dschedule.find('scheduleTime')
            if dschedtime is not None:
                dschedtime = dt.datetime.strptime(dschedtime.text[:-6], format).strftime('%Y%m%d%H%M%S') #Umwandlung der Zeitformat da in 3.6 kein ISO-Format vorhanden
            else:
                dschedtime =''
        for aschedule in details.iter('scheduleArrivalTime'):
            aschedtime = aschedule.find('scheduleTime')
            if aschedtime is not None:
                aschedtime = dt.datetime.strptime(aschedtime.text[:-6], format).strftime('%Y%m%d%H%M%S')
            else: 
                aschedtime =''
        
        # Schreiben in die Datei log_verlauf
        #print(operday, journeyOperator, deviceid, lineshortname, ex_lineid, fnr, index, rt, dschedtime, aschedtime, \
        #      dep_del, arr_del, snr, sname, lat, lon, canc, additional, reported_cancelled,ts_reported_cancelled,file=log_verlauf, sep=';')
        log_verlauf_arr.append([operday, journeyOperator, deviceid, lineshortname, ex_lineid, 
                                fnr, index, rt, dschedtime, aschedtime, dep_del, arr_del, snr, sname, lat, lon, canc, additional, ts_reported_cancelled, reported_cancelled])

In [None]:
log_sort = pd.DataFrame(log_verlauf_arr, columns= ['tag', 'journeyOperator', 'deviceid', 'linie', 'externallinid', 'fahrtnummer', 'index', 'realtime', 'ab_soll', 
                                  'an_soll', 'ab_delay', 'an_delay', 'stationnumber', 'stationname', 'lat', 'lon', 'cancelled', 'additional', 'ts_reported_cancelled', 'reported_cancelled'])

log_sort['index'] = log_sort['index'].astype(np.int8)

Rausfiltern der Fahrten ohne Fahrtnummer (aufgetreten 9/2023)

In [None]:
log_sort = log_sort[~(log_sort.fahrtnummer == '')]

In [None]:
log_sort['fahrtnummer'] = log_sort['fahrtnummer'].astype(np.int32)
log_sort['stationnumber'] = log_sort['stationnumber'].astype(np.int32)
log_sort['linie'] = log_sort['linie'].astype(str)
log_sort['stationname'] = log_sort['stationname'].astype(str)

In [None]:
#Export als Parquet
#log_sort.astype({'linie':'string'}).to_parquet('log/verlauf.parquet')

In [None]:
log_sort['ab_soll'] = log_sort.apply(lambda x: rt_func.format_zeit(x['ab_soll']), axis=1)
log_sort['an_soll'] = log_sort.apply(lambda x: rt_func.format_zeit(x['an_soll']), axis=1)

### Ausgabe der Werte je Fahrt

In [None]:
fahrt_array = []
root = ET.fromstring(r.text)
for child in root.iter('archiveExportJourneyAndDetailsDto'):
    for journey in child.iter('journey'):
        rt = rt_func.isnone(journey.find('hasRealtime'))
        deviceid = rt_func.isnone(journey.find('deviceId'))
        fnr = rt_func.isnone(journey.find('journeyID'))
        lineshortname = rt_func.isnone(journey.find('lineShortName'))
        ex_lineid = rt_func.isnone(journey.find('externalLineId'))
        dlid_pre = ex_lineid.split(':')[0:3]
        dlid = ':'.join(dlid_pre)
        dest = rt_func.isnone(journey.find('destination'))
        operday = dt.datetime.strptime(rt_func.isnone(journey.find('operatingDay'))[:-6], format).strftime('%Y-%m-%d')
        op = rt_func.isnone(journey.find('journeyOperator'))
        cancelled = rt_func.isnone(journey.find('journeyCancelled'))
        lastupdate = rt_func.isnone(journey.find('lastJourneyDetailsUpdateTimestamp'))
        vehicletypeschedule = rt_func.isnone(journey.find('vehicleTypeSchedule'))
        vehicletyperealtime = rt_func.isnone(journey.find('vehicleTypeRealtime'))
        ts_reported_cancelled = rt_func.isnone(journey.find('lastTimestampJourneyCancellationReported'))
        reported_cancelled = True if len(ts_reported_cancelled) > 0 else False
        
        journeyRtType = rt_func.isnone(journey.find('journeyRtType'))
        try:
            lastupdate = dt.datetime.fromtimestamp(int(lastupdate)/1000).strftime('%Y-%m-%d %H:%M:%S')
        except:
            continue
        distance = rt_func.isnone(journey.find('distanceSchedule'))
        
        if journey.find('maximumDelay'):
            #print(fnr, day)
            for maxdelay in journey.iter('maximumDelay'):
                #print(maxdelay.find('delay'))
                maxd = rt_func.isnone(maxdelay.find('delay'))
        else:
            maxd=''
        
        if journey.find('departureDelay'):
            #print(fnr, day)
            for depdelay in journey.iter('departureDelay'):
                ddel = rt_func.isnone(depdelay.find('delay'))
        else:
            ddel=''
        
        if journey.find('arrivalDelay'):
            #print(fnr, day)
            for arrdelay in journey.iter('arrivalDelay'):
                adel = rt_func.isnone(arrdelay.find('delay'))
        else:
            adel=''
        
        if journey.find('minimumDelay'):
            #print(fnr, day)
            for mindelay in journey.iter('minimumDelay'):
                mind = rt_func.isnone(mindelay.find('delay'))
        else:
            mind=''
        
        if journey.find('realArrivalTime'):
            #print(fnr, day)
            for realarrival in journey.iter('realArrivalTime'):
                rat = dt.datetime.strptime(rt_func.isnone(realarrival.find('realTime'))[:-6], format).strftime('%Y-%m-%d %H:%M:%S')
        else:
            rat=''        
        
        if journey.find('realDepartureTime'):
            for realdeparture in journey.iter('realDepartureTime'):
                rdt = dt.datetime.strptime(rt_func.isnone(realdeparture.find('realTime'))[:-6], format).strftime('%Y-%m-%d %H:%M:%S')                
        else:
            rdt=''

        if journey.find('scheduleDepartureTime'):
            for realdeparture in journey.iter('scheduleDepartureTime'):
                sdt = dt.datetime.strptime(rt_func.isnone(realdeparture.find('scheduleTime'))[:-6], format).strftime('%Y-%m-%d %H:%M:%S')                
        else:
            sdt=''
            
        if journey.find('scheduleArrivalTime'):
            for realdeparture in journey.iter('scheduleArrivalTime'):
                sat = dt.datetime.strptime(rt_func.isnone(realdeparture.find('scheduleTime'))[:-6], format)\
                .strftime('%Y-%m-%d %H:%M:%S')                
        else:
            sat=''       
        
        if journey.find('scheduleDepartureStation'):
            for dep_station in journey.iter('scheduleDepartureStation'):
                sdst = rt_func.isnone(dep_station.find('stationName'))               
        else:
            sdst=''
        
        if journey.find('scheduleArrivalStation'):
            for arr_station in journey.iter('scheduleArrivalStation'):
                sast = rt_func.isnone(arr_station.find('stationName'))               
        else:
            sast=''
        fahrt_array.append([lineshortname, ex_lineid, operday, fnr, rt, op, dest, mind, maxd,  cancelled, lastupdate, \
              distance, sdt, sat, rdt, rat, dlid,sdst, sast,ddel, adel, deviceid, journeyRtType, vehicletypeschedule, vehicletyperealtime, \
                reported_cancelled, ts_reported_cancelled])