In [0]:
import pandas as pd
from pyspark.sql import functions as func

def extract_json_data(spark, path):
    df = spark.read.option("multiline", "true").json(path)
    return df

#function to extract csv/excel data from files
def extract_csv_data(spark, path):
    # Check if the file is xlsx like Customer or is it csv like Product
    if('xlsx' in path):
        # Added external library com.crealytics.spark.excel as using pandas df to convert to csv can cause errors field which contain comma as data value
        df = spark.read \
            .format("com.crealytics.spark.excel") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load(path)
    else:
        df = spark.read.csv(path, header=True)
    return df

# Function to create or insert data to tables using the dataframe that's passed as arg
def create_or_insert_data(spark, df, table_name):
    if spark.catalog.tableExists(table_name):
        # Append data to the existing table
        df.write.option("overwriteSchema", "true").option("mergeSchema","true").mode("append").saveAsTable(table_name)
        print(f"Data appended to the existing table: {table_name}")
    else:
        # Create a new table with the provided schema
        spark.sql(f"CREATE TABLE {table_name}")
        df.write.option("overwriteSchema", "true").option("mergeSchema","true").mode("overwrite").saveAsTable(table_name)
        print(f"New table created: {table_name}")

def clean_name_col(df, column):
    # The name column in Customer file contains various data qualtiy issues like having special chars or unexpected spaces or NULL vals. To resolve this issue we replace all NULL values with NA and remove all special chars
    df = df.withColumn(column, func.when(func.col(column).isNull(), "NA")
                             .otherwise(func.initcap(func.trim(func.regexp_replace(func.col(column), r'[^a-zA-Z\s]', '')))))
    return df

def fill_null_values(df):
    # Generic function to update NULL vals to 'NA'
    for col in df.columns:
        df = df.withColumn(col, func.when(func.col(col).isNull(), "NA").otherwise(func.col(col)))
    return df

def process_data(spark, order_df, product_df, cust_df):
    # function to process all the data sourced from the 3 files.
    cust_df_temp = cust_df.select('Customer_ID', 'Customer_Name', 'Country').distinct()
    product_df_temp = product_df.select('Product_ID', 'Category', 'Sub-Category').distinct()

    # Joining all 3 dataframes to get enriched orders dataset which is finally inserted into table
    enriched_orders_df = order_df.alias('o').join(cust_df_temp.alias('c'), on=order_df['Customer_ID'] == cust_df_temp['Customer_ID'], how='inner') \
    .join(product_df_temp.alias('p'), on=order_df['Product_ID'] == product_df_temp['Product_ID'], how='inner') \
    .select('o.*','c.Customer_Name','c.Country','p.Category','p.Sub-Category')

    create_or_insert_data(spark, enriched_orders_df, 'enriched_orders')
    # Another approach to create table using spark SQL
    query = '''
    CREATE TABLE profit_aggregate AS
SELECT 
    substr(Order_Date,length(Order_Date)-3,length(Order_Date)) AS Year,
    Category,
    'Sub-Category',
    Customer_Name,
    SUM(ROUND(Profit, 2)) AS Total_Profit
FROM 
    enriched_orders
GROUP BY 
    substr(Order_Date,length(Order_Date)-3,length(Order_Date)),
    Category,
    'Sub-Category',
    Customer_Name;'''

    # refresh needs to done in case the index is not updated
    refresh_query = 'refresh table profit_aggregate'

    spark.sql(query)
    spark.sql(refresh_query)
    profit_agg = spark.sql('select * from profit_aggregate;')
    return enriched_orders_df, profit_agg


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
spark = SparkSession.builder.appName('PEI_test_module').getOrCreate()
path = '/FileStore/tables/Order.json'
path_prod = '/FileStore/tables/Product.csv'
path_cust = '/FileStore/tables/Customer.xlsx'
order_df = extract_json_data(spark, path)
cust_df = extract_csv_data(spark, path_cust)
product_df = extract_csv_data(spark, path_prod)
order_df = order_df.select([func.col(col).alias(col.replace(' ','_')) for col in order_df.columns])
cust_df = cust_df.select([func.col(col).alias(col.replace(' ','_')) for col in cust_df.columns])
product_df = product_df.select([func.col(col).alias(col.replace(' ','_')) for col in product_df.columns])
product_df = fill_null_values(product_df)
cust_df = clean_name_col(cust_df, 'Customer_Name')
product_df = product_df.withColumnRenamed('Sub_Category', 'Sub-Category')

create_or_insert_data(spark, order_df, 'orders')
create_or_insert_data(spark, product_df, 'products')
create_or_insert_data(spark, cust_df, 'customers')
enriched_orders_df, profit_agg_df = process_data(spark, order_df, fill_null_values(product_df), fill_null_values(cust_df))






New table created: enriched_orders


In [0]:
%sql
SELECT 
    Year,
    SUM(Total_Profit) AS Profit
FROM 
    profit_aggregate
GROUP BY 
    Year;



Year,Profit
2016,64669.1400000001
2017,110596.8
2014,38662.76000000007
2015,62490.14000000002


In [0]:
%sql


SELECT 
    Year,
    Category,
    SUM(Total_Profit) AS Profit
FROM 
    profit_aggregate
GROUP BY 
    Year, Category;



Year,Category,Profit
2016,Office Supplies,34555.74000000002
2016,Furniture,6889.560000000003
2015,Technology,34943.430000000015
2015,Furniture,3027.2000000000007
2014,Office Supplies,22500.42999999998
2016,Technology,23223.84
2015,Office Supplies,24519.509999999984
2017,Technology,63281.90999999993
2014,Furniture,-5331.050000000001
2017,Office Supplies,44273.349999999984


In [0]:
%sql

SELECT 
    Customer_Name,
    SUM(Total_Profit) AS Profit
FROM 
    profit_aggregate
GROUP BY 
    Customer_Name;


Customer_Name,Profit
Jim Mitchum,117.2
Jesus Ocampo,167.13
Michelle Ellison,107.34999999999998
Joseph Holt,-644.4799999999998
Patrick Obrill,37.640000000000015
Denny Joy,483.03
Ruben Ausman,1254.47
Parhena Norris,192.04
Ted Butterfield,384.5
Joy Bell,127.06


In [0]:
%sql

SELECT 
    Customer_Name,
    Year,
    SUM(Total_Profit) AS Profit
FROM 
    profit_aggregate
GROUP BY 
    Customer_Name, Year;

Customer_Name,Year,Profit
Marina Lichtenstein,2014,486.58
Dorothy Badders,2017,40.71000000000001
Emily Burns,2014,16.02
Christina Anderson,2014,16.97
Neil Knudson,2017,80.03
Anemone Ratner,2016,32.64
Mary Zewe,2017,17.38
Valerie Mitchum,2015,-3.71
Pete Takahito,2016,-17.52
Sara Luxemburg,2016,3.11


In [0]:

profit_by_yr = profit_agg_df.select('Year','Total_Profit').groupBy('Year').sum('Total_Profit').alias('Profit')
profit_by_yr.show()
profit_by_yr_cat = profit_agg_df.groupBy("Year", "Category").agg(func.sum(func.col("Total_Profit")).alias("Profit"))
profit_by_yr_cat.show()
profit_by_cust = profit_agg_df.groupBy("Customer_Name").agg(func.sum(func.col("Total_Profit")).alias("Profit"))
profit_by_cust.show()

profit_by_yr_cust = profit_agg_df.groupBy('Year', 'Customer_Name').agg(func.sum(func.col("Total_Profit")).alias("Profit"))
profit_by_yr_cust.show()

+----+-----------------+
|Year|sum(Total_Profit)|
+----+-----------------+
|2016| 64669.1400000001|
|2017|         110596.8|
|2014|38662.76000000007|
|2015|62490.14000000002|
+----+-----------------+

+----+---------------+------------------+
|Year|       Category|            Profit|
+----+---------------+------------------+
|2016|Office Supplies| 34555.74000000002|
|2016|      Furniture| 6889.560000000003|
|2015|     Technology|34943.430000000015|
|2015|      Furniture|3027.2000000000007|
|2014|Office Supplies| 22500.42999999998|
|2016|     Technology|          23223.84|
|2015|Office Supplies|24519.509999999984|
|2017|     Technology| 63281.90999999993|
|2014|      Furniture|-5331.050000000001|
|2017|Office Supplies|44273.349999999984|
|2014|     Technology| 21493.37999999999|
|2017|      Furniture|3041.5399999999986|
+----+---------------+------------------+

+-----------------+-------------------+
|    Customer_Name|             Profit|
+-----------------+-------------------+
|     