In [1]:
import pandas as pd
import numpy as np
import glob
import tqdm.auto
import os
import re
import warnings

import dask.distributed

#### Parameters

In [2]:
WINDOW_DURATION_SECONDS = 15

#### Start a Dask client

In [3]:
client = dask.distributed.Client()

client

0,1
Client  Scheduler: tcp://127.0.0.1:59961  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 6  Cores: 24  Memory: 63.93 GiB


#### Generate Windowed Raw Data

In [4]:
def process_subject_raw_data_files(subject_id):
    activity_map = pd.read_table(
        'wisdm-dataset/activity_key.txt',
        sep = ' = ',
        header = None,
        engine = 'python',
        names = ['activity_name', 'activity_id']
    ).set_index('activity_id')['activity_name'].to_dict()
    
    def process_raw_data_file(sensor, device):
        df = pd.read_table(
            f'wisdm-dataset/raw/{device}/{sensor}/data_{subject_id}_{sensor}_{device}.txt',
            sep = ',',
            header = None, 
            names = ['subject_id', 'activity_id', 'timestamp', 'x', 'y', 'z'],
        )

        # The end of each line has a semi-colon which is being stored in the "z" column; remove this semi-colon.
        df['z'] = df['z'].str[0:-1].astype(float)

        # Rename x, y, z columns
        df.rename(columns={'x': f'{sensor}_{device}_x', 'y': f'{sensor}_{device}_y', 'z': f'{sensor}_{device}_z'}, inplace=True)

        # Map activity_id to activity name
        df['activity_name'] = df['activity_id'].map(activity_map)

        # Calculate t0 (unique for each subject, device, sensor, and activity)
        df = df.merge(df.groupby(['activity_id'])['timestamp'].min().reset_index().rename(columns={'timestamp': 't0'}))

        # Calculate t (from t0) (Units = milliseconds)
        df['t_ms'] = ( df['timestamp'] - df['t0'] ) / 1_000_000

        # Round t_ms rounded to nearest 50 ms (because sensor sampling rate = 20Hz)
        base = 50
        df['t_ms_rounded'] = df['t_ms'].apply(lambda x: base * round(float(x) / base)).astype('int')

        # Compute absolute delta between t_ms and t_ms_rounded
        df['abs_delta_ms'] = ( df['t_ms'] - df['t_ms_rounded'] ).abs()

        # Sort by time
        df.sort_values(['activity_id', 't_ms', 'abs_delta_ms'], ascending=[True, True, False], inplace=True)

        # Remove duplicates (for each activity, by t_ms_rounded)
        df.drop_duplicates(subset=['activity_id', 't_ms_rounded'], keep='first', inplace=True)

        # Calculate segment sequence number
        df['segment_sequence_number'] = np.floor(df['t_ms'] / 1_000 / WINDOW_DURATION_SECONDS).astype(int) + 1

        # Remove irrelevant columns
        df.drop(columns=['timestamp', 't0', 't_ms', 'abs_delta_ms'], inplace=True)

        activity_dfs = []
        # Add dataframe to subject_dfs
        for activity_id in df['activity_id'].unique():
            activity_df = df[df['activity_id'] == activity_id].copy()
            assert activity_df['t_ms_rounded'].is_monotonic_increasing
            activity_df.set_index(['subject_id', 'activity_id', 'activity_name', 'segment_sequence_number', 't_ms_rounded'], inplace=True)
            activity_dfs.append(activity_df)

        return pd.concat(activity_dfs, axis=0)
    
    subject_dfs = []
    for sensor in ('accel', 'gyro'):
        for device in ('phone', 'watch'):
            subject_dfs.append(process_raw_data_file(sensor, device))
            
    # In subject_dfs,
    #   - The 1st dataframe is accel_phone
    #   - The 2nd dataframe is accel_watch
    #   - The 3rd dataframe is gyro_phone
    #   - The 4th dataframe is gyro_watch
    return pd.concat(subject_dfs, axis=1)

In [5]:
subject_ids = pd.Series(
    glob.glob('wisdm-dataset/raw/*/*/*.txt', recursive=True)
).str.extract(r'^.+data_(\d+?)_.+$', expand=False).astype('int').sort_values().unique()

subject_data_futures = []
for subject_id in tqdm.auto.tqdm(subject_ids, desc='Submitting Jobs to Dask Client'):
    subject_data_futures.append(client.submit(process_subject_raw_data_files, subject_id))

print('Waiting for all Jobs to Complete...', end='')
df = pd.concat(client.gather(subject_data_futures), axis=0)
print('done!' + '\n')

df.info()

Submitting Jobs to Dask Client:   0%|          | 0/51 [00:00<?, ?it/s]

Waiting for all Jobs to Complete...done!

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 3298350 entries, (1600, 'A', 'walking', 1, 0) to (1650, 'S', 'folding', 12, 180000)
Data columns (total 12 columns):
 #   Column         Dtype  
---  ------         -----  
 0   accel_phone_x  float64
 1   accel_phone_y  float64
 2   accel_phone_z  float64
 3   accel_watch_x  float64
 4   accel_watch_y  float64
 5   accel_watch_z  float64
 6   gyro_phone_x   float64
 7   gyro_phone_y   float64
 8   gyro_phone_z   float64
 9   gyro_watch_x   float64
 10  gyro_watch_y   float64
 11  gyro_watch_z   float64
dtypes: float64(12)
memory usage: 321.0+ MB


In [6]:
# Add unique segment IDs to each time segment
segment_ids = pd.Series(index=df.index.droplevel(4).drop_duplicates(), data=np.arange(len(df.index.droplevel(4).drop_duplicates()))+1)
segment_ids.name = 'segment_id'

df = pd.merge(
    left = df,
    right = segment_ids,
    how = 'left',
    left_index = True,
    right_index = True,
)

df.set_index('segment_id', append=True, inplace=True)

df.info()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 3298350 entries, (1600, 'A', 'walking', 1, 0, 1) to (1650, 'S', 'folding', 12, 180000, 11138)
Data columns (total 12 columns):
 #   Column         Dtype  
---  ------         -----  
 0   accel_phone_x  float64
 1   accel_phone_y  float64
 2   accel_phone_z  float64
 3   accel_watch_x  float64
 4   accel_watch_y  float64
 5   accel_watch_z  float64
 6   gyro_phone_x   float64
 7   gyro_phone_y   float64
 8   gyro_phone_z   float64
 9   gyro_watch_x   float64
 10  gyro_watch_y   float64
 11  gyro_watch_z   float64
dtypes: float64(12)
memory usage: 327.6+ MB


#### Write `df` to disk

In [7]:
df.to_parquet('processed_raw_data.parquet', index=True)

#### Shutdown Dask Client

In [8]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
