# Question 2 - Pyspark

### Spark Environment Check
Run below code to check if your intial environment configuration using `bash set_local_env.bash` worked

In [1]:
import findspark
findspark.init()
import pyspark
import random
sc = pyspark.SparkContext(appName="Pi")
num_samples = 100000000
try:
  def inside(p):     
    x, y = random.random(), random.random()
    return x*x + y*y < 1
  count = sc.parallelize(range(0, num_samples)).filter(inside).count()
  pi = 4 * count / num_samples
  print(pi)
finally:
  sc.stop()

your 131072x1 screen size is bogus. expect trouble
23/12/27 21:34:33 WARN Utils: Your hostname, D275063 resolves to a loopback address: 127.0.1.1; using 172.22.196.0 instead (on interface eth0)
23/12/27 21:34:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/27 21:34:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

3.141315


#### Scenario
A 70 years old and very oddly placed [silo](https://en.wikipedia.org/wiki/Silo) in a refinery circuit is void of digital weight / volume gauge sensors and needs an operator to climb up and [measure the silo content manually](https://github.com/adilBaigSCE/SCE-Innovation-Lab/blob/main/assets/data/silo_actuals.csv) (both for weight and volume). 

![refinary_silo.png](assets/images/refinary_silo.png)

Due to limited resources and ever increasing higher priority concerns in the plant, this manual measurement takes place randomly as and how it permits to refinery operation. There is a [historical average from past data](assets/data/historical_averages.csv) of how much product is available in the silo any given day of the week. Also the Refinary operation's Inventory Reconcilation team's "Weekly Team Huddle" meeting is every thursday, that report "weekly inventory" on a custom week scale _(last thursday to this wednesday)_
Using these two data points, and using pyspark, print a monthly report that reads exactly like the [output_reference.csv](https://github.com/adilBaigSCE/SCE-Innovation-Lab/blob/main/assets/data/output_reference.csv):

#### Available Data below

##### Actual Silo Readings:

In [2]:
import pandas as pd
df = pd.read_csv("assets/data/silo_actuals.csv", sep=",", keep_default_na=False)
df.head(31)

Unnamed: 0,date,silo_wt_in_tons
0,6/1/2023,70
1,6/5/2023,53
2,6/6/2023,62
3,6/10/2023,43
4,6/14/2023,66
5,6/21/2023,78
6,6/24/2023,41
7,6/25/2023,35
8,6/30/2023,83


Reference Tons from historical daily:

In [3]:
df = pd.read_csv("assets/data/historical_averages.csv", sep=",", keep_default_na=False)
df.head(31)

Unnamed: 0,day,average_tons
0,Monday,50
1,Tuesday,65
2,Wednesday,70
3,Thursday,80
4,Friday,65
5,Saturday,45
6,Sunday,40


Write a pyspark code, to generate output report table __for the entire month of June, 2023__ that has following columns _with these coding restrictions_:
* __silo_wt_in_tons__ : 
    * use actual measured tons where available
    * fall back to historical average on dates its not available
    * <em>SPARK CHALLENGE: DO NOT use sequential for loop to back fill dates between <strong>6/1/2023</strong> through <strong>6/30/2023</strong></em>
* __weekly_total_tons__ :
    * use a custom week starting from thursday ending in wednesday next week
    * report total sum of tons against last day of the week i.e. wednesday
    * weekly total can cross monthly boundaries, i.e. span across two months.
    * <em>SPARK CHALLENGE: DO NOT use seperate dataset to calculate groupBy sum fo tons for the week and then join that in</em>
* __mtd_running_total_tons__ :
    * A running total of tons each day in a monthly window
* __monthly_grand_total__ :
    * the total tons incoming per month.
* <strong><em>CODING PARADIGM</em></strong>
    * As these are small dataset, do not worry about skewing or partitioning best practice for now
    * A production ready code with good documentation and comments would score exrta points
    * Write clean [pythonic code](https://docs.python-guide.org/writing/style/) as much as possible
    * OPTIONAL: Fully endorse [functional programming](https://realpython.com/python-functional-programming/) for seperating of concern if you can
    * AVOID USING UDF FUNCTION FOR ANY CACULATIONS
    * Use the new spark 3.4.* [pyspark.sql.DataFrame.transform](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.transform.html#pyspark.sql.DataFrame.transform) to write cleaner code


The output should look something like this:

In [4]:
df = pd.read_csv("assets/data/output_reference.csv", sep=",", keep_default_na=False)
# shows top 10 rows
df.head(32)

Unnamed: 0,date,silo_wt_in_tons,weekly_total_tons,mtd_running_total_tons,monthly_grand_total
0,6/1/2023,70,,70,
1,6/2/2023,65,,135,
2,6/3/2023,45,,180,
3,6/4/2023,40,,220,
4,6/5/2023,53,,273,
5,6/6/2023,62,,335,
6,6/7/2023,70,405.0,405,
7,6/8/2023,80,,485,
8,6/9/2023,65,,550,
9,6/10/2023,43,,593,


### **************** SOLUTION *******************

##### ETL Logic

In [1]:
import utils as U, traceback
with U.SparkApplication("Silo_Report") as _spark:
    try:
        df_historical, df_actual = _spark.get_dataframes()
        df_backfilled_dates = _spark.backfill_dates(df_actual)
        df_final = (
            df_backfilled_dates
            .transform(_spark.get_date_related_columns)
            .transform(_spark.merge_historical_and_actual, df_historical, df_actual)
            .transform(_spark.get_silo_wt_in_tons)
            .transform(_spark.reassign_week_no)
            .transform(_spark.get_weekly_total_tons)
            .transform(_spark.get_monthly_report_columns)
            .transform(_spark.select_column_order)
        )
        write_loc = "assets/data/output_actual"
        _spark.write_to_filesystem(df_final, write_loc)
        _spark.print_dataframe(write_loc)
        
    except Exception as e:
        traceback.print_exception(type(e), e, e.__traceback__)

your 131072x1 screen size is bogus. expect trouble
23/06/27 15:01:00 WARN Utils: Your hostname, D275063 resolves to a loopback address: 127.0.1.1; using 172.27.166.31 instead (on interface eth0)
23/06/27 15:01:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/27 15:01:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Unnamed: 0,date,silo_wt_in_tons,weekly_total_tons,mtd_running_total_tons,monthly_grand_total
0,2023-06-01,70,,70.0,
1,2023-06-02,65,,135.0,
2,2023-06-03,45,,180.0,
3,2023-06-04,40,,220.0,
4,2023-06-05,53,,273.0,
5,2023-06-06,62,,335.0,
6,2023-06-07,70,405.0,405.0,
7,2023-06-08,80,,485.0,
8,2023-06-09,65,,550.0,
9,2023-06-10,43,,593.0,
