# John's Spark Personal Development Project

I pretty much did this to learn how to model with pySpark, using a kaggle competition dataset provided by personal banking. The first half of this notebook is in-memory feature engineering, and I didn't use spark for this part because data engineering wasn't within the scope of personal development goals.


The feature engineering and lgbm baseline is from this kernel:
https://www.kaggle.com/ceshine/lgbm-starter


One thing to note is that the kernel predicts the next 16 days of sales, but for simplicity I'm only predicting the next day. The baseline was adjusted accordingly.

In [12]:
# basic
from datetime import date, timedelta
import pandas as pd
import numpy as np

# basic pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# in-memory modelling
from sklearn.metrics import mean_squared_error
import lightgbm as lgb

# pyspark modelling
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.ml.regression import RandomForestRegressor, LinearRegression, GBTRegressor

In [2]:
# data importing, only really needs to be done once

# pull data from hue
df_labeled = sqlContext.table("anp_ecda_sandbox.fv_train_2017") # main dataframe to model
df_unlabeled = sqlContext.table("anp_ecda_sandbox.fv_test") # test dataset
df_items = sqlContext.table("anp_ecda_sanadbox.fv_items") # item dataset

# convert to pandas df
df_labeled = df_labeled.toPandas()
df_unlabeled = df_unlabeled.toPandas()
df_items = df_items.toPandas()

# save to csv so you don't have to redo this part (could've used pkl but eh)
df_labeled.to_csv('train.csv', index=False)
df_unlabeled.to_csv('test.csv', index=False)
df_items.to_csv('items.csv', index=False)

In [3]:
df_train = pd.read_csv(
    'train.csv', usecols=[1, 2, 3, 4, 5],
    dtype={'onpromotion': bool},
    converters={'unit_sales': lambda u: np.log1p( # need to log transform the target variable
        float(u)) if float(u) > 0 else 0},
    parse_dates=["t_date"],
)

df_test = pd.read_csv(
    "test.csv", usecols=[0, 1, 2, 3, 4],
    dtype={'onpromotion': bool},
    parse_dates=["t_date"]
).set_index(
    ['store_nbr', 'item_nbr', 't_date']
)

items = pd.read_csv(
    "items.csv",
).set_index("item_nbr")

In [5]:
# only keep 2017 data
df_2017 = df_train[df_train.t_date.isin(
    pd.date_range("2017-05-01", periods=7 * 14))].copy()
del df_train

In [6]:
# combine promotion data
promo_2017_train = df_2017.set_index(
    ["store_nbr", "item_nbr", "t_date"])[["onpromotion"]].unstack(
        level=-1).fillna(False)
promo_2017_train.columns = promo_2017_train.columns.get_level_values(1)
promo_2017_test = df_test[["onpromotion"]].unstack(level=-1).fillna(False)
promo_2017_test.columns = promo_2017_test.columns.get_level_values(1)
promo_2017_test = promo_2017_test.reindex(promo_2017_train.index).fillna(False)
promo_2017 = pd.concat([promo_2017_train, promo_2017_test], axis=1)
del promo_2017_test, promo_2017_train

In [7]:
# multi-index data
df_2017 = df_2017.set_index(
    ["store_nbr", "item_nbr", "t_date"])[["unit_sales"]].unstack(
        level=-1).fillna(0)
df_2017.columns = df_2017.columns.get_level_values(1)

items = items.reindex(df_2017.index.get_level_values(1))

In [8]:
# get actual features
def get_timespan(df, dt, minus, periods):
    return df[
        pd.date_range(dt - timedelta(days=minus), periods=periods)
    ]

def prepare_dataset(t2017, is_train=True):
    X = pd.DataFrame({
        "mean_3_2017": get_timespan(df_2017, t2017, 3, 3).mean(axis=1).values, # mean target values for past 3, 7, and 14 days
        "mean_7_2017": get_timespan(df_2017, t2017, 7, 7).mean(axis=1).values,
        "mean_14_2017": get_timespan(df_2017, t2017, 14, 14).mean(axis=1).values,
        "promo_14_2017": get_timespan(promo_2017, t2017, 14, 14).sum(axis=1).values # number of times item was on promo in lsat 14 days
    })
    for i in range(16):
        X["promo_{}".format(i)] = promo_2017[ # these features indicate which of the future days the item is on promo
            t2017 + timedelta(days=i)].values.astype(np.uint8)
    if is_train:
        y = df_2017[
            pd.date_range(t2017, periods=16)
        ].values
        return X, y
    else:
        y = df_2017[
            pd.date_range(t2017, periods=1)
        ].values
        return X, y

In [9]:
print("Preparing dataset...")
t2017 = date(2017, 6, 5)
X_l, y_l = [], []
for i in range(4):
    delta = timedelta(days=7 * i)
    X_tmp, y_tmp = prepare_dataset(
        t2017 + delta
    )
    print(t2017+delta)
    X_l.append(X_tmp)
    y_l.append(y_tmp)
X_train = pd.concat(X_l, axis=0)
y_train = np.concatenate(y_l, axis=0)
del X_l, y_l
X_val, y_val = prepare_dataset(date(2017, 7, 10))
X_test, y_test = prepare_dataset(date(2017, 7, 31), is_train=False)

Preparing dataset...
2017-06-05
2017-06-12
2017-06-19
2017-06-26


In [27]:
# converting data into pyspark

X_train['label'] = y_train[:,0]
X_val['label'] = y_val[:,0]
X_test['label'] = y_test[:,0]

train = X_train.append(X_val, ignore_index=True) # ditching val set since we aren't tuning hyperparameters
test = X_test

X_train = X_train.drop(['label'], axis=1)
X_val = X_val.drop(['label'], axis=1)
X_test = X_test.drop(['label'], axis=1)

mySchema = StructType([StructField("mean_14_2017", FloatType(), True),
                       StructField("mean_3_2017", FloatType(), True),
                       StructField("mean_7_2017", FloatType(), True),
                       StructField("promo_14_2017", IntegerType(), True),
                       StructField("promo_0", IntegerType(), True),
                       StructField("promo_1", IntegerType(), True),
                       StructField("promo_2", IntegerType(), True),
                       StructField("promo_3", IntegerType(), True),
                       StructField("promo_4", IntegerType(), True),
                       StructField("promo_5", IntegerType(), True),
                       StructField("promo_6", IntegerType(), True),
                       StructField("promo_7", IntegerType(), True),
                       StructField("promo_8", IntegerType(), True),
                       StructField("promo_9", IntegerType(), True),
                       StructField("promo_10", IntegerType(), True),
                       StructField("promo_11", IntegerType(), True),
                       StructField("promo_12", IntegerType(), True),
                       StructField("promo_13", IntegerType(), True),
                       StructField("promo_14", IntegerType(), True),
                       StructField("promo_15", IntegerType(), True),
                       StructField("label", FloatType(), True)
                      ])

train_spark = spark.createDataFrame(train, schema=mySchema)
test_spark = spark.createDataFrame(test, schema=mySchema)

In [28]:
# pyspark mllib requires all features to be one column vector, with each element being another vector of a row of features
vectorAssembler = VectorAssembler(inputCols = ['mean_14_2017', 'mean_3_2017', 'mean_7_2017', 'promo_14_2017',
                                               'promo_0', 'promo_1', 'promo_2', 'promo_3', 'promo_4',
                                               'promo_5', 'promo_6', 'promo_7', 'promo_8', 'promo_9',
                                              'promo_10', 'promo_11', 'promo_12', 'promo_13', 'promo_14',
                                              'promo_15'], outputCol = 'features')

train_spark = vectorAssembler.transform(train_spark)
test_spark = vectorAssembler.transform(test_spark)

train_spark = train_spark.select(['features', 'label'])
test_spark = test_spark.select(['features', 'label'])

# detects anything with less than 2 unique values as categorical
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=2).fit(train_spark)

Py4JJavaError: An error occurred while calling o459.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 41, anp-r02wn07.c03.hadoop.td.com, executor 18): java.io.IOException: Cannot run program "/home/aggara3/codes/anaconda/usr/local/anaconda-notebooks/bin/python3": error=2, No such file or directory
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1047)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:116)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: error=2, No such file or directory
	at java.lang.UNIXProcess.forkAndExec(Native Method)
	at java.lang.UNIXProcess.<init>(UNIXProcess.java:186)
	at java.lang.ProcessImpl.start(ProcessImpl.java:130)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1028)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1430)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1417)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1417)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:797)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:797)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:797)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1645)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1600)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1589)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:623)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1943)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1956)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2378)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2772)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2377)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2384)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2120)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2119)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2802)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2119)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2334)
	at org.apache.spark.ml.feature.VectorIndexer.fit(VectorIndexer.scala:119)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Cannot run program "/home/aggara3/codes/anaconda/usr/local/anaconda-notebooks/bin/python3": error=2, No such file or directory
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1047)
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:116)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more
Caused by: java.io.IOException: error=2, No such file or directory
	at java.lang.UNIXProcess.forkAndExec(Native Method)
	at java.lang.UNIXProcess.<init>(UNIXProcess.java:186)
	at java.lang.ProcessImpl.start(ProcessImpl.java:130)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1028)
	... 32 more


In [66]:
# linear regression

lr = LinearRegression(featuresCol ='features', labelCol = 'label')
lr_pipeline = Pipeline(stages=[featureIndexer, lr])

lr_model = lr_pipeline.fit(train_spark)
lr_predictions = lr_model.transform(test_spark)

lr_evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(lr_predictions)
print("OLS test set RMSE = %g" % rmse)

OLS test set RMSE = 0.614086


In [71]:
# random forest

rf = RandomForestRegressor(featuresCol="features", labelCol='label')
rf_pipeline = Pipeline(stages=[featureIndexer, rf])

rf_model = rf_pipeline.fit(train_spark)
rf_predictions = rf_model.transform(test_spark)

rf_evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(rf_predictions)
print("RF test set RMSE = %g" % rmse)

RF test set RMSE = 0.628987


In [72]:
# GBT's

gbt = GBTRegressor(featuresCol = 'features', labelCol = 'label')
gbt_pipeline = Pipeline(stages=[featureIndexer, gbt])

gbt_model = gbt_pipeline.fit(train_spark)
gbt_predictions = gbt_model.transform(test_spark)

gbt_evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(gbt_predictions)
print("GBT test set RMSE = %g" % rmse)

GBT test set RMSE = 0.621411


In [29]:
# in-memory lightgbm baseline

print("Training and predicting models...")
params = {
    'num_leaves': 2**5 - 1,
    'objective': 'regression_l2',
    'max_depth': 8,
    'min_data_in_leaf': 50,
    'learning_rate': 0.05,
    'feature_fraction': 0.75,
    'bagging_fraction': 0.75,
    'bagging_freq': 1,
    'metric': 'l2',
    'num_threads': 4
}

MAX_ROUNDS = 1000
val_pred = []
test_pred = []
cate_vars = []
for i in range(2):
    print("=" * 50)
    print("Step %d" % (i+1))
    print("=" * 50)
    dtrain = lgb.Dataset(
        X_train, label=y_train[:, i],
        categorical_feature=cate_vars,
        weight=pd.concat([items["perishable"]] * 4) * 0.25 + 1
    )
    dval = lgb.Dataset(
        X_val, label=y_val[:, i], reference=dtrain,
        weight=items["perishable"] * 0.25 + 1,
        categorical_feature=cate_vars)
    bst = lgb.train(
        params, dtrain, num_boost_round=MAX_ROUNDS,
        valid_sets=[dtrain, dval], early_stopping_rounds=50, verbose_eval=50
    )
    print("\n".join(("%s: %.2f" % x) for x in sorted(
        zip(X_train.columns, bst.feature_importance("gain")),
        key=lambda x: x[1], reverse=True
    )))
    val_pred.append(bst.predict(
        X_val, num_iteration=bst.best_iteration or MAX_ROUNDS))
    test_pred.append(bst.predict(
        X_test, num_iteration=bst.best_iteration or MAX_ROUNDS))

Training and predicting models...
Step 1




Training until validation scores don't improve for 50 rounds.
[50]	training's l2: 0.371724	valid_1's l2: 0.357333
[100]	training's l2: 0.361394	valid_1's l2: 0.34803
[150]	training's l2: 0.359097	valid_1's l2: 0.346826
[200]	training's l2: 0.357812	valid_1's l2: 0.346368
[250]	training's l2: 0.356851	valid_1's l2: 0.34607
[300]	training's l2: 0.356061	valid_1's l2: 0.345894
[350]	training's l2: 0.355348	valid_1's l2: 0.34575
[400]	training's l2: 0.354719	valid_1's l2: 0.345623
[450]	training's l2: 0.354152	valid_1's l2: 0.345556
[500]	training's l2: 0.353631	valid_1's l2: 0.345518
[550]	training's l2: 0.353167	valid_1's l2: 0.345481
[600]	training's l2: 0.352719	valid_1's l2: 0.345465
[650]	training's l2: 0.352298	valid_1's l2: 0.345439
[700]	training's l2: 0.351885	valid_1's l2: 0.345409
[750]	training's l2: 0.351503	valid_1's l2: 0.345408
Early stopping, best iteration is:
[723]	training's l2: 0.3517	valid_1's l2: 0.345392
mean_7_2017: 1832677.58
mean_14_2017: 1641617.31
mean_3_2017:

In [32]:
print("Baseline LightGBM test rmse:", np.sqrt(mean_squared_error(
y_test[:, 0], np.array(test_pred).transpose()[:, 0])))

Baseline LightGBM test rmse: 0.604506842403


# Conclusions

Our spark models approached the LGBM prediction accuracy, which was copied and pasted straight from someone else's code. This means pyspark modelling isn't that bad, which is good news!