<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Imports-&amp;-Env-Setup" data-toc-modified-id="Imports-&amp;-Env-Setup-1">Imports &amp; Env Setup</a></span></li><li><span><a href="#Grab-Data-From-s3-Using-Collin's-Functions" data-toc-modified-id="Grab-Data-From-s3-Using-Collin's-Functions-2">Grab Data From s3 Using Collin's Functions</a></span></li><li><span><a href="#Kevin's-Pre-processing-Functions" data-toc-modified-id="Kevin's-Pre-processing-Functions-3">Kevin's Pre-processing Functions</a></span></li><li><span><a href="#Edited-Pre-processing-Functions" data-toc-modified-id="Edited-Pre-processing-Functions-4">Edited Pre-processing Functions</a></span></li><li><span><a href="#Generate-DFs-in-Mini-batches" data-toc-modified-id="Generate-DFs-in-Mini-batches-5">Generate DFs in Mini-batches</a></span></li><li><span><a href="#Transform-Data" data-toc-modified-id="Transform-Data-6">Transform Data</a></span></li><li><span><a href="#Write-Mini-DFs-to-Disk" data-toc-modified-id="Write-Mini-DFs-to-Disk-7">Write Mini-DFs to Disk</a></span></li><li><span><a href="#Join-Mini-DFs-to-Get-BigDF" data-toc-modified-id="Join-Mini-DFs-to-Get-BigDF-8">Join Mini-DFs to Get BigDF</a></span></li><li><span><a href="#Training" data-toc-modified-id="Training-9">Training</a></span></li></ul></div>

# Imports & Env Setup

In [1]:
import os
import math
import numpy as np

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
# Configure pyspark env
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.4" pyspark-shell'

In [3]:
# SparkContet must come AFTER env set-up!
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', 'AKIAW7CYB6L5UCPV525G')    # Access Key
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', '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') # Secret Key

-----------

# Grab Data From s3 Using Collin's Functions

In [4]:
import boto3
from collections import defaultdict

# here for testing purposes
session = boto3.session.Session(profile_name='sparkle')
s3_client = session.client('s3')

def get_s3_page_iterator(profile='sparkle', bucket='msds-sparkle', prefix='data/output/pills_all_data/'):
    """
    Assuming aws keys are stored in specified profile,
    this fn retrieves a paginator (iterable) in the specified s3 location,
    note: pagination approach solves problem of maxing out at 1000 csv's
          https://adamj.eu/tech/2018/01/09/using-boto3-think-pagination/
    note: w/o a prefix, it can recursively reach all paths! Though,
          this would break our `npills` splitting at bottom of `get_paths`
          function, though this could be easily adjusted.
    """
    session = boto3.session.Session(profile_name=profile)
    s3_client = session.client('s3')
    paginator = s3_client.get_paginator('list_objects')
    page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
    return page_iterator

def get_paths(page_iterator):
    """
    Given an s3 paginator, this fn retrieves the path to each file in
    the s3 location, then returns them as a dict with each key being the # of pills
    that data observed and each value being a list of s3_paths.
    """
    s3_paths = defaultdict(list)
    for page in page_iterator:
        for obj in page['Contents']:
            path = obj["Key"]
            # Split on number of slashes in prefix
            npills = path.split("/")[2].split("-")[2]
            s3_paths[int(npills)].append(path)
    return s3_paths

In [5]:
# Grab file paths from s3

bucket = 'msds-twinkle'
prefix = 'data/processed/'

s3_iter = get_s3_page_iterator(bucket=bucket, prefix=prefix)
s3_paths = get_paths(s3_iter)

In [6]:
# Should be 8, one for each level of pill counts
list(s3_paths.values())

[['data/processed/pills-01-00-01.csv',
  'data/processed/pills-01-00-02.csv',
  'data/processed/pills-01-00-03.csv',
  'data/processed/pills-01-00-04.csv',
  'data/processed/pills-01-00-05.csv',
  'data/processed/pills-02-00-01.csv',
  'data/processed/pills-02-00-02.csv',
  'data/processed/pills-02-00-03.csv',
  'data/processed/pills-02-00-04.csv',
  'data/processed/pills-02-00-05.csv',
  'data/processed/pills-03-00-01.csv',
  'data/processed/pills-03-00-02.csv',
  'data/processed/pills-03-00-03.csv',
  'data/processed/pills-03-00-04.csv',
  'data/processed/pills-03-00-05.csv',
  'data/processed/pills-04-00-01.csv',
  'data/processed/pills-04-00-02.csv',
  'data/processed/pills-04-00-03.csv',
  'data/processed/pills-04-00-04.csv',
  'data/processed/pills-04-00-05.csv',
  'data/processed/pills-05-00-01.csv',
  'data/processed/pills-05-00-02.csv',
  'data/processed/pills-05-00-03.csv',
  'data/processed/pills-05-00-04.csv',
  'data/processed/pills-05-00-05.csv',
  'data/processed/pills-0

-----------

# Kevin's Pre-processing Functions

In [7]:
def renameColumns(df):
    
    df_renamed = None
    
    try:
        
        df = df.select('loggingSample(N)', 
                        'gyroRotationX(rad/s)',
                        'gyroRotationY(rad/s)',
                        'gyroRotationZ(rad/s)',                       
                        'avAudioRecorderPeakPower(dB)',
                        'avAudioRecorderAveragePower(dB)')
    except:
        df = df.select('loggingSample', 
                       'gyroRotationX',
                       'gyroRotationY',
                       'gyroRotationZ', 
                       'avAudioRecorderPeakPower',
                       'avAudioRecorderAveragePower')
                        
    renamed_df = df.toDF('loggingSample',
                         'gyro_x',
                         'gyro_y',
                         'gyro_z',
                         'audio_peak_power',
                         'audio_average_power')
    return renamed_df


def trimData(df, trim_start, trim_end):
    
    count = df.count()
    start_sample = math.ceil(count * trim_start)
    end_sample = math.ceil(count * (1 - trim_end))

    trimmed_df = df.where(f"loggingSample > {start_sample} and loggingSample < {end_sample}")
    trimmed_df = trimmed_df.withColumn("loggingSampleAdjusted", trimmed_df["loggingSample"] - start_sample)
    trimmed_df = trimmed_df.drop("loggingSample").withColumnRenamed("loggingSampleAdjusted","loggingSample")
    
    return trimmed_df
    
def windowData(df, num_windows):
    count = df.count()
    df = df.where(f"loggingSample < {count}")
    
    window_size = count / num_windows
    label_window = udf(lambda x : int(x // window_size), IntegerType())
    
    windowed_df = df.select(label_window('loggingSample'),
                           'loggingSample',
                           'gyro_x',
                           'gyro_y',
                           'gyro_z',
                           'audio_peak_power',
                           'audio_average_power'
                          )
    
    windowed_df = windowed_df.withColumnRenamed("<lambda>(loggingSample)", "window")
    return windowed_df
    
    
def transformData(df):
    
    df = df.withColumn("foo", lit("foo"))
    transformed_df = df.groupBy("foo").pivot("window").agg(
                                              round(min('gyro_x'),5),
                                              round(max('gyro_x'),5), 
                                              round(avg('gyro_x'),5), 
                                              round(stddev('gyro_x'),5),
                                              round(min('gyro_y'),5),
                                              round(max('gyro_y'),5), 
                                              round(avg('gyro_y'),5), 
                                              round(stddev('gyro_y'),5),
                                              round(min('gyro_z'),5),
                                              round(max('gyro_z'),5), 
                                              round(avg('gyro_z'),5), 
                                              round(stddev('gyro_z'),5),
                                              round(min('audio_peak_power'),5),
                                              round(max('audio_peak_power'),5), 
                                              round(avg('audio_peak_power'),5), 
                                              round(stddev('audio_peak_power'),5),
                                              round(min('audio_average_power'),5),
                                              round(max('audio_average_power'),5), 
                                              round(avg('audio_average_power'),5), 
                                              round(stddev('audio_average_power'),5),
                                             ).drop('foo')
    
    renamed_columns = [c.replace("(","_").replace(")","_").replace("round_","")[:-5] for c in transformed_df.columns]
    final_df = transformed_df.toDF(*renamed_columns)
    
    return final_df

def appendMetaInfo(df, tdf):
    
    watchOnTwistHand, n_pills = df.select('watchOnTwistHand','n_pills').first()[:]
    tdf = tdf.withColumn('watchOnTwistHand', lit(watchOnTwistHand)).withColumn('n_pills', lit(n_pills))
    return tdf


def preprocess(df, trim_start=0.05, trim_end=0.10, num_windows=10):
        
    # Select out and rename columns of interest
    renamed_df = renameColumns(df)
    
    # Trim off ends of data
    trimmed_df = trimData(renamed_df, trim_start, trim_end)

    # Window data by labeling rows with window assignments 
    windowed_df = windowData(trimmed_df, num_windows)
    
    # Transform from long to wide with summarized statistics
    transformed_df = transformData(windowed_df)
    
    # Append a feature and the data's label to the row
    final_df = appendMetaInfo(df, transformed_df)
    
    return final_df.withColumnRenamed('n_pills', 'label')

-------------------

# Generate DFs in Mini-batches

In [8]:
# Run on flattened list of file paths
s3_paths = list(s3_paths.values())
s3_paths = [x for path in s3_paths for x in path]

In [9]:
# Should be 670 in total
len(s3_paths)

670

In [10]:
# Need to append s3 prefix to read from bucket
s3_prefix = "s3://msds-twinkle/"
s3_paths = [s3_prefix + path for path in s3_paths]

In [11]:
# Check for correct path
s3_paths[0]

's3://msds-twinkle/data/processed/pills-01-00-01.csv'

In [12]:
# Number of windows to use in pre-processing
num_windows = 5

# Initial path index & df
start_idx = 0
df_lite = ss.read.csv(s3_paths[start_idx], header=True, inferSchema=True)
df_lite = preprocess(df_lite, num_windows=num_windows)

# Adjust range to create smaller DFs, which we'll aggregate later
for i in range(1):
    print(i)  # DEBUG
    df_i = ss.read.csv(s3_paths[i], header=True, inferSchema=True)
    df_lite = df_lite.union(preprocess(df_i, num_windows=num_windows))

0


In [13]:
# Check n_features
len(df_lite.columns)

102

-------------

# Transform Data

Perform the following transformation on each mini-DF
* Min-max transform each column
* Shift each value by 0.001 (to miss 0)
* Log-transform

------------

# Write Mini-DFs to Disk

In [None]:
len(df_lite.columns)

In [None]:
# writes locally, this takes some time

# Outpout path
output_folder = "~/DataScience/MSDS/sparkle/sparkle/data/training/"
filename = ""
output_path - output_folder + filename

df_lite.coalesce(1).write.option("header", "true").csv("/Users/kevin/desktop/lite_window_10_2")

---

# Join Mini-DFs to Get BigDF

In [None]:
import pandas as pd

In [None]:
df_0 = pd.read_csv('/users/kevin/desktop/lite_window_10_0/l0.csv')
df_0.shape

In [None]:
df_1 = pd.read_csv('/users/kevin/desktop/lite_window_10_0/l1.csv')
df_1.shape

In [None]:
df_2 = pd.read_csv('/users/kevin/desktop/lite_window_10_0/l2.csv')
df_2.shape

In [None]:
df_0['label'] = (df_0['label'] <= 10).astype(int)
big_df = df_0.copy()

for df in [df_1, df_2]:
    df['label'] = (df['label'] <= 10).astype(int)
    print(df.label.value_counts())
    big_df = big_df.append(df, ignore_index=True)

In [None]:
big_df

In [None]:
big_df.label.value_counts()

In [None]:
big_df.to_csv('~/desktop/lite_629_w10.csv')

---

# Training

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, GBTClassifier, MultilayerPerceptronClassifier
import time

In [None]:
data_path = '/users/kevin/desktop/lite_629_w10.csv'
df_pills = ss.read.csv(data_path, header=True, inferSchema=True)

In [None]:
# Train-test split
df_train, df_test = df_pills.randomSplit(weights=[0.80, 0.20])

In [None]:
train_cols = df_train.drop("label")

# Transformer; excludes "label" col
va = VectorAssembler(outputCol="features", inputCols=df_train.columns[:-1])

# Estimators
gbt = GBTClassifier(maxIter=150, maxDepth=7, stepSize=0.3)
lr = LogisticRegression(maxIter=2000, fitIntercept=True)

In [None]:
# Assemble features
train_lpoints = va.transform(df_train).select("features", "label").cache()
test_lpoints = va.transform(df_test).select("features", "label").cache()

In [None]:
train_lpoints.groupBy("label").count().show(2)

In [None]:
# Train Gradient Boosted Tree Model
start = time.time()
gbt_model = gbt.fit(train_lpoints)
print(time.time() - start)

In [None]:
# Train Logistic Regression
start = time.time()
lr_model = lr.fit(train_lpoints)
print(time.time() - start)

In [None]:
# Predict
gbt_predict = gbt_model.transform(test_lpoints)
lr_predict = lr_model.transform(test_lpoints)

In [None]:
metrics = MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction")

metrics.setMetricName("accuracy")
print(f"Gradient Boosted Accuracy: {metrics.evaluate(gbt_predict):.3f}")
print(f"Logistic Reg Accuracy: {metrics.evaluate(lr_predict):.3f}\n")

metrics.setMetricName("f1")
print(f"Gradient Boosted F1: {metrics.evaluate(gbt_predict):.3f}")
print(f"Logistic Reg F1: {metrics.evaluate(lr_predict):.3f}")

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Imports-&amp;-Env-Setup" data-toc-modified-id="Imports-&amp;-Env-Setup-1">Imports &amp; Env Setup</a></span></li><li><span><a href="#Import-Collin's-Function-for-Grabbing-Data-From-s3" data-toc-modified-id="Import-Collin's-Function-for-Grabbing-Data-From-s3-2">Import Collin's Function for Grabbing Data From s3</a></span></li><li><span><a href="#Kevin's-Pre-processing-Code" data-toc-modified-id="Kevin's-Pre-processing-Code-3">Kevin's Pre-processing Code</a></span></li></ul></div>