In [198]:
# First define the following parameters in this cell
# Then run the entire notebook
import pm4py
import pandas as pd
from os import listdir
from os.path import isfile, join
import xmltodict
import pprint
import logging

## folder name where a list of xml files are stored
## these will be concatenated to one event log file for the given process type

## filename of xml input
#folderPath = "./OriginalData/Berlin"
#folderPath = "./OriginalData/Baden-Württemberg/"
folderPath = "./OriginalData/Brandenburg"

files = [join(folderPath, f) for f in listdir(folderPath) if isfile(join(folderPath, f))]

#print(files)

## output filename
#outputFilename = './all-data-xes/berlin-all' + '.xes'
#outputFilename = "./all-data-xes/baden-württemberg-all.xes"
outputFilename = "./all-data-xes/brandenburg-all.xes"

outputFilenamePreprocessed = outputFilename.replace(".xes", "-preprocessed.xes")

FIRST_VALID_YEAR = 1984
LAST_VALID_YEAR = 2024

MAX_YEARS_DURATION = 5
MISSING_ACTIVITY_REPLACEMENT = "no DokTypL"
MISSING_DATE_REPLACEMENT = pd.Timestamp('1970-01-01')

In [199]:
# read data into dict
pp = pprint.PrettyPrinter(depth=4)
# Set up logging
logging.basicConfig(filename='xml_parsing_errors.log', level=logging.ERROR)


processes = []
for file in files:
    try:
        with open(file, encoding='utf-8', mode='r') as xml_file:
            data = xmltodict.parse(xml_file.read())
            dataVorgaenge = data["Export"]["Vorgang"] # this has the actual data
            processes.extend(dataVorgaenge) # append all processes to the list
    except xmltodict.expat.ExpatError as e:
        logging.error(f"Error parsing file {file}: {e}")
        print(f"Error parsing file {file}: {e}")
    except Exception as e:
        logging.error(f"Unexpected error with file {file}: {e}")
        print(f"Unexpected error with file {file}: {e}")

In [200]:
# get all processes into a dataframe
processesDF = pd.DataFrame.from_dict(processes)
# drop all rows that do not have a value for Dokument
# todo: check out what these rows are!
processesDF = processesDF.dropna(subset=['Dokument'])

print(processesDF.keys())
print("\n")

groupedByType = processesDF.groupby(['VTyp'])
print(groupedByType['VTyp'].count())

Index(['VNr', 'VFunktion', 'ReihNr', 'VTyp', 'VTypL', 'VSys', 'VSysL', 'VIR',
       'Nebeneintrag', 'Dokument'],
      dtype='object')


VTyp
Anfrage                34831
Antrag                  4088
Bericht                 1685
Beschlussempfehlung       73
Debatte                19574
Gesetz                  1853
Sonstiges               6674
Vorschrift                33
Wahl                     781
Name: VTyp, dtype: int64


In [201]:
docsOfVorgaenge = processesDF["Dokument"]
vSysLOfVorgange = processesDF["VSysL"]
vSysOfVorgange = processesDF["VSys"]
vorgangNebeneintraege = processesDF["Nebeneintrag"]
vTypes = processesDF["VTyp"]
vTypesL = processesDF["VTypL"]

# for a Vorgang there are several Deskriptoren stored in several Nebeneintraege
# -> get that information per Vorgang
vorgangDeskriptoren = []
for nebeneintraege in vorgangNebeneintraege:
    if type(nebeneintraege) is not list:
        if nebeneintraege is not dict:
            deskriptoren = []
        else:
            deskriptoren = [nebeneintraege.get("Desk", None)]
    else:
        deskriptoren = [obj.get("Desk", None) for obj in nebeneintraege]

    vorgangDeskriptoren.append([x for x in deskriptoren if x is not None])

allDocs = []

# add trace id to each document
# so that the documents can then be single events that belong to a specific trace in an event log

for idx, vorgang in enumerate(docsOfVorgaenge):
    helperX = vorgang
    if type(vorgang) is dict:
        helperX = [vorgang]
        
    dokTypLOfFirstDoc = None
    for i, doc in enumerate(helperX):
        # add case id column to each document
        # call it case:concept:name as this is the name for pm4py transformation into XES format
        doc['case:concept:name'] = idx
        
        # if it is the first document in a Vorgang, then set the dokTypLOfFirstDoc - this could also be used to see what type of process a Vorgang is
        if (i == 0):
            dokTypLOfFirstDoc = doc.get("DokTypL", "none")
        
        doc['case:DokTypLFirstDoc'] = dokTypLOfFirstDoc

        doc['case:VSys'] = vSysOfVorgange.iloc[idx]
        doc['case:VSysL'] = vSysLOfVorgange.iloc[idx]
        doc['case:VorgangsDeskriptoren'] = vorgangDeskriptoren[idx]
        doc["case:VTyp"] = vTypes.iloc[idx]
        doc["case:VTypL"] = vTypesL.iloc[idx]
    
        # write none into DokTypL if it is not defined, so that it will be written to the event log
        # if there is no valid string, then it would not be included in the event log
        if ('DokTypL' not in doc.keys()):
            doc['DokTypL'] = MISSING_ACTIVITY_REPLACEMENT
        else:
            if doc['DokTypL'] is None:
                doc['DokTypL'] = MISSING_ACTIVITY_REPLACEMENT

        # if there is no value for a key, replace it with "none"
        # or if a list value has none values
        for key, value in doc.items():
            if value is None: 
                doc.update({key: "none"})
            if type(value) is list:
                doc.update({key: [x if x is not None else "none" for x in value]})
        
        # for grouping reasons also sort all other entries that can be sorted
        doc_sortedEntries = {key: sorted(value) if type(value) is list else value for key, value in doc.items()}
        
        # now add this doc as a row 
        allDocs.append(doc)



# turn docs into data frame, so each row is a document now
df = pd.DataFrame(allDocs)

In [202]:
print("number of cases overall:", len(df["case:concept:name"].unique()))

# turn date string into date time object
df["DokDat_original"] = df["DokDat"].copy().astype(str)

# check how many traces have a missing date
print("number of cases with missing date:", len(df[df["DokDat"].isna()]["case:concept:name"].unique()))

df["DokDat"] = pd.to_datetime(df['DokDat'], format='%d.%m.%Y', errors='coerce')

# now check how many traces have a missing date including the dates that are invalid due to out of range or wrong format
print("number of cases with missing or invalid date:", len(df[df["DokDat"].isna()]["case:concept:name"].unique()))

# fill missing or invalid dates with a specific default date that is clearly out of scope
df["DokDat"].fillna(MISSING_DATE_REPLACEMENT, inplace=True)

print("number of cases with missing activity name:", len(df[df["DokTypL"] == MISSING_ACTIVITY_REPLACEMENT]["case:concept:name"].unique()))
print("case ids:", df[df["DokTypL"] == MISSING_ACTIVITY_REPLACEMENT]["case:concept:name"].unique())


number of cases overall: 69740
number of cases with missing date: 60
number of cases with missing or invalid date: 61
number of cases with missing activity name: 8128
case ids: [    0     1     2 ... 69737 69738 69739]


In [203]:
# use pm4py to create XES file
event_log = pm4py.format_dataframe(df, case_id='case:concept:name', activity_key='DokTypL', timestamp_key='DokDat')
pm4py.write_xes(event_log, outputFilename)

exporting log, completed traces ::   0%|          | 0/69740 [00:00<?, ?it/s]

### Pre-Processing
Removing traces with missing dates or invalid dates, and cycle time that is too high (also points to invalid dates) - also remove traces with missing activity names

In [204]:
# get all traces with documents with no date and all traces with documents with invalid dates
caseIdsInvalidDate = df[(
    ((df["DokDat"].dt.year < FIRST_VALID_YEAR) | (df["DokDat"].dt.year > LAST_VALID_YEAR))
                        )]["case:concept:name"].unique()

print("cases with an invalid date including out of scope for this data set specifically:", len(caseIdsInvalidDate))
#print("trace dates of invalid case:\n", df[df["case:concept:name"].isin(caseIdsInvalidDate)]["DokDat"])

cases with an invalid date including out of scope for this data set specifically: 61


In [205]:
# get all traces with missing activity name
caseIdsNoActivityName = df[df['DokTypL'] == MISSING_ACTIVITY_REPLACEMENT]["case:concept:name"].unique()
print("no. of cases with a missing activityName before replacement with DokArtL:", len(caseIdsNoActivityName))

# Filter the dataframe to get rows with case IDs in caseIdsNoActivityName
df_no_activity_name = df[df['DokTypL'] == MISSING_ACTIVITY_REPLACEMENT]

# Get counts of the existing DokArtL values
dokArtL_counts = df_no_activity_name["DokArtL"].value_counts()
print(dokArtL_counts)
# this does not count NaN - the rest of the values are NaN




# Replace 'concept:name' with 'DokArtL' if 'concept:name' is MISSING_ACTIVITY_REPLACEMENT and 'DokArtL' is not "Drucksache"
df.loc[(df['DokTypL'] == MISSING_ACTIVITY_REPLACEMENT) & (df['DokArtL']) & (df['DokArtL'] != "Drucksache"), 'DokTypL'] = df['DokArtL']


caseIdsNoActivityName = df[df['DokTypL'] == MISSING_ACTIVITY_REPLACEMENT]["case:concept:name"].unique()
print("no. of cases with a missing activityName after replacement with DokArtL:", len(caseIdsNoActivityName))


allCaseIdsToRemove = list(set(list(caseIdsInvalidDate) + list(caseIdsNoActivityName)))
print("number of cases to remove due to missing/invalid date and missing activity name:", len(allCaseIdsToRemove))

# remove traces with at least one event with no activity name
df_preProcessed = df[~df["case:concept:name"].isin(caseIdsInvalidDate)]
df_preProcessed = df_preProcessed[~df_preProcessed["case:concept:name"].isin(caseIdsNoActivityName)]


print("number of cases after preprocessing:", len(df_preProcessed["case:concept:name"].unique()))

no. of cases with a missing activityName before replacement with DokArtL: 8128
DokArtL
Unterrichtung                           3104
Information                             1799
Plenarprotokoll                         1250
Frühwarndokument                        1209
Zuschrift                                895
Gutachten                                245
Gesetz- und Verordnungsblatt             105
Ausschussprotokoll                        82
Übersicht                                 23
Drucksache                                20
Informationen zu Rechtsentwicklungen      17
Name: count, dtype: int64
no. of cases with a missing activityName after replacement with DokArtL: 78
number of cases to remove due to missing/invalid date and missing activity name: 83
number of cases after preprocessing: 69657


In [206]:
event_log_preProcessed = pm4py.format_dataframe(df_preProcessed, case_id='case:concept:name', activity_key='DokTypL', timestamp_key='DokDat')

# remove traces with unrealistically high duration
event_log_removed_high_duration = pm4py.filtering.filter_case_performance(event_log_preProcessed, min_performance=-1, max_performance=365*24*60*60 * MAX_YEARS_DURATION)
print(f"Rows removed due to exceeding cycle time limit: {len(event_log_preProcessed) - len(event_log_removed_high_duration)}")
unique_cases_removed = len(event_log_preProcessed['case:concept:name'].unique()) - len(event_log_removed_high_duration['case:concept:name'].unique())
print(f"Unique cases removed due to exceeding cycle time limit: {unique_cases_removed}")

Rows removed due to exceeding cycle time limit: 0
Unique cases removed due to exceeding cycle time limit: 0


In [207]:
# use pm4py to create XES file
pm4py.write_xes(event_log_removed_high_duration, outputFilenamePreprocessed)

exporting log, completed traces ::   0%|          | 0/69657 [00:00<?, ?it/s]