In [1]:
# This is the template for the submission. If you want, you can develop your algorithm in a regular Python script and copy the code here for submission.

# Team members (e-mail, legi):
# chozhang@student.ethz.ch, 22-945-562
# minghli@student.ethz.ch, 22-953-293
# changli@student.ethz.ch, 22-944-474

In [2]:
import os
import sys
curr_environ = os.environ.get('KAGGLE_KERNEL_RUN_TYPE', 'Localhost')
if curr_environ != 'Localhost': 
    sys.path.append('/kaggle/input/mobile-health-2023-path-detection')
    input_dir = '/kaggle/input/mobile-health-2023-path-detection'
else:
    input_dir = os.path.abspath('')

In [3]:
import numpy as np
import pandas as pd

from Lilygo.Recording import Recording
from Lilygo.Dataset import Dataset
from os import listdir
from os.path import isfile, join



In [4]:
# for signal processing and calculations
from scipy import signal

# for tuning parameters
from sklearn.model_selection import GridSearchCV
from sklearn.base import BaseEstimator

# Tasks

## Step Count

In [5]:
### signal processing functions ###
def parse(signal, ds_freq:float=20.0, zero_mean:bool=False):
    """downsampling the signal to specific frequency ds_freq, and make the data
     with zero mean if zero_mean is True"""
    ori_time_seq = np.array(signal.timestamps)
    ori_value_seq = np.array(signal.values)
    if zero_mean: ori_value_seq = ori_value_seq - np.mean(ori_value_seq)
    dt = 1./ds_freq
    time_seq = np.arange(start=np.min(ori_time_seq), stop=np.max(ori_time_seq), step=dt)
    value_seq = np.interp(time_seq, ori_time_seq, ori_value_seq)
    return time_seq, value_seq
    
def bp_filter(value_seq, fp:float=3, fs:float=20.0):
    """apply band pass filter to the sequence. fp is the threshold frequency,
     and fs is the sampling frequency."""
    sos = signal.butter(N=4, Wn=[0.5,fp], btype='bandpass', fs=fs, output='sos')
    filtered = signal.sosfilt(sos, value_seq)
    return filtered
    
def get_envelop(value_seq, 
                fs:float=20, 
                half_window_size:float=0.5, 
                _min:float=20., 
                _max:float=500.):
    """
    get the envelop as the adaptive local norm of the signal, currently the mode
     of vector (no negative values). The envelop is calculated by the maximum in
     a window, half_window_size is the seconds of time. _min and _max for clip.
    """
    half_win = int(fs*half_window_size)
    seq = np.concatenate([np.zeros((half_win,)),value_seq,np.zeros((half_win,))])
    envelop = np.array([np.max(seq[k-half_win:k+half_win+1]) 
                        for k in range(half_win,half_win+len(value_seq))])
    return np.clip(envelop, _min, _max)

In [6]:
class StepCounter(BaseEstimator):
    def __init__(self, acc_min=2.,  acc_max=3., acc_height=0.25,
                 gyro_min=20., gyro_max=500., gyro_height=0.5,
                 acc_weight=1.0, half_window_size=0.5, width=0.5):
        self.acc_min = acc_min
        self.acc_max = acc_max
        self.acc_height = acc_height
        self.gyro_min = gyro_min
        self.gyro_max = gyro_max
        self.gyro_height = gyro_height
        self.acc_weight = acc_weight
        self.half_window_size = half_window_size
        self.width = width

    def fit(self, data, labels):
        # no learning actually, just to fit the estimator interface
        return self

    def score(self, X, y_true, sample_weight=None, normalize=True) -> float:
        '''
        Get the "score" of the step counting result. 
        The score is calculated based on how different the step count is from the true values
        '''
        y_predicted = self.predict(X)
        diff = y_predicted - y_true
        scores = np.zeros(len(diff))
        for i in range(len(diff)):
            s = - abs(diff[i])
            scores[i] = s
        if normalize:
            return np.average(scores, weights=sample_weight)
        elif sample_weight is not None:
            return np.dot(scores, sample_weight)
        else:
            return scores.sum()

    def predict(self, traces):
        # assume array
        if hasattr(traces, '__len__'):
            res = np.zeros(len(traces), dtype=int)
            _traces = traces
        else:
            res = np.zeros(1, dtype=int)
            _traces = [traces]
        i = 0
        for trace in _traces:
            data = trace.data
            # accelerator data
            ax, ay, az = data['ax'], data['ay'], data['az']
            # gyroscope data
            gx, gy, gz = data['gx'], data['gy'], data['gz']
            acc_step_counts = self._count_steps(ax, ay, az,
                                                _max=self.acc_max,
                                                _min=self.acc_min,
                                                _height=self.acc_height,
                                                half_window_size=self.half_window_size,
                                                width=self.width)
            gyro_step_counts = 0
            if self.acc_weight != 1.0:
                gyro_step_counts = self._count_steps(gx, gy, gz,
                                                    _max=self.gyro_max,
                                                    _min=self.gyro_min,
                                                    _height=self.gyro_height,
                                                    half_window_size=self.half_window_size,
                                                    width=self.width)
            res[i] = int(self.acc_weight * acc_step_counts +
                         (1.0 - self.acc_weight) * gyro_step_counts)
            i += 1
        if len(res) == 1:
            return res[0]
        else:
            return res

    def _count_steps(self, ax, ay, az, _max, _min, _height,
                     half_window_size=0.5, width=0.5):
        # interval of m and temp: 80ms; others 50ms
        # acc are in unit "g". gyro should be within -255, 255
        g_t, gx_v = parse(ax)  # use acceleration seems better.
        _, gy_v = parse(ay)
        _, gz_v = parse(az)

        # calculate the mode.
        g_v = np.sqrt(np.sum(np.square([gx_v, gy_v, gz_v]), axis=0))
        g_v /= get_envelop(g_v,
                           half_window_size=half_window_size,
                           _min=_min,
                           _max=_max)  # an adaptive local norm
        # band pass
        filtered_gv = bp_filter(g_v)
        # amp 1/4 after filtering, should be amplified 4x.
        filtered_gv = filtered_gv * (filtered_gv > 0) * 4

        # 0.5 optimal for gyro. not tuned for acc but I am lazy.
        peaks, _ = signal.find_peaks(filtered_gv,
                                     height=_height,
                                     distance=20 * 0.2)
        # when _min=20 for acc, height=0.01 looks good. sota: _min=1, height=0.25
        step_count = len(peaks)  # peaks are the steps.
        return step_count


---

## Smart Watch Location

According to [Kuntze et. al.](https://kaikunze.de/papers/pdf/kunze2005recognizing.pdf) it's sufficient to use a C4.5 decision tree + accelometer to do watch loc detection. (But that paper is very rudimentary)

In [7]:
class WatchLocPredictor(BaseEstimator):
    # required
    def __init__(self):
        pass

    # required
    def fit(self, data, labels):
        pass

    # required
    def predict(self, trace: Recording):
        return 0
    

---

## Path Index

In [8]:
class PathDetector(BaseEstimator):
    # required
    def __init__(self):
        pass

    # required
    def fit(self, data, labels):
        pass

    # required
    def predict(self, trace):
        return 0

---

## Activity

Activities contained in the data trace and performed for more than 60 s uninterrupted. 

Output as a list of integers: e.g., `[0, 3]` (`0`: standing still, `1`: walk, `2`: run, `3`: cycle). 

These do not need to be in the right order and they do not need to occur multiple times.

In [9]:
class ActivityPredictor(BaseEstimator):
    # required
    def __init__(self):
        pass

    # required
    def fit(self, data, labels):
        pass

    # required
    def predict(self, trace):
        pass

---

# Prediction

In [10]:
# Get the path of all traces
dir_traces = '/kaggle/input/mobile-health-2023-path-detection/data/test'
filenames = [join(dir_traces, f) for f in listdir(dir_traces) if isfile(join(dir_traces, f))]
filenames.sort()

In [11]:
# initialize predictors
step_counter = StepCounter()

In [12]:
# Loop through all traces and calculate the step count for each trace
solution_file = []
for filename in filenames[:10]:
    trace = Recording(filename, no_labels=True, mute=True)
    categorization_results = {'watch_loc': 0, 'path_idx': 0, 'step_count': 0, 'stand': 0, 'walk': 0, 'run': 0, 'cycle': 0}

    #
    # Your algorithm goes here
    # You can access the variable 'watch_loc' in the dictionary 'categorization_results' for example with
    # categorization_results['watch_loc'] = 1
    # Make sure, you do not use the gps data and are tolerant for missing data (see task set).
    # Your program must not crash when single smartphone data traces are missing.
    #
    categorization_results['step_count'] = step_counter.predict(trace)

    # Append your calculated results and the id of each trace and category to the solution file
    trace_id = ''.join([*filename][-8:-5])
    for counter_label, category in enumerate(categorization_results):
        solution_file.append([trace_id + f'_{counter_label+1}', categorization_results[category]])


In [13]:
# Write the detected step counts into a .csv file to then upload the .csv file to Kaggle
# When cross-checking the .csv file on your computer, we recommend using the text editor and NOT excel so that the results are displayed correctly
# IMPORTANT: Do NOT change the name of the columns ('Id' and 'Category') of the .csv file
submission_file_df = pd.DataFrame(np.asarray(solution_file), columns=['Id', 'Category'])
submission_file_df.to_csv('/kaggle/working/submission.csv', header=['Id', 'Category'], index=False)