In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

In [2]:
# output_2.txt
# database = 'postgres'
# eating_strings = ["eating"]
# endpoint = 'msds694.cmxsootjz10m.us-west-2.rds.amazonaws.com'
# files = './WISDM/*/*/'
# n = 10
# properties = {'user': 'students', 'password': 'msdsstudents'}
# table = 'activity_code'
# url = 'jdbc:postgresql://%s/%s' % (endpoint, database)
# window_size = 5
# n_fold = 5
# max_iter = [100]
# reg_params = [0.001]


# output_3.txt
database = 'postgres'
eating_strings = ["eating"]
endpoint = 'msds694.cmxsootjz10m.us-west-2.rds.amazonaws.com'
files = './WISDM/*/*/'
n = 5
properties = {'user': 'students', 'password': 'msdsstudents'}
table = 'activity_code'
url = 'jdbc:postgresql://%s/%s' % (endpoint, database)
window_size = 3
n_fold = 5
max_iter = [10, 100]
reg_params = [0.01, 0.001]


In [3]:
def retrive_file_name(x):
    """Returns subject_id, sensor, device and an arry of readings"""
    file_name = x[0].split("/")[-1].split(".txt")[0]
    file_arg = file_name.split("_")
    return (file_arg[1], file_arg[2], file_arg[3], x[1])


def convert_to_integer(x):
    """Convert a value to integer"""
    try:
        return int(x)
    except ValueError:
        return None


def convert_to_float(x):
    """Convert a value to float"""
    try:
        return float(x)
    except ValueError:
        return None


def conver_to_string(x):
    """Convert a value to string"""
    try:
        return str(x)
    except ValueError:
        return None


def check_same_user(x):
    """
    Return subject_id in the file name
    that is same as subject_id in the content.
    """
    if (x is not None and x[0] == x[3]):
        return (x[0], x[1], x[2], x[4], x[5], x[6], x[7], x[8])


def return_no_none_rows(x):
    """Return True if all the readings are not None"""
    if (x is not None and
            x[0] is not None and x[1] is not None and x[1] is not None and
            x[2] is not None and x[3] is not None and x[4] is not None and
            x[5] is not None and x[6] is not None and x[7] is not None):
        # if(x[5] == 0 or x[6] == 0 or x[7] == 0):
        return True
    else:
        return False


def create_flat_rdd(x):
    """
    Returns subject_id, sensor, device and
    subject_id, activity_code, x, y, z readings
    """
    values = x[3].split(",")
    if len(values) == 6:
        return (convert_to_integer(x[0]),
                x[1],
                x[2],
                convert_to_integer(values[0]),
                conver_to_string(values[1]),
                convert_to_integer(values[2]),
                convert_to_float(values[3]),
                convert_to_float(values[4]),
                convert_to_float(values[5]))


def file_rdd(ss, files):
    """Create a pair RDD using wholeTextFiles"""
    return ss.sparkContext.wholeTextFiles(files)


def create_activity_df(ss, files_rdd, schema):
    """Create dataframe using the schema"""
    activity_data_rdd = files_rdd.mapValues(lambda x: x.split(";\n"))\
        .flatMapValues(lambda x: x)\
        .map(retrive_file_name)\
        .map(create_flat_rdd)\
        .map(check_same_user)\
        .filter(return_no_none_rows)

    return ss.createDataFrame(activity_data_rdd, schema)


In [4]:
ss = SparkSession.builder.config('spark.driver.extraClassPath','postgresql-42.2.18.jar')\
                        .config("spark.executor.memory", "5g")\
                        .config("spark.driver.memory", "5g").getOrCreate()

# QUESTION 1



In [5]:
activity_code = ss.read.jdbc(
    url=url, table=table, properties=properties).coalesce(8).cache()

In [6]:
# activity_code.show(5)

In [7]:
schema = StructType([StructField('subject_id', IntegerType(), False),
                     StructField('sensor', StringType(), False),
                     StructField('device', StringType(), False),
                     StructField('activity_code', StringType(), False),
                     StructField('timestamp', LongType(), False),
                     StructField('x', FloatType(), False),
                     StructField('y', FloatType(), False),
                     StructField('z', FloatType(), False)])

In [8]:
# Load the data to rdds
files_rdd = file_rdd(ss, files)
# Create the spark dataframe
files_df = create_activity_df(ss, files_rdd, schema).coalesce(8).cache()

In [9]:
# files_df.show(5)

# QUESTION 2

In [10]:
# activity_code.show(5)

In [11]:
def check_eating(x):
    tracker = 0
    for i in eating_strings:
        if i in x:
            tracker = tracker + 1
    if tracker >= 1:
        return True
    else:
        return False
check_eating_udf = udf(check_eating, BooleanType())  

In [12]:
eating_df = activity_code.withColumn(
    'eating', check_eating_udf(lower(activity_code['activity'])))

In [13]:
# Notice that 'eating' is a boolean column, so we can filter this way 
result2 = eating_df.filter('eating').select('code').distinct().sort('code')
result2.show()

+----+
|code|
+----+
|   H|
|   I|
|   J|
|   L|
+----+



# QUESTION 3

In [14]:
# files_df.show(5)

In [15]:
eating_df = eating_df.select(
    ['activity', 'code', col('eating').cast("integer")]).orderBy([])
# eating_df.show(n)

In [16]:
joined_df = eating_df.join(files_df, eating_df.code ==
                           files_df.activity_code).cache()

In [17]:
result3 = joined_df.select('subject_id','sensor','device','activity_code',
                 'timestamp','x','y','z','eating')\
         .orderBy(['subject_id','timestamp','device','sensor']).cache()

In [18]:
result3.show(n)

+----------+------+------+-------------+--------------+----------+----------+----------+------+
|subject_id|sensor|device|activity_code|     timestamp|         x|         y|         z|eating|
+----------+------+------+-------------+--------------+----------+----------+----------+------+
|      1600| accel| watch|            D|79817308824838|-0.1666963| 1.5316905| 10.057592|     0|
|      1600| accel| watch|            D|79817358500488|  3.613748|-1.0540473| 11.779023|     0|
|      1600|  gyro| watch|            D|79817358500488| -1.365979|-1.5444704|-1.6969953|     0|
|      1600| accel| watch|            D|79817408176138| 2.0886416|-3.4386723|  12.97373|     0|
|      1600|  gyro| watch|            D|79817408176138|-1.9071333|-1.2696322|-1.8173702|     0|
+----------+------+------+-------------+--------------+----------+----------+----------+------+
only showing top 5 rows



# QUESTION 4

In [19]:
# joined_df.show(5)

In [20]:
both_sensor_df = joined_df.groupBy('activity_code', 'device', 'timestamp')\
    .agg(countDistinct('sensor').alias('sensor_count'))\
    .filter('sensor_count==2').cache()

In [21]:
# both_sensor_df.count()

In [22]:
# both_sensor_df.show(5)

In [23]:
# join by the combination of three columns
result4_joined_df = joined_df.join(both_sensor_df, [
                                   'activity_code', 'device', 'timestamp'], 'leftsemi')\
    .select('sensor', 'activity', 'activity_code', 'subject_id', 'device', 'timestamp', 'x', 'y', 'z', 'eating').distinct().cache()

In [24]:
accel = result4_joined_df.filter("sensor == 'accel'")\
    .withColumnRenamed('x', 'accel_x')\
    .withColumnRenamed('y', 'accel_y')\
    .withColumnRenamed('z', 'accel_z')

gyro = result4_joined_df.filter("sensor == 'gyro'")\
    .withColumnRenamed('x', 'gyro_x')\
    .withColumnRenamed('y', 'gyro_y')\
    .withColumnRenamed('z', 'gyro_z')

In [25]:
result4_df = accel.join(gyro, ['activity', 'device', 'timestamp'])\
    .select(gyro.activity_code, accel.subject_id, 'timestamp', 'device', accel.eating,
            'accel_x', 'accel_y', 'accel_z', 'gyro_x', 'gyro_y', 'gyro_z').cache()

In [26]:
result4_count = result4_df.count()
print(result4_count)

5901089


# QUESTION 5

In [27]:
result5_df = result4_df

In [28]:
# result5_df.show(5)

In [29]:
for i in range(1, window_size+1):
    result5_df = result5_df.withColumn(f"lead_{i}_accel_x",
                                       lead('accel_x', i).over(Window.partitionBy(['subject_id', 'activity_code', 'device'])
                                                               .orderBy(['subject_id', 'activity_code', 'device', 'timestamp'])))
    result5_df = result5_df.withColumn(f"lead_{i}_accel_y",
                                       lead('accel_y', i).over(Window.partitionBy(['subject_id', 'activity_code', 'device'])
                                                               .orderBy(['subject_id', 'activity_code', 'device', 'timestamp'])))
    result5_df = result5_df.withColumn(f"lead_{i}_accel_z",
                                       lead('accel_z', i).over(Window.partitionBy(['subject_id', 'activity_code', 'device'])
                                                               .orderBy(['subject_id', 'activity_code', 'device', 'timestamp'])))
    result5_df = result5_df.withColumn(f"lead_{i}_gyro_x",
                                       lead('gyro_x', i).over(Window.partitionBy(['subject_id', 'activity_code', 'device'])
                                                              .orderBy(['subject_id', 'activity_code', 'device', 'timestamp'])))
    result5_df = result5_df.withColumn(f"lead_{i}_gyro_y",
                                       lead('gyro_y', i).over(Window.partitionBy(['subject_id', 'activity_code', 'device'])
                                                              .orderBy(['subject_id', 'activity_code', 'device', 'timestamp'])))
    result5_df = result5_df.withColumn(f"lead_{i}_gyro_z",
                                       lead('gyro_z', i).over(Window.partitionBy(['subject_id', 'activity_code', 'device'])
                                                              .orderBy(['subject_id', 'activity_code', 'device', 'timestamp'])))

In [30]:
result5_df_new = result5_df.orderBy(
    ['subject_id', 'activity_code', 'device', 'timestamp']).drop('activity_code').cache()

In [31]:
result5_df_new.show(n)

+----------+---------------+------+------+-----------+---------+-----------+-----------+-----------+-----------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+-------------+-------------+
|subject_id|      timestamp|device|eating|    accel_x|  accel_y|    accel_z|     gyro_x|     gyro_y|     gyro_z|lead_1_accel_x|lead_1_accel_y|lead_1_accel_z|lead_1_gyro_x|lead_1_gyro_y|lead_1_gyro_z|lead_2_accel_x|lead_2_accel_y|lead_2_accel_z|lead_2_gyro_x|lead_2_gyro_y|lead_2_gyro_z|lead_3_accel_x|lead_3_accel_y|lead_3_accel_z|lead_3_gyro_x|lead_3_gyro_y|lead_3_gyro_z|
+----------+---------------+------+------+-----------+---------+-----------+-----------+-----------+-----------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+--------

# QUESTION 6

In [32]:
result6_df = result5_df.orderBy(
    ['subject_id', 'activity_code', 'device', 'timestamp']).cache()

In [33]:
def indexStringColumns(df, cols):
    newdf = df
    
    for c in cols:
        si = StringIndexer(inputCol=c, outputCol = c+'-num')
        sm = si.fit(newdf)
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+'-num',c)
    return newdf

result6_df_numeric = indexStringColumns(result6_df, ['device'])

In [34]:
# result6_df_numeric.show(5)

In [35]:
def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        ohe = OneHotEncoder(inputCol=c, outputCol=c+'-onehot', dropLast=False)
        ohe_model = ohe.fit(newdf)
        newdf = ohe_model.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+'-onehot', c)
    return newdf


result6_df_onehot = oneHotEncodeColumns(result6_df_numeric, ['device']).orderBy([
    'subject_id', 'timestamp', 'device'])

In [36]:
# result6_df_onehot.show(5)

In [37]:
# Rearrange the order of the columns
cols = result6_df_onehot.columns  # this is a list of columns
sorted_cols = cols[:3]
sorted_cols.append(cols[-1])
sorted_cols.extend(cols[3:-1])
result6_df_new = result6_df_onehot.select(sorted_cols).cache()
result6_df_onehot = result6_df_new.drop('activity_code', 'eating').cache()
result6_df_onehot.show(n)

+----------+--------------+-------------+---------+----------+---------+-----------+-----------+------------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+-------------+-------------+
|subject_id|     timestamp|       device|  accel_x|   accel_y|  accel_z|     gyro_x|     gyro_y|      gyro_z|lead_1_accel_x|lead_1_accel_y|lead_1_accel_z|lead_1_gyro_x|lead_1_gyro_y|lead_1_gyro_z|lead_2_accel_x|lead_2_accel_y|lead_2_accel_z|lead_2_gyro_x|lead_2_gyro_y|lead_2_gyro_z|lead_3_accel_x|lead_3_accel_y|lead_3_accel_z|lead_3_gyro_x|lead_3_gyro_y|lead_3_gyro_z|
+----------+--------------+-------------+---------+----------+---------+-----------+-----------+------------+--------------+--------------+--------------+-------------+-------------+-------------+--------------+--------------+--------------+-------------+---

# QUESTION 7

In [38]:
result7_df = result6_df_new
# result7_df.show(1)

In [39]:
cols = result7_df.columns
input_cols = cols[5:]
va = VectorAssembler(outputCol='features',
                     inputCols=input_cols, handleInvalid="skip")

In [40]:
result7_transformed = va.transform(result7_df).select(
    'activity_code', 'subject_id', 'timestamp', 'eating', 'device', 'features')
# result7_transformed.show(5)

In [41]:
def standard_scaler(input_df):
    df = input_df

    scaler = StandardScaler(
        inputCol='features', outputCol='features_Scaled', withMean=True, withStd=True)

    stds = scaler.fit(df)

    # Normalize each feature
    df = stds.transform(df).drop('features')
    df = df.withColumnRenamed('features_Scaled', 'features')
    return df


result7_standard = standard_scaler(result7_transformed).cache()

In [42]:
result7_final = result7_standard.select('eating', 'device', 'features').orderBy(
    ['subject_id', 'activity_code', 'device', 'timestamp'])

In [43]:
result7_final.show(n)

+------+-------------+--------------------+
|eating|       device|            features|
+------+-------------+--------------------+
|     0|(2,[0],[1.0])|[0.69546612359028...|
|     0|(2,[0],[1.0])|[0.42894076955912...|
|     0|(2,[0],[1.0])|[0.35878297556336...|
|     0|(2,[0],[1.0])|[0.50912109499279...|
|     0|(2,[0],[1.0])|[0.64720949074113...|
+------+-------------+--------------------+
only showing top 5 rows



# QUESTION 8

In [44]:
result8_df = result7_final

input_cols_8 = ['features', 'device']
va8 = VectorAssembler(outputCol='features_new',
                      inputCols=input_cols_8, handleInvalid="skip")

result8_transformed = va8.transform(result8_df)\
                         .drop('features', 'device')\
                         .withColumnRenamed('features_new', 'features')\
                         .withColumnRenamed('eating', 'label')\
                         .select('features', 'label')

In [45]:
# result8_transformed.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.69546612359028...|    0|
|[0.42894076955912...|    0|
|[0.35878297556336...|    0|
|[0.50912109499279...|    0|
|[0.64720949074113...|    0|
+--------------------+-----+
only showing top 5 rows



# QUESTION 9

In [46]:
result9_df = result8_transformed

In [47]:
splits = result9_df.randomSplit([0.8, 0.2], seed=1)

In [48]:
train = splits[0].cache()
valid = splits[1].cache()

In [49]:
train.show(n)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-3.1091863058572...|    0|
|[-3.0893114309134...|    0|
|[-3.0384874887138...|    0|
|[-3.0241532585882...|    0|
|[-3.0091802692393...|    0|
+--------------------+-----+
only showing top 5 rows



In [50]:
valid.show(n)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-3.0236777378331...|    0|
|[-2.6475242601134...|    0|
|[-2.6327949455794...|    0|
|[-2.6230195636882...|    0|
|[-2.5604235255809...|    0|
+--------------------+-----+
only showing top 5 rows



# QUESTION 10

In [51]:
lr = LogisticRegression(regParam=0.01, maxIter=100, fitIntercept=True)

In [52]:
bceval = BinaryClassificationEvaluator()
cv = CrossValidator().setEstimator(lr).setEvaluator(bceval).setNumFolds(n_fold)

paramGrid = ParamGridBuilder().addGrid(lr.maxIter, max_iter)\
    .addGrid(lr.regParam, reg_params).build()

cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(train)

In [53]:
print(cvmodel.bestModel.coefficients)
print('')
print(cvmodel.bestModel.intercept)
print('')
print(cvmodel.bestModel.getMaxIter())
print('')
print(cvmodel.bestModel.getRegParam())
print('')

[-0.14149340055629775,-0.028753314596476485,0.031305889458889,0.016840977623705518,-0.026880537872969904,-0.010666727781817062,-0.049584748298492735,-0.019307188434037465,0.009373638660564823,0.0012064385140092275,-0.0008479007209928024,-0.004665757615142812,-0.05297842660283901,-0.0184727086846613,0.011651564381378748,0.004661524703264557,-0.00582648697753709,-0.0005027034419519082,-0.13646654071498415,-0.032139813749269094,0.03934262861168735,0.0034492003597415715,0.0171438279970519,-0.024790904134362253,-0.06132009617685819,0.03152883740675802]

-1.2678821040195134

100

0.001



# QUESTION 11

In [54]:
result11 = bceval.setMetricName('areaUnderROC').evaluate(
    cvmodel.bestModel.transform(valid))
print(result11)

0.610966669608778


In [55]:
ss.stop()