In [48]:
# This is the template for the submission. If you want, you can develop your algorithm in a regular Python script and copy the code here for submission.

# Team members (e-mail, legi):
# zhisun@ethz.ch, 22-958-227
# enjcao@ethz.ch, 22-942-700
# yifzhou@ethz.ch, 22-940-381

In [49]:
import pandas as pd
import math
from Lilygo.Recording import Recording
from Lilygo.Dataset import Dataset
from os import listdir
from os.path import isfile, join
from math import sqrt
import numpy as np
from scipy import signal
import matplotlib.pyplot as plt

In [50]:
# Get the path of all traces
dir_traces = '/kaggle/input/mobile-health-2023-step-count/data/traces'
filenames = [join(dir_traces, f) for f in listdir(dir_traces) if isfile(join(dir_traces, f))]
filenames.sort()
# print(filenames)

In [51]:
# This function aims to find peak locations and corresponding values in the signal with the function signal.find_peaks
def get_peaks(input_signal, prominence):
    peak_locations, _ = signal.find_peaks(input_signal, prominence=prominence)
    peak_values = input_signal[peak_locations]
    return peak_locations, peak_values

In [52]:
# This function aims to find the component caused by gravity from data, which means the signal around 0 Hz
def get_gravity(data):
    filtered_data = np.zeros_like(data)
    # Parameters in IIR filter
    alpla = [1, -1.979133761292768, 0.979521463540373]
    beta = [0.000086384997973502, 0.00012769995947004, 0.000086384997973502]
    # Formula of IIR filter
    for i in range(2, len(data)):
        filtered_data[i] = alpla[0] * (data[i] * beta[0] + data[i-1] * beta[1] + data[i-2] * beta[2] - filtered_data[i-1] * alpla[1] - filtered_data[i-2] * alpla[2])
    return filtered_data


In [53]:
# This function aims to realize a low-pass filter with cutoff frequency = 1 Hz. Because according to massive amounts of data, the general 
# minimum frequency of human walking is about 1 Hz
def get_lowpass(data):
    filtered_data = np.zeros_like(data)  # filtered_data
    alpla = [1, -1.905384612118461, 0.910092542787947]
    beta = [0.953986986993339, -1.907503180919730, 0.953986986993339]

    for i in range(2, len(data)):
        filtered_data[i] = alpla[0] * (data[i] * beta[0] + data[i-1] * beta[1] + data[i-2] * beta[2] - filtered_data[i-1] * alpla[1] - filtered_data[i-2] * alpla[2])
    return filtered_data

In [54]:
# This funciton aims to realize a high-pass filter with cutoff frequency = 5 Hz. Because according to massive amounts of data, the general 
# maximum frequency of human walking is about 5 Hz
def get_highpass(data):
    filtered_data = np.zeros_like(data)  # filtered_data
    alpla = [1, -1.80898117793047, 0.827224480562408]
    beta = [0.096665967120306, -0.172688631608676, 0.095465967120306]
    
    for i in range(2, len(data)):
        filtered_data[i] = alpla[0] * (data[i] * beta[0] + data[i-1] * beta[1] + data[i-2] * beta[2] - filtered_data[i-1] * alpla[1] - filtered_data[i-2] * alpla[2])
    return filtered_data

In [55]:
stepCounts = []
ids = []

for i, filename in enumerate(filenames):
    trace = Recording(filename, no_labels=True, mute=True)
    stepCount = 0

    # Your algorithm goes here
    # Make sure, you only use data from the LilyGo Wristband, namely the following 10 keys (as in trace.data[key]):
    # 3-axis accelerometer: key in [ax, ay, az]
    # 3-axis gyro: key in [gx, gy, gz]
    # 3-axis magnetometer: key in [mx, my, mz]
    # IMU temperature: key==temperature

    # Get the original data from Lilygo -----------------------------------------------------------------------
    # Get accelerator data
    trace = Recording(filename, no_labels=True, mute=True)
    ax = trace.data['ax']
    ay = trace.data['ay']
    az = trace.data['az']
    # Get time stamp and sampling rate
    timestamp_raw = ax.timestamps
    timestamp_ms = timestamp_raw*1000
    sampling_rate = len(timestamp_raw)/timestamp_raw[-1]

    # Calculate raw magnitude of accelerometer signal
    amagn = [sqrt(a**2+ay.values[i]**2+az.values[i]**2)for i, a in enumerate(ax.values)]
    trace.data['amagn'] = Dataset.fromLists('Accelerometer magnitude', amagn, timestamp_ms)


    # Filter the signal to get more accurate results -----------------------------------------------------------
    # Find the component caused by gravity from data and remove it from the singanl
    data_gravity = get_gravity(amagn)
    data_user = amagn - data_gravity
    # Get user's acceleration along the gravity direction by dot product
    data_a = data_user * data_gravity
    # Add low pass and high pass filter to reduce noise in signal (possible human walking rate:1 - 5Hz)
    data_filtered = get_highpass(data_a)
    data_filtered = get_lowpass(data_filtered)
    # Use convolution to reduce noise in signal again
    filter_window_size = 40
    data_filtered = np.convolve(data_filtered, np.ones((filter_window_size,))/filter_window_size, mode='valid')

    
    # Find peaks in the filtered signal and realize our stepcount -----------------------------------------------
    prominence = 0.1
    peak_locations, _ = get_peaks(data_filtered, 0.1)
    stepCount = len(peak_locations)
    
    
    # Visualize the original signal, filtered signal and detected peaks -----------------------------------------
    # title = "StepCount: " + str(stepCount)
    # print(title)
    # fig, axes1 = plt.subplots(2,1, figsize=(60, 5)) #figsize is width, height
    # axes1[0].set_title(title)
    # axes1[0].plot(data_user, alpha=1, label="original")
    # axes1[1].plot(data_filtered, alpha=1, label="Filtered mag")
    # axes1[1].plot(peak_locations, data_filtered[peak_locations], 'y+', color="red", label="Peak Locations")


    # Append your calculated step counts and the id of the trace to the corresponding array
    stepCounts.append(stepCount)
    ids.append(filename.split('_')[1][:2])

In [56]:
# Write the detected step counts into a .csv file to then upload the .csv file to Kaggle
# When cross-checking the .csv file on your computer, we recommend using the text editor and NOT excel so that the results are displayed correctly
# IMPORTANT: Do NOT change the name of the columns ('Id' and 'Predicted') of the .csv file
df = pd.DataFrame({'Id':ids,'Predicted':stepCounts})
df.to_csv('./submission.csv', index=False)