In [None]:
!pip install mord
!pip install tensorflow

**Load Data**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

!unzip /content/drive/MyDrive/CS229/7606611.zip -d cs229-data
!unzip cs229-data/SubjData.zip -d cs229-data/SubjData
!unzip cs229-data/P80.zip -d cs229-data/P80
for i in range(8):
  for j in range(10):
    !unzip cs229-data/P{i}{j}.zip -d cs229-data/P{i}{j}

In [None]:
!rm cs229-data/SubjData.zip
!rm cs229-data/P80.zip
for i in range(8):
  for j in range(10):
    !rm cs229-data/P{i}{j}.zip

**Code Parameters**

In [None]:
import pytz
import os
import pandas as pd

DEFAULT_TZ = pytz.FixedOffset(540)  # GMT+09:00; Asia/Seoul

PATH_DATA = 'cs229-data/SubjData/'
PATH_ESM = os.path.join(PATH_DATA, 'EsmResponse.csv')
PATH_PARTICIPANT = os.path.join(PATH_DATA, 'UserInfo.csv')

PATH_INTERMEDIATE = './intermediate'

SECOND_MS = 1000
MINUTE_MS = 60*SECOND_MS
DATA_TYPES = {
    'Acceleration': 'ACC',
    'AmbientLight': 'AML',
    'Calorie': 'CAL',
    'Distance': 'DST',
    'EDA': 'EDA',
    'HR': 'HRT',
    'RRI': 'RRI',
    'SkinTemperature': 'SKT',
    'StepCount': 'STP',
    'UltraViolet': 'ULV',
    'ActivityEvent': 'ACE',
    'ActivityTransition': 'ACT',
    'AppUsageEvent': 'APP',
    'BatteryEvent': 'BAT',
    'CallEvent': 'CAE',
    'Connectivity': 'CON',
    'DataTraffic': 'DAT',
    'InstalledApp': 'INS',
    'Location': 'LOC',
    'MediaEvent': 'MED',
    'MessageEvent': 'MSG',
    'WiFi': 'WIF',
    'ScreenEvent': 'SCR',
    'RingerModeEvent': 'RNG',
    'ChargeEvent': 'CHG',
    'PowerSaveEvent': 'PWS',
    'OnOffEvent': 'ONF'
}

**Read in emotional response data**

In [None]:
esm_response = pd.read_csv('cs229-data/SubjData/EsmResponse.csv')
esm_response

In [None]:
def remove_mul_deltas(df):
  for column in df.columns:
    if column.count("-") > 1:
        if column in df.columns:
          df = df.drop(column, axis=1)
  return df

**Read in sensor data**

In [None]:
pcodes = [f"P{str(i).zfill(2)}" for i in range(81)]
all_data_df = pd.DataFrame()
for pcode in pcodes:
  user_df = pd.DataFrame()

  for datatype in ["HR", "SkinTemperature", "Acceleration", "AmbientLight"]: # only uses these two sensor datas for now
    try:
      df = pd.read_csv(f"cs229-data/{pcode}/{datatype}.csv")
    except FileNotFoundError:
      continue

    df['pcode'] = pcode

    df["timestamp-1min"] = df["timestamp"] - MINUTE_MS
    df = pd.merge_asof(df, df[df.columns.difference(['pcode', "timestamp-1min"])], left_on="timestamp-1min", right_on="timestamp", suffixes=["", "-1min"], direction="nearest", tolerance=1500)
    df = remove_mul_deltas(df)
    df = df.drop("timestamp-1min", axis=1)

    df["timestamp-5min"] = df["timestamp"] - 5*MINUTE_MS
    df = pd.merge_asof(df, df[df.columns.difference(['pcode', "timestamp-5min"])], left_on="timestamp-5min", right_on="timestamp", suffixes=["", "-5min"], direction="nearest", tolerance=1500)
    df = remove_mul_deltas(df)
    df = df.drop("timestamp-5min", axis=1)

    df["timestamp-10min"] = df["timestamp"] - 10*MINUTE_MS
    df = pd.merge_asof(df, df[df.columns.difference(['pcode', "timestamp-10min"])], left_on="timestamp-10min", right_on="timestamp", suffixes=["", "-10min"], direction="nearest", tolerance=1500)
    df = remove_mul_deltas(df)
    df = df.drop("timestamp-10min", axis=1)

    if user_df.empty:
        user_df = df
    else:
        user_df = pd.merge_asof(user_df, df, on=["timestamp"], by=["pcode"], direction="nearest", tolerance=1500)
        # user_df = pd.merge(
        #   user_df,
        #   df,
        #   how="inner",
        #   on=['pcode', 'timestamp'],
        # )
        user_df = user_df.dropna()

    # all_data_df.isnull().mean() * 100
    all_data_df = pd.concat([all_data_df, user_df])
    all_data_df = all_data_df.dropna()

In [None]:
all_data_df

**Join Sensor Data with Emotional Response Data**

In [None]:
joined_df = pd.merge(
          all_data_df,
          esm_response,
          how="inner",
          left_on=['pcode'],
          right_on=['pcode']
        )

In [None]:
threshold = 60000 # questionare completed within a minute of sensor readings
joined_df = joined_df[abs(joined_df['timestamp'] - joined_df['responseTime']) <= threshold]
df = joined_df.reset_index(drop=True)
df = df.drop(columns=['timestamp', 'responseTime', 'scheduledTime', 'duration', 'disturbance', 'change'])

In [None]:
df

**Calculate total acceleration**

In [None]:
import numpy as np
df["accel"] = np.sqrt(df["x"] ** 2 + df["y"] ** 2 + df["z"] ** 2)
df["accel-1min"] = np.sqrt(df["x-1min"] ** 2 + df["y-1min"] ** 2 + df["z-1min"] ** 2)
df["accel-5min"] = np.sqrt(df["x-5min"] ** 2 + df["y-5min"] ** 2 + df["z-5min"] ** 2)
df["accel-10min"] = np.sqrt(df["x-10min"] ** 2 + df["y-10min"] ** 2 + df["z-10min"] ** 2)
df = df.drop(columns=['x', 'y', 'z', 'x-1min', 'y-1min', 'z-1min', 'x-5min', 'y-5min', 'z-5min', 'x-10min', 'y-10min', 'z-10min'])

**Shuffle Data**

In [None]:
df = df.sample(frac=1)

**Split into train and test sets**

In [None]:
# split into train and test
seventy_pct = int(df.shape[0] * 0.7)

train_set = df.loc[:seventy_pct, :]
test_set = df.loc[seventy_pct:, :]

train_set

In [None]:
columns_to_split = ['bpm', 'temperature']

# Create the first DataFrame with the specified columns
x_train = train_set[columns_to_split]
x_test = test_set[columns_to_split]

# Create the second DataFrame with the remaining columns
y_train = train_set.drop(columns_to_split, axis=1)
y_test = test_set.drop(columns_to_split, axis=1)

In [None]:
import numpy as np
from mord import LogisticIT  # LogisticIT is for immediate-threshold ordinal regression
from sklearn.metrics import mean_squared_error, confusion_matrix, accuracy_score

In [None]:
models = {}
predictions = {}
softmax_mses = {}
softmax_errors = {}

# Train a model for each output (valence, arousal, attention, stress)
for i, target in enumerate(['valence', 'arousal', 'attention', 'stress']):
    # Create and train the ordinal regression model
    ordinal_model = LogisticIT()
    ordinal_model.fit(x_train, y_train[target])

    models[target] = ordinal_model

    test_preds = ordinal_model.predict(x_test)
    predictions[target] = test_preds

    train_preds = ordinal_model.predict(x_train)
    train_mse = mean_squared_error(y_train[target], train_preds)
    train_error = 1 - accuracy_score(y_train[target], train_preds)
    test_mse = mean_squared_error(y_test[target], predictions[target])
    test_error = 1 - accuracy_score(y_test[target], predictions[target])
    softmax_errors[target] = train_error
    softmax_mses[target] = train_mse
    print(f"\nMetrics for {target}:")
    print(f"Train MSE: {train_mse:.4f}")
    print(f"Train error: {train_error:.2f}")
    print(f"Test MSE: {test_mse:.4f}")
    print(f"Test error: {test_error:.2f}")

In [None]:
from sklearn.ensemble import 2

models = {}
predictions = {}
forest_errors = {}
forest_mses = {}
# Train a model for each output (valence, arousal, attention, stress)
for i, target in enumerate(['valence', 'arousal', 'attention', 'stress']):
    # Create and train the ordinal regression model
    ordinal_model = RandomForestClassifier()
    ordinal_model.fit(x_train, y_train[target])

    models[target] = ordinal_model

    test_preds = ordinal_model.predict(x_test)
    predictions[target] = test_preds

    train_preds = ordinal_model.predict(x_train)
    train_mse = mean_squared_error(y_train[target], train_preds)
    train_error = 1 - accuracy_score(y_train[target], train_preds)
    test_mse = mean_squared_error(y_test[target], predictions[target])
    test_error = 1 - accuracy_score(y_test[target], predictions[target])
    forest_errors[target] = train_error
    forest_mses[target] = train_mse
    print(f"\nMetrics for {target}:")
    print(f"Train MSE: {train_mse:.4f}")
    print(f"Train error: {train_error:.2f}")
    print(f"Test MSE: {test_mse:.4f}")
    print(f"Test error: {test_error:.2f}")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Data
models = list(forest_mses.keys())
mse_random_forest = [forest_errors[x] for x in models]
mse_softmax = [softmax_errors[x] for x in models]

# X-axis positions
x = np.arange(len(models))

# Bar width
bar_width = 0.35

# Create the plot
fig, ax = plt.subplots(figsize=(8, 6))

# Bars
rf_bars = ax.bar(x - bar_width/2, mse_random_forest, bar_width, label='Random Forest', color='skyblue')
softmax_bars = ax.bar(x + bar_width/2, mse_softmax, bar_width, label='Softmax Regression', color='salmon')

# Labels and title
ax.set_xlabel('Models')
ax.set_ylabel('MSE Error')
ax.set_title('MSE Error by Model and Algorithm')
ax.set_xticks(x)
ax.set_xticklabels(models)
ax.legend()

# Add values on top of bars
for bars in [rf_bars, softmax_bars]:
    for bar in bars:
        height = bar.get_height()
        ax.annotate(f'{height:.2f}',
                    xy=(bar.get_x() + bar.get_width() / 2, height),
                    xytext=(0, 3),  # Offset text slightly above the bar
                    textcoords="offset points",
                    ha='center', va='bottom', fontsize=10)

# Show plot
plt.tight_layout()
plt.show()


In [None]:
# Convert x_train and y_train to numpy arrays
x_trainnp = x_train.to_numpy()
y_cols = ['valence', 'arousal', 'attention', 'stress']
y_filtered = y_train.loc[:,y_cols]
y_trainnp = y_filtered.to_numpy()
# Convert x_test and y_test to numpy arrays
x_testnp = x_test.to_numpy()
y_filtered = y_test.loc[:,y_cols]
y_testnp = y_filtered.to_numpy()
print(x_testnp.shape)
print(y_testnp.shape)

In [None]:
def calculate_accuracy(y_true, y_pred, tolerance=0):
    """
    Calculate the percentage of predictions within a tolerance for each column.

    Args:
        y_true: True target values.
        y_pred: Predicted target values.
        tolerance: Maximum allowed difference between true and predicted values.

    Returns:
        A dictionary with accuracy for each column and the overall accuracy.
    """
    print(y_pred)
    within_tolerance = np.abs(y_true - y_pred) <= tolerance
    print(within_tolerance)
    column_accuracies = np.mean(within_tolerance, axis=0)  # Accuracy for each column
    overall_accuracy = np.mean(within_tolerance)  # Overall accuracy
    return {
        "valence_accuracy": column_accuracies[0],
        "arousal_accuracy": column_accuracies[1],
        "attention_accuracy": column_accuracies[2],
        "stress_accuracy": column_accuracies[3],
        "overall_accuracy": overall_accuracy
    }

def postprocess_predictions(predictions, min_val=-3, max_val=3):
    """
    Rounds predictions to the nearest integer and clips them to a specified range.

    Args:
        predictions: Array of predicted values.
        min_val: Minimum allowed value.
        max_val: Maximum allowed value.

    Returns:
        Processed predictions as integers within the range [min_val, max_val].
    """
    rounded = np.rint(predictions)  # Round to nearest integer
    clipped = np.clip(rounded, min_val, max_val)  # Clip to range [-3, 3]
    return clipped.astype(int)  # Ensure integer type

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

# Define the model
model = Sequential([
    Dense(64, activation='relu', input_shape=(2,)),  # Input layer with 2 features
    Dense(128, activation='relu'),                   # Hidden layer 1
    Dense(64, activation='relu'),                    # Hidden layer 2
    Dense(4, activation='linear')                    # Output layer for 4 targets
])

# Compile the model
model.compile(optimizer='adam', loss='mse', metrics=['mae'])

# Train the model
history = model.fit(x=x_trainnp, y=y_trainnp, epochs=100, batch_size=32, validation_split=0.2)

# Evaluate the model
loss, mae = model.evaluate(x_testnp, y_testnp)
print(f"Test Loss: {loss}, Test MAE: {mae}")

# Calculate predictions
train_predictions = model.predict(x_trainnp)
test_predictions = model.predict(x_testnp)

# Post-process predictions
train_predictions_processed = postprocess_predictions(train_predictions)
test_predictions_processed = postprocess_predictions(test_predictions)

# Calculate accuracies using processed predictions
train_accuracy = calculate_accuracy(y_trainnp, train_predictions_processed)
test_accuracy = calculate_accuracy(y_testnp, test_predictions_processed)

# Print accuracies
print("Training Accuracy:")
for key, value in train_accuracy.items():
    print(f"{key}: {value:.2f}")

print("\nTesting Accuracy:")
for key, value in test_accuracy.items():
    print(f"{key}: {value:.2f}")

print(f"Test Loss: {loss}, Test MAE: {mae}")

In [None]:
print(x_train.shape)
print(y_train.shape)

In [None]:
x_train

In [None]:
y_test