In [254]:
import os 
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import lit
from datetime import datetime

<h2> Start Spark Session </h2>

In [23]:
spark = SparkSession.builder.appName("Challenge").master("local").getOrCreate()

<h2>Load files</h2>

In [258]:
wti_df = spark.createDataFrame(pd.read_excel(os.getcwd()+'/Files/wti-daily_csv.xlsx'))
brent_df = spark.createDataFrame(pd.read_excel(os.getcwd()+'/Files/brent-daily_csv.xlsx'))

<h2>Filter dates</h3>

In [259]:
wti_df = wti_df.filter(wti_df.Date > '2000-01-01')
brent_df = brent_df.filter(brent_df.Date > '2000-01-01')

<h2>Moving average</h2>

In [260]:
days = lambda i: i * 86400
#Create 7 day window
w = (Window.orderBy(F.col("Date").cast('long')).rangeBetween(-days(7), 0))
wti_df = wti_df.withColumn('moving_average', F.avg("Price").over(w))
brent_df = brent_df.withColumn('moving_average', F.avg("Price").over(w))

<h2>Create Dataframe </h2>

In [261]:
wti_rdd = wti_df.select("moving_average").rdd.map(lambda l : round(l[0],4))
brent_rdd = brent_df.select("moving_average").rdd.map(lambda l : round(l[0],4))

date_wti = wti_df.select("Date").rdd.map(lambda l : l[0])
date_brent = brent_df.select("Date").rdd.map(lambda l : l[0])

#Append date, moving average and type to df
wti = spark.createDataFrame(date_wti.zip(wti_rdd))\
    .toDF("Date","moving_average")\
    .withColumn("oil_type", lit('WTI'))

brent = spark.createDataFrame(date_brent.zip(brent_rdd))\
    .toDF("Date","moving_average")\
    .withColumn("oil_type", lit('Brent'))

<h2>Export file </h2>

<p>Parquet format bring efficiency compared to csv, aggregation queries are less time consuming and provide savings in cloud storage </p>

In [262]:
oil_df = wti.union(brent).sort("Date")
oil_df.write.parquet(os.getcwd()+'/moving_average_oil_type.parquet')

In [None]:
spark.stop()

<h2>Read File</h2>

In [263]:
parDF=pd.read_parquet(os.getcwd()+'/moving_average_oil_type.parquet')
display(parDF)

Unnamed: 0,Date,moving_average,oil_type
0,2000-01-04 06:00:00,25.5600,WTI
1,2000-01-04 06:00:00,23.9500,Brent
2,2000-01-05 06:00:00,25.1050,WTI
3,2000-01-05 06:00:00,23.8350,Brent
4,2000-01-06 06:00:00,25.0000,WTI
...,...,...,...
10431,2020-08-27 05:00:00,42.7733,WTI
10432,2020-08-27 05:00:00,44.9283,Brent
10433,2020-08-28 05:00:00,42.8300,WTI
10434,2020-08-28 05:00:00,45.0383,Brent
