# Intro to Data Engineering Final Project

Dataframes: Spark vs Pandas

# Project Objective: Compare Spark and Pandas Toolkits

- What
 -  Pandas: python data analysis library
        - By Wes McKinney to run stats in financial data 
        - Python with lots of C modules, so many ops are “C speed”
        - “just” a fancy python library 
 - Spark Dataframes
     - More robust “big data” platform
- Why
 - When is the overhead and complexity of Spark with it?
     - team dabbling with Spark at work
 - Compare performance and usability with medium data.
 - Hypothesis: Scala Spark Dataframes will returns results noticeably faster. But Pandas will be easier to use.


# Quandl Stock Daily Price Table

 - Big data: “data that is too large or complex to process on a single machine”
 - Medium data: Data that is to big to work with in excel, but can still be processed on a single machine.
 - Quandl Stock Price data
     - Stock prices for each market day with metrics
     - 1.8 GB csv file
     - 15,343,013 rows of data for 3196 stocks starting in 1962
     - https://www.quandl.com/databases/WIKIP?filterSelection=all&keyword=wiki

# Problem: generate report on stock prices

- Open/Close report with parameters
    - Aggregate average open and close price
        - Subset of stocks by ticker (CBG, MSFT, …)
        - Date subset by range (Start date & end date)
        - Group by Year and month (EX: 2018-03)


# Approach: Implement Report in Spark and Pandas

- Hardware
    - Azure B4ms (4vcpus, 16 GB RAM)
    - HP Notebook (i7-2720QM 4cores 8threads, 8 GB RAM)
- Spark (running from Ambri docker container)
    - Import csv data, then process with dataframe api
    - Import csv data, then process with spark sql
        - Can save results in HIVE, json, csv …
- Pandas (notebook)
    - Import csv, and process in dataframe
    - Two implementations 

In [4]:
//import stuff for tests
import org.apache.spark.sql.SparkSession
val ss = SparkSession.
builder().
master("local").
appName("Spark in Motion Example").
config("spark.config.option", "some-value").
enableHiveSupport().
getOrCreate()

import ss.implicits._
import org.apache.spark.sql.functions._
import java.util.Date;

In [20]:
//Test 1. Spark with dataframe API 
val operationStart = new Date();

//Import the dataset from CSV 
val bigstocks = spark.read.option("inferSchema", "true").option("header","true").csv("file:///root/data/stocks/WIKI_PRICES_212b326a081eacca455e13140d7bb9db.csv")

//Create Report in two steps
//1. Add new col for YYYY-MM from date, filter by date range and three stock codes
//2. Group By stock code, year/mo then get avg for open price and close price
val withdates = bigstocks.withColumn("year_month", date_format(col("date"), "YYYY-MM")).filter($"date" >= "2017-01-01"and $"date" <= "2017-06-30").filter(($"ticker" isin ("GOOGL","MSFT","COF")))
val openavg = withdates.groupBy($"ticker", $"year_month").avg("open", "close").orderBy($"ticker", $"year_month")

//print the results
openavg.show(20)

val operationEnd = new Date();
val timeDiff = Math.abs(operationStart.getTime() - operationEnd.getTime());
val timeInSeconds = timeDiff / (1000)
print(timeInSeconds)



+------+----------+------------------+-----------------+
|ticker|year_month|         avg(open)|       avg(close)|
+------+----------+------------------+-----------------+
|   COF|   2017-01|           88.2985|            88.26|
|   COF|   2017-02| 89.85263157894737|90.19578947368421|
|   COF|   2017-03| 89.26782608695652|88.92521739130437|
|   COF|   2017-04| 83.41105263157895|83.23526315789472|
|   COF|   2017-05| 80.64818181818183|80.50863636363636|
|   COF|   2017-06| 80.00666666666666|80.21904761904763|
| GOOGL|   2017-01| 829.8539999999997|830.2495000000001|
| GOOGL|   2017-02| 836.1510526315789|836.7547368421052|
| GOOGL|   2017-03| 853.8582608695652|853.7897826086955|
| GOOGL|   2017-04| 860.0765789473684|861.3776315789474|
| GOOGL|   2017-05|  959.595909090909|961.6545454545453|
| GOOGL|   2017-06| 977.2957142857141|975.4533333333331|
|  MSFT|   2017-01|63.185500000000005|63.19200000000001|
|  MSFT|   2017-02| 64.13447368421052| 64.1136842105263|
|  MSFT|   2017-03| 64.76434782

In [21]:
//Test 2. Spark SQL 
val operationStart = new Date();

//Import the dataset from CSV 
val bigstocks = spark.read.option("inferSchema", "true").option("header","true").csv("file:///root/data/stocks/WIKI_PRICES_212b326a081eacca455e13140d7bb9db.csv")

//Create Report in two steps
//1. Create Temp Table
//2. Run SQL QRY

bigstocks.createOrReplaceTempView("bigstocks2")
val openavgsql = spark.sql("SELECT ticker, TRUNC(date, 'MM'), AVG(open), AVG(close) FROM bigstocks2 WHERE ticker IN ('GOOGL', 'MSFT', 'COF') AND (date BETWEEN '2017-01-01' AND '2017-06-30') GROUP BY ticker, TRUNC(date, 'MM') ORDER BY ticker, TRUNC(date, 'MM')")

//print the results
openavgsql.show(20)

val operationEnd = new Date();
val timeDiff = Math.abs(operationStart.getTime() - operationEnd.getTime());
val timeInSeconds = timeDiff / (1000)
print(timeInSeconds)

+------+-----------------------------+------------------+-----------------+
|ticker|trunc(CAST(date AS DATE), MM)|         avg(open)|       avg(close)|
+------+-----------------------------+------------------+-----------------+
|   COF|                   2017-01-01|           88.2985|            88.26|
|   COF|                   2017-02-01| 89.85263157894737|90.19578947368421|
|   COF|                   2017-03-01| 89.26782608695652|88.92521739130437|
|   COF|                   2017-04-01| 83.41105263157895|83.23526315789472|
|   COF|                   2017-05-01| 80.64818181818183|80.50863636363636|
|   COF|                   2017-06-01| 80.00666666666666|80.21904761904763|
| GOOGL|                   2017-01-01| 829.8539999999997|830.2495000000001|
| GOOGL|                   2017-02-01| 836.1510526315789|836.7547368421052|
| GOOGL|                   2017-03-01| 853.8582608695652|853.7897826086955|
| GOOGL|                   2017-04-01| 860.0765789473684|861.3776315789474|
| GOOGL|    

# Note the results in the Spark UI below
    - Dataframe returned in 86 Seconds
    - SQL returned in 73 Seconds 
    

![title](img/spark_ui_one.png)

# Note if you run the same commands line by line in the spark shell you get diffrent runtimes
    - Dataframe returned in 115 Seconds
    - SQL returned in 80 Seconds
    
    Must be able to run optimization magic under the hood when commands are submitted togther.

![title](img/dataframe1.png)
![title](img/sparksql1.png)

//save results in hive; as new table
// Works fine in the spark shell but throws a "please enable hive support error in notebook". 

spark.sql("show tables").show()

openavgsql.createOrReplaceTempView("tempopenavgsql")

spark.sql("create table openclosesql_3_5_18 as select * from tempopenavgsql");

spark.sql("show tables").show()

# Once More with feeling, I mean Pandas? 

Two Implmentations 
Code here: https://github.com/jmategk0/quandl_reports

Code copied from open_close_script_a.py and open_close_script_b.py

# Implementation One
```python
import pandas as pd
from price_table_columns import (PRICES_COLUMNS_TO_DROP, OPEN_COL, CLOSE_COL, TICKER_COL)

filename = "WIKI_PRICES_212b326a081eacca455e13140d7bb9db.csv"
stock_codes = ["COF", "GOOGL", "MSFT"]
start_date = "2017-01-01"
end_date = "2017-06-30"

raw_df = pd.read_csv(filepath_or_buffer=filename, parse_dates=["date"])
df_filtered_by_code = raw_df[raw_df.ticker.isin(stock_codes)]
df_filtered_by_date = df_filtered_by_code[
    (df_filtered_by_code.date >= start_date) & (df_filtered_by_code.date <= end_date)
    ]
df_filtered_col_drop = df_filtered_by_date.drop(PRICES_COLUMNS_TO_DROP, axis=1)

df_final_report = df_filtered_col_drop.groupby(
    by=[TICKER_COL, df_filtered_col_drop.date.dt.to_period("M")])[CLOSE_COL, OPEN_COL].mean()
print(df_final_report)
# ~50 secs with full data 3/3/2018
```

# Implementation Two
```python
from config import DEFAULT_STOCK_CODES, START_DATE, END_DATE
from quandl_reports import CsvQuandlReport
from pprint import pprint

filename = "WIKI_PRICES_212b326a081eacca455e13140d7bb9db.csv"
report = CsvQuandlReport(
    filename=filename,
    stock_codes=DEFAULT_STOCK_CODES,
    end_date=END_DATE,
    start_date=START_DATE
)
report_results = report.report_average_open_close()
pprint(report_results)
# ~47 secs with full data 3/3/2018
```

# Results: A Case Study in Humility 

- Hypothesis: Scala Spark Dataframes will returns results noticeably faster. But Pandas will be easier to use.
- Totally Wrong
    - Spark Dataframe: ~86 - 115 seconds, 4 lines of code
    - Spark SQL: ~73 - 80 seconds, 4 lines of code
    - Pandas Implementation A: ~50 seconds, 6 lines of code
    - Pandas Implementation B: ~47 seconds, 18 lines of code
- Other Observations
    - Pandas job single threaded (80-100%) and uses ~3.2 GB RAM while running
    - Spark job multi-threaded and will max out all four cores and uses ~8.5* GB RAM all the time
    - Both Spark runs where broken down into 4 jobs, 5 stages, and 270 tasks
    - Same Spark commands run togther from notebook run noticeably faster compared to one by one in shell
    - Pandas seems to in in about the same amout of time everytime. With Spark sometimes it get hung up and takes twice as long.

# Takeaways

- For this specific report 
    - Pandas seems to be noticeably faster
        - Trivial to get up and running with low overhead
    - Spark Scala seems to be easier to read and maintain. Especially, raw Spark SQL.
        - Non-Trivial (painful) to get up an running with high overhead
    - Jump to Scala was less of a barrier then anticipated 

# Issues & Future Work

- Issues
    - DataFrame APIs are not the most intuitive
    - Documentation is mediocre
    - Code could be optimized better (somewhat inexperienced with pandas, scala, and spark)
- Future Work (if time was not an issue)
    - Is Ambari Docker overhead impacting Spark Performance Negatively? Would a bare metal single box install be faster? 
    - Setup pandas code in the docker container; No Python3 :(
    - Why is Spark SQL running so much faster than dataframe api? Spark Magic?
    - Why is batching commands in the notebook faster then the shell?
    - Setup ETL using JDBC drivers to pull data into spark and push data out. HIVE integration is really convenient. 
    - Setup better monitoring and benchmarking tools 
    - Find another dataset and test joins in pandas vs spark. Would speed reults hold up?
    - More datasets to evaluate 
    - Evaluate PySpark as well
