In [29]:
import os
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset



In [30]:
# ✅ Ensure the save directory exists
SAVE_DIR = "saved_models"
os.makedirs(SAVE_DIR, exist_ok=True)

# ✅ Load the CSV file
data_path = "data/final_data/BU4707_processed.csv"
df = pd.read_csv(data_path)

# ✅ Prepare features and target
features = ["Pe_results", "Comp_results", "TAC_Reading"]
target = "Sober_classification"



In [31]:
X = df[features].values
y = df[target].values

# ✅ Normalize features
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)

# ✅ Convert to PyTorch tensors
X_tensor = torch.tensor(X_scaled, dtype=torch.float32).view(-1, 1, 3)  # Reshape for RNN/LSTM
y_tensor = torch.tensor(y, dtype=torch.float32).view(-1, 1)

# ✅ Split data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X_tensor, y_tensor, test_size=0.2, random_state=42)

# ✅ Create DataLoader
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=16, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=16, shuffle=False)



In [32]:
# ✅ Define Model Architectures
class RNN(nn.Module):
    def __init__(self):
        super(RNN, self).__init__()
        self.rnn = nn.RNN(input_size=3, hidden_size=50, num_layers=2, batch_first=True)
        self.fc = nn.Linear(50, 1)

    def forward(self, x):
        out, _ = self.rnn(x)
        return self.fc(out[:, -1, :])  # Last time step



In [33]:
class LSTM(nn.Module):
    def __init__(self):
        super(LSTM, self).__init__()
        self.lstm = nn.LSTM(input_size=3, hidden_size=50, num_layers=2, batch_first=True)
        self.fc = nn.Linear(50, 1)

    def forward(self, x):
        out, _ = self.lstm(x)
        return self.fc(out[:, -1, :])



In [34]:
class FNN(nn.Module):
    def __init__(self):
        super(FNN, self).__init__()
        self.fc1 = nn.Linear(3, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)

    def forward(self, x):
        x = x.view(x.shape[0], -1)  # Flatten
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)



In [35]:
# ✅ Function to Save Models and Losses
def save_model_and_losses(model_name, model, train_losses, test_losses):
    torch.save(model.state_dict(), os.path.join(SAVE_DIR, f"{model_name}.pth"))  # Save model
    losses_df = pd.DataFrame({"Epoch": range(1, len(train_losses) + 1), "Train_Loss": train_losses, "Test_Loss": test_losses})
    losses_df.to_csv(os.path.join(SAVE_DIR, f"{model_name}_losses.csv"), index=False)
    print(f"✅ {model_name} model & losses saved in '{SAVE_DIR}'!")

# ✅ Train and Evaluate Function
def train_and_evaluate(model, train_loader, test_loader, epochs=10):
    criterion = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    train_losses, test_losses = [], []

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            if torch.isnan(loss):  # Stop if NaN detected
                print("⚠️ NaN detected in loss. Stopping training!")
                return

            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)
            optimizer.step()
            running_loss += loss.item()

        train_losses.append(running_loss / len(train_loader))
        print(f"Epoch [{epoch+1}/{epochs}] - Train Loss: {train_losses[-1]:.4f}")

        # Evaluate
        model.eval()
        test_loss = sum(criterion(model(inputs), labels).item() for inputs, labels in test_loader) / len(test_loader)
        test_losses.append(test_loss)
        print(f"Epoch [{epoch+1}/{epochs}] - Test Loss: {test_loss:.4f}")

    save_model_and_losses(model.__class__.__name__, model, train_losses, test_losses)



In [37]:
# ✅ Train all models
models = {"RNN": RNN(), "LSTM": LSTM(), "FNN": FNN()}
for name, model in models.items():
    print(f"\n🚀 Training {name}...")
    train_and_evaluate(model, train_loader, test_loader, epochs=10)

print("\n✅ All models trained and saved!\n")

# ✅ Load and Use Models for Predictions
def load_model(model_class, model_name):
    model_path = os.path.join(SAVE_DIR, f"{model_name}.pth")
    if not os.path.exists(model_path):
        print(f"⚠️ Model file '{model_path}' not found!")
        return None
    model = model_class()
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model

rnn_model = load_model(RNN, "RNN")
lstm_model = load_model(LSTM, "LSTM")
fnn_model = load_model(FNN, "FNN")

# ✅ Load new data and make predictions
new_data_path = "data/final_data/new_data.csv"
if os.path.exists(new_data_path):
    df_new = pd.read_csv(new_data_path)
    X_new_scaled = scaler.transform(df_new[features].values)
    X_new_tensor = torch.tensor(X_new_scaled, dtype=torch.float32).view(-1, 1, 3)

    with torch.no_grad():
        rnn_preds = (torch.sigmoid(rnn_model(X_new_tensor)) > 0.5).float()
        lstm_preds = (torch.sigmoid(lstm_model(X_new_tensor)) > 0.5).float()
        fnn_preds = (torch.sigmoid(fnn_model(X_new_tensor)) > 0.5).float()

    print("\n📊 Predictions on New Data:")
    print("RNN:", rnn_preds.numpy().flatten())
    print("LSTM:", lstm_preds.numpy().flatten())
    print("FNN:", fnn_preds.numpy().flatten())
else:
    print("\n⚠️ No new data file found for predictions.")



🚀 Training RNN...
Epoch [1/10] - Train Loss: 0.6952
Epoch [1/10] - Test Loss: 0.6981
Epoch [2/10] - Train Loss: 0.6947
Epoch [2/10] - Test Loss: 0.6972
Epoch [3/10] - Train Loss: 0.6941
Epoch [3/10] - Test Loss: 0.6963
Epoch [4/10] - Train Loss: 0.6935
Epoch [4/10] - Test Loss: 0.6953
Epoch [5/10] - Train Loss: 0.6929
Epoch [5/10] - Test Loss: 0.6944
Epoch [6/10] - Train Loss: 0.6924
Epoch [6/10] - Test Loss: 0.6935
Epoch [7/10] - Train Loss: 0.6918
Epoch [7/10] - Test Loss: 0.6926
Epoch [8/10] - Train Loss: 0.6913
Epoch [8/10] - Test Loss: 0.6917
Epoch [9/10] - Train Loss: 0.6907
Epoch [9/10] - Test Loss: 0.6907
Epoch [10/10] - Train Loss: 0.6901
Epoch [10/10] - Test Loss: 0.6898
✅ RNN model & losses saved in 'saved_models'!

🚀 Training LSTM...
Epoch [1/10] - Train Loss: 0.6991
Epoch [1/10] - Test Loss: 0.7033
Epoch [2/10] - Train Loss: 0.6990
Epoch [2/10] - Test Loss: 0.7031
Epoch [3/10] - Train Loss: 0.6989
Epoch [3/10] - Test Loss: 0.7030
Epoch [4/10] - Train Loss: 0.6988
Epoch [4

In [9]:
import pandas as pd
import numpy as np
from pathlib import Path

class DataLoader:
    def __init__(self, data_dir="data"):
        self.data_dir = Path(data_dir)
        
    def load_all_participants(self):
        """Load all participant CSV files"""
        participant_files = list(self.data_dir.glob("final_data/*_processed.csv"))
        return {f.stem: pd.read_csv(f) for f in participant_files}
    
    def load_merged_data(self):
        """Load merged accelerometer and TAC data"""
        return {
            'accelerometer': pd.read_csv(self.data_dir/'merged_data/ccelerometer_data_processed.csv'),
            'tac': pd.read_csv(self.data_dir/'merged_data/tac_data_processed.csv')
        }

In [11]:
import torch
import torch.nn as nn
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

class SobrietyModelTrainer:
    def __init__(self, models):
        self.models = models  # Dictionary of model instances
        self.results = {}
        
    def train_all_models(self, X_train, y_train, X_test, y_test, epochs=100):
        for name, model in self.models.items():
            train_loss, test_loss = self._train_model(model, X_train, y_train, X_test, y_test, epochs)
            self.results[name] = {
                'train_loss': train_loss,
                'test_loss': test_loss,
                'total_loss': [t+te for t,te in zip(train_loss, test_loss)]
            }
            # Save model and learning curves
            torch.save(model.state_dict(), f"{name}.pth")
            pd.DataFrame(self.results[name]).to_csv(f"{name}_learning_curves.csv")
    
    def _train_model(self, model, X_train, y_train, X_test, y_test, epochs):
        # Implementation of training loop
        pass
    
    def generate_plots(self, original_data, predictions):
        """Generate required visualization plots"""
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
        
        # Plot 1: Time-series with TAC change
        ax1.plot(original_data['timestamp'], original_data['value'], label='Original')
        ax1.plot(original_data['timestamp'], predictions, label='Prediction', linestyle='--')
        ax1.set_title("Time-series with TAC Change")
        ax1.legend()
        
        # Plot 2: Sobriety metric comparison
        ax2.bar(['Sober 1 original', 'Sober 0 original', 'Sober 1 pred', 'Sober 0 pred'],
                [original_sober1, original_sober0, pred_sober1, pred_sober0])
        ax2.set_title("Sobriety Metric Comparison")
        
        plt.savefig("sobriety_comparison.png")
        plt.close()
    
    def generate_confusion_matrices(self, y_true, y_preds):
        """Generate confusion matrices for all models"""
        for name, y_pred in y_preds.items():
            cm = confusion_matrix(y_true, y_pred)
            # Plot and save confusion matrix
            plt.figure()
            sns.heatmap(cm, annot=True)
            plt.title(f"Confusion Matrix - {name}")
            plt.savefig(f"confusion_matrix_{name}.png")
            plt.close()

In [15]:
import torch
import torch.nn as nn

class RCModel(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.reservoir = nn.Linear(input_size, hidden_size, bias=False)
        self.readout = nn.Linear(hidden_size, 1)
        
    def forward(self, x):
        reservoir_out = torch.tanh(self.reservoir(x))
        return self.readout(reservoir_out)

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.linear = nn.Linear(hidden_size, 1)
        
    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        return self.linear(lstm_out[:, -1, :])

class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.linear = nn.Linear(hidden_size, 1)
        
    def forward(self, x):
        rnn_out, _ = self.rnn(x)
        return self.linear(rnn_out[:, -1, :])

class CustomNN(nn.Module):
    def __init__(self, input_size):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_size, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1)
        )
        
    def forward(self, x):
        # Flatten time series if needed
        if x.dim() > 2:
            x = x.view(x.size(0), -1)
        return self.net(x)

In [13]:
class EvaluationPipeline:
    def __init__(self, data_loader):
        self.data_loader = data_loader
        self.complexity_metrics = {}
        self.entropy_metrics = {}
        
    def calculate_complexity(self, signal):
        """Calculate complexity metric"""
        pass
    
    def calculate_permutation_entropy(self, signal):
        """Calculate permutation entropy"""
        pass
    
    def evaluate_all_participants(self, models):
        results = []
        for pid, data in self.data_loader.load_all_participants().items():
            pid_results = {'PID': pid}
            for name, model in models.items():
                # Make predictions
                predictions = model.predict(data)
                
                # Calculate metrics
                pid_results[name] = self._calculate_metrics(data, predictions)
                
                # Generate plots
                self._generate_pid_plots(pid, data, predictions, name)
            
            results.append(pid_results)
        
        # Save results as table
        pd.DataFrame(results).to_csv("participant_results.csv")
        return results
    
    def _calculate_metrics(self, data, predictions):
        """Calculate all required metrics"""
        pass

In [16]:
# Define model hyperparameters
input_size = 3  # For x, y, z accelerometer data
hidden_size = 50  # Size of hidden layers
num_layers = 2  # For stacked RNN/LSTM
dropout = 0.2  # Regularization
output_size = 1  # For regression (or 2 for binary classification)

# Initialize all models
models = {
    'RC': RCModel(
        input_size=input_size,
        hidden_size=hidden_size,
        output_size=output_size
    ),
    'LSTM': LSTMModel(
        input_size=input_size,
        hidden_size=hidden_size,
        num_layers=num_layers,
        dropout=dropout,
        output_size=output_size
    ),
    'RNN': RNNModel(
        input_size=input_size,
        hidden_size=hidden_size,
        num_layers=num_layers,
        dropout=dropout,
        output_size=output_size
    ),
    'CustomNN': CustomNN(
        input_size=input_size,
        hidden_sizes=[64, 32],  # Two hidden layers
        output_size=output_size,
        dropout=dropout
    )
}

TypeError: RCModel.__init__() got an unexpected keyword argument 'output_size'