# Python - Getting Started with Spark

As part of this module we will take a simple use case and try to scratch the surface of the Spark. We will be using simple use case to demontrate end to end Data Engineering Pipeline.

* Understand Data Model
* Define Problem Statement
* Creating Spark Context
* Setting Run Time Job Properties
* Reading data from CSV Files
* Apply Filtering
* Row Level Transformations
* Perform Joins
* Aggregate Data
* Perform Sorting
* Write output to Files
* Complete Script
* Validating Output

## Understand Data Model

Let us understand the data model and also characteristics of the data.

* Base directory for **retail_db** data sets is **/public/retail_db**.
* It have six folders, each folder represents a separate table.
  * Product Catalog Tables
    * products
    * categories
    * departments
  * Customers Table
    * customers
  * Transactional Tables
    * orders
    * order_items
* **orders** and **order_items** are related. **orders** is parent table and **order_items** is child table for orders.
* All folders have one ore more files under them.
* Each line represents a record and have values related to multiple columns. Each record is delimited or separated by **comma (,)**.
* First field in each orders record is order_id and it is a primary key (unique and not null)
* Second field in each order_items record is order_item_order_id which is a foreign key attribute to orders order_id.
* There are other relationships as well, however they are not relevant to get started. We will primarily focus on orders and order_items data.

## Define Problem Statement

Get monthly revenue considering complete or closed orders

* We will use orders and order_items data.
* **orders** is available at **/public/retail_db/orders**
* **order_items** is available at **/public/retail_db/order_items**
* We need to consider orders with COMPLETE or CLOSED status.
* Revenue can be computed using **order_item_subtotal** which is 5th attribute in order_items.

## Creating Spark Context

Let us understand how to create Spark Context using `SparkSession` from `pyspark.sql`.

* We need to have spark context to leverage both APIs as well as distributed computing framework.
* `SparkSession` is a wrapper class which will use existing Spark Context or create new one.
* We can customize the behavior of Spark Context created by passing properties using `config` or by using APIs such as `appName`, `master` etc.
* APIs are provided only for most commonly used properties.

In [3]:
import findspark as fs
import os
fs.init()
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession. \
    builder. \
    appName('Getting Started - Monthly Revenue'). \
    master('local'). \
    getOrCreate()

In [6]:
spark

## Setting Run Time Job Properties

Let us understand how to customize run time behavior of submitted jobs.

* Once Spark Context is created, we can customize run time behavior by using `spark.conf.set`. 
* In our case let us set a property called as `spark.sql.shuffle.partitions` to 2.
* If we do not set this property, by default it will use 200 threads.

In [7]:
spark.conf.set('spark.sql.shuffle.partitions', '2')

* When using Jupyter Notebook, if you want to improvise the readability of the data of the show command then you can set `spark.sql.repl.eagerEval.enabled` to `True`.

In [8]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

## Reading data from CSV Files

Let us quickly see how we can read data from CSV Files.

* Spark provide several APIs to read the files of different file formats.
* All the out of the box APIs are available under `spark.read`.
* In our case we have to read text files where each record is delimited or separated by comma (',').
* To create Data Frames for `orders` and `order_items` we can pass the path to `spark.read.csv`.
* There are other options as well which can be passed using keyword arguments. You can run help on `spark.read.csv`.

In [9]:
spark.read.csv?

[1;31mSignature:[0m [0mspark[0m[1;33m.[0m[0mread[0m[1;33m.[0m[0mcsv[0m[1;33m([0m[0mpath[0m[1;33m,[0m [0mschema[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0msep[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mencoding[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mquote[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mescape[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mcomment[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mheader[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0minferSchema[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mignoreLeadingWhiteSpace[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mignoreTrailingWhiteSpace[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mnullValue[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mnanValue[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mpositiveInf[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mnegativeInf[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mdateFormat[0m[1;33m=[0m[0mNone[0m[1;33m,[0m [0mtimestampFormat[0m[1;33m=[0m[0mNone[0m[1;33m,[0

In [7]:
# Reading orders
orders_path = '/public/retail_db/orders'
orders = spark. \
    read. \
    csv(orders_path, 
        schema="order_id INT, order_date STRING, " +
               "order_customer_id INT, order_status STRING"
       )

In [10]:
# Reading orders
orders_path = "D://Bigdata Tutorials//data//retail_db//orders"
orders = spark. \
    read. \
    csv(orders_path, 
        schema="order_id INT, order_date STRING, " +
               "order_customer_id INT, order_status STRING"
       )

In [11]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [12]:
orders.show()

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|
|       6|2013-07-25 00:00:...|             7130|       COMPLETE|
|       7|2013-07-25 00:00:...|             4530|       COMPLETE|
|       8|2013-07-25 00:00:...|             2911|     PROCESSING|
|       9|2013-07-25 00:00:...|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|             1837|         CLOSED|
|      13|

* We can also set `spark.sql.repl.eagerEval.enabled` to `True` and then just run Data Frame name as part of Jupyter Notebook Cell to preview the data in the Data Frame.

In [13]:
orders

order_id,order_date,order_customer_id,order_status
1,2013-07-25 00:00:...,11599,CLOSED
2,2013-07-25 00:00:...,256,PENDING_PAYMENT
3,2013-07-25 00:00:...,12111,COMPLETE
4,2013-07-25 00:00:...,8827,CLOSED
5,2013-07-25 00:00:...,11318,COMPLETE
6,2013-07-25 00:00:...,7130,COMPLETE
7,2013-07-25 00:00:...,4530,COMPLETE
8,2013-07-25 00:00:...,2911,PROCESSING
9,2013-07-25 00:00:...,5657,PENDING_PAYMENT
10,2013-07-25 00:00:...,5648,PENDING_PAYMENT


In [40]:
# Reading order_items
order_items_path = '/public/retail_db/order_items'
order_items = spark. \
    read. \
    csv(order_items_path, 
        schema="order_item_id INT, order_item_order_id INT, " +
               "order_item_product_id INT, order_item_quantity INT, " +
               "order_item_subtotal FLOAT, order_item_product_price FLOAT"
       )

In [14]:
# Reading order_items
order_items_path = "D://Bigdata Tutorials//data//retail_db//order_items"
order_items = spark. \
    read. \
    csv(order_items_path, 
        schema="order_item_id INT, order_item_order_id INT, " +
               "order_item_product_id INT, order_item_quantity INT, " +
               "order_item_subtotal FLOAT, order_item_product_price FLOAT"
       )

In [15]:
order_items.printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_subtotal: float (nullable = true)
 |-- order_item_product_price: float (nullable = true)



In [16]:
order_items.show()

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
|            3|                  2|                  502|                  5|              250.0|                    50.0|
|            4|                  2|                  403|                  1|             129.99|                  129.99|
|            5|                  4|                  897|                  2|              49.98|                   24.99|
|            6| 

## Apply Filtering

Let us see how we can filter out records in Data Frame.
* We can either use `filter` or `where` to filter the data. Both of them serve the same purpose.
* We can pass the condictions either in SQL Style or API Style.
* In this case, we have used SQL Style to check `order_status` for `COMPLETE` or `CLOSED` orders.
* We can perform all standard filtering conditions.

In [None]:
orders_filtered = orders. \
    filter('order_status in ("COMPLETE", "CLOSED")')

In [None]:
orders_filtered.show()

* Here is an example for API Style.

In [None]:
orders. \
    filter(orders.order_status.isin('COMPLETE', 'CLOSED')). \
    show()

## Row Level Transformations

Let us see how we can project and also derive new fields out of existing fields leveraging functions.

* One of the ways to project data is by using `select` on top of Data Frame.
* Spark provides almost 300 pre defined functions as part of `pyspark.sql.functions`.
* In our case we need to import and use `date_format` function to extract month from existing date.
* Later we will also import and use functions such as `sum` and `round` while aggregating the data.
* We can also provide meaningful names to derived fields using `alias`.

In [None]:
from pyspark.sql.functions import date_format

orders_transformed = orders_filtered. \
    select('order_id', date_format('order_date', 'yyyyMM').alias('order_month'))

In [None]:
orders_transformed.show()

## Perform Joins

Let us join both the data sets which have the fields we are interested in. 

* We can join data sets using `join`.
* We also might have to pass join condition in case the column names are different between the data sets.
* In our case we have to join `orders` and `order_items` using `orders.order_id` and `order_items.order_item_order_id`.

We can join original Data Frames as well and generate order_month while grouping the data as demonstrated in the **Complete Script** Section.

In [None]:
order_details_by_month = orders_transformed. \
    join(order_items, 
         orders.order_id == order_items.order_item_order_id
        )

In [None]:
order_details_by_month.show()

## Aggregate Data

As we have joined `orders` and `order_items`, let us perform the aggregation.
* In this case want to compute revenue for each month.
* `order_month` is derived field which contain both year and month.
* We can use `order_month` as part of `groupBy` so that data can be grouped. It will generate a special Data Frame of type `GroupedData`.

In [None]:
order_details_by_month. \
    groupBy('order_month')

* We can now invoke aggregate functions such as `sum` and pass the desired field using which we want to aggregate (in this case we can pass `order_item_subtotal` to `sum`).

In [None]:
from pyspark.sql.functions import sum, round

monthly_revenue = order_details_by_month. \
    groupBy('order_month'). \
    agg(round(sum('order_item_subtotal'), 2).alias('revenue'))

In [None]:
monthly_revenue.show()

## Perform Sorting

As we got the revenue for each month, let us sort the data so that we can review the output for the validation.

* We can use `orderBy` or `sort` to sort the data.
* By default data will be sorted in ascending order.
* In this case we are sorting the data by `order_month`.

In [None]:
monthly_revenue_sorted = monthly_revenue. \
    orderBy('order_month')

In [None]:
monthly_revenue_sorted.show()

## Write output to Files

As data is read, processed and sorted - now it is time to write data to files in underlying file system.

* In our environment **/public** is read only folder. You will not be able to add files under subdirectories of **/public**.
* Assuming you have write access to **/user/[OS_USER_NAME]**, I have used **/user/{username}/retail_db/monthly_revenue** as target folder.
* `{username}` is replaced by the OS user used for login using `getpass.getuser()`.
* `coalesce(1)` is used to write the output to one file.
* If the folder and files already exists, `mode('overwrite')` will replace existing folder with new files.

In [None]:
import getpass
username = getpass.getuser()

monthly_revenue_sorted. \
    coalesce(1). \
    write. \
    mode('overwrite'). \
    csv(f'/user/{username}/retail_db/monthly_revenue',
        header=True
       )

## Complete Script

Here is the complete script or program which takes care of the following:

* Create Spark Context and set the properties.
* Read the data related to different tables.
* Process the data using relevant Spark Data Frame APIs.
* Write the data back to file system

Entire Data processing and writing the data back to file system is developed using Piped approach.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, sum, round

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    appName('Getting Started - Monthly Revenue'). \
    master('yarn'). \
    getOrCreate()

spark.conf.set('spark.sql.shuffle.partitions', '2')

# Reading orders
orders_path = '/public/retail_db/orders'
orders = spark. \
    read. \
    csv(orders_path, 
        schema="order_id INT, order_date STRING, " +
               "order_customer_id INT, order_status STRING"
       )

# Reading order_items
order_items_path = '/public/retail_db/order_items'
order_items = spark. \
    read. \
    csv(order_items_path, 
        schema="order_item_id INT, order_item_order_id INT, " +
               "order_item_product_id INT, order_item_quantity INT, " +
               "order_item_subtotal FLOAT, order_item_product_price FLOAT"
       )

orders. \
    filter('order_status in ("COMPLETE", "CLOSED")'). \
    join(order_items, orders.order_id == order_items.order_item_order_id). \
    groupBy(date_format('order_date', 'yyyyMM').alias('order_month')). \
    agg(round(sum('order_item_subtotal'), 2).alias('revenue')). \
    orderBy('order_month'). \
    coalesce(1). \
    write. \
    mode('overwrite'). \
    csv(f'/user/{username}/retail_db/monthly_revenue',
        header=True
       )

In [None]:
spark

## Validating Output

Let us go ahead and validate the output.

* In our case we are using HDFS and hence we should be able to use HDFS commands to validate.
* Let us first list the files which will give some idea about when they are created.
* For some file formats, we will also see extension as well as compression algorithm used.

In [None]:
%%sh

hdfs dfs -ls /user/`whoami`/retail_db/monthly_revenue/part*

* In case of small text files we can use `cat` to see the contents. It might not work if the files are compressed.
* Also, it is not a good practice to use `cat` for larger text files.

In [None]:
%%sh

hdfs dfs -cat /user/`whoami`/retail_db/monthly_revenue/part*