# MSiA 431 - Big Data - Homework 3 - Iteration 2 

## Kristiyan Dimitrov

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct
from pyspark.sql import Window
from pyspark.sql.types import StructType, StructField, LongType, DoubleType
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, weekofyear, year, month, window, count, lag, first, last, desc 
from operator import add
import pandas as pd
import numpy as np

In [2]:
spark = SparkSession.builder.appName('Problem 3').getOrCreate()

In [3]:
csv_path = 'data.csv'

In [4]:
df = spark.read.csv(csv_path, header = True , inferSchema = True, timestampFormat='YYYY-MM-DD HH:MM:SS a')

## Putting it all together in a for loop

In [6]:
# CONFIGURATION
alpha = .2

In [17]:
## This is the EWMA function, which takes a list of values and an alpha parameters and calculates the exponentially weighted average
## Testing EWMA with a range of values
def ewma_lst(alpha, lst):
    
    res = 0
    
    for ii in range(len(lst)):
        res += alpha * ( (1-alpha)**ii ) * lst[ii]
    
    return res

In [18]:
# Register UDF
ewma = spark.udf.register("ewma_lst", ewma_lst)

In [19]:
# Taking only data after 2008, before that is not relevant
df_2008 = df.filter(df.time_stamp >= '2008-01-01 00:00:00')

# Dropping 'direction' column
df_2008 = df_2008.drop('direction')

# Taking first chunk of data; NEED TO PARAMETRIZE WITH YEAR & MONTH LATER <------------------ <------------------ <------------------ <------------------ <------------------
df_1 = df_2008.filter(df_2008.time_stamp <= '2008-06-01 00:00:00')

# Let's take a subset of columns for ease
df_subset = df_1.select(['bar_num', 'profit', 'trade_id'])

In [20]:
# Verifying the max bar value across all trades
max_bar_per_trade = df_1.groupBy(col("trade_id")).agg({"bar_num": "max"}).alias('max_bar_num')
max_bar = max_bar_per_trade.agg({"max(bar_num)": "max"}).collect()[0][0]
max_bar

120

In [21]:
# Create schema for empty dataframe, which will hold all the calculated ewma profits
schema = StructType([StructField('trade_id', LongType(), False),
                     StructField('profit_ewma', DoubleType(), False), 
                     StructField('bar_num', LongType(), True)])

results = spark.createDataFrame([], schema)

In [22]:
df_subset.cache()

DataFrame[bar_num: int, profit: int, trade_id: int]

In [23]:
# The goal of this for loop is to calculate a feature based on profit

for ii in range(11, max_bar): # For bars 1, 2, 3, ... 10, we don't need to do anything; So, when we do left join, those feature values for bars 1-10 should be null
    
#     print(f'Start {ii}')
    
    if ii % 10 == 0: # This means we are in the situation of taking bar 20, 30, 40, etc.
        bars_to_take = ii - 10 # For bar 20, we want bars 10 and below; for bar 30, we want bars 20 and below...
    else:
        bars_to_take = ii - ii%10 # E.g. if we are at bar 33, we want bars 33 - 3 = 30 and below
    
    # Taking only the part of the dataset, which contains the subset of bars we are interested in
    df_filtered = df_subset.filter(f'bar_num <= {bars_to_take}')
        
    # Collecting all the profits for a given trade_id in one place (list with its corresponding trade_id)
    df_intermediate = df_filtered.groupby('trade_id').agg(F.collect_list('profit'))
        
    # Apply the UDF EWMA function to the collected list of profits
    df_ewma = df_intermediate.select('trade_id', ewma(F.lit(alpha), col("collect_list(profit)")))
        
    # Adding the bar_num we are currently at, so we can properly join later
    final_df = df_ewma.withColumn('bar_num', lit(ii))
        
    results = results.union(final_df)   
    
#     print(f'End {ii}')
    
    

In [14]:
results.show(10)

+--------+-------------------+-------+
|trade_id|        profit_ewma|bar_num|
+--------+-------------------+-------+
|    9900| 14.230217728000003|     11|
|    9852|-45.088219033600005|     11|
|   10081|  42.92491776000001|     11|
|    9879| -64.16702259200002|     11|
|   10121|-16.785674240000006|     11|
|    9946|  9.038351359999998|     11|
|   10032| 15.545000038400003|     11|
|    9775| 28.437628416000003|     11|
|    9914|  93.61254041600002|     11|
|   10090|      -37.236989952|     11|
+--------+-------------------+-------+
only showing top 10 rows



In [21]:
# results.filter(results.trade_id == 9900).show() # Tried to let this run for ~ 10 minutes, but it didn't finish; aborting and hoping this works on distributed

In [None]:
# Now I need to join the original dataframe with the results i.e. to add the profit_ewma feature 

In [24]:
df_1.show()

+-------------------+-------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------+
|         time_stamp|bar_num|profit|var12|var13|var14|var15|var16|var17|var18|var23|var24|var25|var26|var27|var28|var34|var35|var36|var37|var38|var45|var46|var47|var48|var56|var57|var58|var67|var68|var78|trade_id|
+-------------------+-------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+--------+
|2008-04-10 04:19:00|    120|  -131|    9|    7|    7|    7|    7|    9|    9|    6|    6|    6|    7|    7|    7|    6|    6|    7|    9|    9|    9|    9|    9|    9|    9|    9|    9|   -1|    6|    6|    9853|
|2008-04-10 04:18:00|    119|   -97|    3|    2|    2|    2|    2|    3|    3|    1|    1|    1|    1|    2|    2|    1|    1|    2|    3|    3|

In [25]:
df_with_new_column = df_1.join(results, on = ['trade_id', 'bar_num'])

In [26]:
df_with_new_column.show()

Py4JJavaError: An error occurred while calling o4731.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 20437 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
