In [40]:
import pandas as pd

# reading data and inserting headers
headers = ['order_datetime','location','customer_name','order_items','amount','payment_type','card_number']
df = pd.read_csv('raw-data.csv', header=None, names=headers)

# removing sensitive data (PII)
df.drop(columns=['customer_name','card_number'], inplace=True)

# converting datetime into yyyy/mm/dd format
df['order_datetime'] = pd.to_datetime(df['order_datetime'])

# removing rows with any empty fields
df = df.dropna(axis= 0)

# creating unique index for every order and writing clean dataframe into new csv file
df.index.name = 'order_id'
df.to_csv('pd-clean-data.csv')

# creating csv to show top 10 sales list
df = df.sort_values('amount', ascending=False)
df = df.head(10)
df.to_csv('pd-top-10.csv')

In [42]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, monotonically_increasing_id

# creating a SparkSession
spark = SparkSession.builder.appName("PySpark data cleaning").getOrCreate()

# defining schema and reading csv data
schema = 'order_datetime STRING, location STRING, customer_name STRING, order_items STRING, amount DOUBLE, payment_type STRING, card_number STRING'
df = spark.read.csv('raw-data.csv', schema=schema, header=False)

# rename columns to match headers
headers = ['order_datetime', 'location', 'customer_name', 'order_items', 'amount', 'payment_type', 'card_number']
df = df.toDF(*headers)

# removing sensitive data (PII)
df = df.drop('customer_name', 'card_number')

# converting datetime into yyyy/mm/dd format
df = df.withColumn('order_datetime', to_timestamp('order_datetime', 'dd/MM/yyyy HH:mm'))

# create unique index for every order
df = df.withColumn('order_id', monotonically_increasing_id())

# writing clean DataFrame into new CSV file
new_file = 'pyspark-clean-data'
df.write.csv(new_file, header=True)

# creating a top 10 sales list
df = df.orderBy(df['amount'].desc())
top_10_file = 'pyspark-top-10'
df.limit(10).write.csv(top_10_file, header=True)

spark.stop()