In [7]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window

In [8]:
spark = SparkSession.builder.appName("TimeSeries").getOrCreate()

In [9]:
amz = spark.read.csv('timeseries/Amazon.csv', header=True)
net = spark.read.csv('timeseries/netflix.csv', header=True)

In [10]:
amz.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)



In [11]:
amz = amz.withColumn('Period', F.date_format('Date','yyyy-MM'))
net = net.withColumn('Period', F.date_format('Date','yyyy-MM'))

In [12]:
amz_periods = amz.groupBy('Period').agg(F.min('Date').alias('OpeningDate'))

In [13]:
amz_stg = amz_periods.join(net.select('Period','Date'), on='Period', how = 'left')\
    .withColumn('Diff', F.abs(F.datediff(F.col('OpeningDate'), F.col('Date'))))

amz_stg.where("Period = '2002-05'").show()

+-------+-----------+----------+----+
| Period|OpeningDate|      Date|Diff|
+-------+-----------+----------+----+
|2002-05| 2002-05-01|2002-05-31|  30|
|2002-05| 2002-05-01|2002-05-30|  29|
|2002-05| 2002-05-01|2002-05-29|  28|
|2002-05| 2002-05-01|2002-05-28|  27|
|2002-05| 2002-05-01|2002-05-24|  23|
|2002-05| 2002-05-01|2002-05-23|  22|
+-------+-----------+----------+----+



In [14]:
w = Window().partitionBy('Period')
amz_stg = amz_stg.withColumn('MinInterval', F.min('Diff').over(w))\
    .where((F.col('Diff')==F.col('MinInterval')) | (F.isnull(F.col('MinInterval'))))\
    .drop(*['Diff','MinInterval'])

amz_stg.where("Period = '2002-05'").show()

+-------+-----------+----------+
| Period|OpeningDate|      Date|
+-------+-----------+----------+
|2002-05| 2002-05-01|2002-05-23|
+-------+-----------+----------+



In [15]:
amz_period = amz_stg.select('OpeningDate')\
    .join(amz, amz_stg.OpeningDate == amz.Date, how = 'inner')\
    .drop(*['OpeningDate','Date'])\
    .withColumn('Category', F.lit('Amazon'))

In [16]:
net_period = amz_stg.select('Date')\
    .join(net, on='Date', how = 'inner')\
    .drop(*['OpeningDate','Date'])\
    .withColumn('Category', F.lit('Netflix'))

In [19]:
amz_period.unionAll(net_period.select(amz_period.columns)).write.mode('overwrite').parquet('temp/timeseries')

## Plot

In [20]:
import pandas as pd
import plotly.express as px

In [21]:
df = pd.read_parquet('temp/timeseries').sort_values(by='Period')

In [33]:
df['Volume'] = pd.to_numeric(df['Volume'])
df['Low'] = pd.to_numeric(df['Low'])
df['High'] = pd.to_numeric(df['High'])
df['Open'] = pd.to_numeric(df['Open'])
df['Close'] = pd.to_numeric(df['Close'])

In [29]:
px.line(df, x='Period', y='Volume', color='Category', title='Volume x Period')

In [31]:
px.line(df, x='Period', y='High', color='Category', title='High x Period')