In [9]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

preprocess data

In [16]:
def preprocess_data(file_name):
    # read the data
    data = pd.read_csv(file_name)
    # remove the missing values
    data = data.dropna()
    #normalize temp and vibration data
    scaler = MinMaxScaler()
    data[["temperature","vibration"]] =  scaler.fit_transform(data[["temperature", "vibration"]])
    #split the data into train and test
    X = data[["temperature","vibration"]]
    y = data["fault_label"]
    X_train, X_test, y_train, y_test = train_test_split(X,y, test_size =0.2, random_state = 42)
    return X_train, X_test, y_train, y_test

logistic regression

In [17]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

def train_model(X_train, y_train):
    model = LogisticRegression()
    model.fit(X_train, y_train)
    return model

def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    confusion = confusion_matrix(y_test, y_pred)
    classification_rep = classification_report(y_test, y_pred)
    print("Accuracy: ", accuracy)
    print("Confusion Matrix: ", confusion)
    print("Classification Report: ", classification_rep)

if __name__ == "__main__":
    X_train, X_test, y_train, y_test = preprocess_data("synthetic_data.csv")
    model = train_model(X_train, y_train)
    evaluate_model(model, X_test, y_test)


Accuracy:  0.985
Confusion Matrix:  [[143   0]
 [  3  54]]
Classification Report:                precision    recall  f1-score   support

         0.0       0.98      1.00      0.99       143
         1.0       1.00      0.95      0.97        57

    accuracy                           0.98       200
   macro avg       0.99      0.97      0.98       200
weighted avg       0.99      0.98      0.98       200



RAndom forest

In [18]:
from sklearn.ensemble import RandomForestClassifier

def train_model_RF(X_train, y_train):
    model = RandomForestClassifier(random_state = 42)
    model.fit(X_train, y_train)
    return model
if __name__ == "__main__":
    X_train, X_test, y_train, y_test = preprocess_data("synthetic_data.csv")
    model = train_model_RF(X_train, y_train)
    evaluate_model(model, X_test, y_test)

Accuracy:  1.0
Confusion Matrix:  [[143   0]
 [  0  57]]
Classification Report:                precision    recall  f1-score   support

         0.0       1.00      1.00      1.00       143
         1.0       1.00      1.00      1.00        57

    accuracy                           1.00       200
   macro avg       1.00      1.00      1.00       200
weighted avg       1.00      1.00      1.00       200



neural network

In [57]:
import torch
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.optim as optim

def preprocessing(file_name):
    data = pd.read_csv(file_name)
    data = data.dropna()
    scaler = MinMaxScaler()
    data[["temperature", "vibration"]] = scaler.fit_transform(data[["temperature", "vibration"]])
    X = data[["temperature", "vibration"]]
    y = data["fault_label"]
    X_train, X_test, y_train, y_test = train_test_split(X,y, test_size = 0.2, random_state = 42)
    # convert data to pytorch tensor
    X_train_tensor = torch.Tensor(X_train.values)
    X_test_tensor = torch.Tensor(X_test.values)
    y_train_tensor = torch.Tensor(y_train.values)
    y_test_tensor = torch.Tensor(y_test.values)
    return X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor


class FaultDetectionNN(nn.Module):
    def __init__(self):
        super(FaultDetectionNN,self).__init__()
        self.layer1 = nn.Linear(2,16) # input 2 features temp and vibration
        self.layer2 = nn.Linear(16,8) # hidden layer
        self.output = nn.Linear(8,1) # output layer
        self.sigmoid = nn.Sigmoid() # activation function

    def forward(self, x):
        x = torch.relu(self.layer1(x))
        x = torch.relu(self.layer2(x))
        x = self.sigmoid(self.output(x))
        return x
    
   #train the model
def train_model(model, X_train, X_test, y_train, y_test, epochs=15, lr =0.1 ):
    criterion = nn.BCELoss() #binary cross entropy loss
    optimizer = optim.Adam(model.parameters(), lr=lr)
    for epoch in range(epochs):
        #forward pass
        output = model(X_train)
        loss = criterion(output, y_train.unsqueeze(1))

        #backward pass
        optimizer.zero_grad() #clear the gradients
        loss.backward()
        optimizer.step()
# evaluate the model
def evaluate_model(model, X_test, y_test):
    model.eval() #set model to evaluation mode
    with torch.no_grad(): #stop calculation of gradient
        y_pred = model(X_test)
        y_pred_class = (y_pred > 0.5).float()
        accuracy = ((y_pred_class == y_test.unsqueeze(1)).sum().item() )/ y_test.size(0)
        print("Accuracy: ", accuracy)
  


if __name__ == "__main__":
    X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor = preprocessing("/Users/kshemamariageorge/Documents/digital-twin-sensor-anomaly-prediction/synthetic_data.csv")
    model = FaultDetectionNN()
    train_model(model, X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor)
    evaluate_model(model, X_test_tensor, y_test_tensor)

    

Accuracy:  1.0
