#1.INSTALL NECESSARY LIBRARIES


In [8]:
! sudo pip install pyspark
! sudo pip install polars
! pip install selenium
! sudo apt-get update -y
! sudo apt install chromium-chromedriver -y
! sudo pip install apscheduler

! sudo cp /usr/lib/chromium-browser/chromedriver /usr/bin

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building dependency tree... Done
Reading

#2.IMPORT LIBRARIES


In [9]:
# Import regex for text parsing, sys for path specification and time for delaying execution
import re
import sys
import time

# import selenium sub-modules and functions for web scraping
import selenium
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support.ui import Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException

# import pyspark functions
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *

import random
from datetime import date, timedelta

# Import polars for automatic schema infering (suitable for big data, unlike pandas.)
# has similar syntax to pyspark
import polars as pl

# Import logging to monitor data pipelines errors, info, debug and warnings
import logging
# Import apscheduler to schedule pyspark jobs in Notebooks
from apscheduler.schedulers.blocking import BlockingScheduler
## Note: Although ETL Data pipeline is easier and faster in Notebooks,
## a proper scheduling tool like Airflow is recommended.


#3.SETUP CONFIGURATION (Selenium and Pyspark)


In [10]:
# Selenium configuration
sys.path.insert(0,'/usr/lib/chromium-browser/chromedriver')
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.binary_location = '/usr/bin/chromium-browser'
chrome_options.add_argument("start-maximized")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
wd = webdriver.Chrome(options=chrome_options )

# Spark configuration
spark = SparkSession.builder.master('local[*]') \
                    .appName('test') \
                    .enableHiveSupport() \
                    .getOrCreate()

# Loggings configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

#TASK A: MONTH-OVER-MONTH (MoM) GROWTH RATE OF REVENUE

##i. Extract Data
(Web scrape Schema queries and tables with selenium)

In [11]:
# Chrome driver to visit clickup site to scrape queries and table
wd = webdriver.Chrome(options=chrome_options )
wd.get('https://doc.clickup.com/3627772/p/h/3epqw-172805/3c092c33c63dada')
# Wait for all page elements to load
time.sleep(5)
# FInd the particular schema in web page
query1 = wd.find_elements(By.XPATH, "/html/body/app-root/cu-document-page/cu-dashboard-doc-container/div/div[2]/div/div/cu-dashboard-doc-main/div/div/div/cu-document-page-content/div/div/div/div/pre[1]")
extracted_query = (' '.join([element.text for element in query1])).replace('PRIMARY KEY', "")
print(extracted_query)


    CREATE TABLE Sales (
        sale_id INT ,
        sale_date DATE,
        revenue DECIMAL(10, 2)
    );


##ii. Clean and parse data from schema

In [12]:
# Parse Table name from schema
pattern1 = r"CREATE\s+TABLE\s+(\w+)"
table_name = re.search(pattern1, extracted_query)
table_name = table_name.group(1)
print(table_name)
# Parse Columns from Schema
pattern2 = r'\b([a-z]\w+)\b'
matches = re.findall(pattern2, extracted_query, re.MULTILINE)
column_names = [col for col in matches]
print(column_names)

# Generate random data
num_rows = 100
data = []
for _ in range(num_rows):
  sale_id = random.randint(1, 1000)
  sale_date = date.today() - timedelta(days=random.randint(0, 365))
  revenue = random.uniform(100, 1000)
  data.append((sale_id, sale_date, revenue))
# Create Spark Dataframe from random Data
sales_df = pl.DataFrame({'sale_id': [row[0] for row in data],
                         'sale_date': [row[1] for row in data],
                         'revenue': [row[2] for row in data]
                         })

sales_df = spark.createDataFrame(sales_df.to_pandas(), column_names)
sales_df.show()
# Convert spark dataframe to SQL table
sales_df.createOrReplaceTempView(table_name)

Sales
['sale_id', 'sale_date', 'revenue']
+-------+-------------------+------------------+
|sale_id|          sale_date|           revenue|
+-------+-------------------+------------------+
|    886|2024-06-24 00:00:00| 960.8227733359782|
|    189|2024-05-05 00:00:00|444.08128089647465|
|     66|2023-11-23 00:00:00|  534.991458234937|
|    854|2024-09-04 00:00:00| 982.8353666217429|
|    353|2024-04-20 00:00:00| 835.5696439984451|
|    910|2024-07-15 00:00:00| 952.3920479357321|
|    742|2024-05-17 00:00:00| 737.5079645460632|
|    518|2024-08-08 00:00:00| 710.3577563934969|
|    130|2024-04-11 00:00:00|466.19366746945224|
|    841|2024-07-21 00:00:00|196.80438299879847|
|    501|2024-10-20 00:00:00| 736.0120205536888|
|    176|2023-12-15 00:00:00| 610.1055704569328|
|    589|2024-07-07 00:00:00| 573.1140479317834|
|    706|2024-06-14 00:00:00| 918.3445102735355|
|    990|2023-12-12 00:00:00| 923.5539593056792|
|     21|2024-01-05 00:00:00| 469.5571565300676|
|     20|2024-04-03 00:00:0

##iii. Calculate the month-over-month (MoM) growth rate of revenue



In [13]:

# Write SQL queries
## SQL query to extract month from datetime and aggregate by month
sales_monthly_revenue = spark.sql(''' SELECT
                                           DATE( DATE_TRUNC('month', sale_date)) AS month,
                                            SUM(revenue) AS total_revenue
                                      FROM
                                          Sales
                                      GROUP BY
                                          DATE_TRUNC('month', sale_date)
                                  ''')
### save df to sql table
sales_monthly_revenue.createOrReplaceTempView('Sales_monthly_revenue')

## Extract Previous month revenue from the sales_monthly_revenue
sales_previous_month_revenue = spark.sql('''
                                            SELECT
                                                month,
                                                total_revenue,
                                                LAG(total_revenue, 1) OVER (ORDER BY month) AS previous_month_revenue
                                            FROM
                                                Sales_monthly_revenue;''')
### save df to sql table
sales_previous_month_revenue.createOrReplaceTempView('Sales_previous_month_revenue')

## Extract growth rate from sales_previous_month_revenue
sales_growth_rate = spark.sql('''
                                SELECT
                                    month,
                                    total_revenue,
                                    previous_month_revenue,
                                    (total_revenue - previous_month_revenue) / previous_month_revenue * 100 AS growth_rate
                                FROM
                                    Sales_previous_month_revenue;''')
sales_growth_rate.show()


+----------+------------------+----------------------+-------------------+
|     month|     total_revenue|previous_month_revenue|        growth_rate|
+----------+------------------+----------------------+-------------------+
|2023-11-01|1219.5695387826572|                  NULL|               NULL|
|2023-12-01| 5905.270696372385|    1219.5695387826572|  384.2094287027596|
|2024-01-01| 3502.524719683244|     5905.270696372385| -40.68815978520936|
|2024-02-01| 4588.023529640132|     3502.524719683244| 30.991895756129235|
|2024-03-01|  1561.78862927706|     4588.023529640132| -65.95944595341776|
|2024-04-01|  6500.80446059534|      1561.78862927706|  316.2409905368893|
|2024-05-01| 4854.591571684812|      6500.80446059534|-25.323218055381542|
|2024-06-01|6550.3153001863975|     4854.591571684812| 34.930306771679156|
|2024-07-01| 6407.765337055567|    6550.3153001863975|-2.1762305568218046|
|2024-08-01| 8006.938173879222|     6407.765337055567|  24.95679465001274|
|2024-09-01| 4134.8396538

## Or write the SQL in a single query

In [14]:

# Write SQL queries
sales_df = spark.sql('''WITH Revenue_Per_Month AS (
    SELECT
        DATE(DATE_TRUNC('month', sale_date)) AS month_only,
        SUM(revenue) AS revenue_sum
    FROM
        Sales
    GROUP BY
        DATE_TRUNC('month', sale_date)
),
Revenue_previous AS (
    SELECT
        month_only,
        revenue_sum,
        LAG(revenue_sum, 1) OVER (ORDER BY month_only) AS previous_month_revenue
    FROM
        Revenue_Per_Month
)
SELECT
    month_only,
    revenue_sum,
    previous_month_revenue,
    (revenue_sum - previous_month_revenue) / previous_month_revenue * 100 AS growth_rate
FROM
    Revenue_previous;;''')
#sales_df = sales_df.drop
sales_df.show()

+----------+------------------+----------------------+-------------------+
|month_only|       revenue_sum|previous_month_revenue|        growth_rate|
+----------+------------------+----------------------+-------------------+
|2023-11-01|1219.5695387826572|                  NULL|               NULL|
|2023-12-01| 5905.270696372385|    1219.5695387826572|  384.2094287027596|
|2024-01-01| 3502.524719683244|     5905.270696372385| -40.68815978520936|
|2024-02-01| 4588.023529640132|     3502.524719683244| 30.991895756129235|
|2024-03-01|  1561.78862927706|     4588.023529640132| -65.95944595341776|
|2024-04-01|  6500.80446059534|      1561.78862927706|  316.2409905368893|
|2024-05-01| 4854.591571684812|      6500.80446059534|-25.323218055381542|
|2024-06-01|6550.3153001863975|     4854.591571684812| 34.930306771679156|
|2024-07-01| 6407.765337055567|    6550.3153001863975|-2.1762305568218046|
|2024-08-01| 8006.938173879222|     6407.765337055567|  24.95679465001274|
|2024-09-01| 4134.8396538

##iv. Load data to csv format

In [15]:
sales_df.write.csv("RevenueGrowthRate(MOM).csv", mode='Overwrite')


# TASK B: DAILY UTILIZATION OF EACH VEHICLE

##i. Extract Data

In [16]:
# # Chrome driver to visit clickup site to scrape queries and table
extract = wd.get('https://doc.clickup.com/3627772/p/h/3epqw-172805/3c092c33c63dada')
# Wait for all page elements to load
time.sleep(5)
# Find specific tables in the web page
rows = wd.find_elements(By.XPATH, "//table[@class='clickup-table']/tbody/tr")
table_data = []
for row in rows:
  cols = row.find_elements(By.TAG_NAME, "td")
  row_data = [col.text for col in cols]
  table_data.append(row_data)


In [17]:
# Extract Vehicles columns and data
table_1 = table_data[0][0]
table_1_cols = table_data[1]
table_1_data = table_data[2]
table_1_df = pl.DataFrame({str(table_1_cols[0]): [table_1_data[0]],
                           str(table_1_cols[1]): [table_1_data[1]],
                           str(table_1_cols[2]): [table_1_data[2]],
                           str(table_1_cols[3]): [table_1_data[3]],
                           str(table_1_cols[4]): [table_1_data[4]],
                           str(table_1_cols[5]): [table_1_data[5]],
                           str(table_1_cols[6]): [table_1_data[6]],
                           str(table_1_cols[7]): [table_1_data[7]]
                           })
table_1_df = spark.createDataFrame(table_1_df.to_pandas(), table_1_cols)
vehicles_df = table_1_df
vehicles_df.show()

# Extract Vehicle History columns and data
table_2 = table_data[3][0]
table_2_cols = table_data[4]
table_2_data = table_data[5:14]
table_2_df = pl.DataFrame({
    str(table_2_cols[0]): [row[0] for row in table_2_data],
    str(table_2_cols[1]): [row[1] for row in table_2_data],
    str(table_2_cols[2]): [row[2] for row in table_2_data],
    str(table_2_cols[3]): [row[3] for row in table_2_data],
    str(table_2_cols[4]): [row[4] for row in table_2_data],
    str(table_2_cols[5]): [row[5] for row in table_2_data],
    str(table_2_cols[6]): [row[6] for row in table_2_data],
    str(table_2_cols[7]): [row[7] for row in table_2_data],
    str(table_2_cols[8]): [row[8] for row in table_2_data]
})
table_2_df = spark.createDataFrame(table_2_df.to_pandas())
vehicles_history_df = table_2_df
vehicles_history_df.show()

# Extract location table data and columns
table_3 = table_data[14][0]
table_3_cols = table_data[15]
table_3_data = table_data[16]
table_3_df = pl.DataFrame({
    str(table_3_cols[0]): [table_3_data[0]],
    str(table_3_cols[1]): [table_3_data[1]],
    str(table_3_cols[2]): [table_3_data[2]],
    str(table_3_cols[3]): [table_3_data[3]],
    str(table_3_cols[4]): [table_3_data[4]],
    str(table_3_cols[5]): [table_3_data[5]],
    str(table_3_cols[6]): [table_3_data[6]]
     })
table_3_df = table_3_df.to_pandas()
table_3_df = spark.createDataFrame(table_3_df)
location_df = table_3_df
location_df.show()

+-----------+------------+--------------------+------+-----------+-------------------+-------------------+----------+
|vehicle_id |vehicle_name|license_plate_number|status|location_id|         created_at|         updated_at|deleted_at|
+-----------+------------+--------------------+------+-----------+-------------------+-------------------+----------+
|          2|Ford Everest|              1Q9243|  YARD|          4|2022-09-04 23:37:56|2022-11-15 00:00:16|          |
+-----------+------------+--------------------+------+-----------+-------------------+-------------------+----------+

+-----+----------+-----------+----------+-------------------+-------------------+----------+-------------------+-------------------+
|   id|vehicle_id|location_id|    status|         created_at|         updated_at|deleted_at|         start_time|           end_time|
+-----+----------+-----------+----------+-------------------+-------------------+----------+-------------------+-------------------+
|10618|   

##ii. Clean and Join tables

In [18]:
# Remove any leading/trailing spaces from column names in both DataFrames
vehicles_df = vehicles_df.toDF(*[c.strip() for c in vehicles_df.columns])
vehicles_history_df = vehicles_history_df.toDF(*[c.strip() for c in vehicles_history_df.columns])
location_df = location_df.toDF(*[c.strip() for c in location_df.columns])

## Rename Vehicle columns
vehicles_df = vehicles_df.withColumnRenamed("status", "status(vehicle)") \
                         .withColumnRenamed("created_at", "created_at(vehicle)") \
                          .withColumnRenamed("updated_at", "updated_at(vehicle)") \
                          .withColumnRenamed("deleted_at", "deleted_at(vehicle)") \
                          .withColumnRenamed("location_id", "location_id(vehicle)")
# Rename Vehicle History columns
vehicles_history_df = vehicles_history_df.withColumnRenamed("created_at", "created_at(vehicle_history)") \
                                          .withColumnRenamed("updated_at", "updated_at(vehicle_history)")\
                                          .withColumnRenamed("deleted_at", "deleted_at(vehicle_history)")\
                                          .withColumnRenamed("status", "status(vehicle_history)")\
                                          .withColumnRenamed("start_time", "start_time(vehicle_history)")\
                                          .withColumnRenamed("end_time", "end_time(vehicle_history)")\
                                          .withColumnRenamed("id", "history_id")
# Rename Location Columns
location_df = location_df.withColumnRenamed("created_at", "created_at(location)")\
                          .withColumnRenamed("updated_at", "updated_at(location)")\
                          .withColumnRenamed("deleted_at", "deleted_at(location)")\


# Right join vehicles and vehicle_history tables
vehicles_joined = vehicles_df.join(vehicles_history_df, on='vehicle_id', how="right")
# Left join vehicles_joined and location tables
vehicles_location = vehicles_joined.join(location_df, on='location_id', how="left")

# Change columns data type
column_type_mapping = {
    "vehicle_id": IntegerType(),
    "location_id": IntegerType(),
    "latitude": DoubleType(),
    "longitude": DoubleType(),
    "vehicle_name": StringType(),
    "license_plate_number": StringType(),
    "status(vehicle)": StringType(),
    "created_at(vehicle)": TimestampType(),
    "updated_at(vehicle)": TimestampType(),
    "deleted_at(vehicle)": TimestampType(),
    "history_id": IntegerType(),
    "vehicle_id": IntegerType(),
    "location_id(vehicle)": IntegerType(),
    "start_time(vehicle_history)": TimestampType(),
    "end_time(vehicle_history)": TimestampType(),
    "status(vehicle_history)": StringType(),
    "created_at(vehicle_history)": TimestampType(),
    "updated_at(vehicle_history)": TimestampType(),
    "deleted_at(vehicle_history)": TimestampType(),
    "location_name": StringType(),
    "created_at(location)": TimestampType(),
    "updated_at(location)": TimestampType(),
    "deleted_at(location)": TimestampType()
}

# Iterate through the column type mapping and cast each column
for column_name, data_type in column_type_mapping.items():
    vehicles_location = vehicles_location.withColumn(
        column_name, vehicles_location[column_name].cast(data_type)
    )
vehicles_location.show()


+-----------+----------+------------+--------------------+---------------+--------------------+-------------------+-------------------+-------------------+----------+-----------------------+---------------------------+---------------------------+---------------------------+---------------------------+-------------------------+----------------+--------+---------+--------------------+--------------------+--------------------+
|location_id|vehicle_id|vehicle_name|license_plate_number|status(vehicle)|location_id(vehicle)|created_at(vehicle)|updated_at(vehicle)|deleted_at(vehicle)|history_id|status(vehicle_history)|created_at(vehicle_history)|updated_at(vehicle_history)|deleted_at(vehicle_history)|start_time(vehicle_history)|end_time(vehicle_history)|   location_name|latitude|longitude|created_at(location)|updated_at(location)|deleted_at(location)|
+-----------+----------+------------+--------------------+---------------+--------------------+-------------------+-------------------+---------

##iii. Calculate daily utilization of each vehicle

In [19]:
# Define status and filter
all_status = ['YARD', 'ONRENT', 'REPLACEMENT', 'DRIVE_CAR', 'CLEANING', 'RELOCATION', 'TRANSIT', 'PANELSHOP', 'MAINTENANCE', 'PENDING', 'OVERDUE']
yard_status = ['YARD']
all_status_df = vehicles_location.filter(vehicles_location['status(vehicle)'].isin(all_status))

# Calculate duration column from start time and end time
duration_df = all_status_df.withColumn(
    "duration",
    (unix_timestamp(col("end_time(vehicle_history)")) - unix_timestamp(col("start_time(vehicle_history)"))) /3600
)
# Group by date and sum of(duration, status) and calculate the total duration for each vehicle
duration_by_vehicle = duration_df.groupBy([(to_date("created_at(vehicle_history)")).alias('created_date'),"vehicle_id", "vehicle_name", "license_plate_number", "location_name"]) \
                                .agg(sum("duration").alias("available_utilization_hours"),
                                     sum(when(col("status(vehicle_history)") == "YARD", col("duration"))).alias("idle_in_yard(hrs)"),
                                     sum(when(col("status(vehicle_history)") == "ONRENT", col("duration"))).alias("on_rent(hrs)"),
                                     sum(when(col("status(vehicle_history)") == "REPLACEMENT", col("duration"))).alias("replacement(hrs)"),
                                     sum(when(col("status(vehicle_history)") == "DRIVE_CAR", col("duration"))).alias("drive_car(hrs)"),
                                     sum(when(col("status(vehicle_history)") == "CLEANING", col("duration"))).alias("cleaning(hrs)"),
                                     sum(when(col("status(vehicle_history)") == "RELOCATION", col("duration"))).alias("relocation(hrs)"),
                                     sum(when(col("status(vehicle_history)") == "TRANSIT", col("duration"))).alias("transit(hrs)"),
                                     sum(when(col("status(vehicle_history)") == "PANELSHOP", col("duration"))).alias("panelshop(hrs)"),
                                     sum(when(col("status(vehicle_history)") == "MAINTENANCE", col("duration"))).alias("maintenance(hrs)"),
                                     sum(when(col("status(vehicle_history)") == "PENDING", col("duration"))).alias("pending(hrs)"),
                                     sum(when(col("status(vehicle_history)") == "OVERDUE", col("duration"))).alias("overdue(hrs)"),
                                )

# fill NULL data with 0
duration_by_vehicle = duration_by_vehicle.fillna(0)

# Round float data to 4 decimal places
duration_by_vehicle = duration_by_vehicle.withColumn("available_utilization_hours", round(col("available_utilization_hours"), 4)) \
                                          .withColumn("idle_in_yard(hrs)", round(col("idle_in_yard(hrs)"), 4)) \
                                          .withColumn("on_rent(hrs)", round(col("on_rent(hrs)"), 4)) \
                                          .withColumn("replacement(hrs)", round(col("replacement(hrs)"), 4)) \
                                          .withColumn("drive_car(hrs)", round(col("drive_car(hrs)"), 4)) \
                                          .withColumn("cleaning(hrs)", round(col("cleaning(hrs)"), 4)) \
                                          .withColumn("relocation(hrs)", round(col("relocation(hrs)"), 4)) \
                                          .withColumn("transit(hrs)", round(col("transit(hrs)"), 4)) \
                                          .withColumn("panelshop(hrs)", round(col("panelshop(hrs)"), 4)) \
                                          .withColumn("maintenance(hrs)", round(col("maintenance(hrs)"), 4)) \
                                          .withColumn("pending(hrs)", round(col("pending(hrs)"), 4)) \
                                          .withColumn("overdue(hrs)", round(col("overdue(hrs)"), 4))

# Calaculate the utilations based on duration status
duration_by_vehicle = duration_by_vehicle.withColumn("utilization(%)", (col("on_rent(hrs)") + col("replacement(hrs)") + col("overdue(hrs)")) / col('available_utilization_hours') * 100) \
                                          .withColumn("drive_car_utilization(%)", (col("drive_car(hrs)")) / col('available_utilization_hours') * 100) \
                                          .withColumn("ops_utilization(%)", (col("cleaning(hrs)") + col("relocation(hrs)") + col("transit(hrs)") + col("panelshop(hrs)") + col("maintenance(hrs)") + col("pending(hrs)")) / col('available_utilization_hours') * 100) \
                                          .withColumn("idle_utilization(%)", col("idle_in_yard(hrs)") / col('available_utilization_hours') * 100)

# COnvert utilizations columns to 1 decimal place
duration_by_vehicle = duration_by_vehicle.withColumn("utilization(%)", format_number(col("utilization(%)"), 0)) \
                                          .withColumn("drive_car_utilization(%)", format_number(col("drive_car_utilization(%)"), 0)) \
                                          .withColumn("ops_utilization(%)", format_number(col("ops_utilization(%)"), 0)) \
                                          .withColumn("idle_utilization(%)", format_number(col("idle_utilization(%)"), 0))


duration_by_vehicle.show()



+------------+----------+------------+--------------------+----------------+---------------------------+-----------------+------------+----------------+--------------+-------------+---------------+------------+--------------+----------------+------------+------------+--------------+------------------------+------------------+-------------------+
|created_date|vehicle_id|vehicle_name|license_plate_number|   location_name|available_utilization_hours|idle_in_yard(hrs)|on_rent(hrs)|replacement(hrs)|drive_car(hrs)|cleaning(hrs)|relocation(hrs)|transit(hrs)|panelshop(hrs)|maintenance(hrs)|pending(hrs)|overdue(hrs)|utilization(%)|drive_car_utilization(%)|ops_utilization(%)|idle_utilization(%)|
+------------+----------+------------+--------------------+----------------+---------------------------+-----------------+------------+----------------+--------------+-------------+---------------+------------+--------------+----------------+------------+------------+--------------+---------------------

##iv. Load data to csv format

In [24]:
duration_by_vehicle.write.csv("DailyUtilizationOfEachVehicle.csv", mode='Overwrite')

##v. To Schedule basics daily in Notebooks

In [22]:
# Specify functions to schedule
def run_spark_job():
  return duration_by_vehicle.write.parquet("DailyUtilizationOfEachVehicle.parquet", mode='Overwrite')

sched = BlockingScheduler()
logging.info("Starting Spark job")
sched.add_job(run_spark_job, 'cron', hour=0, minute=0)
#sched.start() # Uncomment to start schedule job
logging.info("Spark job completed")