# SETUP

Run these to create the environment.

In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop2.7.tgz
!tar xf spark-3.2.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop2.7"

findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .appName("programming")
        .master("local")
        .config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config('spark.ui.port', '4050')
        .getOrCreate()
)

In [None]:
string_20210609 = '''worked_date,employee_id,delete_flag,hours_worked
2021-06-09,1001,N,7
2021-06-09,1002,N,3.75
2021-06-09,1003,N,7.5
2021-06-09,1004,N,6.25'''

string_20210610 = '''worked_date,employee_id,delete_flag,hours_worked
2021-06-10,1001,N,8
2021-06-10,1002,N,6
2021-06-10,1003,N,1
2021-06-10,1004,N,10'''

string_20210611 = '''worked_date,employee_id,delete_flag,hours_worked
2021-06-11,1001,N,6
2021-06-11,1002,N,7
2021-06-11,1003,N,9
2021-06-11,1004,N,5
2021-06-10,1003,Y,1
2021-06-10,1004,N,8'''

rdd_20210609 = spark.sparkContext.parallelize(string_20210609.split('\n'))
rdd_20210610 = spark.sparkContext.parallelize(string_20210610.split('\n'))
rdd_20210611 = spark.sparkContext.parallelize(string_20210611.split('\n'))

# START HERE

## Data Load

Load each file consecutively, simulating a daily file feed.  Meaning, only one file should be read/loaded at a time because it is an iterative process.

Rules:

- A worker can only work once per day.
- delete_flag = Y means the record should be removed from the table, if exists.


Create the table

In [None]:
# FILES WILL SHOW UP ON THE LEFT UNDER THE FOLDER ICON IF YOU WANT TO BROWSE THEM
OUTPUT_DELTA_PATH = '/content/output/delta/worked_hours'

spark.sql('CREATE DATABASE IF NOT EXISTS EXERCISE')

spark.sql('''
    CREATE TABLE IF NOT EXISTS EXERCISE.WORKED_HOURS(
        worked_date date
        , employee_id int
        , delete_flag string
        , hours_worked double
    ) USING DELTA
    PARTITIONED BY (worked_date)
    LOCATION "{0}"
    '''.format(OUTPUT_DELTA_PATH)
)

DataFrame[]

Load 2021-06-09 dataframe and output table contents

In [None]:
dataframe_20210609 = spark.read.csv(path = rdd_20210609, header=True)
# TODO Load data to table

Load 2021-06-10 dataframe and output table contents

In [None]:
dataframe_20210610 = spark.read.csv(path = rdd_20210610, header=True)
# TODO Load data to table

Load 2021-06-11 dataframe and output table contents

In [None]:
dataframe_20210611 = spark.read.csv(path = rdd_20210611, header=True)
# TODO Load data to table

## Data Analysis

Using the table that was loaded, answer the below questions using either SQL or dataframe API.

1. How many hours does each employee average?

2. Which day has the least amount of hours worked?

3. Which day did each employee work their least amount of hours?

# Data Load (Method 1: Database)

- Historical data will be exported as table "T1" to DB
- Refresh data will be exported to "data" folder
- On Refresh Loads (daily), "T1" from DB will used as historical data
- Refresh file will have timestamp on the file names
- Refresh file will be moved to "data_archive" folder after processing is complete
<br>
<br>

**All future refresh data should be uploaded to "data" folder**
<br>
<br>
**Note**: Duplicate "worked_date" and "employee_id" without "duplicate_flag" will update the "hours_worked" field -- The idea here is that new record was added to update previous "hours_worked" since workers can only work once daily

### Inital Load (Execute one time only)

In [None]:
import pandas as pd
import shutil
import glob
import os
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, concat, lead, desc, to_date
import warnings
warnings.filterwarnings('ignore')


dataframe_20210609 = spark.read.csv(path = rdd_20210609, header=True)
dataframe_20210609.write.csv('/content/spark-warehouse/data/Refresh_20210609_{}.csv'.format(pd.datetime.now().strftime("%Y-%m-%d %H%M%S")), header=True)

dataframe_20210610 = spark.read.csv(path = rdd_20210610, header=True)
dataframe_20210610.write.csv('/content/spark-warehouse/data/Refresh_20210610_{}.csv'.format(pd.datetime.now().strftime("%Y-%m-%d %H%M%S")), header=True)

dataframe_20210611 = spark.read.csv(path = rdd_20210611, header=True)
dataframe_20210611.write.csv('/content/spark-warehouse/data/Refresh_20210611_{}.csv'.format(pd.datetime.now().strftime("%Y-%m-%d %H%M%S")), header=True)


def process_data(df_historical):

  df_new = df_historical.filter(df_historical.delete_flag == 'N')
  df_new.show()
  df_new.write.saveAsTable("T1")


def move_file(source_folder, destination_folder, file_name):
  
  # Move refresh file to data_archive folder
  shutil.move(os.path.join(source_folder, file_name), destination_folder)


def main():

  # Input 
  historical_file = min(glob.glob('/content/spark-warehouse/data/*'), key=os.path.getctime)
  # Directory
  source_folder = '/content/spark-warehouse/data'
  destination_folder = '/content/spark-warehouse/data_archive'
    
  # Load Data
  df_historical = spark.read.csv(historical_file, header=True) # Load Historical Data
  
  # Transform Data
  process_data(df_historical)
  
  # Move File
  move_file(source_folder, destination_folder, historical_file)

main()

+-----------+-----------+-----------+------------+
|worked_date|employee_id|delete_flag|hours_worked|
+-----------+-----------+-----------+------------+
| 2021-06-09|       1001|          N|           7|
| 2021-06-09|       1002|          N|        3.75|
| 2021-06-09|       1003|          N|         7.5|
| 2021-06-09|       1004|          N|        6.25|
+-----------+-----------+-----------+------------+



### Refresh Load (Daily)

In [None]:
import pandas as pd
import shutil
import glob
import os
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, concat, lead, desc, to_date
import warnings
warnings.filterwarnings('ignore')


def process_data(df_refresh):

  df_historical = spark.sql("SELECT * FROM T1")
  df_refresh = df_refresh.filter(df_refresh.delete_flag == 'N')

  df_new = (df_historical.unionByName(df_refresh) # Union the dfs
            .withColumn('CurrentHours_Worked', lead('hours_worked').over(Window.partitionBy('employee_id','worked_date').orderBy('employee_id',desc(to_date('worked_date'))))) # Create column comparing consecutive hours_worked
            .where(col('CurrentHours_Worked').isNull()) # retain last hours_worked by dropping null
            .drop('CurrentHours_Worked') # Drop the temp column
      ).orderBy(col('worked_date'), col('employee_id'))

  df_new.show()
  df_new.write.mode("overwrite").saveAsTable("Temp_Table")
  df_temp = spark.sql("SELECT * FROM Temp_Table")
  df_temp.write.mode("overwrite").saveAsTable("T1")


def move_file(source_folder, destination_folder, file_name):
  
  # Move refresh file to data_archive folder
  shutil.move(os.path.join(source_folder, file_name), destination_folder)


def main():

  # Input
  refresh_file = min(glob.glob('/content/spark-warehouse/data/*'), key=os.path.getctime) # Input
  # Directory
  source_folder = '/content/spark-warehouse/data'
  destination_folder = '/content/spark-warehouse/data_archive'

  # Load Data  
  df_refresh = spark.read.csv(refresh_file, header=True) # Load Refresh Data

  if df_refresh.count() != 0:
    # Transform Data
    process_data(df_refresh)
    
    # Move File
    move_file(source_folder, destination_folder, refresh_file)

  elif df_refresh.count() == 0:
    move_file(source_folder, destination_folder, refresh_file)
    print("No content in refresh_file")


if __name__ == "__main__":
  
  refresh_source_folder = '/content/spark-warehouse/data'
  path, dirs, files = next(os.walk(refresh_source_folder))

  if len(dirs) == 0:
    print('No refresh_file to process')

  while len(dirs) > 0:
    main()
    path, dirs, files = next(os.walk(refresh_source_folder))

+-----------+-----------+-----------+------------+
|worked_date|employee_id|delete_flag|hours_worked|
+-----------+-----------+-----------+------------+
| 2021-06-09|       1001|          N|           7|
| 2021-06-09|       1002|          N|        3.75|
| 2021-06-09|       1003|          N|         7.5|
| 2021-06-09|       1004|          N|        6.25|
| 2021-06-10|       1001|          N|           8|
| 2021-06-10|       1002|          N|           6|
| 2021-06-10|       1003|          N|           1|
| 2021-06-10|       1004|          N|          10|
+-----------+-----------+-----------+------------+

+-----------+-----------+-----------+------------+
|worked_date|employee_id|delete_flag|hours_worked|
+-----------+-----------+-----------+------------+
| 2021-06-09|       1001|          N|           7|
| 2021-06-09|       1002|          N|        3.75|
| 2021-06-09|       1003|          N|         7.5|
| 2021-06-09|       1004|          N|        6.25|
| 2021-06-10|       1001|     

# Data Load (Method 2: Data Lake -- No Database)

- Historical file will be exported to "output" folder
- Refresh data will be exported to "data" folder
- On Refresh Loads (daily), existing historical file will be moved to "output_archive" folder after processing; updated historical file (historical + refresh) will be exported to "output" folder, which will be ready to use on the next Refresh Load
- Both Historical and Refresh file will have timestamp on the file names
- Refresh file will be moved to "data_archive" folder after processing is complete
<br>
<br>

**All future refresh data should be uploaded to "data" folder**
<br>
<br>
**Note**: Duplicate "worked_date" and "employee_id" without "duplicate_flag" will update the "hours_worked" field -- The idea here is that new record was added to update previous "hours_worked" since workers can only work once daily

### Initial Load (Execute one time only)

In [None]:
import pandas as pd
import shutil
import glob
import os
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, concat, lead, desc, to_date
import warnings
warnings.filterwarnings('ignore')


# FIRST RUN -- Export historical data to output folder
dataframe_20210609 = spark.read.csv(path = rdd_20210609, header=True)
dataframe_20210609.write.csv('/content/output/historical_20210609_{}.csv'.format(pd.datetime.now().strftime("%Y-%m-%d %H%M%S")), header=True)
# FIRST RUN -- Export refresh data to data folder
dataframe_20210610 = spark.read.csv(path = rdd_20210610, header=True)
dataframe_20210610.write.csv('/content/data/refresh_20210610_{}.csv'.format(pd.datetime.now().strftime("%Y-%m-%d %H%M%S")), header=True)
dataframe_20210611 = spark.read.csv(path = rdd_20210611, header=True)
dataframe_20210611.write.csv('/content/data/refresh_20210611_{}.csv'.format(pd.datetime.now().strftime("%Y-%m-%d %H%M%S")), header=True)

### Refresh Load (Daily)

In [None]:
import pandas as pd
import shutil
import glob
import os
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, concat, lead, desc, to_date
import warnings
warnings.filterwarnings('ignore')

def process_data(df_historical, df_refresh):

  # Union Data
  df_new = df_historical.union(df_refresh)

  # Add Index column
  w = Window.orderBy('worked_date')
  df_new = df_new.withColumn('index', row_number().over(w))

  # Assign row_number by using windows function
  windowSpec  = Window.partitionBy(col("worked_date"),
                                  col("employee_id"),
                                  col('delete_flag')) \
                      .orderBy(col("index").desc())
  df_new = df_new.withColumn("row_number",row_number().over(windowSpec))

  # Filter rows
  df_new = df_new.filter((df_new.row_number == 1) & (df_new.delete_flag == 'N'))

  # Filter columns
  columns = ['worked_date', 'employee_id', 'delete_flag', 'hours_worked']
  df_new = df_new.select(columns)
    
  return df_new


def move_historical_file(historical_source_folder, historical_destination_folder):
  
  # Move existing historical file to output_archive folder
  file_names = os.listdir(historical_source_folder)
  for i in file_names:
    shutil.move(os.path.join(historical_source_folder, i), historical_destination_folder)


def move_refresh_file(refresh_source_folder, refresh_destination_folder, refresh_file):
  
  # Move refresh file to data_archive folder
  shutil.move(os.path.join(refresh_source_folder, refresh_file), refresh_destination_folder)


def main():

  # Input
  historical_file = max(glob.glob('/content/output/*'), key=os.path.getctime)
  refresh_file = min(glob.glob('/content/data/*'), key=os.path.getctime)
  # Output
  historical_output = '/content/output/historical_{}.csv'.format(pd.datetime.now().strftime("%Y-%m-%d %H%M%S"))
  # Directory
  historical_source_folder = '/content/output'
  historical_destination_folder = '/content/output_archive'
  refresh_source_folder = '/content/data'
  refresh_destination_folder = '/content/data_archive'
  
  # Load Data
  df_historical = spark.read.csv(historical_file, header=True)
  df_refresh = spark.read.csv(refresh_file, header=True)
  
  if df_refresh.count() != 0:
    # Transform Data
    df_new = process_data(df_historical, df_refresh)
      
    # Cache Data
    df_new = df_new.cache()
    df_new.show()

    # Move file
    move_historical_file(historical_source_folder, historical_destination_folder)
    move_refresh_file(refresh_source_folder, refresh_destination_folder, refresh_file)

    # Export updated historical file to output folder
    df_new.write.csv(historical_output, header=True)

  elif df_refresh.count() == 0:
    move_refresh_file(refresh_source_folder, refresh_destination_folder, refresh_file)
    print("No content in refresh_file")


if __name__ == "__main__":
  
  refresh_source_folder = '/content/data'
  path, dirs, files = next(os.walk(refresh_source_folder))

  if len(dirs) == 0:
    print('No refresh_file to process')

  while len(dirs) > 0:
    main()
    path, dirs, files = next(os.walk(refresh_source_folder))

+-----------+-----------+-----------+------------+
|worked_date|employee_id|delete_flag|hours_worked|
+-----------+-----------+-----------+------------+
| 2021-06-09|       1001|          N|           7|
| 2021-06-09|       1002|          N|        3.75|
| 2021-06-09|       1003|          N|         7.5|
| 2021-06-09|       1004|          N|        6.25|
| 2021-06-10|       1001|          N|           8|
| 2021-06-10|       1002|          N|           6|
| 2021-06-10|       1003|          N|           1|
| 2021-06-10|       1004|          N|          10|
+-----------+-----------+-----------+------------+

+-----------+-----------+-----------+------------+
|worked_date|employee_id|delete_flag|hours_worked|
+-----------+-----------+-----------+------------+
| 2021-06-09|       1001|          N|           7|
| 2021-06-09|       1002|          N|        3.75|
| 2021-06-09|       1003|          N|         7.5|
| 2021-06-09|       1004|          N|        6.25|
| 2021-06-10|       1001|     

# Data Analysis

##### 1. How many hours does each employee average?

In [None]:
spark.sql("SELECT * FROM T1").show()

spark.sql('''
SELECT 
employee_id
,AVG(hours_worked) as average_hours_worked
FROM T1
GROUP BY 1
ORDER BY 1
''').show()

+-----------+-----------+-----------+------------+
|worked_date|employee_id|delete_flag|hours_worked|
+-----------+-----------+-----------+------------+
| 2021-06-09|       1001|          N|           7|
| 2021-06-09|       1002|          N|        3.75|
| 2021-06-09|       1003|          N|         7.5|
| 2021-06-09|       1004|          N|        6.25|
| 2021-06-10|       1001|          N|           8|
| 2021-06-10|       1002|          N|           6|
| 2021-06-10|       1003|          N|           1|
| 2021-06-10|       1004|          N|           8|
| 2021-06-11|       1001|          N|           6|
| 2021-06-11|       1002|          N|           7|
| 2021-06-11|       1003|          N|           9|
| 2021-06-11|       1004|          N|           5|
+-----------+-----------+-----------+------------+

+-----------+--------------------+
|employee_id|average_hours_worked|
+-----------+--------------------+
|       1001|                 7.0|
|       1002|   5.583333333333333|
|       

##### 2. Which day has the least amount of hours worked?

In [None]:
spark.sql("SELECT * FROM T1").show()

spark.sql('''
SELECT 
worked_date
,SUM(hours_worked) AS total_hours_worked
FROM T1
GROUP BY 1
ORDER BY 2 
LIMIT 1
''').show()

+-----------+-----------+-----------+------------+
|worked_date|employee_id|delete_flag|hours_worked|
+-----------+-----------+-----------+------------+
| 2021-06-09|       1001|          N|           7|
| 2021-06-09|       1002|          N|        3.75|
| 2021-06-09|       1003|          N|         7.5|
| 2021-06-09|       1004|          N|        6.25|
| 2021-06-10|       1001|          N|           8|
| 2021-06-10|       1002|          N|           6|
| 2021-06-10|       1003|          N|           1|
| 2021-06-10|       1004|          N|           8|
| 2021-06-11|       1001|          N|           6|
| 2021-06-11|       1002|          N|           7|
| 2021-06-11|       1003|          N|           9|
| 2021-06-11|       1004|          N|           5|
+-----------+-----------+-----------+------------+

+-----------+------------------+
|worked_date|total_hours_worked|
+-----------+------------------+
| 2021-06-10|              23.0|
+-----------+------------------+



##### 3. Which day did each employee work their least amount of hours?

In [None]:
spark.sql("SELECT * FROM T1").show()

spark.sql('''
SELECT 
a.worked_date
,a.employee_id
,b.minimum_hours_worked
FROM T1 a
RIGHT JOIN
(
SELECT 
employee_id
,MIN(hours_worked) as minimum_hours_worked
FROM T1
GROUP BY 1
) b
ON a.employee_id = b.employee_id
AND a.hours_worked = b.minimum_hours_worked
''').show()

+-----------+-----------+-----------+------------+
|worked_date|employee_id|delete_flag|hours_worked|
+-----------+-----------+-----------+------------+
| 2021-06-09|       1001|          N|           7|
| 2021-06-09|       1002|          N|        3.75|
| 2021-06-09|       1003|          N|         7.5|
| 2021-06-09|       1004|          N|        6.25|
| 2021-06-10|       1001|          N|           8|
| 2021-06-10|       1002|          N|           6|
| 2021-06-10|       1003|          N|           1|
| 2021-06-10|       1004|          N|           8|
| 2021-06-11|       1001|          N|           6|
| 2021-06-11|       1002|          N|           7|
| 2021-06-11|       1003|          N|           9|
| 2021-06-11|       1004|          N|           5|
+-----------+-----------+-----------+------------+

+-----------+-----------+--------------------+
|worked_date|employee_id|minimum_hours_worked|
+-----------+-----------+--------------------+
| 2021-06-11|       1001|                 