<a href="https://colab.research.google.com/github/santiagovazquezff/circuit_fault_detection/blob/main/circuit_fault.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Circuit Fault ML Project**

## **Load data**

### Import


In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


### Extract ZIP

In [2]:
import zipfile, os

zip_path = "/content/drive/MyDrive/data.zip"
with zipfile.ZipFile(zip_path, "r") as zip_ref:
    zip_ref.extractall("/content")

### Configurations and more imports

In [3]:
DATA_ROOT = "/content/data"
CLASSES   = ["healthy", "short_r1", "open_r1"]
FREQS     = [200, 500, 800, 1000, 1500, 2000]

TIME_WINDOW = 0.1
N_POINTS    = 16384
VIN_PK      = 0.283
INCLUDE_F0  = True

import numpy as np, pandas as pd
from os.path import join
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, classification_report


## **Pipeline Functions**

### Read data from CSV

In [5]:
def read_multirun_vout_csv(path: str, n_keep=50):
    df = pd.read_csv(path)
    t = df["s"].to_numpy(float)
    V = df.iloc[:, 1:1+n_keep].to_numpy(float)
    return t, V

dummy_path = "/content/data/healthy/200.csv"
t_dummy, V_dummy = read_multirun_vout_csv(dummy_path, n_keep=50)

print("t shape:", t_dummy.shape)
print("V shape:", V_dummy.shape)

print("First few time samples:", t_dummy[:5])
print("First few voltages of run 1:", V_dummy[:5, 0])
print("First few voltages of run 50:", V_dummy[:5, -1])



t shape: (626003,)
V shape: (626003, 50)
First few time samples: [0.00000000e+00 8.00000000e-07 8.66008492e-07 8.66012506e-07
 8.66163737e-07]
First few voltages of run 1: [0.00262348 0.00073607 0.00056864 0.00056863 0.00056825]
First few voltages of run 50: [0.00264795 0.00074945 0.00058105 0.00058104 0.00058066]


### Standardisation of t and V samples

In [13]:
def standardise(t, v, time_window=TIME_WINDOW, n_points=N_POINTS):
    t0 = t[0]
    mask = (t - t0) < time_window
    t2, v2 = t[mask], v[mask] #data where mask is True (less than the time window)
    t_fit = np.linspace(0.0, time_window, n_points, endpoint=False)
    v_fit = np.interp(t_fit, t2 - t0, v2) #interpolates the voltages to uniform time sample from before
    return t_fit, v_fit #Note to self, 1-D arrays in the fitted functions output, so we use only the first column in V_dummy and not the whole n_pointsx50 V_dummy matrix

t_dummy, V_dummy = standardise(t_dummy, V_dummy[:, 0])

print("First few standardised time samples:", t_dummy[:5])
print("First few standardised voltages:", V_dummy[:5])



First few standardised time samples: [0.00000000e+00 6.10351563e-06 1.22070313e-05 1.83105469e-05
 2.44140625e-05]
First few standardised voltages: [ 0.00262348 -0.01890937 -0.04226448 -0.06559856 -0.08892529]


### Waveform features extraction

In [14]:
def lockin_features(v_fit, t_fit, f0):
    x, tt = v_fit.astype(float), t_fit.astype(float)
    dc = float(np.mean(x))
    xz = x - dc
    N = x.size

    def ap(freq):
        w = 2*np.pi*freq
        c = np.cos(w*tt)
        s = np.sin(w*tt)
        a = (2.0/N)*np.dot(xz, c)
        b = (2.0/N)*np.dot(xz, s)
        A = float(np.hypot(a,b))
        ph = float(np.arctan2(-b,a))
        return A, ph


    A1, ph = ap(f0)
    A2, _  = ap(2*f0)
    A3, _  = ap(3*f0)


    thd = (np.sqrt(A2**2 + A3**2) / A1) if A1 > 0 else 0.0


    ms_total = float(np.mean(x**2))
    ms_dc = dc**2
    ms_tones = (A1**2 + A2**2 + A3**2) / 2
    noise_rms = float(np.sqrt(max(ms_total - ms_dc - ms_tones, 0.0)))

    return A1, ph, thd, noise_rms, A2, A3


A1_dummy, ph_dummy, thd_dummy, noise_rms_dummy, A2_dummy, A3_dummy = lockin_features(V_dummy, t_dummy, f0=200)

print("Fundamental amplitude (A1):", A1_dummy)
print("Phase at f0 (ph):", ph_dummy)
print("Total Harmonic Distortion (THD):", thd_dummy)
print("Noise RMS:", noise_rms_dummy)
print("2nd harmonic amplitude (A2):", A2_dummy)
print("3rd harmonic amplitude (A3):", A3_dummy)



Fundamental amplitude (A1): 3.0401981530013815
Phase at f0 (ph): 1.57018404814481
Total Harmonic Distortion (THD): 3.5599910150589134e-05
Noise RMS: 0.0022464605827324283
2nd harmonic amplitude (A2): 7.654438031982989e-05
3rd harmonic amplitude (A3): 7.651705572040724e-05


### Building the dataset

In [15]:
def build_dataset(data_root=DATA_ROOT, classes=CLASSES, freqs=FREQS,
                  vin_pk=VIN_PK, include_f0=INCLUDE_F0):
    rows, labels = [], []
    for cls in classes:
        for f0 in freqs:
            path = join(data_root, cls, f"{f0}.csv")
            t, V = read_multirun_vout_csv(path)

            for k in range(V.shape[1]):
                t_fit, v_fit = standardise(t, V[:,k])
                A1, ph, thd, noise_rms, A2, A3 = lockin_features(v_fit, t_fit, f0)


                feats = [
                    A1/vin_pk, ph, thd,
                    noise_rms/vin_pk, A2/vin_pk, A3/vin_pk
                ]
                if include_f0:
                    feats.append(float(f0))

                rows.append(feats)
                labels.append(cls)

    X = np.array(rows, float)
    y = np.array(labels, object)
    feat_names = [
        "A1_gain","phase","THD","noise_rel","A2_gain","A3_gain"
    ] + (["f0"] if include_f0 else [])

    return X, y, feat_names


## **Model building**

### Data splitting

In [16]:
X, y, feat_names = build_dataset()
print("X shape:", X.shape, " y shape:", y.shape)
print("Features:", feat_names)
print("Class balance:", {c:int(n) for c,n in zip(*np.unique(y, return_counts=True))})

Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

X shape: (900, 7)  y shape: (900,)
Features: ['A1_gain', 'phase', 'THD', 'noise_rel', 'A2_gain', 'A3_gain', 'f0']
Class balance: {'healthy': 300, 'open_r1': 300, 'short_r1': 300}


### Training the model

In [18]:
rf = RandomForestClassifier(n_estimators=300, random_state=42, n_jobs=-1, class_weight="balanced_subsample")
rf.fit(Xtr, ytr)


### Applying model to make a prediction

In [19]:
y_rf_train_pred = rf.predict(Xtr)
y_rf_test_pred = rf.predict(Xte)

## **Model results**

### Confusion matrix, classification report, and cv accuracy score

In [21]:
from sklearn.metrics import accuracy_score
from sklearn.model_selection import cross_val_score

print("Confusion matrix (train):\n", confusion_matrix(ytr, y_rf_train_pred))
print("Classification report (train):\n", classification_report(ytr, y_rf_train_pred))
print("Train accuracy:", accuracy_score(ytr, y_rf_train_pred))

print("\nConfusion matrix (test): \n", confusion_matrix(yte, y_rf_test_pred))
print("Classification report (test):\n", classification_report(yte, y_rf_test_pred))
print("Test accuracy:", accuracy_score(yte, y_rf_test_pred))

cv_scores = cross_val_score(rf, X, y, cv=5, scoring="accuracy", n_jobs=-1)
print("CV accuracy: %.4f ± %.4f" % (cv_scores.mean(), cv_scores.std()))




Confusion matrix (train):
 [[240   0   0]
 [  0 240   0]
 [  0   0 240]]
Classification report (train):
               precision    recall  f1-score   support

     healthy       1.00      1.00      1.00       240
     open_r1       1.00      1.00      1.00       240
    short_r1       1.00      1.00      1.00       240

    accuracy                           1.00       720
   macro avg       1.00      1.00      1.00       720
weighted avg       1.00      1.00      1.00       720

Train accuracy: 1.0

Confusion matrix (test): 
 [[60  0  0]
 [ 0 60  0]
 [ 0  0 60]]
Classification report (test):
               precision    recall  f1-score   support

     healthy       1.00      1.00      1.00        60
     open_r1       1.00      1.00      1.00        60
    short_r1       1.00      1.00      1.00        60

    accuracy                           1.00       180
   macro avg       1.00      1.00      1.00       180
weighted avg       1.00      1.00      1.00       180

Test accuracy: 1.

### Frequency held out check

In [22]:
from sklearn.model_selection import GroupKFold, cross_val_score
groups = X[:, -1].astype(int)
X_no_f0 = X[:, :-1]                     # drop f0 so model can't “cheat” with it
cv = GroupKFold(n_splits=len(np.unique(groups)))
scores = cross_val_score(rf, X_no_f0, y, cv=cv, groups=groups, scoring="accuracy", n_jobs=-1)
print("Group (by f0) CV accuracy: %.4f ± %.4f" % (scores.mean(), scores.std()))


Group (by f0) CV accuracy: 0.9444 ± 0.1242


### Label shuffle check

In [23]:
import numpy as np
from sklearn.metrics import accuracy_score
rf_shuf = RandomForestClassifier(n_estimators=300, random_state=0, n_jobs=-1)
ytr_shuf = np.random.permutation(ytr)
rf_shuf.fit(Xtr, ytr_shuf)
print("Label-shuffle test accuracy:", accuracy_score(yte, rf_shuf.predict(Xte)))


Label-shuffle test accuracy: 0.3055555555555556


### Testing for duplicates

In [24]:
tr = {tuple(row) for row in np.round(Xtr, 12)}
te = {tuple(row) for row in np.round(Xte, 12)}
print("Exact duplicates across train/test:", len(tr & te))


Exact duplicates across train/test: 0
