In [1]:
import pyspark
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql.functions import col, to_date, date_format, regexp_replace, when, concat_ws, coalesce, lit, udf, avg, first, last_day, round

spark = SparkSession.builder.master("local[1]").appName("setup").getOrCreate()

In [2]:
def read_csv(input_path):

    # Read the CSV into a DataFrame
    df = spark.read.option("delimiter", ",").option("quote", "\"").csv(input_path, header=True, inferSchema=True)

    return df

# Example usage:
orders = read_csv('cleaned.csv')
orders.show()

+--------+----------------+--------+------+------------+-------------------+--------------------+--------+-----+----+
|order_id|delivery_company|quantity| price|ordered_date|            address|                 _c6|     _c7|  _c8| _c9|
+--------+----------------+--------+------+------------+-------------------+--------------------+--------+-----+----+
|       1| delivery_comp_1|       1|245,52|    9/2/2022| Cedar Lane Houston|            CA 90001|    null| null|null|
|       2| delivery_comp_2|       2|114,77|        null|        Main Street|   New York CA 60601|    null| null|null|
|       3| delivery_comp_3|    null|739,43|   14-3-2022|        Main Street|    Chicago TX 10001|    null| null|null|
|       4| delivery_comp_0|       1|878.93|   20/4/2022|         Oak Avenue|Los Angeles FL 90001|    null| null|null|
|       5| delivery_comp_1|       2|481,44|        null|Maple Drive Chicago|            FL 60601|    null| null|null|
|       6| delivery_comp_2|     #NA| 78,13|        null|

In [3]:
def process_df(df):
    
    def enforce_double_type(df, column_name):
        # Check datatype of the column
        column_type = [f.dataType for f in df.schema.fields if f.name == column_name][0]
    
        # If not double, cast it
        
        if not isinstance(column_type, DoubleType):
    
            # Replace commas with periods
            df = df.withColumn(column_name, regexp_replace(col(column_name), ",", "."))
            
            # Cast to DoubleType
            df = df.withColumn(column_name, col(column_name).cast(DoubleType()))
        
        return df

    double_df = enforce_double_type(df, 'price')

    merged_df = double_df.withColumn(
    "address_merged", 
    concat_ws(" ", 
              coalesce(double_df.address, lit("")), 
              coalesce(double_df._c6, lit("")),
              coalesce(double_df._c7, lit("")),
              coalesce(double_df._c8, lit("")), 
              coalesce(double_df._c9, lit("")))
    ).select('order_id', 'delivery_company', 'quantity', 'price', 'ordered_date', 'address_merged')
    
    # Python lambda to process the address
    address_lambda = lambda x: str(x.split(' ')[0]) + ' ' + str(x.split(' ')[1]) + ',' + ','.join(x.split(' ')[2:])
    
    # Convert the lambda to a UDF
    address_udf = udf(address_lambda, StringType())

    # Apply the UDF to the DataFrame column
    address_df = merged_df.withColumn("address", address_udf(merged_df["address_merged"]))

    spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

    def standardize_date_formats(df, column_name):
        temp_df = df.withColumn(
            column_name,
            when(to_date(col(column_name), "dd-MM-yyyy").isNotNull(), to_date(col(column_name), "dd-MM-yyyy"))
            .when(to_date(col(column_name), "dd/MM/yyyy").isNotNull(), to_date(col(column_name), "dd/MM/yyyy"))
            .otherwise(col(column_name))  # Keeping original values if no format matches. Replace with .otherwise(None) to set unmatched to null
        )
        return temp_df.withColumn("ordered_date", date_format(temp_df["ordered_date"], "dd-MM-yyyy"))
    
    orders_new = standardize_date_formats(address_df, 'ordered_date')
    final_orders = orders_new.drop(orders_new.address_merged)
    return final_orders
    
orders = process_df(orders)
orders.show()


+--------+----------------+--------+------+------------+--------------------+
|order_id|delivery_company|quantity| price|ordered_date|             address|
+--------+----------------+--------+------+------------+--------------------+
|       1| delivery_comp_1|       1|245.52|  09-02-2022|Cedar Lane,Housto...|
|       2| delivery_comp_2|       2|114.77|        null|Main Street,New,Y...|
|       3| delivery_comp_3|    null|739.43|  14-03-2022|Main Street,Chica...|
|       4| delivery_comp_0|       1|878.93|  20-04-2022|Oak Avenue,Los,An...|
|       5| delivery_comp_1|       2|481.44|        null|Maple Drive,Chica...|
|       6| delivery_comp_2|     #NA| 78.13|        null|Main Street,Houst...|
|       7| delivery_comp_3|       1|832.17|  20-02-2022|Oak Avenue,New,Yo...|
|       8| delivery_comp_0|       2| 687.8|  01-04-2022|Maple Drive,Los,A...|
|       9| delivery_comp_1|     #NA|338.44|  13-04-2022|Cedar Lane,Miami,...|
|      10| delivery_comp_2|       1|461.33|        null|Oak Aven