In [None]:
import numpy as np
import pandas as pd
import os
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score
from torch.utils.data import DataLoader, TensorDataset

In [None]:
p_center = "NYU"

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 1 is for asd and 0 is for healthy
df_labels = pd.read_csv('/content/drive/My Drive/Phenotypic_V1_0b_preprocessed1.csv')#path
df_labels.DX_GROUP = df_labels.DX_GROUP.map({1: 1, 2:0})


labels = {}
for row in df_labels.iterrows():
    file_id = row[1]['FILE_ID']
    y_label = row[1]['DX_GROUP']
    if file_id == 'no_filename':
        continue
    assert(file_id not in labels)
    labels[file_id] = y_label


In [None]:
def get_key(filename):
    f_split = filename.split('_')
    if f_split[3] == 'rois':
        key = '_'.join(f_split[0:3])
    else:
        key = '_'.join(f_split[0:2])
    return key

In [None]:
data_main_path = '/content/drive/My Drive/cc400precdata/ABIDE_pcp/cpac/filt_global' #path to time series data
#data_main_path = '/content/drive/My Drive/power264'
flist = os.listdir(data_main_path)
print(len(flist))

for f in range(len(flist)):
    flist[f] = get_key(flist[f])

In [None]:
centers_dict = {}
for f in flist:
    key = f.split('_')[0]

    if key not in centers_dict:
        centers_dict[key] = []
    centers_dict[key].append(f)

flist = np.array(centers_dict[p_center])

In [None]:
ASD_labels = []
for f in flist:
    ASD_labels.append(labels[f])

print(len(ASD_labels))

In [None]:
folder_path = '/content/drive/My Drive/cc400precdata/ABIDE_pcp/cpac/filt_global'
fMRI_samples = []
selected_files = [f for f in os.listdir(folder_path) if f.startswith('NYU')]
print(selected_files)
for file_name in selected_files:
    file_path = os.path.join(folder_path, file_name)
    data = np.loadtxt(file_path)
    print(data.shape)
    fMRI_samples.append(data)

fMRI_data = np.array(fMRI_samples)
print(fMRI_data.shape)

### using RNN for some sites

In [None]:
X_train, X_test, y_train, y_test = train_test_split(fMRI_data, ASD_labels, test_size=0.2, random_state=42)

# Convert to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)

# Create DataLoader instances
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [None]:
class DeepLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=2):
        super(DeepLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.2)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out


In [None]:
# Hyperparameters
input_size = 392  # Number of brain regions 
hidden_size = 128
output_size = 2  
num_layers = 3  
num_epochs = 10
learning_rate = 0.001

model = DeepLSTM(input_size, hidden_size, output_size, num_layers).to('cuda')
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    model.train()
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to('cuda'), labels.to('cuda')

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')


In [None]:
def evaluate_model(model, test_loader):
    model.eval()
    all_predictions = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to('cuda'), labels.to('cuda')
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    acc = accuracy_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions)
    recall = recall_score(all_labels, all_predictions)
    
    print(f"Accuracy: {acc:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")

# Evaluate the trained model
evaluate_model(model, test_loader)


### using reinforcement learning

In [None]:
pip install torch stable-baselines3 gym numpy matplotlib scikit-learn

In [None]:
pip install shimmy

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import torch

scaler = StandardScaler()
num_samples = fMRI_data.shape[0]
reshaped_data = fMRI_data.reshape(-1, 392)


normalized_data = scaler.fit_transform(reshaped_data)

fMRI_data_normalized = normalized_data.reshape(num_samples, 296, 392)

print(fMRI_data_normalized.shape)

fMRI_data_tensor = torch.tensor(fMRI_data_normalized, dtype=torch.float32)
ASD_labels_tensor = torch.tensor(ASD_labels, dtype=torch.long)

print(fMRI_data_tensor.shape)
print(ASD_labels_tensor.shape)


fMRI_train, fMRI_test, labels_train, labels_test = train_test_split(
    fMRI_data_tensor, ASD_labels_tensor, test_size=0.2, random_state=42
)

print(fMRI_train.shape)
print(labels_train.shape)



In [None]:
import numpy as np
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
for i in range(fMRI_data.shape[0]):
    fMRI_data[i] = scaler.fit_transform(fMRI_data[i])


In [None]:
import torch
import torch.nn as nn

# Define the LSTM network for feature extraction
class LSTMFeatureExtractor(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(LSTMFeatureExtractor, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h_lstm, _ = self.lstm(x)  # LSTM output
        h_last = h_lstm[:, -1, :]  # Get the last hidden state
        out = self.fc(h_last)  # Fully connected layer
        return out

# Example input shape (batch_size, time_steps, num_features)
input_size = fMRI_data.shape[2]
hidden_size = 296  # Number of LSTM units
output_size = 296  # Output feature size

# Initialize the LSTM feature extractor
lstm_model = LSTMFeatureExtractor(input_size, hidden_size, output_size)


In [None]:
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
import gym
import numpy as np
import torch

# Create a custom Gym environment
class FMRIEnv(gym.Env):
    def __init__(self, fMRI_data, ASD_labels, lstm_model):
        super(FMRIEnv, self).__init__()
        self.fMRI_data = torch.tensor(fMRI_data, dtype=torch.float32)
        self.labels = ASD_labels
        self.lstm_model = lstm_model
        self.current_step = 0
        self.num_samples = len(ASD_labels)

        # Action space: classify as ASD (1) or TD (0)
        self.action_space = gym.spaces.Discrete(2)
        # Observation space: feature vector output from the LSTM
        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(296,), dtype=np.float32)

    def reset(self):
        self.current_step = np.random.randint(0, self.num_samples)
        # Extract features using LSTM
        features = self.lstm_model(self.fMRI_data[self.current_step:self.current_step+1])
        return features.detach().numpy().squeeze()

    def step(self, action):
        label = self.labels[self.current_step]
        reward = 1 if action == label else -1  # Reward based on correct classification
        done = True  # Single-step environment
        features = self.lstm_model(self.fMRI_data[self.current_step:self.current_step+1])
        return features.detach().numpy().squeeze(), reward, done, {}

# Initialize the environment
env = DummyVecEnv([lambda: FMRIEnv(fMRI_train, labels_train, lstm_model)])
test_env = DummyVecEnv([lambda: FMRIEnv(fMRI_test, labels_test, lstm_model)])

In [None]:
# Initialize the DQN agent
dqn_agent = DQN('MlpPolicy', env, verbose=1)

# Train the agent
dqn_agent.learn(total_timesteps=20000)  # Adjust the timesteps based on your data


In [None]:

def evaluate_agent(agent, test_env, test_labels):
    predictions = []
    true_labels = []

    # Set the environment to the test environment
    test_env.reset()

    for i in range(len(test_labels)):
        # Get observation from the test environment
        obs = test_env.reset()
        # Get the agent's action
        action, _ = agent.predict(obs)
        # Store the prediction and true label
        predictions.append(action)
        true_labels.append(test_labels[i])

    # Calculate metrics
    acc = accuracy_score(true_labels, predictions)
    precision = precision_score(true_labels, predictions)
    recall = recall_score(true_labels, predictions)

    print(f"Accuracy: {acc:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}")

# Evaluate the trained agent with the test data
evaluate_agent(dqn_agent, test_env, labels_test)
