# Flint Workbook

In [1]:
import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark.sql import SQLContext
from ts.flint import FlintContext, summarizers, windows
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

sc=pyspark.SparkContext().getOrCreate()
sqlContext = SQLContext(sc)
flintContext = FlintContext(sqlContext)
spark = SparkSession.builder.getOrCreate()

## 1. Create a flint dataframe

In [2]:
l = [('2018-08-19', 1.0), ('2018-08-21', 2.0), ('2018-08-24', 3.0)]
rdd = sc.parallelize(l)
dat = rdd.map(lambda x: Row(time=x[0], v=x[1]))
df = spark.createDataFrame(data=dat)
df = df.withColumn('time', from_utc_timestamp(col('time'), 'UTC'))
flint_df = flintContext.read.dataframe(df)
flint_df.show()
print(type(flint_df))

+-------------------+---+
|               time|  v|
+-------------------+---+
|2018-08-19 00:00:00|1.0|
|2018-08-21 00:00:00|2.0|
|2018-08-24 00:00:00|3.0|
+-------------------+---+

<class 'ts.flint.dataframe.TimeSeriesDataFrame'>


## 2. Compute sum of column v

In [3]:
value_sum = flint_df.summarize(summarizers.sum("v"))
value_sum.show()

+-------------------+-----+
|               time|v_sum|
+-------------------+-----+
|1970-01-01 00:00:00|  6.0|
+-------------------+-----+



## 3.1 Compute 3-day moving average using Flint

In [7]:
#inclusive
w = windows.past_absolute_time('2day')
threedayMA = flint_df.summarizeWindows(w, summarizers.mean('v'))
threedayMA.show()


+-------------------+---+------+
|               time|  v|v_mean|
+-------------------+---+------+
|2018-08-19 00:00:00|1.0|   1.0|
|2018-08-21 00:00:00|2.0|   1.5|
|2018-08-24 00:00:00|3.0|   3.0|
+-------------------+---+------+



## 3.2 Compute 3-day moving average using sql function and window

In [8]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window


#function to calculate number of seconds from number of days
days = lambda i: i * 86400


#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("time").cast('long')).rangeBetween(-days(2), 0))

df = df.withColumn('rolling_average', F.avg("v").over(w))
df.show()

+-------------------+---+---------------+
|               time|  v|rolling_average|
+-------------------+---+---------------+
|2018-08-19 00:00:00|1.0|            1.0|
|2018-08-21 00:00:00|2.0|            1.5|
|2018-08-24 00:00:00|3.0|            3.0|
+-------------------+---+---------------+



## 3.3 Compute 2-minute moving average using sql function and window

In [9]:
l = [('2018-08-19', 1.0), ('2018-08-20 23:59:00', 1.0), ('2018-08-21', 2.0), ('2018-08-24', 3.0)]
rdd = sc.parallelize(l)
dat = rdd.map(lambda x: Row(time=x[0], v=x[1]))
df = spark.createDataFrame(data=dat)
df = df.withColumn('time', from_utc_timestamp(col('time'), 'UTC'))
flint_df = flintContext.read.dataframe(df)
w = windows.past_absolute_time('1min')
onedayMA = flint_df.summarizeWindows(w, summarizers.mean('v'))
onedayMA.show()

+-------------------+---+------+
|               time|  v|v_mean|
+-------------------+---+------+
|2018-08-19 00:00:00|1.0|   1.0|
|2018-08-20 23:59:00|1.0|   1.0|
|2018-08-21 00:00:00|2.0|   1.5|
|2018-08-24 00:00:00|3.0|   3.0|
+-------------------+---+------+



## 4. Left join two dataframes with misaligned timestamps

In [10]:
l = [('2019-11-13', 5.0), ('2019-11-14', 8.0)]
rdd = sc.parallelize(l)
dat = rdd.map(lambda x: Row(time=x[0], left_v=x[1]))
df = spark.createDataFrame(data=dat)
df = df.withColumn('time', from_utc_timestamp(col('time'), 'UTC'))
left = flintContext.read.dataframe(df)
l = [('2019-11-11', 1.0), ('2019-11-14', 4.0)]
rdd = sc.parallelize(l)
dat = rdd.map(lambda x: Row(time=x[0], right_v=x[1]))
df = spark.createDataFrame(data=dat)
df = df.withColumn('time', from_utc_timestamp(col('time'), 'UTC'))
right = flintContext.read.dataframe(df)

left.show()

+------+-------------------+
|left_v|               time|
+------+-------------------+
|   5.0|2019-11-13 00:00:00|
|   8.0|2019-11-14 00:00:00|
+------+-------------------+



In [11]:
right.show()

+-------+-------------------+
|right_v|               time|
+-------+-------------------+
|    1.0|2019-11-11 00:00:00|
|    4.0|2019-11-14 00:00:00|
+-------+-------------------+



In [12]:
joined = left.leftJoin(right, tolerance='3day')
joined.show()

+-------------------+------+-------+
|               time|left_v|right_v|
+-------------------+------+-------+
|2019-11-13 00:00:00|   5.0|    1.0|
|2019-11-14 00:00:00|   8.0|    4.0|
+-------------------+------+-------+



In [13]:
sc.stop()