### PySpark ETL
This example will show how to use SparkSql to perform ETL on a set of relational tables
<br>
<br>
**Tables**:
1. Customer: Information such as State, Gender and Customer ID
2. Product: List of SKUs the retail chain s ells
3. Transaction: Point of Sale (POS) data for customers

### Create a Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, regexp_replace, udf
from pyspark.sql.types import DoubleType

#Create Spark Session
spark=SparkSession.builder.appName("simple").getOrCreate()

### Load in Files from hdfs

In [3]:
#Read in Customer file from HDFS
Customer = spark.read.csv('hdfs://localhost:54310/user/andrew/data/relational_example/Customer.csv', header=True)

#Register for SQL Use
Customer.createOrReplaceTempView("Customer")

#See Head
Customer.show(10)

+----------+-----+------+-----------+--------------------+----------+--------------------+--------------+
|CustomerID|State|Gender|  FirstName|            LastName|Birth_Date|             Address|Member_Type_ID|
+----------+-----+------+-----------+--------------------+----------+--------------------+--------------+
|         1|   KY|     M|     Albert|              Collet| 24Nov1940|Square Edouard Vii 1|          2030|
|         2|   ME|     F|   Mercedes|            Mart�nez| 15Jan1955|          Edificio 2|          1010|
|         3|   IN|     M|Pier Egidio|              Boeris| 01Jul1970|Via M. Di Monteso...|          1040|
|         4|   NY|     M|      James|             Kvarniq| 27Jun1970|      4382 Gralyn Rd|          1020|
|         5|   NY|     F|   Sandrina|            Stephano| 09Jul1975|    6468 Cog Hill Ct|          2020|
|         6|   OH|     M|       Rent|            Van Lint| 23Dec1945|         Mispadstr 2|          1030|
|         7|   ME|     F|     Juli�n|Escorihue

In [4]:
#Read in Productfile from HDFS
Product = spark.read.csv('hdfs://localhost:54310/user/andrew/data/relational_example/Product.csv', header=True)


#Strip dollar signs to make it numeric
strip_dollar=udf(lambda s: s.replace("$", ""))
strip_comma=udf(lambda s: s.replace(",", ""))

Product_2=Product.withColumn("Amount", strip_dollar("Amount"))
Product_3=Product_2.withColumn("Amount", strip_comma("Amount"))
Product_4=Product_3.withColumn("Amount", Product_3["Amount"].cast("double"))

#Register for SQL Use
Product_4.createOrReplaceTempView("Product")

#See head
Product_4.show(10)

+--------+--------------------+-----------+------+
|itemcode|                item|   category|Amount|
+--------+--------------------+-----------+------+
|     111|             SEP IRA| Retirement| 550.0|
|     112|              Keough| Retirement| 325.0|
|     113|          Simple IRA| Retirement| 125.0|
|     114|         US Equities|Mutual Fund|   2.8|
|     115|International Equ...|Mutual Fund| 325.0|
|     116|      Lifestyle Fund|Mutual Fund| 245.0|
|     117|        Money Market|Mutual Fund| 268.0|
|     118|Mutual Fund - US ...|Mutual Fund|  75.0|
|     121|    Asset Allocation|Mutual Fund| 850.0|
|     122|        Fixed Income|Mutual Fund| 268.0|
+--------+--------------------+-----------+------+
only showing top 10 rows



In [16]:
#See the data typs
Product_4.describe().show()
Product_4.dtypes

#Alternative syntax to replace
#Product4=Product.withColumn("category", regexp_replace(Product["category"],"Retir","abc"))
#Product4.show()

+-------+-----------------+------------------+----------+------------------+
|summary|         itemcode|              item|  category|            Amount|
+-------+-----------------+------------------+----------+------------------+
|  count|               34|                34|        34|                34|
|   mean|            196.0|              null|      null| 314.9647058823529|
| stddev|81.54604573955487|              null|      null|321.16340585756666|
|    min|              111|       529 College|   Banking|               2.8|
|    max|              324|Variable Universal|Retirement|            1595.0|
+-------+-----------------+------------------+----------+------------------+



[('itemcode', 'string'), ('item', 'string'), ('category', 'string'), ('Amount', 'double')]

In [6]:
#Read in Transaction file from HDFS
Transaction = spark.read.csv('hdfs://localhost:54310/user/andrew/data/relational_example/Transaction.csv', header=True)

#Register for SQL Use
Transaction.createOrReplaceTempView("Transaction")

Transaction.show(10)

+------------+--------+--------+----------+-----------+---------+
|order_number|quantity|itemcode|order_type|Customer_ID|     date|
+------------+--------+--------+----------+-----------+---------+
|        1002|       1|     324|         1|          2|02Jan2016|
|        1002|       1|     322|         1|          2|02Jan2016|
|        1001|       1|     322|         0|          1|02Jan2016|
|        1003|       1|     324|         1|          3|04Jan2016|
|        1004|       1|     324|         1|          4|04Jan2016|
|        1005|       2|     314|         1|          5|04Jan2016|
|        1006|       1|     314|         0|          6|05Jan2016|
|        1007|       1|     323|         0|          7|05Jan2016|
|        1008|       1|     323|         0|          8|08Jan2016|
|        1009|       4|     221|         1|          9|08Jan2016|
+------------+--------+--------+----------+-----------+---------+
only showing top 10 rows



### See Tables

In [7]:
spark.sql("show tables").show()

+--------+-----------+-----------+
|database|  tableName|isTemporary|
+--------+-----------+-----------+
|        |   customer|       true|
|        |    product|       true|
|        |transaction|       true|
+--------+-----------+-----------+



## Let's Join the Data

In [8]:
#Merge all 3 tables
complete_history = spark.sql('''
                             SELECT t1.*,
                                    t2.*,
                                    t3.item,
                                    t3.category,
                                    t3.Amount
                             FROM customer t1
                             INNER JOIN transaction t2
                             INNER JOIN product t3
                             ON t1.CustomerID=t2.Customer_ID
                             AND t2.itemcode=t3.itemcode''')

complete_history[['CustomerID','Gender','State','date','item','category','quantity','Amount']].show()

+----------+------+-----+---------+--------------------+---------------+--------+------+
|CustomerID|Gender|State|     date|                item|       category|quantity|Amount|
+----------+------+-----+---------+--------------------+---------------+--------+------+
|         1|     M|   KY|17Jan2016|        Fixed Income|    Mutual Fund|       1| 268.0|
|         1|     M|   KY|02Jan2016|           Refinance|         Borrow|       1|1595.0|
|         2|     F|   ME|02Jan2016|           Refinance|         Borrow|       1|1595.0|
|         2|     F|   ME|02Jan2016|       Grow my money|Managed Account|       1|  95.0|
|         3|     M|   IN|11Feb2016|       Grow my money|Managed Account|       3|  95.0|
|         3|     M|   IN|01Feb2016|       Grow my money|Managed Account|       3|  95.0|
|         3|     M|   IN|28Jan2016|             SEP IRA|     Retirement|       1| 550.0|
|         3|     M|   IN|04Jan2016|       Grow my money|Managed Account|       1|  95.0|
|         4|     M|  

In [9]:
complete_history.columns

['CustomerID', 'State', 'Gender', 'FirstName', 'LastName', 'Birth_Date', 'Address', 'Member_Type_ID', 'order_number', 'quantity', 'itemcode', 'order_type', 'Customer_ID', 'date', 'item', 'category', 'Amount']

In [10]:
#Customer Transaction History
customer_history = spark.sql('''
                             SELECT t1.CustomerID,
                                    COUNT(distinct t2.order_number) AS order_count,
                                    COUNT(*)/COUNT(distinct t2.order_number) AS basket_size,
                                    count(t3.Amount) AS total_spend,
                                    SUM(t3.Amount)/COUNT(distinct t2.order_number) AS avg_basket_spend
                             FROM customer t1
                             INNER JOIN transaction t2
                             INNER JOIN product t3
                             ON t1.CustomerID=t2.Customer_ID
                             AND t2.itemcode=t3.itemcode
                             GROUP BY t1.CustomerID''')

customer_history.show()

+----------+-----------+-----------+-----------+----------------+
|CustomerID|order_count|basket_size|total_spend|avg_basket_spend|
+----------+-----------+-----------+-----------+----------------+
|         7|          2|        1.5|          3|           247.5|
|        15|          2|        1.0|          2|            91.0|
|        11|          3|        2.0|          6|           285.0|
|        29|          1|        2.0|          2|           912.0|
|        42|          1|        1.0|          1|           325.0|
|         3|          4|        1.0|          4|          208.75|
|        30|          1|        1.0|          1|          1595.0|
|        34|          1|        2.0|          2|          1448.0|
|         8|          4|       2.25|          9|           299.0|
|        28|          1|        2.0|          2|           144.0|
|        22|          1|        1.0|          1|            17.0|
|        16|          1|        1.0|          1|           165.0|
|        3