In [1]:
#Feature Engineer Activity Summary of Actigraphy Data

## Feature Engineer Activity Summary of Actigraphy Data

In this notebook, we process actigraphy data from train and test sets using a custom class, `ActivityProcessor`. The main steps are:

- **Initialize Processor**:
    - Set up paths for train and test data, output directory, and batch size.
    - Create a cumulative DataFrame for storing results.


- **Batch Processing**:
    - **Load Data**: Each batch of parquet files is loaded and concatenated, adding a `student_id` and converting categorical columns as needed.
    - **Validate Wear Flag**: Apply thresholds on ENMO and accelerometer data to confirm wear status, generating a `valid_wear_flag`.
    - **Summarize Activity**: For each student, calculate wear duration, no-motion periods, and overall wear status.


- **Save Results**:
    - After processing, save separate summary CSVs for train and test data.

In [2]:
import cudf
import glob
import os

class ActivityProcessor:
    """
    Class to process activity data from train and test parquet files, validate wear flags,
    and calculate activity summaries in batches.
    """

    def __init__(self, train_path, test_path, output_dir, batch_size=100):
        """
        Initializes the ActivityProcessor with paths for train and test data, output directory,
        and batch size.
        
        Parameters:
            train_path (str): Path to the directory containing train parquet files.
            test_path (str): Path to the directory containing test parquet files.
            output_dir (str): Path to the output directory where results will be saved.
            batch_size (int): Number of files to process per batch.
        """
        self.train_path = train_path
        self.test_path = test_path
        self.output_dir = output_dir
        self.batch_size = batch_size
        self.activity_summary_df = cudf.DataFrame()
        os.makedirs(output_dir, exist_ok=True)

    def load_and_concatenate_batch(self, batch_files):
        """
        Loads a batch of parquet files, adds a student_id column to each, and concatenates them.
        
        Parameters:
            batch_files (list): List of parquet file paths to load and concatenate.
        
        Returns:
            cudf.DataFrame: Concatenated DataFrame containing data from all files in the batch.
        """
        batch_df = cudf.DataFrame()
        for file in batch_files:
            student_id = os.path.basename(os.path.dirname(file))
            df = cudf.read_parquet(file)
            df['student_id'] = student_id

            # Convert categorical columns to strings
            for column in df.columns:
                if df[column].dtype.name == "category":
                    df[column] = df[column].astype("str")

            batch_df = cudf.concat([batch_df, df], ignore_index=True)
        
        return batch_df

    def validate_non_wear_flag(self, df, enmo_threshold=0.02, std_threshold=0.05, window_size=60):
        """
        Validates wear and non-wear periods by applying thresholds to ENMO and accelerometer std values.
        
        Parameters:
            df (cudf.DataFrame): DataFrame containing activity data.
            enmo_threshold (float): Threshold for ENMO values indicating no motion.
            std_threshold (float): Threshold for standard deviation indicating no motion.
            window_size (int): Rolling window size for calculating standard deviation.
        
        Returns:
            cudf.DataFrame: DataFrame with an additional 'valid_wear_flag' column indicating wear periods.
        """
        df = df.copy()
        df['std_X'] = df['X'].rolling(window=window_size).std()
        df['std_Y'] = df['Y'].rolling(window=window_size).std()
        df['std_Z'] = df['Z'].rolling(window=window_size).std()
        df['std_acc'] = (df['std_X'] + df['std_Y'] + df['std_Z']) / 3
        df['valid_wear_flag'] = 0
        non_wear_mask = (df['enmo'] < enmo_threshold) & (df['std_acc'] < std_threshold)
        df['valid_wear_flag'] = df['valid_wear_flag'].mask(non_wear_mask, 1)
        df['flag_mismatch'] = df['valid_wear_flag'] != df['non-wear_flag']
        
        return df

    def calculate_activity_summary(self, df):
        """
        Calculates activity summary statistics for each student, including wear duration, no-motion periods,
        and overall wear status.
        
        Parameters:
            df (cudf.DataFrame): DataFrame containing validated wear data.
        
        Returns:
            cudf.DataFrame: DataFrame containing summary statistics for each student.
        """
        wear_df = df[df['valid_wear_flag'] == 0]
        activity_summary = df.groupby('student_id').agg({
            'valid_wear_flag': ['count', 'sum'],
            'enmo': 'mean'
        })
        
        activity_summary.columns = ['total_duration', 'non_wear_duration', 'average_activity_level']
        activity_summary['wear_duration'] = activity_summary['total_duration'] - activity_summary['non_wear_duration']
        no_motion_counts = df[df['enmo'] == 0].groupby('student_id').size().rename('no_motion_count')
        activity_summary = activity_summary.merge(no_motion_counts, on='student_id', how='left').fillna(0)
        
        # Convert durations to seconds
        activity_summary['total_duration_seconds'] = activity_summary['total_duration'] * 5
        activity_summary['non_wear_duration_seconds'] = activity_summary['non_wear_duration'] * 5
        activity_summary['wear_duration_seconds'] = activity_summary['wear_duration'] * 5
        activity_summary['no_motion_duration_seconds'] = activity_summary['no_motion_count'] * 5
        
        # Determine if the device was worn for the majority of the sampling period
        activity_summary['overall_wear_status'] = (activity_summary['wear_duration_seconds'] > 
                                                   (activity_summary['total_duration_seconds'] / 2)).astype(int)
        
        # Drop sample-based columns
        activity_summary = activity_summary.drop(columns=['total_duration', 'non_wear_duration', 
                                                          'wear_duration', 'no_motion_count']).reset_index()
        
        return activity_summary

    def save_to_csv(self, file_name):
        """
        Saves the cumulative activity summary DataFrame to a CSV file.
        
        Parameters:
            file_name (str): Name of the output CSV file.
        """
        output_path = os.path.join(self.output_dir, file_name)
        self.activity_summary_df.to_csv(output_path)
        print(f"Data saved to {output_path}")

    def process_files(self, mode='train'):
        """
        Processes all parquet files in the specified mode (train or test) in batches. For each batch, loads, validates,
        and summarizes the data, then appends to the cumulative activity summary DataFrame.
        
        Parameters:
            mode (str): Indicates which data to process ('train' or 'test').
        """
        data_path = self.train_path if mode == 'train' else self.test_path
        parquet_files = glob.glob(os.path.join(data_path, '**/*.parquet'), recursive=True)
        
        for i in range(0, len(parquet_files), self.batch_size):
            batch_files = parquet_files[i:i + self.batch_size]
            batch_df = self.load_and_concatenate_batch(batch_files)
            validated_df = self.validate_non_wear_flag(batch_df)
            batch_summary = self.calculate_activity_summary(validated_df)
            self.activity_summary_df = cudf.concat([self.activity_summary_df, batch_summary], ignore_index=True)
            print(f"Processed {mode} batch {i // self.batch_size + 1}/{len(parquet_files) // self.batch_size + 1}")

        # Save to CSV with appropriate naming based on mode
        self.save_to_csv(f'activity_summary_{mode}.csv')


In [3]:
# paths for train and test actigraphy data
train_path = '/kaggle/input/child-mind-institute-problematic-internet-use/series_train.parquet'
test_path = '/kaggle/input/child-mind-institute-problematic-internet-use/series_test.parquet'
output_dir = '/kaggle/working/batch_output'

# initialize with batch size of 100
processor = ActivityProcessor(train_path=train_path, test_path=test_path, output_dir=output_dir, batch_size=100)

# process train data files
processor.process_files(mode='train')

# clear activity_summary_df
processor.activity_summary_df = cudf.DataFrame()

# process test data files
processor.process_files(mode='test')

Processed train batch 1/10
Processed train batch 2/10
Processed train batch 3/10
Processed train batch 4/10
Processed train batch 5/10
Processed train batch 6/10
Processed train batch 7/10
Processed train batch 8/10
Processed train batch 9/10
Processed train batch 10/10
Data saved to /kaggle/working/batch_output/activity_summary_train.csv
Processed test batch 1/1
Data saved to /kaggle/working/batch_output/activity_summary_test.csv
