In [1]:
!sed -i '$a\# Add the line for suppressing the NativeCodeLoader warning \nlog4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR,console' /$HADOOP_HOME/etc/hadoop/log4j.properties

## Initilization and Load Data

In [2]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.window import Window

# need these for the feature generation and UDFs and functions.col for brevity 
from pyspark.sql.functions import pandas_udf, PandasUDFType, col
from pyspark.sql.types import IntegerType, DoubleType, ArrayType, StructType, StructField
import pandas as pd
import numpy as np
from scipy.signal import find_peaks


In [3]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("EDA") \
    .getOrCreate()

In [4]:
###---------- Change path if necessary ----------###
path = "file:///home/work/Final/Dataset/raw"

file = "/phone/accel/*.txt"
df_phone_a = spark.read.csv(path + file, header=False, inferSchema=True)

file = "/phone/gyro/*.txt"
df_phone_g = spark.read.csv(path + file, header=False, inferSchema=True)

file = "/watch/accel/*.txt"
df_watch_a = spark.read.csv(path + file, header=False, inferSchema=True)

file = "/watch/gyro/*.txt"
df_watch_g = spark.read.csv(path + file, header=False, inferSchema=True)


                                                                                

In [5]:
SAMPLE_RATE = 20 #Hz    # also defined in a UDF!
SECONDS = 10

In [6]:
# print(f"Phone Accel: {df_phone_a.count()}")
# print(f"Phone Gyro: {df_phone_g.count()}")
# print(f"Watch Accel: {df_watch_a.count()}")
# print(f"Watch Gyro: {df_watch_g.count()}")

## Cleaning

In [7]:

def clean_df(df): 
    #Rename Cols
    col_map = {'_c0': 'sub_id',
           '_c1': 'activity',
           '_c2': 'time',
           '_c3': 'x',
           '_c4': 'y',
           '_c5': 'z'
          }
    df = df.withColumnsRenamed(col_map)

    #convert z from str to double 
    df = df.withColumn('z', func.regexp_replace("z", ";", "").cast("double"))

    #Sort
    df = df.sort("sub_id", "activity", "time")

    #Drop any na/null 
    count1 = df.count()
    df = df.dropna()
    print(f"rows_dropped: {count1 - df.count()}")

    return df



## Segmenting

Segmenting the samples in to ~10 second segments will let us do some more feature enginering 

3 mins / 10s = 18 segment per activity per subject 

51 subjects * 18 segment = 918 per activity 

16,524 segments across all activities


In [8]:
# Segment into 10s of data (200 samples) 

def label_segments(df, num_s):
    """ takes a spark df and num_s (int) returns a df where each row element is a list num_s elements

    df must have cols as named in clean_df()
    """
    window_spec = Window.partitionBy("sub_id", "activity").orderBy("time")

    df = df.withColumn("row_num", func.row_number().over(window_spec))

    # create label for grouping 
    df = df.withColumn("group_id", ((func.col("row_num") - 1) / num_s).cast("int"))

    return df

In [9]:
# UDFs for gen_features 
# I started trying to keep them short and single purpose... but I called find_peaks 5 times
# for the fft features there is one large UDF for 4 features

# Element-wise summation of x, y, z streams
@pandas_udf(ArrayType(DoubleType())) #schema returned 
def sum_xyz_udf(x_col: pd.Series, y_col: pd.Series, z_col: pd.Series) -> pd.Series: #(param: type) -> return type 
    return pd.Series([(np.array(x) + np.array(y) + np.array(z)).tolist() for x, y, z in zip(x_col, y_col, z_col)])

@pandas_udf(IntegerType())
def num_peaks_udf(col: pd.Series) -> pd.Series:
    return col.apply(lambda x: len(find_peaks(np.array(x), distance=10)[0]))

@pandas_udf(IntegerType())
def num_peaks_udf_t_1(col: pd.Series) -> pd.Series:
    return col.apply(lambda x: len(find_peaks(np.array(x), threshold=1)[0]))

@pandas_udf(IntegerType())
def num_peaks_udf_t_pt1(col: pd.Series) -> pd.Series:
    return col.apply(lambda x: len(find_peaks(np.array(x), threshold=0.1)[0]))

@pandas_udf(ArrayType(IntegerType()))
def peak_locs_udf(col: pd.Series) -> pd.Series:
    return col.apply(lambda x: find_peaks(np.array(x), distance=10)[0].tolist())

@pandas_udf(IntegerType())
def sum_peak_locs_udf(col: pd.Series) -> pd.Series:
    return col.apply(lambda x: sum(find_peaks(np.array(x))[0]))


@pandas_udf(StructType([
        StructField("dom_freq", DoubleType()),
        StructField("second_freq", DoubleType()),
        StructField("third_freq", DoubleType()),
        StructField("mean_freq", DoubleType())
    ]))
def fft_features_udf(col: pd.Series) -> pd.DataFrame:
    """ 
    calls a extract feature function that provides top 3 frequencies and the mean freq from a fft
    if fft doesn't find enough freq it returns 0 
    if the values are null there are not enough values in the serries 
    """
    def extract_features(x):
        """
        expects pd.Serries, tuple of 4 doubles: (dom_freq, second_freq, third_freq, mean_freq)
        """
        SAMPLE_RATE = 20 #Hz

        # if no value  
        output = 0.0 
        # output = None 
        
        x = np.array(x)
        if len(x) < 2:
            return (None, None, None, None)
        
        x = x - np.mean(x) #remove offset

        # FFT
        fft_vals = np.fft.fft(x)
        fft_freq = np.fft.fftfreq(len(x), d=1.0/SAMPLE_RATE)

        # Use only the positive spectrum
        pos = fft_freq > 0
        freqs = fft_freq[pos]
        mag_s = np.abs(fft_vals[pos])  # Magnitude spectrum

        if len(mag_s) >= 3: 
            top_freqs = freqs[np.argsort(mag_s)[-3:]]
            
            dom_freq = top_freqs[-1]
            second_freq = top_freqs[-2]
            third_freq = top_freqs[-3]
            
        elif len(mag_s) == 2:
            top_freqs = freqs[np.argsort(mag_s)[-2:]]

            dom_freq = top_freqs[-1]
            second_freq = top_freqs[-2]
            third_freq = output
            
        elif len(mag_s) == 1: 
            top_freqs = freqs[np.argsort(mag_s)[-1:]]

            dom_freq = top_freqs[-1]
            second_freq = output
            third_freq = output
        else: 
            dom_freq = output
            second_freq = output
            third_freq = output

        # Dominant frequency
        # top_freqs = freqs[np.argsort(mag_s)[-3:]] if len(mag_s) > 3 else [None, None, None]
        # dom_freq = top_freqs[-1]
        # second_freq = top_freqs[-2]
        # third_freq = top_freqs[-3]
        # # dom_freq = freqs[np.argmax(mags)] if len(mags) > 0 else None
        
        # Mean freq (weighted)
        mean_freq = np.sum(freqs * mag_s) / np.sum(mag_s) if np.sum(mag_s) > 0 else output

        return (dom_freq, second_freq, third_freq, mean_freq)
    
    return pd.DataFrame(col.apply(extract_features).tolist(), columns=["dom_freq", "second_freq", "third_freq", "mean_freq"])



def gen_features(df): 
    """ takes a spark df

    dfs must have columns "sub_id", "activity", "group_id"
    """
    df = df.groupBy("sub_id", "activity", "group_id").agg(
        func.collect_list("time").alias("time"), 
        func.collect_list("x").alias("x"),
        func.collect_list("y").alias("y"),
        func.collect_list("z").alias("z"), 
        func.avg("x").alias("x_avg"),
        func.avg("y").alias("y_avg"), 
        func.avg("z").alias("z_avg"),
        func.sum("x").alias("x_sum"),
        func.sum("y").alias("y_sum"),
        func.sum("z").alias("z_sum"),
        func.stddev("x").alias("x_stddev"),
        func.stddev("y").alias("y_stddev"),
        func.stddev("z").alias("z_stddev"),
        func.corr("x", "y").alias("corr_x_y"),
        func.corr("x", "z").alias("corr_x_z"),
        func.corr("y", "z").alias("corr_y_z"),
        func.avg( func.sqrt( func.pow("x", 2) + func.pow("y", 2) + func.pow("z", 2))).alias("resultant")
        
    )

    df = df.withColumn("sum_xyz", sum_xyz_udf("x", "y", "z"))
    
    # find_peaks 
    df = df.withColumn("x_num_peaks", num_peaks_udf("x")) \
    .withColumn("x_num_peaks_t_1", num_peaks_udf_t_1('x')) \
    .withColumn("x_num_peaks_t_pt1", num_peaks_udf_t_pt1('x'))

    df = df.withColumn("y_num_peaks", num_peaks_udf("y")) \
    .withColumn("y_num_peaks_t_1", num_peaks_udf_t_1('y')) \
    .withColumn("y_num_peaks_t_pt1", num_peaks_udf_t_pt1('y'))

    df = df.withColumn("z_num_peaks", num_peaks_udf("z")) \
    .withColumn("z_num_peaks_t_1", num_peaks_udf_t_1('z')) \
    .withColumn("z_num_peaks_t_pt1", num_peaks_udf_t_pt1('z'))

    df = df.withColumn("xyz_num_peaks", num_peaks_udf("sum_xyz")) \
    .withColumn("xyz_num_peaks_t_1", num_peaks_udf_t_1('sum_xyz')) \
    .withColumn("xyz_num_peaks_t_pt1", num_peaks_udf_t_pt1('sum_xyz'))

    # might have some information on where in the segment the peaks are found relative
    df = df.withColumn("x_avg_peak_locs", sum_peak_locs_udf("x")/col("x_num_peaks")) \
    .withColumn("y_avg_peak_locs", sum_peak_locs_udf("y")/col("y_num_peaks")) \
    .withColumn("z_avg_peak_locs", sum_peak_locs_udf("z")/col("z_num_peaks")) \
    .withColumn("xyz_avg_peak_locs", sum_peak_locs_udf("sum_xyz")/col("xyz_num_peaks")) \
    
    
    # frequency features 
    # each block unpacks the udf results into 4 features 
    df = df.withColumn("x_fft", fft_features_udf("x")) \
        .withColumn("x_dom_freq", func.col("x_fft.dom_freq")) \
        .withColumn("x_2nd_freq", func.col("x_fft.second_freq")) \
        .withColumn("x_3rd_freq", func.col("x_fft.third_freq")) \
        .withColumn("x_mean_freq", func.col("x_fft.mean_freq")) \
        .drop("x_fft")

    df = df.withColumn("y_fft", fft_features_udf("y")) \
        .withColumn("y_dom_freq", func.col("y_fft.dom_freq")) \
        .withColumn("y_2nd_freq", func.col("y_fft.second_freq")) \
        .withColumn("y_3rd_freq", func.col("y_fft.third_freq")) \
        .withColumn("y_mean_freq", func.col("y_fft.mean_freq")) \
        .drop("y_fft")

    df = df.withColumn("z_fft", fft_features_udf("z")) \
        .withColumn("z_dom_freq", func.col("z_fft.dom_freq")) \
        .withColumn("z_2nd_freq", func.col("z_fft.second_freq")) \
        .withColumn("z_3rd_freq", func.col("z_fft.third_freq")) \
        .withColumn("z_mean_freq", func.col("z_fft.mean_freq")) \
        .drop("z_fft")
    
    df = df.withColumn("xyz_fft", fft_features_udf("sum_xyz")) \
        .withColumn("xyz_dom_freq", func.col("xyz_fft.dom_freq")) \
        .withColumn("xyz_2nd_freq", func.col("xyz_fft.second_freq")) \
        .withColumn("xyz_3rd_freq", func.col("xyz_fft.third_freq")) \
        .withColumn("xyz_mean_freq", func.col("xyz_fft.mean_freq")) \
        .drop("xyz_fft")
    
    return df




# df_phone_a_seg.show()

### Apply Cleaning

In [10]:
df_phone_a = clean_df(df_phone_a)
df_phone_g = clean_df(df_phone_g)
df_watch_a = clean_df(df_watch_a)
df_watch_g = clean_df(df_watch_g)

# df_phone_a_seg.printSchema()

                                                                                

rows_dropped: 0


                                                                                

rows_dropped: 0


                                                                                

rows_dropped: 0


[Stage 37:>                                                         (0 + 4) / 4]

rows_dropped: 0


                                                                                

### Apply Feature Generation 

In [11]:
# here SAMPLE_RATE = 20Hz and SECONDS = 10 --> 200 sample segments 
df_phone_a_seg = gen_features(label_segments(df_phone_a, SAMPLE_RATE*SECONDS))
df_phone_g_seg = gen_features(label_segments(df_phone_g, SAMPLE_RATE*SECONDS))
df_watch_a_seg = gen_features(label_segments(df_watch_a, SAMPLE_RATE*SECONDS))
df_watch_g_seg = gen_features(label_segments(df_watch_g, SAMPLE_RATE*SECONDS))


### Clean Up Again

In [12]:
# Remove rows that have too few samples 
min_samples = SAMPLE_RATE*SECONDS

df_phone_a_seg = df_phone_a_seg.filter(func.size(col("x")) == min_samples)
df_phone_g_seg = df_phone_g_seg.filter(func.size(col("x")) == min_samples)
df_watch_a_seg = df_watch_a_seg.filter(func.size(col("x")) == min_samples)
df_watch_g_seg = df_watch_g_seg.filter(func.size(col("x")) == min_samples)


In [13]:
# Select a df with sub_id, activity, and generated features (numeric cols)
numeric_cols = [field.name for field in df_watch_a_seg.schema.fields if isinstance(field.dataType, IntegerType) or isinstance(field.dataType, DoubleType)]
to_select = ["activity"] + numeric_cols 


pddf_phone_a = df_phone_a_seg.select(to_select).toPandas()
pddf_watch_a = df_watch_a_seg.select(to_select).toPandas()
pddf_phone_g = df_phone_g_seg.select(to_select).toPandas()
pddf_watch_g = df_watch_g_seg.select(to_select).toPandas()




                                                                                

In [14]:
#Save CSV for future work 
path = "/home/work/Final/features_"

pddf_phone_a.to_csv(path+"phone_a"+".csv", index=False)
pddf_watch_a.to_csv(path+"watch_a"+".csv", index=False) 
pddf_phone_g.to_csv(path+"phone_g"+".csv", index=False) 
pddf_watch_g.to_csv(path+"watch_g"+".csv", index=False) 


In [15]:
spark.stop()