In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

from user_definition import *
# Please do not add anything extra than pyspark related packages/libraries.

In [2]:
ss = SparkSession.builder.config('spark.driver.extraClassPath',
                                 'postgresql-42.2.18.jar').getOrCreate()

In [4]:
ss

### Upload Data

In [3]:
wisdm_file = file_rdd(ss=ss, files=files)

### Set Schema

In [4]:
schema = StructType([StructField("subject_id", IntegerType(), False),
                     StructField("sensor", StringType(), False),
                     StructField("device", StringType(), False),
                     StructField("activity_code", StringType(), False),
                     StructField("timestamp", LongType(), False),
                     StructField("x", FloatType(), False),
                     StructField("y", FloatType(), False),
                     StructField("z", FloatType(), False)])

In [5]:
wisdm_df = create_activity_df(ss=ss,
                              files_rdd=wisdm_file,
                              schema=schema).cache()

## Create Eat Variable

In [6]:
def check_eat(x):
    for string in eating_strings:
        if string.lower() in x.lower():
            return 1
    return 0


check_eat_udf = udf(check_eat, IntegerType())

In [7]:
activity_code = ss.read.jdbc(url=url, table=table, properties=properties)

In [8]:
# Check Codes that relate to eating

activity_code.filter(check_eat_udf(activity_code['activity']) == 1)\
             .select('code')\
             .show()

+----+
|code|
+----+
|   H|
|   I|
|   J|
|   L|
+----+



In [9]:
activity_code = activity_code.withColumn("eating", check_eat_udf(activity_code['activity']))\
                             .cache()

In [15]:
wisdm_df = wisdm_df.join(activity_code, wisdm_df.activity_code == activity_code.code)\
                   .drop('activity', 'code')\
                   .orderBy(["subject_id", "timestamp", "device", "sensor"])\
        

wisdm_df.show(n)

+----------+------+------+-------------+--------------+----------+----------+----------+------+
|subject_id|sensor|device|activity_code|     timestamp|         x|         y|         z|eating|
+----------+------+------+-------------+--------------+----------+----------+----------+------+
|      1600| accel| watch|            D|79817308824838|-0.1666963| 1.5316905| 10.057592|     0|
|      1600| accel| watch|            D|79817358500488|  3.613748|-1.0540473| 11.779023|     0|
|      1600|  gyro| watch|            D|79817358500488| -1.365979|-1.5444704|-1.6969953|     0|
|      1600| accel| watch|            D|79817408176138| 2.0886416|-3.4386723|  12.97373|     0|
|      1600|  gyro| watch|            D|79817408176138|-1.9071333|-1.2696322|-1.8173702|     0|
+----------+------+------+-------------+--------------+----------+----------+----------+------+
only showing top 5 rows



In [19]:
acc = wisdm_df.filter(wisdm_df.sensor == 'accel')\
              .select('subject_id', 'device',
                      'timestamp', 'activity_code', 'eating',
                      wisdm_df.x.alias('accel_x'),
                      wisdm_df.y.alias('accel_y'),
                      wisdm_df.z.alias('accel_z'))

gyr = wisdm_df.filter(wisdm_df.sensor == 'gyro')\
              .select('subject_id', 'device',
                      'timestamp', 'activity_code', 'eating',
                      wisdm_df.x.alias('gyro_x'),
                      wisdm_df.y.alias('gyro_y'),
                      wisdm_df.z.alias('gyro_z'))

# concatenate two device values together

acc_gyr = acc.join(gyr, [acc.timestamp == gyr.timestamp,
                         acc.activity_code == gyr.activity_code,
                         acc.device == gyr.device], 'inner')\
             .select(acc.activity_code, acc.subject_id,
                     acc.device, acc.timestamp, acc.accel_x,
                     acc.accel_y, acc.accel_z, 'gyro_x', 'gyro_y',
                     'gyro_z', acc.eating).distinct().cache()

new_df = acc_gyr.select('activity_code', 'subject_id', 'timestamp', 'device',
                        'eating', 'accel_x', 'accel_y', 'accel_z', 'gyro_x', 
                        'gyro_y', 'gyro_z').cache()

In [20]:
print(new_df.count())

5901089


In [21]:
w = Window\
    .partitionBy("subject_id", "activity_code", "device") \
    .orderBy("timestamp")

for i in range(1, window_size + 1):
    new_df = new_df.withColumn(f"lead_{i}_accel_x", lead("accel_x", i).over(w))\
                   .withColumn(f"lead_{i}_accel_y", lead("accel_y", i).over(w))\
                   .withColumn(f"lead_{i}_accel_z", lead("accel_z", i).over(w))\
                   .withColumn(f"lead_{i}_gyro_x", lead("gyro_x", i).over(w))\
                   .withColumn(f"lead_{i}_gyro_y", lead("gyro_y", i).over(w))\
                   .withColumn(f"lead_{i}_gyro_z", lead("gyro_z", i).over(w))\
                   .orderBy(["subject_id", "activity_code", "device", "timestamp"])\

new_df = new_df.drop("activity_code").cache()

In [22]:
new_df.show(n)

+----------+---------------+------+------+-----------+---------+-----------+-----------+-----------+-----------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+-------------+-------------+
|subject_id|      timestamp|device|eating|    accel_x|  accel_y|    accel_z|     gyro_x|     gyro_y|     gyro_z|lead_1_accel_x|lead_1_accel_y|lead_1_accel_z|lead_1_gyro_x|lead_1_gyro_y|lead_1_gyro_z|lead_2_accel_x|lead_2_accel_y|lead_2_accel_z|lead_2_gyro_x|lead_2_gyro_y|lead_2_gyro_z|lead_3_accel_x|lead_3_accel_y|lead_3_accel_z|lead_3_gyro_x|lead_3_gyro_y|lead_3_gyro_z|
+----------+---------------+------+------+-----------+---------+-----------+-----------+-----------+-----------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+--------

## Encode Variables

In [23]:
si = StringIndexer(inputCol="device", outputCol="device-num")
sm = si.fit(new_df)
new_df = sm.transform(new_df).drop("device")
new_df = new_df.withColumnRenamed("device-num", "device")

ohe = OneHotEncoder(inputCol="device",
                    outputCol="device-onehot", dropLast=False)
ohe_model = ohe.fit(new_df)

new_df = ohe_model.transform(new_df).drop("device")
new_df = new_df.withColumnRenamed("device-onehot", "device")

In [24]:
col_order = new_df.columns[:2] + ["device"] + new_df.columns[2:-1]

new_df = new_df.orderBy(["subject_id", "timestamp", "device"])\
               .select(col_order).drop('eating')

new_df.show(n)

+----------+--------------+-------------+---------+----------+---------+-----------+-----------+------------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+-------------+-------------+
|subject_id|     timestamp|       device|  accel_x|   accel_y|  accel_z|     gyro_x|     gyro_y|      gyro_z|lead_1_accel_x|lead_1_accel_y|lead_1_accel_z|lead_1_gyro_x|lead_1_gyro_y|lead_1_gyro_z|lead_2_accel_x|lead_2_accel_y|lead_2_accel_z|lead_2_gyro_x|lead_2_gyro_y|lead_2_gyro_z|lead_3_accel_x|lead_3_accel_y|lead_3_accel_z|lead_3_gyro_x|lead_3_gyro_y|lead_3_gyro_z|
+----------+--------------+-------------+---------+----------+---------+-----------+-----------+------------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+---

## Vectorize

In [112]:
input_cols = [col for col in new_df.columns if "gyro" in col or "accel" in col]

va = VectorAssembler(outputCol="features",
                     inputCols=input_cols, handleInvalid="skip")

df_trans = va.transform(new_df)

## Scale

In [113]:
scaler = StandardScaler(inputCol="features", outputCol="features_scaled",
                        withStd=True, withMean=True)

scaler_model = scaler.fit(df_trans)

df_scaled = scaler_model.transform(df_trans)\
                        .drop("features")\
                        .withColumnRenamed("features_scaled", "features")

In [125]:
df_scaled = df_scaled.select("eating", "device", "features")\
                     .orderBy(["subject_id", "activity_code",
                               "device", "timestamp"])

df_scaled.show(n)

+------+-------------+--------------------+
|eating|       device|            features|
+------+-------------+--------------------+
|     0|(2,[0],[1.0])|[0.69546612359028...|
|     0|(2,[0],[1.0])|[0.42894076955912...|
|     0|(2,[0],[1.0])|[0.35878297556336...|
|     0|(2,[0],[1.0])|[0.50912109499279...|
|     0|(2,[0],[1.0])|[0.64720949074113...|
+------+-------------+--------------------+
only showing top 5 rows



In [126]:
va2 = VectorAssembler(outputCol="final_feature", inputCols=["features", "device"],
                      handleInvalid="skip")

final_df = va2.transform(df_scaled)

In [127]:
final_df = final_df.select("final_feature", "eating")\
                   .withColumnRenamed("final_feature", "features")\
                   .withColumnRenamed("eating", "label")\
                   .cache()

## Train/Valid Split

In [128]:
splits = final_df.randomSplit([0.8, 0.2], 1)

train = splits[0].cache()
valid = splits[1].cache()

In [129]:
train.show(n)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-3.1091863058572...|    0|
|[-3.0893114309134...|    0|
|[-3.0384874887138...|    0|
|[-3.0241532585882...|    0|
|[-3.0091802692394...|    0|
+--------------------+-----+
only showing top 5 rows



In [130]:
valid.show(n)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-3.0236777378331...|    0|
|[-2.6475242601134...|    0|
|[-2.6327949455794...|    0|
|[-2.6230195636882...|    0|
|[-2.5604235255809...|    0|
+--------------------+-----+
only showing top 5 rows



## Grid Search

In [132]:
lr = LogisticRegression()
bceval = BinaryClassificationEvaluator()

cv = CrossValidator().setEstimator(lr).setEvaluator(bceval).setNumFolds(n_fold)

paramGrid = ParamGridBuilder().addGrid(lr.maxIter, max_iter)\
                              .addGrid(lr.regParam, reg_params).build()

cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(train)

Exception ignored in: <function JavaWrapper.__del__ at 0x7fa7c3fc4c10>
Traceback (most recent call last):
  File "/Users/matthewhui/opt/anaconda3/lib/python3.8/site-packages/pyspark/ml/wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'StandardScaler' object has no attribute '_java_obj'


In [133]:
print(cvmodel.bestModel.coefficients)
print()
print(cvmodel.bestModel.intercept)
print()
print(cvmodel.bestModel.getMaxIter())
print()
print(cvmodel.bestModel.getRegParam())
print()

[-0.14065045738207851,-0.029078747642291694,0.031978120510682696,0.01659939995186924,-0.026075269240005656,-0.011546305556566676,-0.049485697263672246,-0.018095208246746586,0.010091025746710117,0.0006447343097696235,-0.0019262657412809562,-0.003017908792742433,-0.05295796713300515,-0.019173179224324894,0.011534458377788518,0.0056003107889840975,-0.005280181522858189,-0.0017075021535317373,-0.13730588107953515,-0.03194979180208861,0.038443855711314014,0.002378249924257801,0.0170903141671498,-0.024152714008608092,-0.06222799707487454,0.032856013792252454]

-1.2679903453341719

100

0.001



## Best Model AUC

In [135]:
print(BinaryClassificationEvaluator().setMetricName("areaUnderROC")
                                     .evaluate(cvmodel.bestModel
                                                      .transform(valid)))

0.6111573792775656


In [136]:
ss.stop()