In [None]:
# docker run -rm -it -p 8888:8888 jupyter/all-spark-notebook

In [None]:
# !pip install spark-xml
!pip install delta-spark

from delta import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.types import _parse_datatype_json_string
import os

In [None]:
import urllib.request

url = "https://repo1.maven.org/maven2/com/databricks/spark-xml_2.12/0.16.0/spark-xml_2.12-0.16.0.jar"
destination = "/home/jovyan/.ivy2/jars/spark-xml_2.12-0.16.0.jar"

urllib.request.urlretrieve(url, destination)

# if os.path.exists(destination):
#     print(f"File successfully downloaded to {destination}")
# else:
#     print("File download failed")

# expected_size = 150439  
# actual_size = os.path.getsize(destination)
# print(f"Expected size: {expected_size}, Actual size: {actual_size}")

# if actual_size == expected_size:
#     print("File size matches the expected size")
# else:
#     print("File size does not match the expected size, the file might be corrupted")

In [22]:
ticket_fees = [{'type':'base', 'value':30},
               {'type':'base + school zone','value': 60},
               {'type':'base + construction work zone','value':60},
               {'type':'base + school zone + construction work zone', 'value':120}]

keywords = ['automobiles', 'people', 'speeding_tickets']

path = './work/ttpd_data'
all_files = [os.path.join(path, f) for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
# all_files

In [36]:
# XML is not included in the Python release it seems, only Scala
# https://github.com/databricks/spark-xml
# https://stackoverflow.com/questions/64934508/parsing-xml-columns-from-pyspark-dataframe-using-udf
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import StructType
from pyspark.sql.utils import *
import pyspark.sql.functions as F

def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    # print(java_xml_module)
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    # print(java_schema)
    
    return _parse_datatype_json_string(java_schema.json())

In [46]:
def _get_spark_session_() -> SparkSession:
    builder = SparkSession \
        .builder \
        .appName('takehome') \
        .config("spark.jars", destination) \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        
    
    return configure_spark_with_delta_pip(builder).getOrCreate()


def _read_file_(spark, file_path):
    
    if file_path.endswith('.xml'):
        # print(f'Reading XML file {file_path}')
        
        df = spark.read.format("binaryFile").load(file_path).selectExpr("CAST(content AS STRING)")
        xml_extraction_options = {"mode": "DROPMALFORMED","inferSchema":"true"}
        xml_schema = ext_schema_of_xml_df(df.select("content"), xml_extraction_options)
        # print(xml_schema)
        df_parsed = df.withColumn("parsed", ext_from_xml(F.col("content"), xml_schema))
        # df_parsed.show()
        df2 = df_parsed.select(*df_parsed.columns[:-1],F.explode(F.col('parsed').getItem('automobile')))
        # df2.show()
        new_col_names = [s.split(':')[0] for s in xml_schema['automobile'].simpleString().split('<')[-1].strip('>>').split(',')]
        # print(new_col_names)
        for c in new_col_names:
            df2 = df2.withColumn(c, F.col('col').getItem(c))
        df2 = df2.drop('col','_VALUE')

        return df2

    elif file_path.endswith('.json'):
        # print(f'Reading JSON file {file_path}')
        df = spark.read.json(file_path)
        if 'speeding_tickets' in df.columns:
            df = df.select(explode(col('speeding_tickets')).alias('ticket')).select('ticket.*')
        return df
        
    elif file_path.endswith('.csv'):
        # print(f'Reading CSV file {file_path}')
        return spark.read.csv(file_path, header=True, inferSchema=True,sep='|')
    else:
        return None


def _combine_files_(spark, keyword):
    filtered_files = [f for f in all_files if keyword in f]
    if not filtered_files:
        return None

    dataframes = [_read_file_(spark, f) for f in filtered_files if _read_file_(spark, f) is not None]
    if not dataframes:
        return None

    combined_df = dataframes[0]
    for df in dataframes[1:]:
        combined_df = combined_df.union(df)
    return combined_df


def main():
    spark = _get_spark_session_()
    print(f'Spark Session Initialized...')

    combined_dfs = {keyword: _combine_files_(spark, keyword) for keyword in keywords}

    for keyword, df in combined_dfs.items():
        if df is not None:
            print(f"Combined DataFrame for {keyword}:")
            df.show()
        else:
            print(f"No files found for keyword: {keyword}")
    # fee_df = spark.createDataFrame(ticket_fees)
    # fee_df.show()

In [47]:
main()



# spark = _get_spark_session_()
# _read_file_(spark, './work/ttpd_data/20240503111609_automobiles_41.xml')

Spark Session Initialized...
Combined DataFrame for automobiles:
+--------------------+-------------+-------------+--------------------+-----------------+----+
|             content|        color|license_plate|           person_id|              vin|year|
+--------------------+-------------+-------------+--------------------+-----------------+----+
|<?xml version="1....|   SandyBrown|      QFF 087|dfa9aed5-7172-4e9...|CL6TK95K73FE5SHXT|1982|
|<?xml version="1....|LavenderBlush|        3Q380|ba743372-658b-48c...|WPCAK8EK7EH8VW3YS|2023|
|<?xml version="1....|  ForestGreen|      CLT 179|ba743372-658b-48c...|EJ90XVM508JD3FUDV|2019|
|<?xml version="1....|         Snow|      38R E99|ba743372-658b-48c...|ECBGKDYM89G9HBGER|1980|
|<?xml version="1....|       Purple|     62-13425|829a6ec5-6bc2-4ee...|AD075YPR3LNRSYDPH|1977|
|<?xml version="1....|    LawnGreen|      467-IHF|56b147ea-8c68-494...|PDD6NEU28YM141FAX|2009|
|<?xml version="1....|       Yellow|        0DM43|56b147ea-8c68-494...|1Z9CHB950