In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os
from ydata_profiling import ProfileReport

In [4]:
import pyspark
sc = pyspark.SparkContext(appName="EEG_Analysis")
sc

In [3]:
spark = SparkSession.builder \
    .appName("EEG_Analysis") \
    .master("local[*]") \
    .getOrCreate()

In [4]:
# base_path = r"C:\Users\sachi\pyspark_tutorial\muse_pipeline\Telepathic-Navigation\muse_dataset\Trial_1"
body_parts = [(0,"Right_hand"), (1,"Left_hand"), (2,"Right_leg"), (3,"Left_leg")]

In [5]:
#  Select relevant columns and drop NA values
columns_to_analyze = [
    "TimeStamp", "Delta_TP9", "Delta_AF7", "Delta_AF8", "Delta_TP10",
    "Theta_TP9", "Theta_AF7", "Theta_AF8", "Theta_TP10",
    "Alpha_TP9", "Alpha_AF7", "Alpha_AF8", "Alpha_TP10",
    "Beta_TP9", "Beta_AF7", "Beta_AF8", "Beta_TP10",
    "Gamma_TP9", "Gamma_AF7", "Gamma_AF8", "Gamma_TP10"]

from pyspark.sql.types import *

data_schema = [
                StructField('TimeStamp', DoubleType(), True),
                StructField('Delta_TP9', DoubleType(), True),
                StructField('Delta_AF7', DoubleType(), True),
                StructField('Delta_AF8', DoubleType(), True),
                StructField('Delta_TP10', DoubleType(), True),
                StructField('Theta_TP9', DoubleType(), True),
                StructField('Theta_AF7', DoubleType(), True),
                StructField('Theta_AF8', DoubleType(), True),
                StructField('Theta_TP10', DoubleType(), True),
                StructField('Alpha_TP9', DoubleType(), True),
                StructField('Alpha_AF7', DoubleType(), True),
                StructField('Alpha_AF8', DoubleType(), True),
                StructField('Alpha_TP10', DoubleType(), True),
                StructField('Beta_TP9', DoubleType(), True),
                StructField('Beta_AF7', DoubleType(), True),
                StructField('Beta_AF8', DoubleType(), True),
                StructField('Beta_TP10', DoubleType(), True),
                StructField('Gamma_TP9', DoubleType(), True),
                StructField('Gamma_AF7', DoubleType(), True),
                StructField('Gamma_AF8', DoubleType(), True),
                StructField('Gamma_TP10', DoubleType(), True)
                # StructField('label', IntegerType(), True)
            ]

final_struct = StructType(fields=data_schema)

In [6]:
from pyspark.sql.functions import lit
"""
Load data from a folder containing multiple CSV files.
"""
def load_data(index, folder_path, part):

    csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
    # Initialize an empty list to store individual DataFrames
    dfs = []
    # Read each CSV file and create a DataFrame
    for file in csv_files:
        file_path = os.path.join(folder_path, file)
        df = spark.read.csv(file_path, header=True, schema=final_struct)
        df.shape = (df.count(), len(df.columns))
        print(f"Loaded {file} of {part} with shape {df.shape}")
        dfs.append(df)

    # Union all DataFrames
    combined_df = dfs[0]
    for df in dfs[1:]:
        combined_df = combined_df.union(df)
    combined_df = combined_df.withColumn("label", lit(index))
    return combined_df

# # Load data for each body part
# data_dict = {}
# for index, part in body_parts:
#     folder_path = os.path.join(base_path, part)
#     data_dict[part] = load_data(index, folder_path, part)

In [7]:
def create_dataset(start_time, end_time, cols, granularity=0.100):
    timestamps = np.arange(start_time, end_time, granularity)
    pdf = pd.DataFrame(index=timestamps, columns=cols)
    pdf.reset_index(inplace=True)
    pdf.columns = ['TimeStamp'] + cols
    data_table = spark.createDataFrame(pdf)
    for col in cols:
        data_table = data_table.withColumn(col, F.lit(None).cast("double"))
    return data_table


# Create a dataset with the desired granularity
data_table = create_dataset(start_time, end_time, columns_to_analyze[1:], granularity)
# data_table = data_table.withColumn("TimeStamp", dataset["TimeStamp"])


NameError: name 'start_time' is not defined

In [8]:
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from datetime import timedelta



file_path = r"C:\Users\sachi\pyspark_tutorial\muse_pipeline\Telepathic-Navigation\muse_dataset\Trial_2\Right_hand\museMonitor_2024-10-11--20-47-37_8793473853970757967.csv"

dataset = pd.read_csv(file_path, header=0).drop(columns=['Elements'])
dataset = dataset.dropna()

dataset['TimeStamp'] = pd.to_datetime(dataset['TimeStamp'])
dataset['TimeStamp'] = ((dataset['TimeStamp'] - dataset['TimeStamp'].min()).dt.total_seconds().round(3)* 1000)
# dataset['TimeStamp'] = (dataset['TimeStamp'].astype(float))
dataset = dataset[columns_to_analyze]

dataset = spark.createDataFrame(dataset, schema=final_struct)

from pyspark.sql.window import Window
# from pyspark.sql.functions import col, monotonically_increasing_id

granularity = 100 # Change this to your desired granularity in milliseconds

start_time = dataset.select('TimeStamp').collect()[0][0]
end_time = dataset.select('TimeStamp').collect()[-1][0]

# Create a window
window_spec = Window.orderBy('TimeStamp').rowsBetween(0, granularity)

# Bucket the data by the time interval (granularity)
dataset = dataset.withColumn('TimeBucket', (F.col('TimeStamp') / granularity).cast('int') * granularity)

# Group by the buckets and calculate the average for each bucket
aggregations = {col: 'avg' for col in columns_to_analyze}
dataset = dataset.groupBy('TimeBucket').agg(aggregations)

# Rename the columns to remove "avg()"
for col in columns_to_analyze[1:]:
    dataset = dataset.withColumnRenamed(f'avg({col})', col)

# Drop the TimeBucket column
dataset = dataset.drop('TimeBucket')
# Show the resulting dataset
# dataset.show()

dataset.shape = (dataset.count(), len(dataset.columns))

print(f"Dataset shape: {dataset.shape}")


Dataset shape: (26, 21)


In [9]:
print(dataset.count())
step = 0.100
values = [(i * step,) for i in range(dataset.count())]
print(len(values))

26
26


In [10]:

# Create a Spark DataFrame with the new TimeStamp values
time_df = spark.createDataFrame([(val,) for val in values], ['TimeStamp'])

time_df.count()

26

In [11]:

dataset = dataset.withColumn('row_id', F.monotonically_increasing_id())

time_df = time_df.withColumn('row_id', F.monotonically_increasing_id())

dataset.count()
time_df.count()

26

In [15]:
window_spec = Window.orderBy(F.lit(1))  # Use F.lit(1) to avoid sorting issues

# Calculate the number of rows in the dataset
num_rows = dataset.count()

# Generate the desired timestamp values with step 0.100
step = 0.100
values = [(i * step,) for i in range(num_rows)]

time_df = spark.createDataFrame(values, ['TimeStamp'])

# Add a consecutive row number to both the original dataset and the time_df
dataset = dataset.withColumn('row_id', F.row_number().over(window_spec))
time_df = time_df.withColumn('row_id', F.row_number().over(window_spec))

reslt = dataset.join(time_df, 'row_id', 'inner').drop('row_id')

reslt.show()

+--------------------+-------------------+--------------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+--------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|           Theta_AF7|          Alpha_AF8|           Delta_AF8|    avg(TimeStamp)|           Gamma_AF7|         Delta_TP10|          Gamma_AF8|          Delta_AF7|         Gamma_TP10|           Beta_AF8|           Beta_TP9|         Delta_TP9|           Theta_AF8|          Gamma_TP9|          Beta_TP10|            Beta_AF7|         Theta_TP10|          Alpha_AF7|         Alpha_TP10|           Theta_TP9|          Alpha_TP9|          TimeStamp|
+--------------------+-------------------+--------------------+------------------+--------------------+-----