In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName('infosys722-i4-dp').getOrCreate()

# ### Read Raw Data
raw_data_products = spark.read.options(compression='gzip').csv('./Datasets/2017PurchasePricesDec.csv.gz', inferSchema=True, header=True)
raw_data_inventory = spark.read.options(compression='gzip').csv('./Datasets/BegInvFINAL12312016.csv.gz', inferSchema=True, header=True)
raw_data_inventory_end = spark.read.options(compression='gzip').csv('./Datasets/EndInvFINAL12312016.csv.gz', inferSchema=True, header=True)
raw_data_purchase_order = spark.read.options(compression='gzip').csv('./Datasets/InvoicePurchases12312016.csv.gz', inferSchema=True, header=True)
raw_data_purchase_detail = spark.read.options(compression='gzip').csv('./Datasets/PurchasesFINAL12312016.csv.gz', inferSchema=True, header=True)
raw_data_sales = spark.read.options(compression='gzip').csv('./Datasets/SalesFINAL12312016.csv.gz', inferSchema=True, header=True)


def replace(dataframe, column, condition, new_value):
    return dataframe.withColumn(column, F.when(condition, new_value).otherwise(dataframe[column]))

#
# replace invalid value
#
raw_data_products = replace(raw_data_products, 'Volume', raw_data_products['Volume'].isNull(), 0)
raw_data_products = replace(raw_data_products, 'Volume', raw_data_products['Volume'] == 'Unknown', 0)
raw_data_products = replace(raw_data_products, 'Volume', raw_data_products['Volume'] == '162.5', 162)

#
# Correct Data Type
#
dtype_products = {
    'Brand': 'string',
    'Description': 'string',
    'Price': 'float',
    'Size': 'string',
    'Volume': 'int',
    'Classification': 'string',
    'PurchasePrice': 'float',
    'VendorNumber': 'string',
    'VendorName': 'string',
}
    
dtype_inventory_begin = {    
    'InventoryId': 'string',
    'Store': 'string',
    'City': 'string',
    'Brand': 'string',
    'Description': 'string',
    'Size': 'string',
    'onHand': 'int',
    'Price': 'float',
    'startDate': 'string',
}    

dtype_inventory_end = { 
    'InventoryId': 'string',
    'Store': 'string',
    'City': 'string',
    'Brand': 'string',
    'Description': 'string',
    'Size': 'string',
    'onHand': 'int',
    'Price': 'float',
    'endDate': 'string',
}    

dtype_purchase_order = {    
    'VendorNumber': 'string',
    'VendorName': 'string',
    'InvoiceDate': 'string',
    'PONumber': 'string',
    'PODate': 'string',
    'PayDate': 'string',
    'Quantity': 'int',
    'Dollars': 'float',
    'Freight': 'float',
    'Approval': 'string',
}

dtype_purchase_detail = {
    'InventoryId': 'string',
    'Store': 'string',
    'Brand': 'string',
    'Description': 'string',
    'Size': 'string',
    'VendorNumber': 'string',
    'VendorName': 'string',
    'PONumber': 'string',
    'PODate': 'string',
    'ReceivingDate': 'string',
    'InvoiceDate': 'string',
    'PayDate': 'string',
    'PurchasePrice': 'float',
    'Quantity': 'int',
    'Dollars': 'float',
    'Classification': 'string',
}   
    
dtype_sales = {    
    'InventoryId': 'string',
    'Store': 'string',
    'Brand': 'string',
    'Description': 'string',
    'Size': 'string',
    'SalesQuantity': 'int',
    'SalesDollars': 'float',
    'SalesPrice': 'float',
    'SalesDate': 'string',
    'Volume': 'int',
    'Classification': 'string',
    'ExciseTax': 'float',
    'VendorNo': 'string',
    'VendorName': 'string',
}

def correct_dtype(dataframe, dtype_dict):
    for column, dtype in dtype_dict.items():
        dataframe = dataframe.withColumn(column, F.col(column).cast(dtype))
    return dataframe

raw_data_products = correct_dtype(raw_data_products, dtype_products)
raw_data_inventory = correct_dtype(raw_data_inventory, dtype_inventory_begin)
raw_data_inventory_end = correct_dtype(raw_data_inventory_end, dtype_inventory_end)
raw_data_purchase_order = correct_dtype(raw_data_purchase_order, dtype_purchase_order)
raw_data_purchase_detail = correct_dtype(raw_data_purchase_detail, dtype_purchase_detail)
raw_data_sales = correct_dtype(raw_data_sales, dtype_sales)

#
# Convert date string to datetime
#

def format_date(dataframe, columns, date_format='yyyy-MM-dd'):
    for column in columns:
        dataframe = dataframe.withColumn(column, F.to_date(column, date_format))
    return dataframe
        
raw_data_inventory = format_date(raw_data_inventory, ['startDate'])
raw_data_inventory_end = format_date(raw_data_inventory_end, ['endDate'])
raw_data_purchase_order = format_date(raw_data_purchase_order, ['PODate', 'InvoiceDate', 'PayDate'])
raw_data_purchase_detail = format_date(raw_data_purchase_detail, ['PODate', 'ReceivingDate', 'InvoiceDate', 'PayDate'])
raw_data_sales = format_date(raw_data_sales, ['SalesDate'], date_format = 'M/d/yyyy')

#
# Align features' name across datasets
#
# VendorNo in Sales should be renamed to VendorNumber
raw_data_sales = raw_data_sales.withColumnRenamed('VendorNo', 'VendorNumber')


# #### Select Related Data
# The project only requires Store 15's data
selected_inventory = raw_data_inventory.where('Store = 15')
selected_purchase_detail = raw_data_purchase_detail.where('Store = 15')
selected_sales = raw_data_sales.where('Store = 15')


# ##### Remove unused fields
selected_products = raw_data_products.drop('Description', 'VendorName')
selected_purchase_detail = selected_purchase_detail.drop('InventoryId', 'Store', 'Description', 'VendorName', 'PONumber')
selected_sales = selected_sales.drop('InventoryId', 'Store', 'Description', 'VendorName')


# Construct DayOfWeek, DayOfMonth, Month fields
selected_sales = selected_sales.withColumn('DayOfWeek', F.dayofweek('SalesDate'))
selected_sales = selected_sales.withColumn('DayOfMonth', F.dayofmonth('SalesDate'))
selected_sales = selected_sales.withColumn('Month', F.month('SalesDate'))

selected_purchase_detail = selected_purchase_detail.withColumn('DeliverTime', F.datediff('ReceivingDate', 'PODate'))
selected_purchase_detail = selected_purchase_detail.withColumn('Month', F.month('ReceivingDate'))
selected_purchase_detail = selected_purchase_detail.withColumn('DayOfMonth', F.dayofmonth('ReceivingDate'))

# #### Reformat DayOfWeek and Month to string
day_of_week_mapping = {
    1: "Sunday",
    2: "Monday",
    3: "Tuesday",
    4: "Wednesday",
    5: "Thursday",
    6: "Friday",
    7: "Saturday"
}

month_mapping = {
    1: "January",
    2: "February",
    3: "March",
    4: "April",
    5: "May",
    6: "June",
    7: "July",
    8: "August",
    9: "September",
    10: "October",
    11: "November",
    12: "December"
}

def convert_data_type(dataframe, column, mapping, dtype=None):
    for key, value in mapping.items():
        dataframe = dataframe.withColumn(column, F.when(F.col(column) == key, value).otherwise(F.col(column)))
    if dtype is not None:
        dataframe = dataframe.withColumn(column, F.col(column).cast(dtype))
    return dataframe

selected_sales = convert_data_type(selected_sales, 'Month', month_mapping, 'string')
selected_sales = convert_data_type(selected_sales, 'DayOfWeek', day_of_week_mapping, 'string')
selected_purchase_detail = convert_data_type(selected_purchase_detail, 'Month', month_mapping, 'string')

In [2]:
def save_cleaned_data(dataframe, name, folder = './Cleaned Datasets/', version = '_v001'):
    dataframe.write.parquet(folder + name + version, compression='gzip', mode='overwrite')

In [3]:
save_cleaned_data(selected_inventory, 'inventory')
save_cleaned_data(selected_purchase_detail, 'purchase_detail')
save_cleaned_data(selected_sales, 'sales')

In [4]:
inv = spark.read.options(compression='gzip').format('parquet').load('./Cleaned Datasets/inventory_v001')
pdetail = spark.read.options(compression='gzip').format('parquet').load('./Cleaned Datasets/purchase_detail_v001')
sales = spark.read.options(compression='gzip').format('parquet').load('./Cleaned Datasets/sales_v001')

In [5]:
print(inv.head())
inv.printSchema()

Row(InventoryId='15_WANBORNE_58', Store='15', City='WANBORNE', Brand='58', Description='Gekkeikan Black & Gold Sake', Size='750mL', onHand=9, Price=12.989999771118164, startDate=datetime.date(2016, 1, 1))
root
 |-- InventoryId: string (nullable = true)
 |-- Store: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- onHand: integer (nullable = true)
 |-- Price: float (nullable = true)
 |-- startDate: date (nullable = true)



In [6]:
print(sales.head())
sales.printSchema()

Row(Brand='100', Size='750mL', SalesQuantity=1, SalesDollars=159.99000549316406, SalesPrice=159.99000549316406, SalesDate=datetime.date(2016, 1, 14), Volume=750, Classification='1', ExciseTax=0.7900000214576721, VendorNumber='17035', DayOfWeek='Thursday', DayOfMonth=14, Month='January')
root
 |-- Brand: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- SalesQuantity: integer (nullable = true)
 |-- SalesDollars: float (nullable = true)
 |-- SalesPrice: float (nullable = true)
 |-- SalesDate: date (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Classification: string (nullable = true)
 |-- ExciseTax: float (nullable = true)
 |-- VendorNumber: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DayOfMonth: integer (nullable = true)
 |-- Month: string (nullable = true)



In [7]:
print(pdetail.head())
pdetail.printSchema()

Row(Brand='2663', Size='1.75L', VendorNumber='480', PODate=datetime.date(2015, 12, 20), ReceivingDate=datetime.date(2016, 1, 1), InvoiceDate=datetime.date(2016, 1, 12), PayDate=datetime.date(2016, 2, 5), PurchasePrice=21.420000076293945, Quantity=12, Dollars=257.0400085449219, Classification='1', DeliverTime=12, Month='January', DayOfMonth=1)
root
 |-- Brand: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- VendorNumber: string (nullable = true)
 |-- PODate: date (nullable = true)
 |-- ReceivingDate: date (nullable = true)
 |-- InvoiceDate: date (nullable = true)
 |-- PayDate: date (nullable = true)
 |-- PurchasePrice: float (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Dollars: float (nullable = true)
 |-- Classification: string (nullable = true)
 |-- DeliverTime: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayOfMonth: integer (nullable = true)



In [8]:
save_cleaned_data(selected_products, 'products')
products = spark.read.options(compression='gzip').format('parquet').load('./Cleaned Datasets/products_v001')
print(products.head())
products.printSchema()

Row(Brand='58', Price=12.989999771118164, Size='750mL', Volume=750, Classification='1', PurchasePrice=9.279999732971191, VendorNumber='8320')
root
 |-- Brand: string (nullable = true)
 |-- Price: float (nullable = true)
 |-- Size: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Classification: string (nullable = true)
 |-- PurchasePrice: float (nullable = true)
 |-- VendorNumber: string (nullable = true)

