In [13]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.conf import * 
import math
import numpy as np

In [14]:
sqlContext = SparkSession.builder.master("local[*]").appName("appName").\
                      config("spark.sql.warehouse.dir", "./spark-warehouse").getOrCreate()

In [15]:
path = "/Users/dwoodbridge/Research/Medhere/Data"

percentage_to_drop_in_file = 0.25

#temporary files
temp_trimmed_file = "temp.csv"
temp_accelerometer_file = "temp_accelerometer.csv"
temp_gyroscope_file = "temp_gyroscope.csv" 

#final output file
final_output_file = "discretizaed_data.csv"

new_bin_no = 5 # how many bins it will use. 
time_unit = 1000000 # the basic unit of time increment.

In [16]:
def trim_file(file_name, percentage_to_drop_in_file):
    file = open(os.path.join(root, name), "rw")
    input_string = file.read()
    line = input_string.split("\n")
    total_line = len(line)
    print total_line
    extracted_line = line[(int) (total_line * percentage_to_drop_in_file):\
                          (int) (total_line *(1 - percentage_to_drop_in_file))]
    with open(temp_trimmed_file, 'w') as file_handler:
        for item in extracted_line:
            file_handler.write("{}\n".format(item))

In [17]:
def read_trimmed_file_as_df():
    schema = StructType([
            StructField("time_stamp", DoubleType(), True),
            StructField("sensor_type", IntegerType(), True),
            StructField("sensor_name", StringType(), True),
            StructField("x_val", DoubleType(), True),
            StructField("y_val", DoubleType(), True),
            StructField("z_val", DoubleType(), True)])

    spark = SparkSession.builder \
                        .appName("Data_Prep") \
                        .master("local[*]") \
                        .config(conf=SparkConf()) \
                        .getOrCreate()
        
    df_motion = spark.read \
                    .format("csv") \
                    .option("delimiter", ",") \
                    .load(temp_trimmed_file, schema=schema)
    return df_motion       
    #df_motion.cache() #cahcing in memory ==> enhance the performance.

In [18]:
def time_min_max(df_motion):
    time_min = df_motion.sort("time_stamp", ascending=1).select('time_stamp').first()
    time_min = time_min[0]
    time_max = df_motion.sort("time_stamp", ascending=0).select('time_stamp').first()
    time_max = time_max[0]
    return {'time_min':time_min, 'time_max':time_max }

In [19]:
def averaging_sensor_data(df_motion,sensor_type):
    return df_motion.filter('sensor_type='+str(sensor_type)).\
                     groupBy('time_stamp').\
                     avg('x_val','y_val','z_val').\
                     sort("time_stamp", ascending=1).\
                     withColumnRenamed('avg(x_val)', 'x_val').\
                     withColumnRenamed('avg(y_val)', 'y_val').\
                     withColumnRenamed('avg(z_val)', 'z_val')

In [20]:
def missing_data_imputation(sensor_values, sensor_type):
    #DO NOT INTERPORATE : PROBLEM IS THAT SENSOR IS ONLY WRITTEN WHEN DATA POINT IS CHANGED.
    if(sensor_type == 1):
        file_name = temp_accelerometer_file
    elif(sensor_type ==4):
        file_name = temp_gyroscope_file

    first = sensor_values.first()

    print time_min
    print time_max
    with open(file_name, 'w') as file_handler:
        if(first[0] > time_min):
            time = time_min
        else :
            time = first[0]
        while(first[0] > time):
                item =  str(first[1]) + "," + str(first[2]) + "," + str(first[3]) 
                #print "Boo"+ str(time) + "," + item
                file_handler.write("{}\n".format(str(time) + "," + item))
                time = time + time_unit       

        for row in sensor_values.collect():
            while(time < row.time_stamp):
                    #print  "Zoo" + str(time) + "," + item
                    file_handler.write("{}\n".format(str(time) + "," + item))
                    time = time + time_unit
            item =  str(row[1]) + "," + str(row[2]) + "," + str(row[3])
            #print "Coo" + str(time) + "," +item  
            file_handler.write("{}\n".format(str(time) + "," + item))
            time = time + time_unit

        while(time <= time_max):
            #print "ZEE" + str(time) + "," +item  
            file_handler.write("{}\n".format(str(time) + "," + item))
            time = time + time_unit  

In [21]:
def global_stat(sensor_type):
    if(sensor_type == 1):
        file_name = temp_accelerometer_file
    elif(sensor_type ==4):
        file_name = temp_gyroscope_file

    schema = StructType([
                StructField("time_stamp", DoubleType(), True),
                StructField("x_val", DoubleType(), True),
                StructField("y_val", DoubleType(), True),
                StructField("z_val", DoubleType(), True)])

    df_after_missing_data_imputation = spark.read \
                                            .format("csv") \
                                            .option("delimiter", ",") \
                                            .load(file_name, schema=schema)
                
    #structure : timestamp, x_val, y_val, z_val
    global_avg = df_after_missing_data_imputation.groupBy().avg().rdd.flatMap(lambda x:  x).collect()
    global_avg_x_val = global_avg[1]
    global_avg_y_val = global_avg[2]
    global_avg_z_val = global_avg[3]

    global_min = df_after_missing_data_imputation.groupBy().min().rdd.flatMap(lambda x:  x).collect()
    global_min_x_val = global_min[1]
    global_min_y_val = global_min[2]
    global_min_z_val = global_min[3]

    global_max = df_after_missing_data_imputation.groupBy().max().rdd.flatMap(lambda x:  x).collect()
    global_max_x_val = global_max[1]
    global_max_y_val = global_max[2]
    global_max_z_val = global_max[3]

    global_std_x_val = df_after_missing_data_imputation.groupBy().agg(stddev("x_val")).rdd.flatMap(lambda x:  x).collect()[0]
    global_std_y_val = df_after_missing_data_imputation.groupBy().agg(stddev("y_val")).rdd.flatMap(lambda x:  x).collect()[0]
    global_std_z_val = df_after_missing_data_imputation.groupBy().agg(stddev("z_val")).rdd.flatMap(lambda x:  x).collect()[0]

    global_med_x_val = df_after_missing_data_imputation.approxQuantile("x_val", [0.5], 0)[0]
    global_med_y_val = df_after_missing_data_imputation.approxQuantile("y_val", [0.5], 0)[0]
    global_med_z_val = df_after_missing_data_imputation.approxQuantile("z_val", [0.5], 0)[0]

    global_stat =  str(global_min_x_val) + "," + str(global_min_y_val) + "," + str(global_min_z_val) + "," +\
                   str(global_med_x_val) + "," + str(global_med_y_val) + "," + str(global_med_z_val) + "," +\
                   str(global_max_x_val) + "," + str(global_max_y_val) + "," + str(global_max_z_val) + "," +\
                   str(global_avg_x_val) + "," + str(global_avg_y_val) + "," + str(global_avg_z_val) + "," +\
                   str(global_std_x_val) + "," + str(global_std_y_val) + "," + str(global_std_z_val) 
    return global_stat

In [22]:
def moving_stat(old_bin_no,sensor_type):
    if(sensor_type == 1):
        file_name = temp_accelerometer_file
    elif(sensor_type ==4):
        file_name = temp_gyroscope_file

    schema = StructType([
                StructField("time_stamp", DoubleType(), True),
                StructField("x_val", DoubleType(), True),
                StructField("y_val", DoubleType(), True),
                StructField("z_val", DoubleType(), True)])

    df_after_missing_data_imputation = spark.read \
                                            .format("csv") \
                                            .option("delimiter", ",") \
                                            .load(file_name, schema=schema)
                
    data_in_each_bin = int(old_bin_no/new_bin_no)
    count = 0
    x_list = []
    y_list = []
    z_list = []
    line = ''
    for val in df_after_missing_data_imputation.rdd.map(lambda x : x).collect():
      count = count + 1
      x_list.append(val.x_val)
      y_list.append(val.y_val)
      z_list.append(val.z_val)
      if( count % data_in_each_bin == 0 ):
          bin_stat =  str(np.min(x_list)) + "," + str(np.min(y_list)) + "," + str(np.min(z_list)) + "," + \
                      str(np.median(x_list)) + "," + str(np.median(y_list)) + "," + str(np.median(z_list)) + "," +\
                      str(np.max(x_list)) + "," +  str(np.max(y_list)) + "," + str(np.max(z_list)) + "," + \
                      str(np.mean(x_list)) + "," + str(np.mean(y_list)) + "," + str(np.mean(z_list)) + "," +\
                      str(np.std(x_list)) + "," + str(np.std(y_list)) + "," + str(np.std(z_list)) 
          if(count == data_in_each_bin):
            line = bin_stat
          else:
            line = line + "," + bin_stat
          print bin_stat
          x_list=[]
          y_list=[]
          z_list=[]
            
    return line


ITERATE EACH FILE TO TRIM, IMPUTE MISSING DATA AND DISCRETIZE DATA.

In [None]:
import os
import re
with open(final_output_file, 'w') as file_handler:
    for root, dirs, files in os.walk(path, topdown=False):
        for name in files:
            if re.search('.*dominant.*(.csv)$', name):            
                file_label =  re.search('[a-z_]+',name).group(0)[:-1]
                line = file_label
                print file_label + " : " + name
                #print(os.path.join(root, name))

                #trim_file
                trim_file(os.path.join(root, name), percentage_to_drop_in_file)

                df_motion = read_trimmed_file_as_df()
                df_motion.cache() #cahcing in memory ==> enhance the performance.

                #claculate time_min, time_max, and old_bin_no
                df_time_calcaultion = time_min_max(df_motion)
                time_min =  df_time_calcaultion['time_min']
                time_max =  df_time_calcaultion['time_max']
                time_diff = time_max-time_min
                print time_diff
                #old_bin_no = time_diff/time_unit

                ##missing_data_imputation
                #sensor_types = [1,4] # accelerometer : 1, gyroscope : 4 
                #for sensor_type in sensor_types:
                #    sensor_values = averaging_sensor_data(df_motion, sensor_type)
                #    missing_data_imputation(sensor_values, sensor_type)

                #for sensor_type in sensor_types:
                #    line = line + "," + global_stat(sensor_type)

                #for sensor_type in sensor_types:
                #    line = line + "," + moving_stat(old_bin_no,sensor_type)

                #file_handler.write("{}\n".format(line))
                
                #df_motion.unpersist()   