In [6]:
!pip install --user avro


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m


In [1]:
import sys
print(sys.path)


['/usr/lib/python38.zip', '/usr/lib/python3.8', '/usr/lib/python3.8/lib-dynload', '', '/home/jovyan/.local/lib/python3.8/site-packages', '/usr/local/lib/python3.8/dist-packages', '/usr/lib/python3/dist-packages']


In [None]:
import sys
sys.path.append('/home/jovyan/.local/lib/python3.8/site-packages')

from avro.datafile import DataFileReader
from avro.io import DatumReader
import json
import csv
import os
import time

# Base directory for participant data
base_dir = "Participant data (participant id,time)"
output_dir_base = "Processed_Output"

# Count total files for progress reporting
total_files = sum(len(files) for _, _, files in os.walk(base_dir) if files)
processed_files = 0

# Traverse through the directory structure and process all avro files
for root, dirs, files in os.walk(base_dir):
    for file in files:
        if file.endswith(".avro"):
            avro_file_path = os.path.join(root, file)
            
            # Print progress
            processed_files += 1
            print(f"[{processed_files}/{total_files}] Processing {avro_file_path}...")

            # Generate the output directory structure
            participant_id = root.split(os.sep)[1]  # Get participant ID from path
            date_folder = root.split(os.sep)[2]  # Get date folder from path
            output_dir = os.path.join(output_dir_base, participant_id, date_folder)
            os.makedirs(output_dir, exist_ok=True)

            try:
                # Process the AVRO file
                with DataFileReader(open(avro_file_path, "rb"), DatumReader()) as reader:
                    schema = json.loads(reader.meta.get('avro.schema').decode('utf-8'))
                    data = next(reader)

                    # Function to write CSV data
                    def write_csv(filename, headers, rows):
                        with open(os.path.join(output_dir, filename), 'w', newline='') as f:
                            writer = csv.writer(f)
                            writer.writerow(headers)
                            writer.writerows(rows)

                    # Accelerometer processing
                    acc = data["rawData"]["accelerometer"]
                    timestamp = [round(acc["timestampStart"] + i * (1e6 / acc["samplingFrequency"]))
                                 for i in range(len(acc["x"]))]
                    delta_physical = acc["imuParams"]["physicalMax"] - acc["imuParams"]["physicalMin"]
                    delta_digital = acc["imuParams"]["digitalMax"] - acc["imuParams"]["digitalMin"]
                    x_g = [val * delta_physical / delta_digital for val in acc["x"]]
                    y_g = [val * delta_physical / delta_digital for val in acc["y"]]
                    z_g = [val * delta_physical / delta_digital for val in acc["z"]]
                    write_csv('accelerometer.csv', ["unix_timestamp", "x", "y", "z"],
                              zip(timestamp, x_g, y_g, z_g))

                    # Gyroscope processing
                    gyro = data["rawData"]["gyroscope"]
                    timestamp = [round(gyro["timestampStart"] + i * (1e6 / gyro["samplingFrequency"]))
                                 for i in range(len(gyro["x"]))]
                    delta_physical = gyro["imuParams"]["physicalMax"] - gyro["imuParams"]["physicalMin"]
                    delta_digital = gyro["imuParams"]["digitalMax"] - gyro["imuParams"]["digitalMin"]
                    x_dps = [val * delta_physical / delta_digital for val in gyro["x"]]
                    y_dps = [val * delta_physical / delta_digital for val in gyro["y"]]
                    z_dps = [val * delta_physical / delta_digital for val in gyro["z"]]
                    write_csv('gyroscope.csv', ["unix_timestamp", "x", "y", "z"],
                              zip(timestamp, x_dps, y_dps, z_dps))

                    # EDA processing
                    eda = data["rawData"]["eda"]
                    timestamp = [round(eda["timestampStart"] + i * (1e6 / eda["samplingFrequency"]))
                                 for i in range(len(eda["values"]))]
                    write_csv('eda.csv', ["unix_timestamp", "eda"], zip(timestamp, eda["values"]))

                    # Temperature processing
                    tmp = data["rawData"]["temperature"]
                    timestamp = [round(tmp["timestampStart"] + i * (1e6 / tmp["samplingFrequency"]))
                                 for i in range(len(tmp["values"]))]
                    write_csv('temperature.csv', ["unix_timestamp", "temperature"], zip(timestamp, tmp["values"]))

                    # Tags processing
                    tags = data["rawData"]["tags"]
                    write_csv('tags.csv', ["tags_timestamp"], [[tag] for tag in tags["tagsTimeMicros"]])

                    # BVP processing
                    bvp = data["rawData"]["bvp"]
                    timestamp = [round(bvp["timestampStart"] + i * (1e6 / bvp["samplingFrequency"]))
                                 for i in range(len(bvp["values"]))]
                    write_csv('bvp.csv', ["unix_timestamp", "bvp"], zip(timestamp, bvp["values"]))

                    # Systolic peaks processing
                    sps = data["rawData"]["systolicPeaks"]
                    write_csv('systolic_peaks.csv', ["systolic_peak_timestamp"], [[sp] for sp in sps["peaksTimeNanos"]])

                    # Steps processing
                    steps = data["rawData"]["steps"]
                    timestamp = [round(steps["timestampStart"] + i * (1e6 / steps["samplingFrequency"]))
                                 for i in range(len(steps["values"]))]
                    write_csv('steps.csv', ["unix_timestamp", "steps"], zip(timestamp, steps["values"]))

                print(f"Finished processing {avro_file_path}, output saved to {output_dir}")

            except Exception as e:
                print(f"Error processing {avro_file_path}: {e}")

            # Optional: Simulate processing time for visibility (remove or adjust for actual use)
            time.sleep(0.5)


In [None]:
from avro.datafile import DataFileReader
from avro.io import DatumReader
import json
import csv
import os

# Base directory for participant data
base_dir = "Participant data (participant id,time)"
output_dir_base = "Processed_Output"
log_file = "processed_files.log"

# Load the list of already processed files
if os.path.exists(log_file):
    with open(log_file, 'r') as log:
        processed_files = set(log.read().splitlines())
else:
    processed_files = set()

# Traverse through the directory structure and process all avro files
for root, dirs, files in os.walk(base_dir):
    for file in files:
        if file.endswith(".avro"):
            avro_file_path = os.path.join(root, file)

            # Skip if this file has already been processed
            if avro_file_path in processed_files:
                print(f"Skipping already processed file: {avro_file_path}")
                continue

            # Generate the output directory structure
            participant_id = root.split(os.sep)[1]  # Get participant ID from path
            date_folder = root.split(os.sep)[2]  # Get date folder from path
            output_dir = os.path.join(output_dir_base, participant_id, date_folder)
            os.makedirs(output_dir, exist_ok=True)

            try:
                # Process the AVRO file
                with DataFileReader(open(avro_file_path, "rb"), DatumReader()) as reader:
                    schema = json.loads(reader.meta.get('avro.schema').decode('utf-8'))
                    data = next(reader)

                    # Function to write CSV data
                    def write_csv(filename, headers, rows):
                        with open(os.path.join(output_dir, filename), 'a', newline='') as f:
                            writer = csv.writer(f)
                            if os.stat(os.path.join(output_dir, filename)).st_size == 0:  # Check if file is empty
                                writer.writerow(headers)  # Write headers only if the file is empty
                            writer.writerows(rows)



                    # Accelerometer processing
                    acc = data["rawData"]["accelerometer"]
                    timestamp = [round(acc["timestampStart"] + i * (1e6 / acc["samplingFrequency"]))
                                 for i in range(len(acc["x"]))]
                    delta_physical = acc["imuParams"]["physicalMax"] - acc["imuParams"]["physicalMin"]
                    delta_digital = acc["imuParams"]["digitalMax"] - acc["imuParams"]["digitalMin"]
                    x_g = [val * delta_physical / delta_digital for val in acc["x"]]
                    y_g = [val * delta_physical / delta_digital for val in acc["y"]]
                    z_g = [val * delta_physical / delta_digital for val in acc["z"]]
                    write_csv('accelerometer.csv', ["unix_timestamp", "x", "y", "z"],
                              zip(timestamp, x_g, y_g, z_g))

                    # Gyroscope processing
                    gyro = data["rawData"]["gyroscope"]
                    timestamp = [round(gyro["timestampStart"] + i * (1e6 / gyro["samplingFrequency"]))
                                 for i in range(len(gyro["x"]))]
                    delta_physical = gyro["imuParams"]["physicalMax"] - gyro["imuParams"]["physicalMin"]
                    delta_digital = gyro["imuParams"]["digitalMax"] - gyro["imuParams"]["digitalMin"]
                    x_dps = [val * delta_physical / delta_digital for val in gyro["x"]]
                    y_dps = [val * delta_physical / delta_digital for val in gyro["y"]]
                    z_dps = [val * delta_physical / delta_digital for val in gyro["z"]]
                    write_csv('gyroscope.csv', ["unix_timestamp", "x", "y", "z"],
                              zip(timestamp, x_dps, y_dps, z_dps))

                    # EDA processing
                    eda = data["rawData"]["eda"]
                    timestamp = [round(eda["timestampStart"] + i * (1e6 / eda["samplingFrequency"]))
                                 for i in range(len(eda["values"]))]
                    write_csv('eda.csv', ["unix_timestamp", "eda"], zip(timestamp, eda["values"]))

                    # Temperature processing
                    tmp = data["rawData"]["temperature"]
                    timestamp = [round(tmp["timestampStart"] + i * (1e6 / tmp["samplingFrequency"]))
                                 for i in range(len(tmp["values"]))]
                    write_csv('temperature.csv', ["unix_timestamp", "temperature"], zip(timestamp, tmp["values"]))

                    # Tags processing
                    tags = data["rawData"]["tags"]
                    write_csv('tags.csv', ["tags_timestamp"], [[tag] for tag in tags["tagsTimeMicros"]])

                    # BVP processing
                    bvp = data["rawData"]["bvp"]
                    timestamp = [round(bvp["timestampStart"] + i * (1e6 / bvp["samplingFrequency"]))
                                 for i in range(len(bvp["values"]))]
                    write_csv('bvp.csv', ["unix_timestamp", "bvp"], zip(timestamp, bvp["values"]))

                    # Systolic peaks processing
                    sps = data["rawData"]["systolicPeaks"]
                    write_csv('systolic_peaks.csv', ["systolic_peak_timestamp"], [[sp] for sp in sps["peaksTimeNanos"]])

                    # Steps processing
                    steps = data["rawData"]["steps"]
                    timestamp = [round(steps["timestampStart"] + i * (1e6 / steps["samplingFrequency"]))
                                 for i in range(len(steps["values"]))]
                    write_csv('steps.csv', ["unix_timestamp", "steps"], zip(timestamp, steps["values"]))

                # Log the processed file
                with open(log_file, 'a') as log:
                    log.write(avro_file_path + '\n')

                print(f"Processed {avro_file_path} and saved output to {output_dir}")

            except Exception as e:
                print(f"Error processing {avro_file_path}: {e}")


Skipping already processed file: Participant data (participant id,time)/10-3YK3H151PR/2024-03-19/raw_data/v6/1-1-10_1710855484.avro
Skipping already processed file: Participant data (participant id,time)/10-3YK3H151PR/2024-03-19/raw_data/v6/1-1-10_1710857290.avro
Skipping already processed file: Participant data (participant id,time)/10-3YK3H151PR/2024-03-19/raw_data/v6/1-1-10_1710859096.avro
Skipping already processed file: Participant data (participant id,time)/10-3YK3H151PR/2024-03-19/raw_data/v6/1-1-10_1710860901.avro
Skipping already processed file: Participant data (participant id,time)/10-3YK3H151PR/2024-03-19/raw_data/v6/1-1-10_1710862701.avro
Skipping already processed file: Participant data (participant id,time)/10-3YK3H151PR/2024-03-19/raw_data/v6/1-1-10_1710864502.avro
Skipping already processed file: Participant data (participant id,time)/10-3YK3H151PR/2024-03-19/raw_data/v6/1-1-10_1710866300.avro
Skipping already processed file: Participant data (participant id,time)/10-3