In [None]:
AWS_ACCESS_KEY = "AK..."
AWS_SECRET_KEY = "dC..."

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", AWS_SECRET_KEY)

In [None]:
games_df = spark.read.csv("s3://spark_pipeline/csv/games-expand.csv", header=True, inferSchema = True)
display(games_df)

stats_df = spark.read.csv("s3://spark_pipeline/csv/game_skater_stats.csv", header=True, inferSchema=True)
display(stats_df)

In [None]:
# Format Avro, Parquet, ORC, csv
# Can convert between one another


# AVRO write
avro_path = "s3://spark_pipeline/avro/game_skater_stats/"
stats_df.write.mode('overwrite').format("com.databricks.spark.avro").save(avro_path)

# AVRO read 
avro_df = sqlContext.read.format("com.databricks.spark.avro").load(avro_path)

# parquet out
parquet_path = "s3a://spark_pipeline/games-parquet/"
avro_df.write.mode('overwrite').parquet(parquet_path)

# parquet in
parquet_df = sqlContext.read.parquet(parquet_path)

# orc out
orc_path = "s3a://spark_pipeline/games-orc/"
parquet_df.write.mode('overwrite').orc(orc_path)

# orc in
orc_df = sqlContext.read.orc(orc_path)

# CSV out
csv_path = "s3a://spark_pipeline/games-csv-out/"
orc_df.coalesce(1).write.mode('overwrite').format(
 "com.databricks.spark.csv").option("header","true").save(csv_path)
  
# and CSV read to finish the round trip 
csv_df = spark.read.csv(csv_path, header=True, inferSchema=True)

In [None]:
# Koalas
import databricks.koalas as ks

stats_ks = stats_df.to_koalas()
stats_df = stats_ks.to_spark()

print(stats_ks['timeOnIce'].mean())
print(stats_ks.iloc[:1, 1:2])

In [None]:
# Spark SQL
new_df = spark.sql("""
  select player_id, sum(1) as games, sum(goals) as goals
  from stats
  group by 1
  order by 3 desc
  limit 5
""")

display(new_df)


display(spark.sql("""
  select cast(goals/shots * 50 as int)/50.0 as Goals_per_shot
      ,sum(1) as Players 
  from (
    select player_id, sum(shots) as shots, sum(goals) as goals
    from stats
    group by 1
    having goals >= 5
  )  
  group by 1
  order by 1
"""))


from pyspark.sql.functions import lit

# dropping columns
copy_df = stats_df.drop('game_id', 'player_id')

# selection columns 
copy_df = copy_df.select('assists', 'goals', 'shots')

# adding columns
copy_df = copy_df.withColumn("league", lit('NHL'))
display(copy_df)

copy_df = stats_df.select('game_id', 'player_id').withColumn("league", lit('NHL'))
df = copy_df.join(stats_df, ['game_id', 'player_id'])
display(df)

In [None]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd

# Create the schema for the resulting dataframe
schema = StructType([StructField('ID', LongType(), True),
                     StructField('p0', DoubleType(), True),
                     StructField('p1', DoubleType(), True)])

# Define the UDF, input and outputs are Pandas DFs
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def analyze_player(sample_pd):
    
    # return empty params in not enough data
    if (len(sample_pd.shots) <= 1):
        return pd.DataFrame({'ID': [sample_pd.player_id[0]], 
                                   'p0': [ 0 ], 'p1': [ 0 ]})
     
    # Perform curve fitting     
    result = leastsq(fit, [1, 0], args=(sample_pd.shots, 
                                  sample_pd.hits))
    
    # Return the parameters as a Pandas DF 
    return pd.DataFrame({'ID': [sample_pd.player_id[0]], 
                       'p0': [result[0][0]], 'p1': [result[0][1]]})
    
# perform the UDF and show the results 
player_df = stats_df.groupby('player_id').apply(analyze_player)
display(player_df)

In [None]:
# MLlib
games_df.createOrReplaceTempView("games_df")

games_df = spark.sql("""
  select *, row_number() over (order by rand()) as user_id
    ,case when rand() > 0.7 then 1 else 0 end as test
  from games_df
""")
trainDF = games_df.filter("test == 0")
testDF = games_df.filter("test == 1")
print("Train " + str(trainDF.count()))
print("Test " + str(testDF.count()))
from pyspark.ml.feature import VectorAssembler

# create a vector representation
assembler = VectorAssembler(inputCols=trainDF.schema.names[0:10], outputCol="features")

trainVec = assembler.transform(trainDF).select('label', 'features')
testVec = assembler.transform(testDF).select('label', 'features', 'user_id')
display(testVec)

from pyspark.ml.classification import LogisticRegression

# specify the columns for the model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# fit on training data
model = lr.fit(trainVec)

# predict on test data 
predDF = model.transform(testVec)
display(predDF)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

roc = BinaryClassificationEvaluator().evaluate(predDF)
print(roc)

# Retrieving the results
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# split out the array into a column 
secondElement = udf(lambda v:float(v[1]),FloatType())
predDF = predDF.select("*", secondElement("probability").alias("propensity"))
display(predDF)

# save results to S3
results_df = predDF.select("user_id", "propensity")
results_path = "s3a://spark_pipeline/game-predictions/"
results_df.write.mode('overwrite').parquet(results_path)

# plot the predictions 
predDF.createOrReplaceTempView("predDF")

plotDF = spark.sql("""
  select cast(propensity*100 as int)/100 as propensity, 
         label, sum(1) as users
  from predDF 
  group by 1, 2
  order by 1, 2  
""")

# table output
display(plotDF)

In [None]:
# Deep learning model
import tensorflow as tf
import keras
from keras import models, layers

model = models.Sequential()
model.add(layers.Dense(64, activation='relu', input_shape=(10,)))
model.add(layers.Dropout(0.1))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))

model.compile(optimizer='rmsprop', loss='binary_crossentropy')
history = model.fit(x_train, y_train, epochs=100, batch_size=100, validation_split = .2, verbose=0)

import matplotlib.pyplot as plt

loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)

fig = plt.figure(figsize=(10,6) )
plt.plot(epochs, loss, 'bo', label='Training Loss')
plt.plot(epochs, val_loss, 'b', label='Validation Loss')
plt.legend()
plt.show()
display(fig)

In [None]:
# DF partition and model application
testDF.createOrReplaceTempView("testDF ")

partitionedDF = spark.sql("""
  select *, cast(rand()*100 as int) as partition_id
  from testDF 
""")

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

schema = StructType([StructField('user_id', LongType(), True),
                     StructField('propensity', DoubleType(),True)])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_keras(pd):
    pd['propensity'] = model.predict(pd.iloc[:,0:10])
    return pd[['user_id', 'propensity']]

results_df=partitionedDF.groupby('partition_id').apply(apply_keras)
display(results_df)    

In [None]:
# Automated feature engineering
plays_df = spark.read.csv("s3://spark_pipeline/csv/game_plays.csv", 
            header=True, inferSchema = True).drop(
            'secondaryType', 'periodType', 'dateTime', 'rink_side')
plays_pd = plays_df.filter("rand() < 0.003").toPandas()
plays_pd.shape
# pip3 install featuretools
import featuretools as ft
from featuretools import Feature

es = ft.EntitySet(id="plays")
es = es.entity_from_dataframe(entity_id="plays",dataframe=plays_pd,
                    index="player_id", variable_types = {
                    "event": ft.variable_types.Categorical,
                    "description": ft.variable_types.Categorical })


f1 = Feature(es["plays"]["event"])
f2 = Feature(es["plays"]["description"])

encoded, defs = ft.encode_features(plays_pd, [f1, f2], top_n=10)
encoded.reset_index(inplace=True)

es = ft.EntitySet(id="plays")
es = es.entity_from_dataframe(entity_id="plays", 
                                dataframe=encoded, index="play_id")

es = es.normalize_entity(base_entity_id="plays", 
                            new_entity_id="games", index="game_id")
features, transform=ft.dfs(entityset=es, 
                                 target_entity="games",max_depth=2)
features.reset_index(inplace=True)

features.columns = features.columns.str.replace("[(). =]", "")
schema = sqlContext.createDataFrame(features).schema
features.columns

In [None]:
# bucket IDs 
plays_df.createOrReplaceTempView("plays_df")
plays_df = spark.sql("""
  select *, abs(hash(game_id))%1000 as partition_id 
  from plays_df 
""")

# Full feature transformation
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def gen_features(plays_pd):

    es = ft.EntitySet(id="plays")
    es = es.entity_from_dataframe(entity_id="plays",dataframe=plays_pd,
                    index="player_id", variable_types = {
                    "event": ft.variable_types.Categorical,
                    "description": ft.variable_types.Categorical })
                    
    encoded_features = ft.calculate_feature_matrix(defs, es)    
    encoded_features.reset_index(inplace=True)
  
    es = ft.EntitySet(id="plays")
    es = es.entity_from_dataframe(entity_id="plays", 
                                dataframe=encoded, index="play_id")
    es = es.normalize_entity(base_entity_id="plays",
                            new_entity_id="games", index="game_id")
    generated = ft.calculate_feature_matrix(transform,es).fillna(0)
    
    generated.reset_index(inplace=True)
    generated.columns = generated.columns.str.replace("[(). =]","")
    return generated 
  
features_df = plays_df.groupby('partition_id').apply(gen_features)
display(features_df)      