# PEI-Assessment

In [0]:
import pyspark
from pyspark.sql.types import StructType, StructField, LongType, IntegerType, StringType, DecimalType, DateType, TimestampType, DoubleType
from pyspark.sql.functions import udf, to_date, col, to_timestamp
from pyspark.sql import SparkSession
import re

# Create a SparkSession
spark = SparkSession.builder.appName("Assessment").getOrCreate()

# Install Maven library from configuration "com.crealytics:spark-excel_2.12:0.13.5"

In [0]:
# Function to remove spaces and hypen from column names
def col_name_formatter(df):
    for col_name in df.columns:
        if ' ' in col_name or '-' in col_name:
            df = df.withColumnRenamed(col_name, col_name.replace(' ', '_').replace('-', '_'))
    return df

#Order

In [0]:
# Function to format price - Extraction of floating point numbers
def format_price(price):
    # price = str(price)
    cleaned_price = re.findall(r'\b\d+(?:\.\d+)?\b', price)
    # if cleaned_price:
    #     return cleaned_price[0]
    return cleaned_price[0]

In [0]:
# Test col_name_formatter function
def test_col_name_formatter(spark, df_order):
    df_order = col_name_formatter(df_order)
    expected_columns = ['Customer_ID', 'Discount', 'Order_Date', 'Order_ID', 'Price', 'Product_ID', 'Profit', 'Quantity', 'Row_ID', 'Ship_Date', 'Ship_Mode']
    assert df_order.columns == expected_columns, "Columns are not formatted correctly"
    
    return df_order
  
# Test casting column to LongType
def test_cast_column(df_order):
    df_order = df_order.withColumn("Price", col("Price").cast(DoubleType()))
    expected_dtype = "double"
    assert df_order.schema["Price"].dataType.typeName() == expected_dtype, "Price column cast to DoubleType failed"

    df_order = df_order.withColumn("Discount", col("Discount").cast(DoubleType()))
    expected_dtype = "double"
    assert df_order.schema["Discount"].dataType.typeName() == expected_dtype, "Discount column cast to DoubleType failed"

    df_order = df_order.withColumn("Profit", col("Profit").cast(DoubleType()))
    expected_dtype = "double"
    assert df_order.schema["Profit"].dataType.typeName() == expected_dtype, "Profit column cast to DoubleType failed"

    df_order = df_order.withColumn("Quantity", col("Quantity").cast(LongType()))
    expected_dtype = "long"
    assert df_order.schema["Quantity"].dataType.typeName() == expected_dtype, "Quantity column cast to LongType failed"
    
    df_order = df_order.withColumn("Row_ID", col("Row_ID").cast(LongType()))
    expected_dtype = "long"
    assert df_order.schema["Row_ID"].dataType.typeName() == expected_dtype, "Row_ID column cast to LongType failed"

    df_order = df_order.withColumn("Order_Date", col("Order_Date").cast(DateType()))
    expected_dtype = "date"
    assert df_order.schema["Order_Date"].dataType.typeName() == expected_dtype, "Order_Date column cast to DateType failed"
    
    df_order = df_order.withColumn("Ship_Date", col("Ship_Date").cast(DateType()))
    expected_dtype = "date"
    assert df_order.schema["Ship_Date"].dataType.typeName() == expected_dtype, "Ship_Date column cast to DateType failed"

    return df_order

# Test 1: col_name_formatter
df_order = spark.read.option("multiline","true").json("dbfs:/FileStore/PEI-Assessment/Order.json")
df_order = test_col_name_formatter(spark, df_order)


# display(df_order)

format_price_udf = udf(format_price, StringType())
df_order = df_order.withColumn("Price", format_price_udf("Price"))
df_order = df_order.withColumn("Price", col("Price").cast(DoubleType()))

df_order = df_order.withColumn("Order_Date", to_date(to_timestamp(col("Order_Date"), "d/M/yyyy")))
df_order = df_order.withColumn("Ship_Date", to_date(to_timestamp(col("Ship_Date"), "d/M/yyyy")))
df_order = df_order.withColumn("Discount", col("Discount").cast(DoubleType()))
df_order = df_order.withColumn("Profit", col("Profit").cast(DoubleType()))
df_order = df_order.withColumn("Quantity", col("Quantity").cast(LongType()))
df_order = df_order.withColumn("Row_ID", col("Row_ID").cast(LongType()))

# Test 2: Cast column to LongType
df_order = test_cast_column(df_order)

# display(df_order)
df_order.show(truncate=False)

+-----------+--------+----------+--------------+-------+---------------+-------+--------+------+----------+--------------+
|Customer_ID|Discount|Order_Date|Order_ID      |Price  |Product_ID     |Profit |Quantity|Row_ID|Ship_Date |Ship_Mode     |
+-----------+--------+----------+--------------+-------+---------------+-------+--------+------+----------+--------------+
|JK-15370   |0.3     |2016-08-21|CA-2016-122581|573.174|FUR-CH-10002961|63.686 |7       |1     |2016-08-25|Standard Class|
|BD-11320   |0.0     |2017-09-23|CA-2017-117485|291.96 |TEC-AC-10004659|102.186|4       |2     |2017-09-29|Standard Class|
|LB-16795   |0.7     |2016-10-06|US-2016-157490|17.0   |OFF-BI-10002824|-14.92 |4       |3     |2016-10-07|First Class   |
|KB-16315   |0.2     |2015-07-02|CA-2015-111703|15.552 |OFF-PA-10003349|5.6376 |3       |4     |2015-07-09|Standard Class|
|DO-13435   |0.2     |2014-10-03|CA-2014-108903|142.488|TEC-AC-10003023|-3.0   |3       |5     |2014-10-03|Same Day      |
|CB-12025   |0.0

#Customer

In [0]:
def format_customer_name(customer_name):
    if not customer_name:
        return None
    
    # Remove all non-alphabetic characters between two lowercase alphabets
    cleaned_name = re.sub(r'([a-z])[^a-zA-Z]+([a-z])', r'\1\2', customer_name)
    
    # Remove all non-alphabetic characters between uppercase and lowercase alphabets
    cleaned_name = re.sub(r'([A-Z])[^a-zA-Z]+([a-z])', r'\1\2', cleaned_name)

    # Remove all non-alphabetic characters between a lowercase and an uppercase alphabet and add one white space in between
    cleaned_name = re.sub(r'([a-z])([^a-zA-Z]+)([A-Z])', r'\1 \3', cleaned_name)

    # Remove all leading non-alphabetic characters
    cleaned_name = re.sub(r'^[^a-zA-Z]+|[^a-zA-Z]+$', '', cleaned_name)

    if cleaned_name.strip() == '':
        return None
    return cleaned_name.strip()

def format_phone_number(phone_number):

    if not phone_number:
        return None
    phone_number = str(phone_number)
    # Remove everything after x
    phone_number = phone_number.split('x')[0]

    # Remove non-numeric characters from the phone number
    numeric_phone_number = re.sub(r'\D', '', phone_number)

    # Remove leading '001'
    numeric_phone_number = re.sub(r'^001', '', numeric_phone_number)
    
    numeric_phone_number= numeric_phone_number.lstrip('0')
    if len(numeric_phone_number) != 10:
        return None
    return numeric_phone_number

In [0]:
# Test col_name_formatter function
def test_col_name_formatter(spark, df_customer):
    df_customer = col_name_formatter(df_customer)
    expected_columns = ['Customer_ID', 'Customer_Name', 'email', 'phone', 'address', 'Segment', 'Country', 'City', 'State', 'Postal_Code', 'Region']
    assert df_customer.columns == expected_columns, "Columns are not formatted correctly"
    
    return df_customer
  
# Test casting column to LongType
def test_cast_column_to_long(df_customer):
    df_customer = df_customer.withColumn("Phone", col("Phone").cast(LongType()))
    expected_dtype = "long"
    assert df_customer.schema["Phone"].dataType.typeName() == expected_dtype, "Phone column cast to LongType failed"
    
    df_customer = df_customer.withColumn("Postal_Code", col("Postal_Code").cast(LongType()))
    expected_dtype = "long"
    assert df_customer.schema["Postal_Code"].dataType.typeName() == expected_dtype, "Postal_Code column cast to LongType failed"

    return df_customer

# Test 1: col_name_formatter
df_customer = spark.read.format("com.crealytics.spark.excel").option("header", "true").option("inferSchema", "true").option("dataAddress", "Worksheet").load("dbfs:/FileStore/PEI-Assessment/Customer.xlsx")
df_customer = test_col_name_formatter(spark, df_customer)

# display(df_customer)


# Call UDF to format Customer_Name
format_customer_name_udf = udf(format_customer_name, StringType())
df_customer = df_customer.withColumn("Customer_Name", format_customer_name_udf("Customer_Name"))

# Call UDF to format Phone number

format_phone_number_udf = udf(format_phone_number, StringType())
df_customer = df_customer.withColumn("Phone", format_phone_number_udf("Phone"))
df_customer = df_customer.withColumn("Phone", col('Phone').cast(LongType()))
df_customer = df_customer.withColumn("Postal_Code", col('Postal_Code').cast(LongType()))

# Test 2: Cast column to LongType
df_customer = test_cast_column_to_long(df_customer)

# display(df_customer)
df_customer.show(truncate=False)

+-----------+------------------+-----------------------------+----------+----------------------------------------------------+-----------+-------------+----------------+------------+-----------+-------+
|Customer_ID|Customer_Name     |email                        |Phone     |address                                             |Segment    |Country      |City            |State       |Postal_Code|Region |
+-----------+------------------+-----------------------------+----------+----------------------------------------------------+-----------+-------------+----------------+------------+-----------+-------+
|PW-19240   |Pierre Wener      |bettysullivan808@gmail.com   |4215800902|001 Jones Ridges Suite 338\nJohnsonfort, FL 95462   |Consumer   |United States|Louisville      |Colorado    |80027      |West   |
|GH-14410   |Gary Hansen       |austindyer948@gmail.com      |5424150246|00347 Murphy Unions\nAshleyton, IA 29814            |Home Office|United States|Chicago         |Illinois    |60653 

#Product

In [0]:
us_states = {
    'alabama', 'alaska', 'arizona', 'arkansas', 'california', 'colorado', 'connecticut', 'delaware', 'florida', 'georgia', 'hawaii', 'idaho', 'illinois', 'indiana', 'iowa', 'kansas', 'kentucky', 'louisiana', 'maine', 'maryland', 'massachusetts', 'michigan', 'minnesota', 'mississippi', 'missouri', 'montana', 'nebraska', 'nevada', 'new hampshire', 'new jersey', 'new mexico', 'new york', 'north carolina', 'north dakota', 'ohio', 'oklahoma', 'oregon', 'pennsylvania', 'rhode island', 'south carolina', 'south dakota', 'tennessee', 'texas', 'utah', 'vermont', 'virginia', 'washington', 'west virginia', 'wisconsin', 'wyoming'
    }


# Function to check if state exists and valid
def format_states(state):   
    if state.lower() in us_states:
        return state
    return None

In [0]:
# Test col_name_formatter function
def test_col_name_formatter(spark, df_product):
    df_product = col_name_formatter(df_product)
    expected_columns = ['Product_ID', 'Category', 'Sub_Category', 'Product_Name', 'State', 'Price_per_product']
    assert df_product.columns == expected_columns, "Columns are not formatted correctly"
    
    return df_product
  
# Test casting column to DoubleType
def test_cast_column_to_double(df_product):
    df_product = df_product.withColumn("Price_per_product", col("Price_per_product").cast(DoubleType()))
    expected_dtype = "double"
    assert df_product.schema["Price_per_product"].dataType.typeName() == expected_dtype, "Price_per_product column cast to DoubleType failed"
    
    return df_product

# Test 1: col_name_formatter
df_product = spark.read.option("delimiter",",").csv("dbfs:/FileStore/PEI-Assessment/Product.csv", header=True)
df_product = test_col_name_formatter(spark, df_product)

# Call UDF to format Customer_Name
format_states_udf = udf(format_states, StringType())
df_product = df_product.withColumn("State", format_states_udf("State"))

# Test 2: Cast column to DoubleType
df_product = test_cast_column_to_double(df_product)

# display(df_product)
df_product.show(truncate=False)

+---------------+---------------+------------+---------------------------------------------------------------+------------+-----------------+
|Product_ID     |Category       |Sub_Category|Product_Name                                                   |State       |Price_per_product|
+---------------+---------------+------------+---------------------------------------------------------------+------------+-----------------+
|FUR-CH-10002961|Furniture      |Chairs      |Leather Task Chair, Black                                      |New York    |81.882           |
|TEC-AC-10004659|Technology     |Accessories |Imation Secure+ Hardware Encrypted USB 2.0 Flash Drive; 16GB   |Oklahoma    |72.99            |
|OFF-BI-10002824|Office Supplies|Binders     |Recycled Easel Ring Binders                                    |Colorado    |4.25             |
|OFF-PA-10003349|Office Supplies|Paper       |Xerox 1957                                                     |Florida     |5.184            |
|TEC-A

#Question:1

In [0]:
def test_save_as_table(df_order, df_customer, df_product):    
    # Save dataframes as Delta tables
    df_order.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable("Order")
    df_customer.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable("Customer")
    df_product.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable("Product")
    
    # Check if the tables are created
    assert spark._jsparkSession.catalog().tableExists("Order")
    assert spark._jsparkSession.catalog().tableExists("Customer")
    assert spark._jsparkSession.catalog().tableExists("Product")
    
    # Check the count of the tables
    assert spark.sql("SELECT COUNT(*) FROM Order").first()[0] == df_order.count()
    assert spark.sql("SELECT COUNT(*) FROM Customer").first()[0] == df_customer.count()
    assert spark.sql("SELECT COUNT(*) FROM Product").first()[0] == df_product.count()
    
    # Drop the tables
    # spark.sql("DROP TABLE IF EXISTS Order")
    # spark.sql("DROP TABLE IF EXISTS Customer")
    # spark.sql("DROP TABLE IF EXISTS Product")
    
    # Check if the tables are dropped
    # assert not spark._jsparkSession.catalog().tableExists("Order")
    # assert not spark._jsparkSession.catalog().tableExists("Customer")
    # assert not spark._jsparkSession.catalog().tableExists("Product")
    
# Run the test function
test_save_as_table(df_order, df_customer, df_product)


#Question:2

In [0]:
df_enriched_customer_products = spark.sql(
    """
    Select 
    c.Customer_ID
    ,c.Customer_Name
    ,c.email
    ,c.phone
    ,c.address
    ,c.Segment
    ,c.Country
    ,c.City
    ,c.State as Customer_State
    ,c.Postal_Code
    ,c.Region
    ,p.Product_ID
    ,p.Category
    ,p.Sub_Category
    ,p.Product_Name
    ,p.State as Product_State
    ,p.Price_per_product
    -- ,o.Order_ID
    from customer as c
    join `order` as o
    join product as p
    on c.Customer_ID = o.Customer_ID
    and p.Product_ID = o.Product_ID
    """)

# display(df_enriched_customer_products)
df_enriched_customer_products.show(truncate=False)

+-----------+-------------+--------------------------+----------+-------------------------------------------------+-----------+-------------+----------+--------------+-----------+-------+---------------+---------------+------------+-----------------------------------------------------------------------------+--------------+-----------------+
|Customer_ID|Customer_Name|email                     |phone     |address                                          |Segment    |Country      |City      |Customer_State|Postal_Code|Region |Product_ID     |Category       |Sub_Category|Product_Name                                                                 |Product_State |Price_per_product|
+-----------+-------------+--------------------------+----------+-------------------------------------------------+-----------+-------------+----------+--------------+-----------+-------+---------------+---------------+------------+-----------------------------------------------------------------------------+--

In [0]:
def test_enriched_customer_products_table():
    # Save dataframe as Delta table
    df_enriched_customer_products.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable("enriched_customer_products")

    # Test case: check if the table exists
    table_exists = spark._jsparkSession.catalog().tableExists("enriched_customer_products")
    assert table_exists

    # Test case: check if the table has correct schema
    expected_schema = [
        'Customer_ID', 'Customer_Name', 'email', 'phone', 'address', 'Segment', 'Country', 'City', 'Customer_State',
        'Postal_Code', 'Region', 'Product_ID', 'Category', 'Sub_Category', 'Product_Name', 'Product_State',
        'Price_per_product'
    ]
    actual_schema = [field.name for field in spark.catalog.listColumns("enriched_customer_products")]
    assert actual_schema == expected_schema

    # Test case: check if the table has data
    row_count = spark.sql("SELECT COUNT(*) FROM enriched_customer_products").collect()[0][0]
    assert row_count > 0

test_enriched_customer_products_table()


In [0]:
# Duplicates on Product ID was observed
display(spark.sql("""
          select * from product
          where Product_ID='FUR-CH-10002961'
          """))
# duplicates in product id observed

Product_ID,Category,Sub_Category,Product_Name,State,Price_per_product
FUR-CH-10002961,Furniture,Chairs,"Leather Task Chair, Black",New York,81.882
FUR-CH-10002961,Furniture,Chairs,"Leather Task Chair, Black",Pennsylvania,63.686


#Question:3

In [0]:
df_enriched_order_customer_product = spark.sql(
    """
    Select 
    c.Customer_Name
    ,c.Country
    ,Round(o.Profit, 2) as Profit
    ,p.Category
    ,p.Sub_Category
    from `order` as o
    join customer as c
    join product as p
    on c.Customer_ID = o.Customer_ID
    and p.Product_ID = o.Product_ID
    """)
    
# display(df_enriched_order_customer_product)
df_enriched_order_customer_product.show(truncate=False)

+------------------+-------------+------+---------------+------------+
|Customer_Name     |Country      |Profit|Category       |Sub_Category|
+------------------+-------------+------+---------------+------------+
|Jay Kimmel        |United States|63.69 |Furniture      |Chairs      |
|Jay Kimmel        |United States|63.69 |Furniture      |Chairs      |
|Bil Donatelli     |United States|102.19|Technology     |Accessories |
|Laurel Beltran    |United States|-14.92|Office Supplies|Binders     |
|Karl Braun        |United States|5.64  |Office Supplies|Paper       |
|Denny Ordway      |United States|-3.0  |Technology     |Accessories |
|Cassandra Brandow |United States|38.38 |Office Supplies|Binders     |
|Sally Matthias    |United States|5.23  |Office Supplies|Paper       |
|Rick Duston       |United States|5.19  |Furniture      |Furnishings |
|Justin MacKendrick|United States|214.0 |Office Supplies|Storage     |
|Scot Coram        |United States|4.18  |Office Supplies|Storage     |
|Bil O

In [0]:
def test_enriched_order_customer_product_table():
    # Save dataframe as Delta table
    df_enriched_order_customer_product.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable("enriched_order_customer_product")

    # Test case: check if the table exists
    table_exists = spark._jsparkSession.catalog().tableExists("enriched_order_customer_product")
    assert table_exists

    # Test case: check if the table has correct schema
    expected_schema = ["Customer_Name", "Country", "Profit", "Category", "Sub_Category"]
    actual_schema = [field.name for field in spark.catalog.listColumns("enriched_order_customer_product")]
    assert actual_schema == expected_schema

    # Test case: check if the table has data
    row_count = spark.sql("SELECT COUNT(*) FROM enriched_order_customer_product").collect()[0][0]
    assert row_count > 0

test_enriched_order_customer_product_table()


#Question:4

In [0]:
df_profit_by_year_pc_psc_customer = spark.sql("""
                  Select 
                  sum(Profit) as Aggregate_Profit
                  ,year(o.Order_Date) as Year
                  ,p.Category
                  ,p.Sub_Category
                  ,c.Customer_ID
                  ,c.Customer_Name
                  from `order` as o
                  join product as p
                  join customer as c
                  on c.Customer_ID = o.Customer_ID
                  and p.Product_ID = o.Product_ID 
                  group by
                  year(o.Order_Date)
                  ,p.Category
                  ,p.Sub_Category
                  ,c.Customer_ID
                  ,c.Customer_Name
                  order by Year desc
                  """)

# display(df_profit_by_year_pc_psc_customer)
df_profit_by_year_pc_psc_customer.show(truncate=False)

+-----------------+----+---------------+------------+-----------+--------------------+
|Aggregate_Profit |Year|Category       |Sub_Category|Customer_ID|Customer_Name       |
+-----------------+----+---------------+------------+-----------+--------------------+
|133.0            |2017|Furniture      |Furnishings |MH-17290   |Marc Harrigan       |
|99.9012          |2017|Office Supplies|Appliances  |BM-11140   |Becky Martin        |
|5.1042           |2017|Office Supplies|Envelopes   |CC-12220   |Chris Cortes        |
|2.0416           |2017|Office Supplies|Art         |TA-21385   |Tom Ashbrook        |
|11.998           |2017|Technology     |Accessories |HP-14815   |Harold Pawlan       |
|-32.2192         |2017|Furniture      |Bookcases   |DW-13585   |Dorothy Wardle      |
|107.9892         |2017|Technology     |Accessories |CS-12355   |Christine Sundaresam|
|268.347          |2017|Office Supplies|Paper       |TH-21550   |Tracy Hopkins       |
|170.2833         |2017|Office Supplies|Sto

In [0]:
def test_profit_by_year_pc_psc_customer_table():
    # Save dataframe as Delta table
    df_profit_by_year_pc_psc_customer.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable("profit_by_year_pc_psc_customer")

    # Test case: check if the table exists
    table_exists = spark._jsparkSession.catalog().tableExists("profit_by_year_pc_psc_customer")
    assert table_exists

    # Test case: check if the table has correct schema
    expected_schema = ["Aggregate_Profit", "Year", "Category", "Sub_Category", "Customer_ID", "Customer_Name"]
    actual_schema = [field.name for field in spark.catalog.listColumns("profit_by_year_pc_psc_customer")]
    assert actual_schema == expected_schema

    # Test case: check if the table has data
    row_count = spark.sql("SELECT COUNT(*) FROM profit_by_year_pc_psc_customer").collect()[0][0]
    assert row_count > 0

test_profit_by_year_pc_psc_customer_table()

#Question:5a

In [0]:
def test_profit_by_year():
    # Run SQL Query
    df_profit_by_year = spark.sql(
      """
      Select 
      sum(Aggregate_Profit) as Profit_By_Year
      ,Year
      from profit_by_year_pc_psc_customer
      group by
      Year
      order by Year desc
      """)
    # display(df_profit_by_year)
    df_profit_by_year.show(truncate=False)

    # Test case: check if the dataframe has correct schema
    expected_schema = ["Profit_By_Year", "Year"]
    actual_schema = [field for field in df_profit_by_year.columns]
    assert actual_schema == expected_schema

    # Test case: check if the dataframe has data
    assert df_profit_by_year.select('Year').distinct().count() == spark.sql("Select count(distinct year(Order_Date)) from order").collect()[0][0]

test_profit_by_year()

+------------------+----+
|Profit_By_Year    |Year|
+------------------+----+
|127175.11320000011|2017|
|68161.4049000001  |2016|
|65706.34270000012 |2015|
|40975.45719999994 |2014|
+------------------+----+



#Question:5b

In [0]:


def test_profit_by_year_cat():
    # Run SQL Query
    df_profit_by_year_cat = spark.sql(
      """
      Select 
      sum(Aggregate_Profit) as Profit_By_Year_And_Prd_Cat
      ,Year
      ,Category as Product_Category
      from profit_by_year_pc_psc_customer
      group by
      Year
      ,Product_Category
      order by Year desc
      """)
    # display(df_profit_by_year_cat)
    df_profit_by_year_cat.show(truncate=False)

    # Test case: check if the dataframe has correct schema
    expected_schema = ["Profit_By_Year_And_Prd_Cat", "Year", "Product_Category"]
    actual_schema = [field for field in df_profit_by_year_cat.columns]
    assert actual_schema == expected_schema

    # Test case: check if the dataframe has data
    assert df_profit_by_year_cat.select('Year','Product_Category').distinct().count() == spark.sql("Select count(distinct Year, Category) from profit_by_year_pc_psc_customer").collect()[0][0]

test_profit_by_year_cat()

+--------------------------+----+----------------+
|Profit_By_Year_And_Prd_Cat|Year|Product_Category|
+--------------------------+----+----------------+
|45330.59050000009         |2017|Office Supplies |
|78482.83109999997         |2017|Technology      |
|3361.6915999999983        |2017|Furniture       |
|24437.399599999986        |2016|Technology      |
|7750.212200000004         |2016|Furniture       |
|35973.79310000002         |2016|Office Supplies |
|25490.43370000002         |2015|Office Supplies |
|3392.1574                 |2015|Furniture       |
|36823.75160000004         |2015|Technology      |
|23486.18540000001         |2014|Technology      |
|-5174.653799999998        |2014|Furniture       |
|22663.92559999998         |2014|Office Supplies |
+--------------------------+----+----------------+



#Question:5c

In [0]:
def test_profit_by_customer():
    # Run SQL Query
    df_profit_by_customer = spark.sql(
      """
      Select 
      sum(Aggregate_Profit) as Profit_By_Customer
      ,Customer_ID
      ,Customer_Name
      from profit_by_year_pc_psc_customer
      group by
      Customer_ID
      ,Customer_Name
      order by Customer_ID
      """)
    # display(df_profit_by_customer)
    df_profit_by_customer.show(truncate=False)

    # Test case: check if the dataframe has correct schema
    expected_schema = ["Profit_By_Customer", "Customer_ID", "Customer_Name"]
    actual_schema = [field for field in df_profit_by_customer.columns]
    assert actual_schema == expected_schema

    # Test case: check if the dataframe has data
    assert df_profit_by_customer.select('Customer_ID').distinct().count() == spark.sql("Select count(distinct Customer_ID) from profit_by_year_pc_psc_customer").collect()[0][0]

test_profit_by_customer()

+-------------------+-----------+--------------------+
|Profit_By_Customer |Customer_ID|Customer_Name       |
+-------------------+-----------+--------------------+
|-273.40890000000013|AA-10315   |Alex Avila          |
|277.3824           |AA-10375   |Allen Armold        |
|445.96940000000006 |AA-10480   |Andrew Allen        |
|807.8329           |AA-10645   |Anna Andreadi       |
|129.6821           |AB-10015   |Aaron Bergman       |
|2047.8491000000001 |AB-10060   |Adam Bellavance     |
|5483.749           |AB-10105   |Adrian Barton       |
|320.6815           |AB-10150   |Aimee Bixby         |
|215.36410000000004 |AB-10165   |Alan Barnes         |
|264.56749999999994 |AB-10255   |Alejandro Ballentine|
|-275.286           |AB-10600   |Ann Blume           |
|-62.1342           |AC-10420   |Alyssa Crouse       |
|1730.0927000000001 |AC-10450   |Amy Cox             |
|298.61330000000004 |AC-10615   |Ann Chong           |
|-28.700399999999995|AC-10660   |Anna Chung          |
|1869.9293

#Question:5d

In [0]:
def test_profit_by_year_customer():
    # Run SQL Query
    df_profit_by_year_customer = spark.sql(
      """
      Select 
      sum(Aggregate_Profit) as Profit_By_Year_And_Customer
      ,Year
      ,Customer_ID
      ,Customer_Name
      from profit_by_year_pc_psc_customer
      group by
      Year
      ,Customer_ID
      ,Customer_Name
      order by Year desc
      """)
    # display(df_profit_by_year_customer)
    df_profit_by_year_customer.show(truncate=False)

    # Test case: check if the dataframe has correct schema
    expected_schema = ["Profit_By_Year_And_Customer", "Year", "Customer_ID", "Customer_Name"]
    actual_schema = [field for field in df_profit_by_year_customer.columns]
    assert actual_schema == expected_schema

    # Test case: check if the dataframe has data
    assert df_profit_by_year_customer.select('Year', 'Customer_ID').distinct().count() == spark.sql("Select count(distinct Year, Customer_ID) from profit_by_year_pc_psc_customer").collect()[0][0]

test_profit_by_year_customer()

+---------------------------+----+-----------+------------------+
|Profit_By_Year_And_Customer|Year|Customer_ID|Customer_Name     |
+---------------------------+----+-----------+------------------+
|10.4754                    |2017|EM-13810   |NULL              |
|33.7788                    |2017|CC-12610   |Corey Catlett     |
|221.47639999999998         |2017|MK-18160   |Mike Kennedy      |
|74.228                     |2017|CB-12415   |Christy Brittain  |
|108.05359999999999         |2017|MH-17620   |Matt Hagelstein   |
|397.7298                   |2017|MG-18145   |Mike Gockenbach   |
|1210.7213                  |2017|KF-16285   |Karen Ferguson    |
|36.9728                    |2017|JF-15490   |Jeremy Farry      |
|423.7296                   |2017|EJ-13720   |Ed Jacobs         |
|56.8938                    |2017|EP-13915   |Emily Phan        |
|196.45880000000002         |2017|VP-21730   |Victor Preis      |
|-83.4279                   |2017|SC-20770   |Stewart Carmichael|
|164.91330

In [0]:
# spark.stop()