# Motor Fault Detection using Eulerian Video Magnification

This notebook implements a complete pipeline for detecting motor faults using Eulerian Video Magnification (EVM). The pipeline processes video data to extract and magnify subtle motions, then classifies the motor state as Normal or Abnormal using machine learning.

## Pipeline Overview

1. Frame Extraction - Extract and resample video frames
2. Video Stabilization - Remove camera shake
3. ROI Detection - Locate the region of interest
4. Motion Magnification - Amplify subtle motions using EVM
5. Feature Extraction - Compute motion and frequency features
6. Classification - Train SVM model to detect faults

## Dataset

The dataset consists of:
- Normal.mov - 24-minute video of normal motor operation
- Abnormal.mov - 24-minute video of motor with fault
- Test videos for validation

## 1. Install Required Packages

First, we'll install all the necessary Python packages:

In [None]:
!pip install opencv-python-headless numpy scipy scikit-image scikit-learn pyrtools tqdm joblib gdown

In [None]:
import cv2
import numpy as np
from scipy.signal import butter, filtfilt
from sklearn.svm import SVC
from sklearn.decomposition import PCA
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from typing import List, Tuple
import os
from joblib import dump, load

## 2. Download Dataset from Google Drive

Mount Google Drive and verify access to the dataset:

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Dataset folder path
DATASET_PATH = "/content/drive/MyDrive/1hOu0PExCWjvA5H5cXz7dBS0JNYW5sM7Z"
NORMAL_VIDEO = os.path.join(DATASET_PATH, "Normal.mov")
ABNORMAL_VIDEO = os.path.join(DATASET_PATH, "Abnormal.mov")

print("Checking dataset files...")
for file_path in [NORMAL_VIDEO, ABNORMAL_VIDEO]:
    if os.path.exists(file_path):
        print(f"Found {os.path.basename(file_path)}")
    else:
        print(f"Missing {os.path.basename(file_path)}")

# Get list of test videos
test_videos = [f for f in os.listdir(DATASET_PATH) if f not in ["Normal.mov", "Abnormal.mov"]]
print(f"\nFound {len(test_videos)} test videos:")

## 3. Frame Extraction

Define and test the function to extract frames from videos:

In [None]:
def extract_frames(video_path: str, fps: int = 30, max_frames: int = None) -> List[np.ndarray]:
    """Extract frames from video at specified FPS.
    
    Parameters
    ----------
    video_path : str
        Path to video file
    fps : int
        Target frames per second
    max_frames : int, optional
        Maximum number of frames to extract
    
    Returns
    -------
    List[np.ndarray]
        List of RGB frames
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise IOError(f"Cannot open {video_path}")
    
    orig_fps = cap.get(cv2.CAP_PROP_FPS)
    frames = []
    frame_count = 0
    
    with tqdm(desc="Extracting frames") as pbar:
        success, frame = cap.read()
        while success:
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame_rgb)
            frame_count += 1
            pbar.update(1)
            
            if max_frames and frame_count >= max_frames:
                break
            success, frame = cap.read()
    
    cap.release()
    
    if not frames:
        return frames
    
    # Resample to target FPS if needed
    if abs(orig_fps - fps) > 1e-2:
        n_frames_out = max(1, int(round(len(frames) * fps / orig_fps)))
        indices = np.linspace(0, len(frames) - 1, n_frames_out).astype(int)
        frames = [frames[i] for i in indices]
    
    return frames

# Test frame extraction (using first 100 frames)
test_frames = extract_frames(NORMAL_VIDEO, fps=30, max_frames=100)
print(f"Extracted {len(test_frames)} frames of shape {test_frames[0].shape}")

# Display first frame
plt.figure(figsize=(10, 6))
plt.imshow(test_frames[0])
plt.axis('off')
plt.title('First Frame from Normal Video')
plt.show()

## 4. Frame Stabilization

Implement video stabilization using ORB features and homography:

In [None]:
def stabilise_frames(frames: List[np.ndarray]) -> List[np.ndarray]:
    """Stabilize video using feature matching and homography.
    
    Parameters
    ----------
    frames : List[np.ndarray]
        Input RGB frames
    
    Returns
    -------
    List[np.ndarray]
        Stabilized frames
    """
    if len(frames) < 2:
        return frames
    
    height, width = frames[0].shape[:2]
    orb = cv2.ORB_create(500)
    matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
    
    transforms = [np.eye(3, dtype=np.float32)]
    prev_gray = cv2.cvtColor(frames[0], cv2.COLOR_RGB2GRAY)
    
    with tqdm(total=len(frames)-1, desc="Stabilizing frames") as pbar:
        for idx in range(1, len(frames)):
            curr_gray = cv2.cvtColor(frames[idx], cv2.COLOR_RGB2GRAY)
            k1, d1 = orb.detectAndCompute(prev_gray, None)
            k2, d2 = orb.detectAndCompute(curr_gray, None)
            
            if d1 is None or d2 is None:
                transforms.append(transforms[-1])
                prev_gray = curr_gray
                pbar.update(1)
                continue
                
            matches = matcher.match(d1, d2)
            if len(matches) < 4:
                transforms.append(transforms[-1])
                prev_gray = curr_gray
                pbar.update(1)
                continue
                
            pts1 = np.float32([k1[m.queryIdx].pt for m in matches])
            pts2 = np.float32([k2[m.trainIdx].pt for m in matches])
            H, mask = cv2.findHomography(pts2, pts1, cv2.RANSAC, 5.0)
            
            if H is None:
                H = transforms[-1]
            else:
                H = H.astype(np.float32)
                
            transforms.append(H @ transforms[-1])
            prev_gray = curr_gray
            pbar.update(1)
    
    stabilised = []
    for frame, H in zip(frames, transforms):
        warped = cv2.warpPerspective(frame, H, (width, height),
                                   flags=cv2.INTER_LINEAR,
                                   borderMode=cv2.BORDER_REFLECT)
        stabilised.append(warped)
    
    return stabilised

# Test stabilization
stable_frames = stabilise_frames(test_frames)

# Show before/after comparison
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
ax1.imshow(test_frames[0])
ax1.set_title('Before Stabilization')
ax1.axis('off')
ax2.imshow(stable_frames[0])
ax2.set_title('After Stabilization')
ax2.axis('off')
plt.show()

## 5. ROI Detection and Cropping

Implement ROI detection using the red marker:

In [None]:
def crop_roi(frames: List[np.ndarray], marker_hsv: Tuple[int, int, int] = (0, 255, 255), 
           pad: int = 10) -> List[np.ndarray]:
    """Detect and crop region around the red marker.
    
    Parameters
    ----------
    frames : List[np.ndarray]
        Input RGB frames
    marker_hsv : Tuple[int, int, int]
        HSV color of the marker
    pad : int
        Padding around detected ROI
        
    Returns
    -------
    List[np.ndarray]
        Cropped frames
    """
    if not frames:
        return frames
    
    height, width = frames[0].shape[:2]
    boxes = []
    tol = np.array([10, 100, 100])
    hsv_target = np.array(marker_hsv)
    
    with tqdm(total=len(frames), desc="Detecting ROI") as pbar:
        for frame in frames:
            hsv = cv2.cvtColor(frame, cv2.COLOR_RGB2HSV)
            lower = np.maximum(hsv_target - tol, (0, 0, 0))
            upper = np.minimum(hsv_target + tol, (179, 255, 255))
            mask = cv2.inRange(hsv, lower, upper)
            contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            
            if contours:
                cnt = max(contours, key=cv2.contourArea)
                if cv2.contourArea(cnt) > 0:
                    x, y, w, h = cv2.boundingRect(cnt)
                    boxes.append((x, y, x + w, y + h))
            pbar.update(1)
    
    if not boxes:
        print("[WARN] Marker not found; returning full frames")
        return frames
    
    box = np.array(boxes)
    x0, y0 = box[:, :2].min(axis=0)
    x1, y1 = box[:, 2:].max(axis=0)
    
    x0 = max(0, x0 - pad)
    y0 = max(0, y0 - pad)
    x1 = min(width, x1 + pad)
    y1 = min(height, y1 + pad)
    
    cropped = [f[y0:y1, x0:x1].copy() for f in frames]
    return cropped

# Test ROI detection
cropped_frames = crop_roi(stable_frames)
print(f"Original shape: {stable_frames[0].shape}")
print(f"Cropped shape: {cropped_frames[0].shape}")

# Show before/after comparison
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
ax1.imshow(stable_frames[0])
ax1.set_title('Before ROI Cropping')
ax1.axis('off')
ax2.imshow(cropped_frames[0])
ax2.set_title('After ROI Cropping')
ax2.axis('off')
plt.show()

## 6. Laplacian Pyramid

Implement pyramid construction and reconstruction for motion magnification:

In [None]:
def build_laplacian_pyramid(img: np.ndarray, levels: int = 5) -> List[np.ndarray]:
    """Build Laplacian pyramid.
    
    Parameters
    ----------
    img : np.ndarray
        Input RGB image
    levels : int
        Number of pyramid levels
        
    Returns
    -------
    List[np.ndarray]
        Pyramid levels
    """
    pyr = []
    current = img.astype(np.float32)
    
    for _ in range(levels):
        down = cv2.pyrDown(current)
        up = cv2.pyrUp(down, dstsize=(current.shape[1], current.shape[0]))
        lap = current - up
        pyr.append(lap)
        current = down
    
    pyr.append(current)
    return pyr

def reconstruct_from_laplacian(pyr: List[np.ndarray]) -> np.ndarray:
    """Reconstruct image from Laplacian pyramid.
    
    Parameters
    ----------
    pyr : List[np.ndarray]
        Pyramid levels
        
    Returns
    -------
    np.ndarray
        Reconstructed image
    """
    current = pyr[-1]
    for lap in reversed(pyr[:-1]):
        up = cv2.pyrUp(current, dstsize=(lap.shape[1], lap.shape[0]))
        current = up + lap
    
    current = np.clip(current, 0, 255)
    return current.astype(np.uint8)

# Test pyramid construction and reconstruction
test_frame = cropped_frames[0]
pyr = build_laplacian_pyramid(test_frame, levels=4)
recon = reconstruct_from_laplacian(pyr)

# Compute PSNR
mse = np.mean((test_frame.astype(np.float32) - recon.astype(np.float32)) ** 2)
psnr = 10 * np.log10(255**2 / (mse + 1e-8))
print(f"Reconstruction PSNR: {psnr:.2f} dB")

# Display pyramid levels and reconstruction
fig, axs = plt.subplots(2, 3, figsize=(15, 10))
fig.suptitle('Laplacian Pyramid and Reconstruction')

for i, level in enumerate(pyr[:-1]):
    ax = axs.flat[i]
    ax.imshow(np.abs(level).astype(np.uint8))
    ax.set_title(f'Level {i+1}')
    ax.axis('off')

axs.flat[-2].imshow(pyr[-1].astype(np.uint8))
axs.flat[-2].set_title('Gaussian Residual')
axs.flat[-2].axis('off')

axs.flat[-1].imshow(recon)
axs.flat[-1].set_title('Reconstruction')
axs.flat[-1].axis('off')

plt.tight_layout()
plt.show()

## 7. Temporal Filtering

Implement temporal bandpass filter for motion isolation:

In [None]:
def temporal_bandpass(stack: np.ndarray, fs: int, f_low: float, f_high: float, 
                      order: int = 3) -> np.ndarray:
    """Apply temporal bandpass filter.
    
    Parameters
    ----------
    stack : np.ndarray
        Input array with time as first dimension
    fs : int
        Sampling frequency in Hz
    f_low : float
        Low cutoff frequency
    f_high : float
        High cutoff frequency
    order : int
        Filter order
        
    Returns
    -------
    np.ndarray
        Filtered array
    """
    nyq = 0.5 * fs
    low = f_low / nyq
    high = f_high / nyq
    b, a = butter(order, [low, high], btype="band")
    
    T = stack.shape[0]
    flat = stack.reshape(T, -1)
    filtered = filtfilt(b, a, flat, axis=0)
    return filtered.reshape(stack.shape)

def amplify(stack: np.ndarray, alpha: float) -> np.ndarray:
    """Amplify signal by constant factor."""
    return stack * alpha

# Test temporal filtering
fs = 30  # Hz
t = np.arange(0, 1, 1/fs)
freq = 3.0  # target frequency

# Create synthetic motion signal
signal = np.sin(2 * np.pi * freq * t) + 0.5 * np.random.randn(len(t))
stack = signal[:, None, None].astype(np.float32)

# Apply filter
filtered = temporal_bandpass(stack, fs, freq-0.5, freq+0.5).squeeze()

# Plot frequency domain comparison
fft_freqs = np.fft.rfftfreq(len(t), 1/fs)
orig_fft = np.abs(np.fft.rfft(signal))
filt_fft = np.abs(np.fft.rfft(filtered))

plt.figure(figsize=(12, 4))
plt.plot(fft_freqs, orig_fft, label='Original')
plt.plot(fft_freqs, filt_fft, label='Filtered')
plt.axvline(freq, color='r', linestyle='--', label='Target frequency')
plt.xlabel('Frequency (Hz)')
plt.ylabel('Magnitude')
plt.title('Frequency Response of Bandpass Filter')
plt.legend()
plt.grid(True)
plt.show()

# Print amplitude at target frequency
target_idx = np.argmin(np.abs(fft_freqs - freq))
print(f"Amplitude at {freq} Hz before filtering: {orig_fft[target_idx]:.3f}")
print(f"Amplitude at {freq} Hz after filtering: {filt_fft[target_idx]:.3f}")
print(f"Peak frequency in filtered signal: {fft_freqs[np.argmax(filt_fft)]:.2f} Hz")

## 8. Motion Magnification

Combine pyramid decomposition and temporal filtering to magnify motion:

In [None]:
def magnify_motion(frames: List[np.ndarray], fs: int, f_low: float, f_high: float,
                 alpha: float, levels: int = 5) -> List[np.ndarray]:
    """Magnify motion in video.
    
    Parameters
    ----------
    frames : List[np.ndarray]
        Input RGB frames
    fs : int
        Frame rate in Hz
    f_low : float
        Low cutoff frequency
    f_high : float
        High cutoff frequency
    alpha : float
        Amplification factor
    levels : int
        Number of pyramid levels
        
    Returns
    -------
    List[np.ndarray]
        Motion magnified frames
    """
    if not frames:
        return []
    
    # Build Laplacian pyramids
    print("Building Laplacian pyramids...")
    pyramids = [build_laplacian_pyramid(f, levels) for f in frames]
    n_levels = len(pyramids[0])
    T = len(frames)
    
    # Process each level
    print("Processing pyramid levels...")
    for lvl in tqdm(range(n_levels), desc="Magnifying motion"):
        stack = np.stack([pyr[lvl] for pyr in pyramids], axis=0)
        filtered = temporal_bandpass(stack, fs, f_low, f_high)
        amplified = amplify(filtered, alpha)
        
        for t in range(T):
            pyramids[t][lvl] = pyramids[t][lvl] + amplified[t]
    
    # Reconstruct frames
    print("Reconstructing frames...")
    out_frames = [reconstruct_from_laplacian(pyr) for pyr in pyramids]
    return out_frames

# Test motion magnification on a short sequence
magnified = magnify_motion(cropped_frames[:100], fs=30, 
                         f_low=0.5, f_high=3.0, alpha=10)

# Display original vs magnified frames
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
ax1.imshow(cropped_frames[0])
ax1.set_title('Original Frame')
ax1.axis('off')
ax2.imshow(magnified[0])
ax2.set_title('Motion Magnified Frame')
ax2.axis('off')
plt.show()

# Save a few frames as GIF for visualization
from IPython.display import Image
import imageio

print("Creating preview GIF...")
frames_for_gif = [frame[::2, ::2] for frame in magnified[:30]]  # Downsample for size
imageio.mimsave('magnified_preview.gif', frames_for_gif, fps=10)
display(Image('magnified_preview.gif'))

## 9. Feature Extraction

Extract and combine energy, optical flow, and FFT features:

In [None]:
def energy_features(frames: List[np.ndarray]) -> np.ndarray:
    """Compute energy features from frame differences."""
    if len(frames) < 2:
        return np.zeros(3, dtype=np.float32)
    
    diffs = []
    prev_gray = cv2.cvtColor(frames[0], cv2.COLOR_RGB2GRAY).astype(np.float32)
    
    for f in frames[1:]:
        gray = cv2.cvtColor(f, cv2.COLOR_RGB2GRAY).astype(np.float32)
        diff = np.abs(gray - prev_gray)
        diffs.append(diff)
        prev_gray = gray
    
    diffs = np.stack(diffs, axis=0)
    feat = np.array([
        diffs.mean(),
        diffs.std(),
        diffs.max()
    ], dtype=np.float32)
    
    return feat

def flow_hist_features(frames: List[np.ndarray], bins: int = 16) -> np.ndarray:
    """Compute optical flow histogram features."""
    if len(frames) < 2:
        return np.zeros(bins, dtype=np.float32)
    
    # Resize frames for faster processing
    target_h = 160
    resized = [cv2.resize(f, (int(f.shape[1] * target_h / f.shape[0]), target_h))
               for f in frames]
    
    mags = []
    max_mag = 0.0
    for i in range(len(resized) - 1):
        g1 = cv2.cvtColor(resized[i], cv2.COLOR_RGB2GRAY)
        g2 = cv2.cvtColor(resized[i + 1], cv2.COLOR_RGB2GRAY)
        flow = cv2.calcOpticalFlowFarneback(
            g1, g2, None,
            pyr_scale=0.5, levels=3, winsize=15,
            iterations=3, poly_n=5, poly_sigma=1.2, flags=0,
        )
        mag = np.linalg.norm(flow, axis=2)
        mags.append(mag)
        max_mag = max(max_mag, float(mag.max()))
    
    max_mag = max(max_mag, 1e-3)
    hist_accum = np.zeros(bins, dtype=np.float32)
    for mag in mags:
        hist, _ = np.histogram(mag, bins=bins, range=(0, max_mag))
        hist_accum += hist.astype(np.float32) / mag.size
        
    return hist_accum / len(mags)

def fft_sideband_features(frames: List[np.ndarray], fs: int, f1: float) -> np.ndarray:
    """Compute FFT sideband features."""
    if not frames:
        return np.zeros(3, dtype=np.float32)
    
    # Extract mean intensity signal
    signal = [cv2.cvtColor(f, cv2.COLOR_RGB2GRAY).mean() for f in frames]
    signal = np.array(signal, dtype=np.float32)
    
    # Compute FFT
    freqs = np.fft.rfftfreq(len(signal), d=1.0/fs)
    spectrum = np.abs(np.fft.rfft(signal))
    
    # Find amplitudes at fundamental and second harmonic
    def amp_at(freq):
        idx = np.argmin(np.abs(freqs - freq))
        return float(spectrum[idx])
    
    A1 = amp_at(f1)
    A2 = amp_at(2 * f1)
    ratio = A2 / A1 if A1 > 1e-8 else 0.0
    
    return np.array([A1, A2, ratio], dtype=np.float32)

def build_feature_vector(frames: List[np.ndarray], fs: int, f1: float) -> np.ndarray:
    """Combine all features into one vector."""
    energy = energy_features(frames)
    flow = flow_hist_features(frames)
    fft = fft_sideband_features(frames, fs, f1)
    return np.concatenate([energy, flow, fft])

# Test feature extraction on our sample frames
feature_vector = build_feature_vector(magnified, fs=30, f1=1.0)
print("Feature vector shape:", feature_vector.shape)
print("\nFeature breakdown:")
print("- Energy features:", feature_vector[:3])
print("- Flow histogram:", feature_vector[3:-3])
print("- FFT features:", feature_vector[-3:])

## 10. Process Training Data

Extract features from normal and abnormal videos:

In [None]:
def process_video(video_path: str, segment_frames: int = 150, step_frames: int = 75,
                fs: int = 30, f1: float = 1.0) -> List[np.ndarray]:
    """Process video and extract features from segments.
    
    Parameters
    ----------
    video_path : str
        Path to video file
    segment_frames : int
        Number of frames per segment
    step_frames : int
        Number of frames to shift between segments
    fs : int
        Frame rate in Hz
    f1 : float
        Fundamental frequency for FFT features
        
    Returns
    -------
    List[np.ndarray]
        List of feature vectors
    """
    print(f"Processing {os.path.basename(video_path)}...")
    
    # Extract frames
    frames = extract_frames(video_path)
    print(f"Extracted {len(frames)} frames")
    
    # Stabilize
    frames = stabilise_frames(frames)
    print("Stabilization complete")
    
    # Crop ROI
    frames = crop_roi(frames)
    print(f"ROI cropping complete, shape: {frames[0].shape}")
    
    # Process segments
    features = []
    for start in tqdm(range(0, len(frames) - segment_frames, step_frames),
                     desc="Processing segments"):
        segment = frames[start:start + segment_frames]
        
        # Magnify motion
        magnified = magnify_motion(segment, fs, f1-0.5, f1+0.5, alpha=10)
        
        # Extract features
        feat = build_feature_vector(magnified, fs, f1)
        features.append(feat)
    
    return features

# Process normal and abnormal videos
X_normal = process_video(NORMAL_VIDEO)
y_normal = np.zeros(len(X_normal))

X_abnormal = process_video(ABNORMAL_VIDEO)
y_abnormal = np.ones(len(X_abnormal))

# Combine datasets
X = np.vstack([X_normal, X_abnormal])
y = np.concatenate([y_normal, y_abnormal])

print("\nDataset summary:")
print(f"Normal samples: {len(X_normal)}")
print(f"Abnormal samples: {len(X_abnormal)}")
print(f"Feature matrix shape: {X.shape}")

# Save processed data
np.save('features.npy', X)
np.save('labels.npy', y)

## 11. Train and Evaluate Classifier

Train SVM classifier with hyperparameter tuning:

In [None]:
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from joblib import dump, load

# Split data for validation
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Optionally apply PCA (recommended for high-dimensional features)
pca = PCA(n_components=0.95)
X_train_pca = pca.fit_transform(X_train)
X_val_pca = pca.transform(X_val)

# SVM with grid search
param_grid = {
    'C': [1, 10, 100],
    'gamma': ['scale', 'auto'],
    'kernel': ['rbf']
}
svc = SVC(probability=True)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=0)
grid = GridSearchCV(svc, param_grid, cv=cv, n_jobs=-1)
grid.fit(X_train_pca, y_train)

best_model = grid.best_estimator_
dump(best_model, 'svm_model.joblib')

# Evaluate
y_pred = best_model.predict(X_val_pca)
acc = accuracy_score(y_val, y_pred)
prec = precision_score(y_val, y_pred)
rec = recall_score(y_val, y_pred)
f1 = f1_score(y_val, y_pred)
print(f'Validation Accuracy: {acc:.3f}')
print(f'Precision: {prec:.3f}, Recall: {rec:.3f}, F1: {f1:.3f}')
print('Confusion Matrix:')
print(confusion_matrix(y_val, y_pred))
print('Classification Report:')
print(classification_report(y_val, y_pred))