In [1]:
import os
import sys
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt

In [2]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

#pd.set_option('display.height', 1000)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
pd.set_option('display.expand_frame_repr', False)
pd.set_option('max_colwidth', 800)
pd.set_option('display.max_colwidth', 500)
pd.set_option('expand_frame_repr', True)

In [3]:
APP_NAME = "Policy Evaluation - Chessy II"

try:
    sc.stop()
    spark.stop()
except:
    pass

from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

    
sc = SparkContext(conf = SparkConf() 
                  .set("spark.driver.maxResultSize", "96g") 
                  .set("spark.sql.execution.arrow.enabled", "true") 
                  .set('spark.sql.broadcastTimeout', 1000) 
                  .set('spark.local.dir', '/data_data/session_length/spark_tmp/') 
                  .set('spark.driver.memory', '96G') 
                  .set("spark.executor.instances", "20") 
                  .set("spark.executor.cores", 16) 
                  .set("spark.executor.memory", "8G")).getOrCreate()
spark = SparkSession(sc)
spark.sparkContext.setLogLevel("ERROR")

In [4]:
from pyspark.sql.types import StructField, StructType, StringType, LongType, DateType, DoubleType, IntegerType, ArrayType, BooleanType
from pyspark.sql.functions import count, mean, stddev_pop, min, max, lit, round, bround, pow, col, corr, lower, upper, avg, stddev, abs, log
from pyspark.sql.functions import lit, trim, rtrim, rpad, trim, coalesce
from pyspark.sql.functions import current_date, current_timestamp, date_add, date_sub, months_between, to_date
from pyspark.sql.functions import udf, col, sum, from_json,lag, lead, monotonically_increasing_id, row_number, array, explode, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, dense_rank, rank, expr, split, regexp_replace


from pyspark.ml import Pipeline
from pyspark.ml.feature import RFormula
from pyspark.ml.classification import LogisticRegression, GBTClassifier, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from datetime import datetime

#### Import Data

In [5]:
!ls -alrth /data_data/reinforcement_learning/results | grep history


-rw-rw-r-- 1 ubuntu ubuntu    1 Apr 27 18:33 history_file_1000_trials_2_sides_Apr_02_2020_1648
-rw-rw-r-- 1 ubuntu ubuntu 4.9M Apr 27 18:33 history_file_100_trials_2_sides_Mar_27_2020_1153
-rw-rw-r-- 1 ubuntu ubuntu 525K Apr 27 18:39 history_sarsa_1k_test
-rw-rw-r-- 1 ubuntu ubuntu    0 Apr 27 18:39 history_sarsa_1M_test
-rw-rw-r-- 1 ubuntu ubuntu 4.9M Apr 27 18:39 history_file_100_trials_2_sides_Mar_27_2020_1235
-rw-rw-r-- 1 ubuntu ubuntu    0 Apr 27 18:39 history_sarsa_full.tsv
-rw-rw-r-- 1 ubuntu ubuntu  16M Apr 27 18:39 history_file_1000000_trials_2_sides_Apr_02_2020_1629
-rw-r--r-- 1 ubuntu ubuntu 2.7M Apr 27 18:39 history_file_100.csv
-rw-rw-r-- 1 ubuntu ubuntu 4.9M Apr 27 19:15 history_file_100_trials_2_sides_Mar_27_2020_1220
-rw-rw-r-- 1 ubuntu ubuntu 4.9M Apr 27 19:22 history_file_100_trials_2_sides_Mar_27_2020_1228
-rw-rw-r-- 1 ubuntu ubuntu    1 Apr 27 19:22 history_file_100_trials_2_sides_Mar_27_2020_1218
-rw-rw-r-- 1 ubuntu ubuntu    1 Apr 27 19:29 history_file_

In [None]:
!head /data_data/reinforcement_learning/results/history_file_2M_sample_of_20K

In [None]:
df = spark.read.format('com.databricks.spark.csv').options(header='false', inferschema='true', delimiter='\t').load('/data_data/reinforcement_learning/results/history_file_2M_sample_of_20K')
df.show(5,False)

In [None]:
df.describe()

### Set up schema for episode data

In [6]:
file_to_import = "/data_data/reinforcement_learning/results/history_file_2000000_trials_2_sides_May_11_2020_1618"

In [7]:
SARF = None
def import_episode_data():
    print("====================================================================================================================")
    print("====================================state_avg_value_sarsa_2M.tsv====================================================")
    print("==================================================================================================================\n\n")
    global SARF
    DRLSchema = StructType([\
        StructField('episode',  IntegerType(), True),\
        StructField('step',     IntegerType(), True),\
        StructField('_state_prior_',   StringType(),  True),\
        StructField('reward_',   IntegerType(), True),\
        StructField('action',   StringType(),  True),\
        StructField('action_verbose',   StringType(),  True),\
        StructField('a',        LongType(),    True),\
        StructField('b',        LongType(),    True)])  
    
    udf_lr_trim = udf(lambda str_arr:str_arr[1:-1], StringType())
    
    SARF = spark.read.format('csv').schema(DRLSchema).option("sep","\t").load(file_to_import).withColumn('_state_prior',udf_lr_trim('_state_prior_')).drop('_state_prior_').drop('a').drop('b')
    
    
    

### Import episode data

In [8]:
import_episode_data()





In [9]:
SARF.show(5,False)

+-------+----+-------+---------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|episode|step|reward_|action   |action_verbose            |_state_prior                                                                                                                                                 |
+-------+----+-------+---------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0      |0   |-1     |14,1,-1  |(Pawn,(1,-1),(1,6),(2,5)) |1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,-16,-15,-14,-13,-12,-11,-10,-9,-8,-7,-6,-5,-4,-3,-2,-1|
|0      |1   |-1     |-14,-1,-1|(Pawn,(-1,-1),(6,3),(5,2))|1,2,3,4,5,6,7,8,9,10,11,12,13,14,0,15,0,0,0,0,0,16,0,0,0,0,0,0,0,0,0,

In [10]:
SARF.printSchema()

root
 |-- episode: integer (nullable = true)
 |-- step: integer (nullable = true)
 |-- reward_: integer (nullable = true)
 |-- action: string (nullable = true)
 |-- action_verbose: string (nullable = true)
 |-- _state_prior: string (nullable = true)



### Create unique ID for each turn

###### Note: Agent moves are ODD numbered whereas Environment moves are EVEN numbered
###### Here we add an index column (ID) 

In [11]:
cols = ['episode','step']

w = Window.orderBy(cols)

SARF_df = SARF.withColumn("id", row_number().over(w))

SARF_df.show(10,False)

+-------+----+-------+---------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|episode|step|reward_|action   |action_verbose            |_state_prior                                                                                                                                                 |id |
+-------+----+-------+---------+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|0      |0   |-1     |14,1,-1  |(Pawn,(1,-1),(1,6),(2,5)) |1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,-16,-15,-14,-13,-12,-11,-10,-9,-8,-7,-6,-5,-4,-3,-2,-1|1  |
|0      |1   |-1     |-14,-1,-1|(Pawn,(-1,-1),(6,3),(5,2))|1,2,3,4,5,6,7,8,9,10,11,12,13,14,0,15,0,0,0,0,0,16,0,

### Create a column that captures BOTH Agent and Environment turns into a single row [SARS]
#### Drop the odd rows (which only capture Agent moves)

In [12]:
#SARF_df.registerTempTable("SARF")

In [13]:
#spark.sql("select * from SARF").show(5, False)

In [14]:
#spark.sql("select count(*) from SARF").show()

In [15]:
#a_df = spark.sql('select id,episode,turn,_state_prior,action,lead(_state_prior,3) over(partition by episode order by turn) as state_prime from SARF').show(150,False)

In [16]:
w_step = Window.partitionBy('episode').orderBy(col("id"))

initial_state = "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,-16,-15,-14,-13,-12,-11,-10,-9,-8,-7,-6,-5,-4,-3,-2,-1"

SARF_env_step_df = SARF_df.withColumn('_state_prime',lead(col('_state_prior'),3,initial_state).over(w_step)).filter(SARF_df["id"] % 2 == 0).withColumn('state_prior', array(col('_state_prior'))).drop('_state_prior').drop('_state').withColumn('state_prime', array(col('_state_prime'))).drop('_state_prime')

SARF_env_step_df.show(200,False)

+-------+----+-------+---------+----------------------------+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|episode|step|reward_|action   |action_verbose              |id |state_prior                                                                                                                                                    |state_prime                                                                                                                                                    |
+-------+----+-------+---------+----------------------------+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------+---

In [17]:
check_lag_udf = udf(lambda a,b: a==b, BooleanType())

In [18]:
#SARF_test_lag_df = SARF_env_step_df.withColumn("lag_check", check_lag_udf(col('state_prime'),col('state_prior')))
#SARF_test_lag_df.show(300, False)

### Set up Hyperparameters for MDPs

In [19]:
#Discount rate
gamma = 0.7

In [20]:
#Learning rate
alpha = 1.0

### Calculate reward for each SAS transition using the SUM_Diffs of the respective States 
#### NOTE: Here we lookup player/piece actual values in a dictusing their board_ids  (16 -> 16)  

In [21]:
piece_value_dict = {"1":5,"2":3,"3":3,"4":9,"5":20,"6":3,"7":3,"8":5,"9":1,"10":1,"11":1,"12":1,"13":1,"14":1,"15":1,"16":1,"-16":-1,"-15":-1,"-14":-1,"-13":-1,"-12":-1,"-11":-1,"-10":-1,"-9":-1,"-8":-5,"-7":-3,"-6":-3,"-5":-9,"-4":-20,"-3":-3,"-2":-3,"-1":-5}


In [22]:
def normalized_reward(reward_):
    reward = reward_
    if reward <= -1:
        return -1
    elif reward >= 1:
        return 1
    else:
        return reward

In [23]:
get_normalized_reward = udf(lambda x: normalized_reward(x), IntegerType())

In [24]:
SASR_df = SARF_env_step_df.withColumn('reward', get_normalized_reward(col('reward_'))).drop('reward_')
#SASR_df = SARF_env_step_df
#SASR_df.show(150,False)

In [25]:
#get_discounted_reward = udf(lambda reward, step, episode_length, discount: int(reward)**(int(episode_length) - int(step) ), DoubleType())

In [26]:
get_discounted_reward = udf(lambda reward, step, episode_length, discount: int(reward)*discount**(episode_length - step), DoubleType())

In [27]:
byEpisode = Window.partitionBy('episode')
SASR_discounted_rewards_df = SASR_df.withColumn('discounted_reward',get_discounted_reward(col('reward'),col('step'), max('step').over(byEpisode),lit(gamma)))
#SASR_discounted_rewards_df.show(200,False)

In [28]:
byEpisode_desc = Window.partitionBy('episode').orderBy(col('step'))

get_state_value = udf(lambda reward, discounted_rewards: normalized_reward(reward) + discounted_rewards, DoubleType())

SASR_cumsum_discounted_rewards_df = SASR_discounted_rewards_df.withColumn('value',get_state_value(col('reward'),sum('discounted_reward').over(byEpisode_desc)))
#SASR_cumsum_discounted_rewards_df.show(400,False)

### Calculate Average Value for a state V(s)

In [29]:
State_Value_df = SASR_cumsum_discounted_rewards_df.groupBy('state_prior').agg(expr('avg(value) as state_value')).withColumnRenamed('state_prior','state') 
State_Value_df.show(10,False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|state                                                                                                                                                          |state_value        |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|[1,2,0,0,0,3,0,4,0,0,0,5,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,6,0,7,8,0,0,0,9,0,10,0,11,12,13,14,0,0,15,0,0,0,0,16,0,0,0,0,0,-16,-15,0,0,0,0,-14,0,0]                 |-1.0000000014230304|
|[1,0,0,0,2,3,0,4,5,6,0,0,0,0,7,8,0,9,0,10,11,0,0,0,0,12,13,0,14,15,0,16,0,0,-16,0,-15,0,0,-14,0,-13,-12,0,0,-11,0,-10,-9,0,-8,0,-7,-6,-5,0,0,-4,-3,0,0,0,-2,-1]|-1.2306817659840892|
|[1,2,3,0,4,5,0,6,0,0,7,0,8,9,10,0,11,0,0,0,0,0,12,0,0,13,0,14,0,0,15,0,0,16,0,-16,-15,0,0

### Calculate Q(s,a) == State_Action Value

In [30]:
State_Action_Value_df = SASR_cumsum_discounted_rewards_df.groupBy(['state_prior','action','action_verbose','reward']).agg(expr('avg(value) as V_state_action'),expr('stddev_pop(value) as std_value'),expr('count(state_prior) as visits'))
#State_Action_Value_df.show(10,False)

### Calculate T(s,a,s_) == State_Action_State' Transition Matrix

In [31]:
State_Action_State_df_ = SASR_cumsum_discounted_rewards_df.groupBy(['state_prior','action','action_verbose','state_prime','reward']).agg(expr('count(state_prime) as transitions'))
#State_Action_State_df_.show(10,False)

In [32]:
sas_window_spec = Window.partitionBy('state_prior')

get_transition_frequency_udf = udf(lambda transitions, state_visits: 1.0 if transitions > state_visits else transitions/state_visits, DoubleType())

State_Action_State_df = State_Action_State_df_.withColumn('state_visits', count('state_prior').over(sas_window_spec)).withColumn('trans_freq',get_transition_frequency_udf('transitions','state_visits') )
#State_Action_State_df.show(10,False)


### Generate Policy == P(s,a) for MDP modelling of Environment 
###### Assume agent would use equiprobable action selection or random selection from feasible actions to teleport to another state if it encounters an unknown state

In [33]:
#State_Value_df.show(10,False)

#### Join state_value table with state_transition_matrix to create Policy_Calc_Table

In [34]:
t = State_Action_State_df.alias('t')
s = State_Value_df.alias('s')
join_condition = [ (s.state == t.state_prime) ]
policy_calc_df_ = t.join(s, join_condition, 'inner').drop('state')
#policy_calc_df_.show(20000,False)

##### Calculate Expected Value (trans_freq * state_value) for each SAS transitions

In [35]:
get_expected_sas_value = udf(lambda alpha,gamma,reward,trans_freq,state_value: trans_freq*alpha*(reward + gamma*state_value),DoubleType())

policy_calc_df = policy_calc_df_.withColumn('expected_value', get_expected_sas_value(lit(alpha),lit(gamma),col('reward'),col('trans_freq'), col('state_value')))

#policy_calc_df.show(1000, False)

In [36]:
policy_df = policy_calc_df.select('state_prior', max('action_verbose').over(sas_window_spec).alias('policy'))


+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|state_prior                                                                                                                                                    |policy                      |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,2,3,4,5,0,0,6,7,0,0,0,8,0,9,10,0,0,0,0,0,0,0,11,0,0,0,0,0,0,0,12,0,0,0,13,0,14,0,15,0,16,0,-16,0,0,-15]                   |(Pawn,(-1,0),(3,1),(2,1))   |
|[0,0,0,0,0,0,0,0,0,0,0,0,1,2,0,3,0,4,5,6,0,0,7,0,8,9,0,0,10,11,12,13,0,0,0,0,14,15,0,0,0,0,16,0,0,0,0,-16,-15,0,0,-14,0,0,0,-13,0,0,0,-12,-11,-10,0,-9]        |(Queen,(1,0),(1,5),(2,5))   |
|[0,0,0,0,0,0,0,0,0,0,0,0,1,2,3,4,5,0,0,0,6,7

In [37]:

policy_df.show(1000, False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|state_prior                                                                                                                                                    |policy                      |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,2,3,4,5,0,0,6,7,0,0,0,8,0,9,10,0,0,0,0,0,0,0,11,0,0,0,0,0,0,0,12,0,0,0,13,0,14,0,15,0,16,0,-16,0,0,-15]                   |(Pawn,(-1,0),(3,1),(2,1))   |
|[0,0,0,0,0,0,0,0,0,0,0,0,1,2,0,3,0,4,5,6,0,0,7,0,8,9,0,0,10,11,12,13,0,0,0,0,14,15,0,0,0,0,16,0,0,0,0,-16,-15,0,0,-14,0,0,0,-13,0,0,0,-12,-11,-10,0,-9]        |(Queen,(1,0),(1,5),(2,5))   |
|[0,0,0,0,0,0,0,0,0,0,0,0,1,2,3,4,5,0,0,0,6,7

In [38]:

policy_dict = policy_df.rdd.map(lambda row: {row[0]: row[1]}).collect()
policy_dict

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 30.0 failed 1 times, most recent failure: Lost task 12.0 in stage 30.0 (TID 3551, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-38-ba4a08cbf9c9>", line 1, in <lambda>
TypeError: unhashable type: 'list'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-38-ba4a08cbf9c9>", line 1, in <lambda>
TypeError: unhashable type: 'list'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
