In [49]:
import numpy as np
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler

sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

In [2]:
train = ss.read.csv("X_train.csv", inferSchema=True, header=True)

In [16]:
train.printSchema()

root
 |-- num_prev_so: integer (nullable = true)
 |-- mean_amp_prev_so: double (nullable = true)
 |-- mean_duration_prev_so: double (nullable = true)
 |-- amp_curr_so: double (nullable = true)
 |-- duration_curr_so: integer (nullable = true)
 |-- curr_sleep_stage: integer (nullable = true)
 |-- time_elapsed_since_asleep: integer (nullable = true)
 |-- time_deep_sleep: integer (nullable = true)
 |-- time_light_sleep: integer (nullable = true)
 |-- time_rem_sleep: integer (nullable = true)
 |-- time_wake_sleep: integer (nullable = true)
 |-- eeg1: double (nullable = true)
 |-- eeg2: double (nullable = true)
 |-- eeg3: double (nullable = true)
 |-- eeg4: double (nullable = true)
 |-- eeg5: double (nullable = true)
 |-- eeg6: double (nullable = true)
 |-- eeg7: double (nullable = true)
 |-- eeg8: double (nullable = true)
 |-- eeg9: double (nullable = true)
 |-- eeg10: double (nullable = true)
 |-- eeg11: double (nullable = true)
 |-- eeg12: double (nullable = true)
 |-- eeg13: double (null

In [61]:
train_pre = train.rdd\
                 .map(lambda x: list(x[0:11])
                      + [min(x[11:]), max(x[11:]), sum(abs(egg) for egg in x[11:])/1250, float(np.median(x[11:]))])

In [62]:
schema = StructType([StructField("num_prev_so", IntegerType(), True),
                     StructField("mean_amp_prev_so", DoubleType(), True),
                     StructField("mean_duration_prev_so", DoubleType(), True),
                     StructField("amp_curr_so", DoubleType(), True),
                     StructField("duration_curr_so", IntegerType(), True),
                     StructField("curr_sleep_stage", IntegerType(), True),
                     StructField("time_elapsed_since_asleep", IntegerType(), True),
                     StructField("time_deep_sleep", IntegerType(), True),
                     StructField("time_light_sleep", IntegerType(), True),
                     StructField("time_rem_sleep", IntegerType(), True),
                     StructField("time_wake_sleep", IntegerType(), True),
                     StructField("egg_min", DoubleType(), True),
                     StructField("egg_max", DoubleType(), True),
                     StructField("egg_mean", DoubleType(), True),
                     StructField("egg_median", DoubleType(), True)
                    ])

In [63]:
train_pre_df = ss.createDataFrame(train_pre, schema)

In [65]:
train_pre_df.show(5)

+-----------+------------------+---------------------+-----------+----------------+----------------+-------------------------+---------------+----------------+--------------+---------------+-------------------+------------------+------------------+------------------+
|num_prev_so|  mean_amp_prev_so|mean_duration_prev_so|amp_curr_so|duration_curr_so|curr_sleep_stage|time_elapsed_since_asleep|time_deep_sleep|time_light_sleep|time_rem_sleep|time_wake_sleep|            egg_min|           egg_max|          egg_mean|        egg_median|
+-----------+------------------+---------------------+-----------+----------------+----------------+-------------------------+---------------+----------------+--------------+---------------+-------------------+------------------+------------------+------------------+
|        237|           152.659|              341.523|    128.017|             429|               3|                    11379|           2730|            3780|             0|            480|      