**Installing unavailable requirements**

In [1]:
pip install lxml

Note: you may need to restart the kernel to use updated packages.


**Importing Libraries**

In [2]:
#for databricks
import pandas as pd
import xml.etree.ElementTree as ET
import os
from os import listdir
from os.path import isfile, join
import re
import zipfile
import pyspark
from pyspark.sql import SparkSession


**Spark Session**


In [3]:
from delta import *

#creating a builder that loads jar files for delta frame work
builder = pyspark.sql.SparkSession.builder.appName("data-gurus") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.6.1")

spark = configure_spark_with_delta_pip(builder).getOrCreate()


#old code to create session
# #Create SparkSession
# spark = SparkSession.builder.master("local[1]").appName("data-gurus").getOrCreate()
# print(spark)

In [4]:
#pyspark version
print('PySpark Version :'+spark.sparkContext.version)

PySpark Version :3.3.1


**Functions:**

**Masking Function**

In [5]:
# give path of dataset and file name without the number
def files_mask(dataset_path, filename):
    '''
    
    :param dataset_path: dataset path of zip folder
    :param filename: file name without extension type
    :return: matching list with the same kind of files
    '''
    # get all files names in the datasetfolder

    zf = zipfile.ZipFile(dataset_path, "r")
    all_files = []
    for file in zf.namelist():
        if file.endswith(".xml"):
            all_files.append(file)

    regex_pattern = ".*" + filename + ".*"

    match_list = []
    for i in range(0, len(all_files)):
        match = re.findall(regex_pattern, all_files[i])
        if not match:
            # if list is empty
            pass
        else:
            match_list.append(match[0])

    # print("Required Files in the Dataset: ", match_list)
    return match_list

**XML Data to Pandas Dataframe Conversion Function**

In [6]:
def xml_to_dataframe_converter(dataset_path, xml_file):
    '''

    :param dataset_path:zip folder path 
    :param xml_file: full file name with the extension
    :return: pandas dataframe
    '''

    zf = zipfile.ZipFile(dataset_path, "r")

    if xml_file in zf.namelist():
        # print("xml_file > ",xml_file)
        xml_file_open = zf.open(xml_file)
        xml_file = xml_file_open.read()
        # print("xml_file_open > ",xml_file_open)
        
        df = pd.read_xml(xml_file, parser="lxml", encoding= "utf-16")

    #print(df.info())
    return df

**XML Parser Function, i.e Main Function**

In [7]:
#to supress warnings by notebook
import warnings
warnings.filterwarnings('ignore') 

#this is the main function that runs everything
def xml_parser():
    
    #dataset path
    dataset_path = "/home/jovyan/work/DATA.zip"

    #file_names = ['AnlagenEegSolar', 'AnlagenEegSpeicher', 'AnlagenStromSpeicher', 'EinheitenStromSpeicher', 'EinheitenSolar']
    file_names = ['EinheitenStromSpeicher']

    for i in range(0, len(file_names)):
    
        file_name = file_names[i]
        
        #drops databricks table
        spark.sql(f"drop table if exists marktstammdaten_{file_name.lower()}")

        print("Current File Type : ", file_name)

        xml_files_list = files_mask(dataset_path, file_name)
        print("XML FILES LIST: ", xml_files_list)
        
        for xml_file in xml_files_list:

            print(f"Processing : {xml_file}")
            data = xml_to_dataframe_converter(dataset_path,xml_file)
            
            #problem fields
            #converting them to strings while they are in pandas dataframe
            if (file_name == "EinheitenSolar"):
                data.Hausnummer = data.Hausnummer.astype(str)
                data.NameStromerzeugungseinheit = data.NameStromerzeugungseinheit.astype(str)
                data.Bundesland = data.Bundesland.astype(str)
                data.Postleitzahl  = data.Postleitzahl .astype(str)
                data.Gemeindeschluessel = data.Gemeindeschluessel.astype(str)
                data.Lage = data.Lage.astype(str)
                data.Einsatzverantwortlicher = data.Einsatzverantwortlicher.astype(str)
                data.Einspeisungsart = data.Einspeisungsart.astype(str)
                data.Adresszusatz = data.Adresszusatz.astype(str)
                
            if (file_name == "EinheitenStromSpeicher"):
                data.Technologie = data.Technologie.astype(str)
                data.Postleitzahl = data.Postleitzahl.astype(str)   
                
                
            try:
                #creating spark data frame   
                sdf = spark.createDataFrame(data)
                #sdf.printSchema()
            except Exception as e:
                print("Error while converting from pandas to spark :", e)
    
            #database table name
            table_name = file_name.lower()
            table_name = "marktstammdaten_" + table_name

            try:     
                #
                sdf.write.format("delta").option("mergeSchema", "true").option("path", f"file:/home/jovyan/work/spark-warehouse/marktstammdaten_{file_name.lower()}").mode("append").saveAsTable(table_name)

            except Exception as e:
                print("Exception occurred while writing to databricks :")
                print(e)
                
    print("Finished processing!!!!")

**Function Calls**

In [8]:
#calling main function
xml_parser()

Current File Type :  EinheitenStromSpeicher
XML FILES LIST:  ['DATA/EinheitenStromSpeicher_1.xml', 'DATA/EinheitenStromSpeicher_2.xml', 'DATA/EinheitenStromSpeicher_3.xml', 'DATA/EinheitenStromSpeicher_4.xml', 'DATA/EinheitenStromSpeicher_5.xml']
Processing : DATA/EinheitenStromSpeicher_1.xml
Processing : DATA/EinheitenStromSpeicher_2.xml
Processing : DATA/EinheitenStromSpeicher_3.xml
Processing : DATA/EinheitenStromSpeicher_4.xml
Processing : DATA/EinheitenStromSpeicher_5.xml
Finished processing!!!!


**TESTS**

Change File Name:

In [13]:
file_name = "EinheitenStromSpeicher"

**Checks Databricks Table Format**

In [14]:
#show the format of table again, i.e first element of first column
table = spark.sql(f"DESCRIBE DETAIL marktstammdaten_{file_name.lower()}").head()
print(table)

Row(format='delta', id='0b0d165d-3905-49b0-8a7d-1aa4aba2bfb0', name='default.marktstammdaten_einheitenstromspeicher', description=None, location='file:/home/jovyan/work/spark-warehouse/marktstammdaten_einheitenstromspeicher', createdAt=datetime.datetime(2023, 1, 3, 15, 20, 38, 983000), lastModified=datetime.datetime(2023, 1, 3, 15, 26, 25, 746000), partitionColumns=[], numFiles=20, sizeInBytes=51507105, properties={}, minReaderVersion=1, minWriterVersion=2)


**Table Size and Schema**

In [15]:
#just checking the table size and schema
aes = spark.sql(f"select * from marktstammdaten_{file_name.lower()}")
print("Count :", aes.count())
print("Schema :", aes.schema)
#aes.show()


Count : 496499
Schema : StructType([StructField('EinheitMastrNummer', StringType(), True), StructField('DatumLetzteAktualisierung', StringType(), True), StructField('LokationMaStRNummer', StringType(), True), StructField('NetzbetreiberpruefungStatus', LongType(), True), StructField('NetzbetreiberpruefungDatum', StringType(), True), StructField('AnlagenbetreiberMastrNummer', StringType(), True), StructField('Land', LongType(), True), StructField('Bundesland', DoubleType(), True), StructField('Landkreis', StringType(), True), StructField('Gemeinde', StringType(), True), StructField('Gemeindeschluessel', DoubleType(), True), StructField('Postleitzahl', StringType(), True), StructField('Ort', StringType(), True), StructField('Registrierungsdatum', StringType(), True), StructField('Inbetriebnahmedatum', StringType(), True), StructField('EinheitSystemstatus', LongType(), True), StructField('EinheitBetriebsstatus', LongType(), True), StructField('NichtVorhandenInMigriertenEinheiten', LongType