In [94]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Demo').getOrCreate()

In [95]:
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql import functions as func

In [96]:
currency  = 'dim_currency.csv'
customers = 'dim_customer.csv'
dates = 'dim_date.csv'
product = 'dim_product.csv'
intsales = 'fact_internet_sales.csv'

In [98]:
df = spark.read.csv('fact_internet_sales.csv', header=True, inferSchema=True)
df.show(2)

+----------+------------+-----------+-----------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+
|ProductKey|OrderDateKey|CustomerKey|CurrencyKey|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|SalesAmount|  TaxAmt|Freight|
+----------+------------+-----------+-----------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+
|       310|    20101229|      21768|         19|            1|  3578.27|       3578.27|                 0.0|           0.0|          2171.2942|       2171.2942|    3578.27|286.2616|89.4568|
|       346|    20101229|      28389|         39|            1|  3399.99|       3399.99|                 0.0|           0.0|          1912.1544|       1912.1544|    3399.99|271.9992|84.9998|
+----------+------------+-----------+--------

In [99]:
dfcurrency = spark.read.csv(currency, header=True, inferSchema=True)
dfcustomers = spark.read.csv(customers, header=True, inferSchema=True)
dfdates = spark.read.csv(dates, header=True, inferSchema=True)
dfproducts = spark.read.csv(product, header=True, inferSchema=True)
dfsales = spark.read.csv(intsales, header=True, inferSchema=True)

In [100]:
dfcurrency.show(2)
dfcustomers.show(2)
dfdates.show(2)
dfproducts.show(2)
dfsales.show(2)

+-----------+--------------------+--------------+
|CurrencyKey|CurrencyAlternateKey|  CurrencyName|
+-----------+--------------------+--------------+
|          1|                 AFA|       Afghani|
|          2|                 DZD|Algerian Dinar|
+-----------+--------------------+--------------+
only showing top 2 rows

+-----------+---------+----------+--------+---------+----------+-------------+------+------+--------------------+------------+-------------+---------------+------------+-------------------+
|CustomerKey|FirstName|MiddleName|LastName|NameStyle| BirthDate|MaritalStatus|Suffix|Gender|        EmailAddress|YearlyIncome|TotalChildren|   AddressLine1|AddressLine2|              Phone|
+-----------+---------+----------+--------+---------+----------+-------------+------+------+--------------------+------------+-------------+---------------+------------+-------------------+
|      11000|      Jon|         V|    Yang|    false|1971-10-06|            M|  NULL|     M|jon24@adventu

#### **Date regulation**

In [101]:
dfdates = dfdates.filter(col('EnglishDayNameOfWeek')=='Sunday')

#### **Product Regulation**
Include only products where:- `Color` is **Silver**.
- Has **Size** information.
- `Weight` is greater than **10**.

In [102]:
dfproducts = dfproducts.filter((col('Color') == 'Silver') & (col('Size').isNotNull()) & (col('Weight')>10))
dfproducts.show(10)

+----------+-------------------+--------------------+--------------------+------+---------+----+------+-----------------+-------+
|ProductKey|ProductAlternateKey|  EnglishProductName|  SpanishProductName| Color|ListPrice|Size|Weight|DaysToManufacture| Status|
+----------+-------------------+--------------------+--------------------+------+---------+----+------+-----------------+-------+
|       344|         BK-M82S-38|Mountain-100 Silv...|Montaña: 100, pla...|Silver|  3399.99|  38| 20.35|                4|   NULL|
|       345|         BK-M82S-42|Mountain-100 Silv...|Montaña: 100, pla...|Silver|  3399.99|  42| 20.77|                4|   NULL|
|       346|         BK-M82S-44|Mountain-100 Silv...|Montaña: 100, pla...|Silver|  3399.99|  44| 21.13|                4|   NULL|
|       347|         BK-M82S-48|Mountain-100 Silv...|Montaña: 100, pla...|Silver|  3399.99|  48| 21.42|                4|   NULL|
|       352|         BK-M68S-38|Mountain-200 Silv...|Montaña: 200, pla...|Silver|2071.4196

#### **Customer regulations**
Include customers who:- Have a `YearlyIncome` greater than **100,000.0**.
- Have more than **1 child**.


In [103]:
dfcustomers.show(2)

+-----------+---------+----------+--------+---------+----------+-------------+------+------+--------------------+------------+-------------+---------------+------------+-------------------+
|CustomerKey|FirstName|MiddleName|LastName|NameStyle| BirthDate|MaritalStatus|Suffix|Gender|        EmailAddress|YearlyIncome|TotalChildren|   AddressLine1|AddressLine2|              Phone|
+-----------+---------+----------+--------+---------+----------+-------------+------+------+--------------------+------------+-------------+---------------+------------+-------------------+
|      11000|      Jon|         V|    Yang|    false|1971-10-06|            M|  NULL|     M|jon24@adventure-w...|     90000.0|            2|3761 N. 14th St|        NULL|1 (11) 500 555-0162|
|      11001|   Eugene|         L|   Huang|    false|1976-05-10|            S|  NULL|     M|eugene10@adventur...|     60000.0|            3|     2243 W St.|        NULL|1 (11) 500 555-0110|
+-----------+---------+----------+--------+-------

In [104]:
dfcustomers = dfcustomers.filter((col('YearlyIncome') >100_000) & (col('TotalChildren') == 1))


#### **Merging data frames**

In [105]:
dff = (dfproducts
          .join(dfsales, on="ProductKey", how="inner")
          .join(dfcustomers, on="CustomerKey", how="inner")
          .join(dfcurrency, on="CurrencyKey", how="inner")
          .join(dfdates, dfsales.OrderDateKey == dfdates.DateKey, how="inner"))

dff.show(2, truncate=False)


+-----------+-----------+----------+-------------------+-----------------------+--------------------------+------+---------+----+------+-----------------+------+------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+---------+----------+--------+---------+----------+-------------+------+------+-----------------------------+------------+-------------+--------------------+------------+-------------------+--------------------+-----------------+--------+--------------------+---------------+--------------------+--------------------+----------------+---------------+----------------+----------------+----------------+-----------------+---------------+------------+----------------+-------------+----------+--------------+
|CurrencyKey|CustomerKey|ProductKey|ProductAlternateKey|EnglishProductName     |SpanishProductName        |Color |ListPrice|Size|Weight|DaysToManufacture|Status|OrderDateKey|Order

#### **Aggregations**

In [112]:
newdff = (dff.groupBy("CustomerKey", "FirstName") 
             .agg(
                 func.sum("TaxAmt").alias("Total TaxAmt"),
                 func.avg("SalesAmount").alias("Average SalesAmount"),
                 func.avg("TotalProductCost").alias("Average TotalProductCost") 
             ))

newdff.show(20, truncate=False)

+-----------+---------+------------+-------------------+------------------------+
|CustomerKey|FirstName|Total TaxAmt|Average SalesAmount|Average TotalProductCost|
+-----------+---------+------------+-------------------+------------------------+
|15587      |Jessica  |165.7136    |2071.4196          |1117.8559               |
|19696      |Adam     |45.1992     |564.99             |308.2179                |
|11290      |Katelyn  |185.5992    |2319.99            |1265.6195               |
|14673      |Willie   |165.7136    |2071.4196          |1117.8559               |
|15586      |Victor   |185.5992    |2319.99            |1265.6195               |
|14433      |Jennifer |185.5992    |2319.99            |1265.6195               |
+-----------+---------+------------+-------------------+------------------------+



In [107]:
finaldf = newdff.orderBy('FirstName', ascending = True)
finaldf.show()


+-----------+---------+------------+-------------------+------------------------+
|CustomerKey|FirstName|Total TaxAmt|Average SalesAmount|Average TotalProductCost|
+-----------+---------+------------+-------------------+------------------------+
|      19696|     Adam|     45.1992|             564.99|                308.2179|
|      14433| Jennifer|    185.5992|            2319.99|               1265.6195|
|      15587|  Jessica|    165.7136|          2071.4196|               1117.8559|
|      11290|  Katelyn|    185.5992|            2319.99|               1265.6195|
|      15586|   Victor|    185.5992|            2319.99|               1265.6195|
|      14673|   Willie|    165.7136|          2071.4196|               1117.8559|
+-----------+---------+------------+-------------------+------------------------+



#### **Data Presentation**

In [108]:
finaldf = finaldf.drop('CustomerKey')
finaldf.show()

+---------+------------+-------------------+------------------------+
|FirstName|Total TaxAmt|Average SalesAmount|Average TotalProductCost|
+---------+------------+-------------------+------------------------+
|     Adam|     45.1992|             564.99|                308.2179|
| Jennifer|    185.5992|            2319.99|               1265.6195|
|  Jessica|    165.7136|          2071.4196|               1117.8559|
|  Katelyn|    185.5992|            2319.99|               1265.6195|
|   Victor|    185.5992|            2319.99|               1265.6195|
|   Willie|    165.7136|          2071.4196|               1117.8559|
+---------+------------+-------------------+------------------------+

