In [1]:
pip install git+https://github.com/MetaGuard/xror.git#egg=xror

Collecting xror
  Cloning https://github.com/MetaGuard/xror.git to /tmp/pip-install-or09pvt9/xror_c7b3aa18edf6481493bf6a9bd94da2b6
  Running command git clone --filter=blob:none --quiet https://github.com/MetaGuard/xror.git /tmp/pip-install-or09pvt9/xror_c7b3aa18edf6481493bf6a9bd94da2b6
  Resolved https://github.com/MetaGuard/xror.git to commit b2177253d97066038ab52bf59714f1966ece903e
  Preparing metadata (setup.py) ... [?25ldone
Note: you may need to restart the kernel to use updated packages.


In [2]:
import pandas as pd
import os
import glob
import logging
from xror import XROR  # Ensure xror module is available in the notebook environment

logging.basicConfig(filename= 'xror_parsing_errors.log', level=logging.INFO, format= '%(asctime)s - %(levelname)s - %(message)s')
# Define the column names based on the number of sensors/measurements
frame_columns = [
    "spawnTime", "saberSpeed", "saberDirX", "saberDirY", "saberDirZ",
    "cutDirDeviation", "cutPointX", "cutPointY", "cutPointZ",
    "cutNormalX", "cutNormalY", "cutNormalZ", "cutDistanceToCenter",
    "cutAngle", "beforeCutRating", "afterCutRating", "noteID",
    "speedOK", "directionOK", "saberTypeOK", "wasCutTooSoon", "saberType"
]

# List of root folders containing subdirectories with XROR files
root_folders = ['chunk1', 'chunk2', 'chunk3']

In [3]:
def process_xror_files(root_folder_path):
    # Recursively find all .xror files in the root folder and its subdirectories
    xror_files = glob.glob(os.path.join(root_folder_path, '**/*.xror'), recursive=True)

    # Process each XROR file
    for file_path in xror_files:
        try:
            base_name = os.path.splitext(file_path)[0]
            csv_file_path = base_name + '.csv'

            if os.path.exists(csv_file_path):
                logging.info(f"CSV file already exists for {file_path}. Skipping.")
                continue

            with open(file_path, 'rb') as f:
                binary_data = f.read()

            xror_data = XROR.unpack(binary_data)

            if len(xror_data.data['frames'][0]) != len(frame_columns):
                logging.warning(f"Column mismatch in {file_path}. Expected {len(frame_columns)} columns, found {len(xror_data.data['frames'][0])}.")
                continue

            df_frames = pd.DataFrame(xror_data.data['frames'], columns=frame_columns)
            df_frames['directionOK'] = df_frames.apply(lambda row: row['saberDirX'] > 0 and row['saberDirY'] > 0, axis=1)
            df_frames.to_csv(csv_file_path, index=False)
            logging.info(f"DataFrame saved to CSV successfully for file: {file_path}")

        except Exception as e:
            logging.error(f"Error processing {file_path}: {e}")

# Process files in all specified root folders
for root_folder in root_folders:
    process_xror_files(root_folder)

In [4]:
# Verify that the CSV files were created successfully
created_csv_files = []
for root_folder in root_folders:
    csv_files = glob.glob(os.path.join(root_folder, '**/*.csv'), recursive=True)
    created_csv_files.extend(csv_files)

print(f"Total CSV files created: {len(created_csv_files)}")

Total CSV files created: 241196


In [5]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
import os
import glob

def normalize_csv_files(root_folders):
    for root_folder in root_folders:
        # Recursively find all .csv files in the root folder and its subdirectories
        csv_files = glob.glob(os.path.join(root_folder, '**/*.csv'), recursive=True)
        csv_files = [f for f in csv_files if '_normalized.csv' not in f]
        
        # Process each CSV file
        for file_path in csv_files:
            try:
                # Construct the path for the normalized file
                normalized_file_path = file_path.replace('.csv', '_normalized.csv')
                
                # If normalized file already exists, skip to avoid reprocessing
                if os.path.exists(normalized_file_path):
                    #print(f"Normalized file already exists for {file_path}. Skipping.")
                    continue
                
                # Load the data
                data = pd.read_csv(file_path)
                
                # Normalize the data
                scaler = StandardScaler()
                numeric_cols = data.select_dtypes(include=['float64', 'int']).columns
                data[numeric_cols] = scaler.fit_transform(data[numeric_cols])
                
                # Save the normalized data back to disk
                data.to_csv(normalized_file_path, index=False)
                print(f"Data normalized and saved successfully for file: {file_path}")

            except Exception as e:
                print(f"Error normalizing file {file_path}: {e}")

# Specify the root folder paths where your CSV files are located
root_folders = ['chunk1', 'chunk2', 'chunk3']  # Update this path to where your CSV files are stored
normalize_csv_files(root_folders)

In [6]:
import dask

# Print current Dask configuration
print("Current Dask configuration:")
print(dask.config.config)

Current Dask configuration:
{'temporary-directory': None, 'visualization': {'engine': None}, 'tokenize': {'ensure-deterministic': False}, 'dataframe': {'shuffle-compression': None, 'parquet': {'metadata-task-size-local': 512, 'metadata-task-size-remote': 16}}, 'array': {'svg': {'size': 120}, 'slicing': {'split-large-chunks': None}}, 'optimization': {'fuse': {'active': None, 'ave-width': 1, 'max-width': None, 'max-height': inf, 'max-depth-new-edges': None, 'subgraphs': None, 'rename-keys': True}}}


In [None]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import os

def aggregate_and_feature_engineer(root_folders):
    file_patterns = [os.path.join(root, '**/*_normalized.csv') for root in root_folders]
    ddf = dd.read_csv(file_patterns, include_path_column=True)
    
    ddf['user_id'] = ddf['path'].str.extract(r'/([^/]+)/[^/]+\.csv$')[0]
    ddf['chunk_id'] = ddf['path'].str.extract(r'/(chunk\d+)/')[0]
    
    ddf = ddf.drop('path', axis=1)
    
    aggregations = {
        'saberSpeed': ['mean', 'std', 'min', 'max'],
        'saberDirX': ['mean', 'std', 'min', 'max'],
        'saberDirY': ['mean', 'std', 'min', 'max'],
        'saberDirZ': ['mean', 'std', 'min', 'max']
    }
    
    grouped_ddf = ddf.groupby(['chunk_id', 'user_id']).agg(aggregations)
    grouped_ddf.columns = ['_'.join(col).strip() for col in grouped_ddf.columns.values]

    with ProgressBar():
        # Save each part after computing
        results = []
        for i, part in enumerate(grouped_ddf.to_delayed()):
            part_df = part.compute()
            if not part_df.empty:
                part_df.to_csv(f'aggregated_and_featured_data_part_{i}.csv', index=False)
                results.append(part_df)
                print(f"Part {i} processed and saved.")

            # Optionally save a checkpoint for every 10000 parts processed
            if i % 10000 == 0 and results:
                dd.concat(results).to_csv('aggregated_and_featured_data_checkpoint.csv', index=False)
                results = []  # Clear the results list to free up memory

        # Final save
        if results:
            dd.concat(results).to_csv('aggregated_and_featured_data_final.csv', index=False)
            print("Final data processing complete and saved.")

root_folders = ['chunk1', 'chunk2', 'chunk3']
aggregate_and_feature_engineer(root_folders)

[########################################] | 100% Completed | 30hr 27m
Part 0 processed and saved.
