In [1]:
import pyspark 
sc = pyspark.SparkContext('local[*]')
# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)


[346, 109, 56, 865, 457]

In [2]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Read CSV File") \
    .getOrCreate()

# Read the CSV file into a DataFrame
df = spark.read.csv("work/clean_me_out.csv", header=True, sep=",", quote = "\"")
print(f"#df: {df.count()}")


#df: 499999


In [3]:
# Read CSV file into table
df.createOrReplaceTempView("clean_me")


In [4]:
# prepare average data
avg_a =spark.sql("""
        SELECT 
            delivery_company
            ,avg(CAST(quantity AS int)) avg_quantity
        FROM clean_me
        GROUP BY delivery_company
    """).collect()


In [5]:
#prep dictionary with averages
avg_dict = {r["delivery_company"]:r["avg_quantity"] for r in avg_a}

In [6]:
avg_dict

{'delivery_comp_1': 1.5,
 'delivery_comp_3': 1.499993999976,
 'delivery_comp_2': 1.500006000024,
 'delivery_comp_0': 1.499993999976}

In [7]:
#function replace null quantity
def replace_quantity(company, quantity, av_d):
    if company in av_d:
        #null is considered as None, "null",  "#NA"
        if (quantity is None) or (quantity == "null") or (quantity == "#NA"):
            return av_d[company]
    return quantity

#print( replace_quantity('delivery_comp_1a', 5 , avg_dict))

In [8]:
from pyspark.sql.functions import col, udf

In [9]:
replace_quantityUDF = udf(lambda z, y : replace_quantity(z, y, avg_dict)) 

In [10]:
#test to check what is in dataset
df.select(col("delivery_company"), col("quantity"), replace_quantityUDF(col("delivery_company"), col("quantity")).alias("quantity_converted")).show(10)

+----------------+--------+------------------+
|delivery_company|quantity|quantity_converted|
+----------------+--------+------------------+
| delivery_comp_1|       1|                 1|
| delivery_comp_2|       2|                 2|
| delivery_comp_3|    null|    1.499993999976|
| delivery_comp_0|       1|                 1|
| delivery_comp_1|       2|                 2|
| delivery_comp_2|     #NA|    1.500006000024|
| delivery_comp_3|       1|                 1|
| delivery_comp_0|       2|                 2|
| delivery_comp_1|     #NA|               1.5|
| delivery_comp_2|       1|                 1|
+----------------+--------+------------------+
only showing top 10 rows



In [46]:
from pyspark.sql.functions import to_date, last_day, coalesce
#fix dates in dataset
def unify_date(date):
    if date == 'null':
        return None
    #replace slahs with dash
    words = date.replace("/","-").split("-")
    #add leading zeroes
    return f"{('0' + words[0])[-2:]}-{('0'+ words[1])[-2:]}-{words[2]}"
    
udf_unify_date = udf(unify_date)

#prepare dataset with corrected dates
df2 = df.select("order_id", "delivery_company", to_date(udf_unify_date("ordered_date"), "dd-MM-yyyy").alias("ordered_date") )
df2.createOrReplaceTempView("clean_me2")
df2.printSchema()



root
 |-- order_id: string (nullable = true)
 |-- delivery_company: string (nullable = true)
 |-- ordered_date: date (nullable = true)



In [56]:
#prep data for orders date
from datetime import datetime
df_cor = spark.sql("""
        SELECT 
            order_id
            ,delivery_company
            ,ordered_date
            ,CASE WHEN ordered_date is NULL 
                THEN  COALESCE(
                            LAG(ordered_date) IGNORE NULLS OVER (partition by delivery_company order by order_id) + 1
                            ,last_day(LAG(ordered_date) IGNORE NULLS OVER (partition by delivery_company order by order_id DESC))
                        )
                ELSE  ordered_date
                END as corrected_ordered_date
        FROM clean_me2
    """)
df_cor = df_cor.filter("ordered_date is NULL").select("order_id", "corrected_ordered_date")
cor_date = df_cor.collect()
#dictionary with proper dates values for proper order id 
cor_dict = {r["order_id"]:r["corrected_ordered_date"].strftime("%Y-%m-%d") for r in cor_date}



'2022-03-04'

In [63]:
#function with correct dates for null dates
def correct_dates(id, old_value):
    if id in cor_dict:
        return cor_dict[id] 
    return old_value
    

correct_datesUDF = udf(lambda z, y : correct_dates(z, y)) 


2022-03-04


In [65]:
from pyspark.sql.functions import *

In [74]:
#test
df2.select(col("order_id"), col("delivery_company"), col("ordered_date"), correct_datesUDF(col("order_id"), date_format(col("ordered_date"), "yyyy-MM-dd")).alias("corrected_ordered_date")).show(10, truncate=False)

+--------+----------------+------------+----------------------+
|order_id|delivery_company|ordered_date|corrected_ordered_date|
+--------+----------------+------------+----------------------+
|1       |delivery_comp_1 |2022-02-09  |2022-02-09            |
|2       |delivery_comp_2 |null        |2022-03-04            |
|3       |delivery_comp_3 |2022-03-14  |2022-03-14            |
|4       |delivery_comp_0 |2022-04-20  |2022-04-20            |
|5       |delivery_comp_1 |null        |2022-04-10            |
|6       |delivery_comp_2 |null        |2022-04-22            |
|7       |delivery_comp_3 |2022-02-20  |2022-02-20            |
|8       |delivery_comp_0 |2022-04-01  |2022-04-01            |
|9       |delivery_comp_1 |2022-04-13  |2022-04-13            |
|10      |delivery_comp_2 |null        |2022-02-28            |
+--------+----------------+------------+----------------------+
only showing top 10 rows

