In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

# data file path and names
data_file_path = '/content/gdrive/MyDrive/data/'

print("Google Drive Mounted")

In [2]:
data_file_path = 'data/'

In [3]:
import numpy as np
import pandas as pd
import os

In [4]:
rows = []
colNames = ['BirthYear','Gender','Parkinsons','Tremors','DiagnosisYear','Sided','UPDRS','Impact','Levadopa','DA','MAOB','Other']
users = []

userFolder = data_file_path + 'Users'

for userFilename in os.listdir(userFolder):
    users.append(userFilename[5:-4])

    f = open(os.path.join(userFolder, userFilename))
    lines = f.readlines()

    row = []
    col = []

    for line in lines:
        line = line[:-1]
        lineSplit = line.split(': ')
        
        row.append(lineSplit[1])
        col.append(lineSplit[0])
            
    if col == colNames:
        rows.append(row)
    else:
        print(userFilename+'read wrong.')

    f.close()

userDF = pd.DataFrame(rows, columns=colNames, index=users)
userDF.head()

Unnamed: 0,BirthYear,Gender,Parkinsons,Tremors,DiagnosisYear,Sided,UPDRS,Impact,Levadopa,DA,MAOB,Other
0EA27ICBLF,1952,Female,True,True,2000,Left,Don't know,Severe,True,True,False,False
0QAZFRHQHW,1959,Female,False,False,------,,Don't know,------,False,False,False,False
0WTDIGPSBZ,1946,Female,False,False,------,,Don't know,------,False,False,False,False
1HOEBIGASW,1944,Male,False,False,------,,Don't know,------,False,False,False,False
1WMVCCU4RH,1953,Male,True,True,2017,Left,Don't know,Medium,False,False,False,False


In [6]:
rows = []
colNames = ['UserID','Date','Timestamp','Hand','HoldTime','Direction','LatencyTime','FlightTime']

dataFolder = data_file_path + 'TappyData'
invalid = [0,0,0,0,0,0,0,0,0]
for dataFilename in os.listdir(dataFolder):
    infoArr = dataFilename[:-4].split('_')
    userID = infoArr[0]
    yearMonth = infoArr[1]

    f = open(os.path.join(dataFolder, dataFilename))
    lines = f.readlines()

    row = []
    
    for idx, line in enumerate(lines):
        line = line[:-1]
        lineSplit = line.split('\t')

        if len(line) != 57:
            invalid[0] = invalid[0] + 1
        elif len(lineSplit[0]) != 10 or lineSplit[0] != userID:
            invalid[1] = invalid[1] + 1
        elif len(lineSplit[1]) != 6 or lineSplit[1][0:4] != yearMonth:
            invalid[2] = invalid[2] + 1
        elif len(lineSplit[2]) != 12: # Timestamp
            invalid[3] = invalid[3] + 1
        elif len(lineSplit[3]) != 1: # Hand
            invalid[4] = invalid[4] + 1
        elif len(lineSplit[4]) != 6: # HoldTime
            invalid[5] = invalid[5] + 1
        elif len(lineSplit[5]) != 2: # Direction
            invalid[6] = invalid[6] + 1
        elif len(lineSplit[6]) != 6: # LatencyTime
            invalid[7] = invalid[7] + 1
        elif len(lineSplit[7]) != 6: # FlightTime
            invalid[8] = invalid[8] + 1
        else:
            rows.append(lineSplit[:-1])
    
    f.close()
print(invalid)
dataDF = pd.DataFrame(rows, columns=colNames)
dataDF.head()

[1354, 0, 0, 7, 0, 0, 0, 0, 0]


Unnamed: 0,UserID,Date,Timestamp,Hand,HoldTime,Direction,LatencyTime,FlightTime
0,0EA27ICBLF,160722,18:41:04.336,L,101.6,LL,234.4,156.3
1,0EA27ICBLF,160722,18:42:14.070,L,85.9,LL,437.5,359.4
2,0EA27ICBLF,160722,18:42:14.273,L,78.1,LL,210.9,125.0
3,0EA27ICBLF,160722,18:42:14.617,L,62.5,LL,359.4,281.3
4,0EA27ICBLF,160722,18:42:15.586,S,125.0,LS,187.5,93.8


In [7]:
df = pd.merge(dataDF,userDF[['Parkinsons']], left_on='UserID', right_index=True, how='left')
df['Parkinsons'] = df['Parkinsons'].map({'False':0, 'True':1})
df['Hand'] = df['Hand'].map({'L':1, 'R':2, 'S':0})
df['Direction'] = df['Direction'].map({'LL':0, 'RL':1, 'LR':2, 'RR':3, 'LS':4, 'SL':5, 'RS':6, 'SR':7, 'SS':8})
df['HoldTime'] = df['HoldTime'].astype(float)
df['LatencyTime'] = df['LatencyTime'].astype(float)
df['FlightTime'] = df['FlightTime'].astype(float)
df = df.dropna()

In [8]:
positiveSamples = df[df['Parkinsons']==1]
negativeSamples = df[df['Parkinsons']==0]
positiveGrouped = positiveSamples.groupby('UserID')
negativeGrouped = negativeSamples.groupby('UserID')
positiveUserIDs = np.array(positiveSamples['UserID'].unique())
negativeUserIDs = np.array(negativeSamples['UserID'].unique())
np.random.shuffle(positiveUserIDs)
np.random.shuffle(negativeUserIDs)

train_ratio = 0.7
valid_ratio = 0.2

total_positive_rows = len(positiveSamples)
train_positive_rows = int(total_positive_rows * train_ratio)
valid_positive_rows = int(total_positive_rows * valid_ratio)
total_negative_rows = len(negativeSamples)
train_negative_rows = int(total_negative_rows * train_ratio)
valid_negative_rows = int(total_negative_rows * valid_ratio)

# Initialize empty DataFrames for training, validation, and testing sets
train_df = pd.DataFrame(columns=df.columns)
valid_df = pd.DataFrame(columns=df.columns)
test_df = pd.DataFrame(columns=df.columns)

train_positive_count, valid_positive_count = 0, 0
train_negative_count, valid_negative_count = 0, 0
train_data = []
valid_data = []
test_data = []

# Iterate over the shuffled user_ids and assign rows to the corresponding set
for userID in positiveUserIDs:
    user_data = positiveGrouped.get_group(userID)
    user_rows = len(user_data)
    
    if train_positive_count + user_rows <= train_positive_rows:
        train_data.append(user_data)
        train_positive_count += user_rows
    elif valid_positive_count + user_rows <= valid_positive_rows:
        valid_data.append(user_data)
        valid_positive_count += user_rows
    else:
        test_data.append(user_data)

for userID in negativeUserIDs:
    user_data = negativeGrouped.get_group(userID)
    user_rows = len(user_data)
    
    if train_negative_count + user_rows <= train_negative_rows:
        train_data.append(user_data)
        train_negative_count += user_rows
    elif valid_negative_count + user_rows <= valid_negative_rows:
        valid_data.append(user_data)
        valid_negative_count += user_rows
    else:
        test_data.append(user_data)

train_df = pd.concat(train_data)
valid_df = pd.concat(valid_data)
test_df = pd.concat(test_data)
valid_test_df = pd.concat([valid_df, test_df]).reset_index(drop=True)

In [13]:
from torch import nn
import torch
device = "cuda"

In [11]:
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(4, 1024)
        self.fc2 = nn.Linear(1024, 128)
        self.fc3 = nn.Linear(128, 32)
        self.fc4 = nn.Linear(32, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.fc1(x)
        x = nn.functional.relu(x)
        x = self.fc2(x)
        x = nn.functional.relu(x)
        x = self.fc3(x)
        x = nn.functional.relu(x)
        x = self.fc4(x)
        x = self.sigmoid(x)
        return x

In [14]:
feature_columns = ['Hand','HoldTime','Direction','LatencyTime']
label_column = 'Parkinsons'

X_train = torch.tensor(train_df[feature_columns].values, dtype=torch.float32).to(device)
y_train = torch.tensor(train_df[label_column].values, dtype=torch.float32).unsqueeze(1).to(device)

X_valid = torch.tensor(valid_df[feature_columns].values, dtype=torch.float32).to(device)
y_valid = torch.tensor(valid_df[label_column].values, dtype=torch.float32).unsqueeze(1).to(device)

X_test = torch.tensor(test_df[feature_columns].values, dtype=torch.float32).to(device)
y_test = torch.tensor(test_df[label_column].values, dtype=torch.float32).unsqueeze(1).to(device)

In [15]:
from tqdm import tqdm
from torch.utils.data import TensorDataset, DataLoader

train_dataset = TensorDataset(X_train, y_train)
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataset = TensorDataset(X_valid, y_valid)
val_dataloader = DataLoader(val_dataset, batch_size=32)
test_dataset = TensorDataset(X_test, y_test)
test_dataloader = DataLoader(test_dataset, batch_size=32)

model = MLP().to(device)

# Define the loss function and optimizer
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters())

# Train the MLP model on the training set and evaluate on the validation set
best_loss = float('inf')
for epoch in range(10):
    running_loss = 0.0
    for inputs, labels in tqdm(train_dataloader):
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    # Evaluate the model on the validation set
    with torch.no_grad():
        total_val_loss = 0
        for inputs, labels in tqdm(val_dataloader):
            val_outputs = model(inputs)
            val_loss = criterion(val_outputs, labels)
            total_val_loss = total_val_loss + val_loss
        if total_val_loss < best_loss:
            best_loss = total_val_loss
            best_model = model.state_dict()
    print('Epoch %d train loss: %.3f val loss: %.3f' % (epoch + 1, running_loss / len(X_train), total_val_loss))

# Load the best model and evaluate it on the testing set
model.load_state_dict(best_model)
with torch.no_grad():
    test_tp = test_tn = test_fp = test_fn = 0
    for inputs, labels in tqdm(test_dataloader):
        test_outputs = model(inputs)
        test_predictions = (test_outputs > 0.5).float()
        test_tp = test_tp + ((test_predictions == 1) & (labels == 1)).sum().item()
        test_tn = test_tn + ((test_predictions == 0) & (labels == 0)).sum().item()
        test_fp = test_fp + ((test_predictions == 1) & (labels == 0)).sum().item()
        test_fn = test_fn + ((test_predictions == 0) & (labels == 1)).sum().item()
    
print('True positives:', test_tp)
print('True negatives:', test_tn)
print('False positives:', test_fp)
print('False negatives:', test_fn)

100%|██████████| 196612/196612 [06:33<00:00, 500.05it/s]
100%|██████████| 44254/44254 [00:29<00:00, 1492.24it/s]


Epoch 1 train loss: 0.018 val loss: 1.401


100%|██████████| 196612/196612 [06:26<00:00, 508.14it/s]
100%|██████████| 44254/44254 [00:29<00:00, 1486.38it/s]


Epoch 2 train loss: 0.018 val loss: 1.671


100%|██████████| 196612/196612 [06:26<00:00, 508.86it/s]
100%|██████████| 44254/44254 [00:29<00:00, 1487.28it/s]


Epoch 3 train loss: 0.018 val loss: 1.397


100%|██████████| 196612/196612 [06:38<00:00, 493.39it/s]
100%|██████████| 44254/44254 [00:31<00:00, 1417.44it/s]


Epoch 4 train loss: 0.018 val loss: 1.576


100%|██████████| 196612/196612 [06:37<00:00, 495.24it/s]
100%|██████████| 44254/44254 [00:31<00:00, 1427.04it/s]


Epoch 5 train loss: 0.018 val loss: 1.501


100%|██████████| 196612/196612 [06:34<00:00, 498.23it/s]
100%|██████████| 44254/44254 [00:31<00:00, 1424.88it/s]


Epoch 6 train loss: 0.018 val loss: 1.543


100%|██████████| 196612/196612 [06:35<00:00, 496.94it/s]
100%|██████████| 44254/44254 [00:30<00:00, 1431.19it/s]


Epoch 7 train loss: 0.018 val loss: 1.554


100%|██████████| 196612/196612 [06:36<00:00, 495.43it/s]
100%|██████████| 44254/44254 [00:31<00:00, 1383.20it/s]


Epoch 8 train loss: 0.018 val loss: 1.627


100%|██████████| 196612/196612 [06:26<00:00, 508.53it/s]
100%|██████████| 44254/44254 [00:30<00:00, 1449.53it/s]


Epoch 9 train loss: 0.018 val loss: 1.575


100%|██████████| 196612/196612 [06:21<00:00, 515.97it/s]
100%|██████████| 44254/44254 [00:30<00:00, 1449.67it/s]


Epoch 10 train loss: 0.018 val loss: 1.451


100%|██████████| 40812/40812 [00:47<00:00, 864.75it/s]

True positives: 633086
True negatives: 22503
False positives: 636308
False negatives: 14077





In [None]:
torch.save(model.state_dict(), '/content/gdrive/MyDrive/model.pth')

0.03415698887844921

In [None]:
from sklearn.svm import LinearSVC
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score

feature_columns = ['HoldTime'] # ['Hand','HoldTime','Direction','LatencyTime']
label_column = 'Parkinsons'
X_train = train_df[feature_columns]
y_train = train_df[label_column]

X_test = valid_test_df[feature_columns]
y_test = valid_test_df[label_column]

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

svm = LinearSVC(verbose=2,max_iter=1000,C=0.01)
svm.fit(X_train_scaled, y_train)

y_test_pred = svm.predict(X_test_scaled)
test_accuracy = accuracy_score(y_test, y_test_pred)
print(f'Test accuracy: {test_accuracy:.4f}')

In [None]:
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_test, y_test_pred)
tn, fp, fn, tp = cm.ravel()