<a href="https://colab.research.google.com/github/mohkmh/my-new-rep/blob/main/Python_Code_for_ECG_Beat_Classification_(MIT_BIH).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
# -*- coding: utf-8 -*-
"""
Script for ECG beat classification (Normal vs. VEB) using MIT-BIH data.
"""

import wfdb # للتعامل مع بيانات PhysioNet
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal as signal
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
# سنستخدم Random Forest كمصنف (قوي وجيد كبداية)
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
import seaborn as sns
import os # للتحقق من وجود مجلد البيانات

# -------------------------------------------
# 0. إعدادات وتحميل البيانات
# Settings and Data Loading
# -------------------------------------------
# تحديد رقم التسجيل وقناة الإشارة (عادةً القناة الأولى هي MLII)
# Specify record name and signal channel (usually the first channel is MLII)
record_name = '101' # مثال لتسجيل يحتوي على نبضات طبيعية و VEBs (Example record with Normal and VEB beats)
channel = 0
data_dir = 'mit_bih_data' # مجلد لتخزين بيانات physionet (Folder to store PhysioNet data)

# التأكد من وجود المجلد وإنشائه إذا لم يكن موجودًا
# Ensure the directory exists, create it if not
if not os.path.exists(data_dir):
    os.makedirs(data_dir)
    print(f"Created directory: {data_dir}")

# تحميل بيانات التسجيل (الإشارة والبيانات الوصفية)
# Load record data (signal and metadata)
# download=True سيقوم بتحميل البيانات إذا لم تكن موجودة في data_dir
# download=True will download data if not present in data_dir
try:
    print(f"Attempting to load record '{record_name}' from PhysioNet MIT-BIH database...")
    record = wfdb.rdrecord(f'{record_name}', sampfrom=0, sampto=None, # sampto=30000 لتحميل جزء فقط (to load only a part)
                           channels=[channel], pb_dir='mitdb', # pb_dir يحدد قاعدة البيانات على PhysioNet (specifies the database on PhysioNet)
                           data_dir=data_dir) # تحديد المجلد المحلي (Specify local directory)
    # تحميل التعليقات التوضيحية (أنواع النبضات ومواقعها)
    # Load annotations (beat types and locations)
    annotation = wfdb.rdann(f'{record_name}', 'atr', sampfrom=0, sampto=None, # sampto=30000
                            pb_dir='mitdb', data_dir=data_dir)
    print("Record and annotations loaded successfully.")
except Exception as e:
    print(f"Error loading record {record_name}: {e}")
    print("Please ensure you have an internet connection for the first download,")
    print(f"or that the data exists in the specified directory: {data_dir}")
    exit()

# استخلاص الإشارة ومعدل أخذ العينات
# Extract signal and sampling rate
ecg_signal = record.p_signal[:, 0] # أخذ القناة الأولى (Take the first channel)
fs = record.fs
print(f"Loaded record: {record_name}, Sampling rate: {fs} Hz, Signal length: {len(ecg_signal)} samples")

# استخلاص مواقع النبضات وأنواعها من التعليقات التوضيحية
# Extract beat locations and types from annotations
beat_indices = annotation.sample
beat_symbols = annotation.symbol
print(f"Number of annotated beats: {len(beat_indices)}")
print(f"Unique beat symbols found: {np.unique(beat_symbols)}")

# -------------------------------------------
# 1. معالجة أولية بسيطة للإشارة (اختياري لكن موصى به)
# Simple Signal Preprocessing (Optional but recommended)
# -------------------------------------------
# إزالة انحراف خط الأساس
# Remove baseline wander using a high-pass filter
order_hp = 4
cutoff_hp = 0.5 # هرتز (Hz)
b_hp, a_hp = signal.butter(order_hp, cutoff_hp, btype='highpass', fs=fs)
ecg_filtered = signal.filtfilt(b_hp, a_hp, ecg_signal)
print("Applied high-pass filter for baseline wander removal.")

# -------------------------------------------
# 2. اكتشاف قمم R (يمكن استخدام المكتشف أو التعليقات التوضيحية)
# R-peak Detection (Can use a detector or annotations)
# -------------------------------------------
# في هذا المثال، سنستخدم مواقع النبضات من التعليقات التوضيحية *الموثوقة*
# كبديل لاكتشاف R-peaks، لضمان دقة مواقع النبضات وأنواعها.
# (في تطبيق حقيقي، ستحتاج إلى خوارزمية اكتشاف R-peak قوية)
# In this example, we use the *reliable* beat locations from annotations
# instead of an R-peak detector, ensuring accurate beat locations and types.
# (In a real application, you would need a robust R-peak detection algorithm)
rpeak_indices_annotated = beat_indices
print(f"Using {len(rpeak_indices_annotated)} annotated R-peak locations.")

# -------------------------------------------
# 3. استخلاص قطع النبضات (Beat Segmentation) وتحديد الفئات
# Beat Segmentation and Class Definition
# -------------------------------------------
# تحديد الفئات المستهدفة (طبيعي 'N' و VEB 'V')
# Define target classes (Normal 'N' and VEB 'V')
# رموز MIT-BIH للطبيعي تشمل 'N', 'L', 'R', 'e', 'j' (سنعتبرها كلها 'N')
# MIT-BIH symbols for Normal include 'N', 'L', 'R', 'e', 'j' (we'll consider them all 'N')
# رموز VEB تشمل 'V', 'E' (سنعتبرها كلها 'V')
# VEB symbols include 'V', 'E' (we'll consider them all 'V')
target_symbols = {'N': 0, 'L': 0, 'R': 0, 'e': 0, 'j': 0, # الفئة 0: طبيعي (Class 0: Normal)
                  'V': 1, 'E': 1}                         # الفئة 1: VEB (Class 1: VEB)
class_names = ['Normal (N)', 'VEB (V)']

# تحديد حجم النافذة حول قمة R لاستخلاص قطعة النبضة
# Define window size around R-peak for beat segment extraction
window_before = int(0.1 * fs) # 100 مللي ثانية قبل القمة (100 ms before peak)
window_after = int(0.15 * fs) # 150 مللي ثانية بعد القمة (150 ms after peak)
segment_length = window_before + window_after
print(f"Beat segment window: {window_before} samples before R, {window_after} samples after R. Total length: {segment_length} samples.")

segmented_beats = []
labels = []
rr_intervals_prev = []
rr_intervals_next = []
beat_rms = []

# المرور على جميع النبضات المشروحة
# Iterate through all annotated beats
# نتجاهل الأولى والأخيرة لسهولة حساب RR (Ignore first and last for easy RR calculation)
for i in range(1, len(rpeak_indices_annotated) - 1):
    current_beat_index = rpeak_indices_annotated[i]
    current_beat_symbol = beat_symbols[i]

    # التحقق مما إذا كان نوع النبضة ضمن الفئات المستهدفة
    # Check if the beat type is among the target classes
    if current_beat_symbol in target_symbols:
        # التأكد من أن النافذة لا تخرج عن حدود الإشارة
        # Ensure the window does not go out of signal bounds
        start = current_beat_index - window_before
        end = current_beat_index + window_after
        if start >= 0 and end < len(ecg_filtered):
            # استخلاص قطعة النبضة
            # Extract beat segment
            beat_segment = ecg_filtered[start:end]

            # Ensure segment has the correct length (sometimes edge cases might cause issues)
            if len(beat_segment) == segment_length:
                segmented_beats.append(beat_segment)

                # إضافة التصنيف (0 أو 1)
                # Add the label (0 or 1)
                labels.append(target_symbols[current_beat_symbol])

                # حساب فترة R-R السابقة واللاحقة (بالثواني)
                # Calculate previous and next R-R interval (in seconds)
                prev_beat_index = rpeak_indices_annotated[i-1]
                next_beat_index = rpeak_indices_annotated[i+1]
                rr_prev = (current_beat_index - prev_beat_index) / fs
                rr_next = (next_beat_index - current_beat_index) / fs
                rr_intervals_prev.append(rr_prev)
                rr_intervals_next.append(rr_next)

                # حساب ميزة بسيطة للشكل: RMS للقطعة
                # Calculate a simple shape feature: RMS of the segment
                rms = np.sqrt(np.mean(beat_segment**2))
                beat_rms.append(rms)
            # else:
            #     print(f"Skipping beat at index {current_beat_index} due to inconsistent segment length: {len(beat_segment)}")


print(f"Extracted {len(segmented_beats)} beats belonging to target classes.")
# Check if any beats were extracted
if not segmented_beats:
    print("Error: No beats were extracted. Check annotation symbols or windowing logic.")
    exit()

unique_labels, counts = np.unique(labels, return_counts=True)
print(f"Class distribution: {dict(zip(unique_labels, counts))}")

# -------------------------------------------
# 4. إنشاء مصفوفة الميزات (Feature Matrix)
# Create Feature Matrix
# -------------------------------------------
# الميزات: RR_prev, RR_next, RMS
# Features: RR_prev, RR_next, RMS
# Note: We could also add morphological features from `segmented_beats`
features = np.column_stack((rr_intervals_prev, rr_intervals_next, beat_rms))
y = np.array(labels)

print(f"Feature matrix shape: {features.shape}") # (عدد النبضات × 3 ميزات) (number of beats x 3 features)

# -------------------------------------------
# 5. تقسيم البيانات وتحجيم الميزات
# Data Splitting and Feature Scaling
# -------------------------------------------
# stratify=y ensures proportional class representation in train/test splits
X_train, X_test, y_train, y_test = train_test_split(features, y, test_size=0.3, random_state=42, stratify=y)
print(f"Training set size: {X_train.shape[0]}, Test set size: {X_test.shape[0]}")

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
print("Features scaled using StandardScaler.")

# -------------------------------------------
# 6. تدريب مصنف الغابة العشوائية (Random Forest)
# Train Random Forest Classifier
# -------------------------------------------
# n_estimators: عدد الأشجار في الغابة (number of trees in the forest)
# max_depth: للتحكم في عمق كل شجرة (يساعد على منع فرط التخصيص)
# max_depth: controls the depth of each tree (helps prevent overfitting)
# class_weight='balanced': مفيد إذا كانت الفئات غير متوازنة (كما هو الحال غالبًا في ECG)
# class_weight='balanced': useful if classes are imbalanced (often the case in ECG)
rf_classifier = RandomForestClassifier(n_estimators=100, max_depth=10,
                                       random_state=42, class_weight='balanced')

print("\nTraining Random Forest classifier...")
rf_classifier.fit(X_train_scaled, y_train)
print("Training complete.")

# -------------------------------------------
# 7. التنبؤ والتقييم
# Prediction and Evaluation
# -------------------------------------------
print("\nEvaluating Random Forest classifier on the test set...")
y_pred_rf = rf_classifier.predict(X_test_scaled)

accuracy_rf = accuracy_score(y_test, y_pred_rf)
print(f"Accuracy: {accuracy_rf:.4f}")

print("\nConfusion Matrix:")
cm_rf = confusion_matrix(y_test, y_pred_rf)
plt.figure(figsize=(6, 5))
sns.heatmap(cm_rf, annot=True, fmt='d', cmap='Purples',
            xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title(f'Random Forest Confusion Matrix (Record {record_name})')
plt.tight_layout()
plt.show() # Display the plot

print("\nClassification Report:")
report_rf = classification_report(y_test, y_pred_rf, target_names=class_names)
print(report_rf)

# -------------------------------------------
# 8. (اختياري) أهمية الميزات
# (Optional) Feature Importances
# -------------------------------------------
importances = rf_classifier.feature_importances_
feature_names_list = ['RR_prev', 'RR_next', 'Beat_RMS']
indices = np.argsort(importances)[::-1] # ترتيب الميزات من الأكثر أهمية للأقل (Sort features from most to least important)

print("\nFeature Importances:")
for f in range(features.shape[1]):
    print(f"{f + 1}. Feature: {feature_names_list[indices[f]]} ({importances[indices[f]]:.4f})")

# رسم أهمية الميزات
# Plot feature importances
plt.figure(figsize=(8, 4))
plt.title("Feature Importances (Random Forest)")
plt.bar(range(features.shape[1]), importances[indices], align='center')
plt.xticks(range(features.shape[1]), [feature_names_list[i] for i in indices], rotation=45, ha='right')
plt.xlim([-1, features.shape[1]])
plt.ylabel("Importance")
plt.tight_layout()
plt.show() # Display the plot

print("\nScript finished.")

ModuleNotFoundError: No module named 'wfdb'

Add `%load_ext cudf.pandas` before importing pandas to speed up operations using GPU

In [6]:
%load_ext cudf.pandas
import pandas as pd
import numpy as np

# Randomly generated dataset of parking violations-
# Define the number of rows
num_rows = 1000000

states = ["NY", "NJ", "CA", "TX"]
violations = ["Double Parking", "Expired Meter", "No Parking",
              "Fire Hydrant", "Bus Stop"]
vehicle_types = ["SUBN", "SDN"]

# Create a date range
start_date = "2022-01-01"
end_date = "2022-12-31"
dates = pd.date_range(start=start_date, end=end_date, freq='D')

# Generate random data
data = {
    "Registration State": np.random.choice(states, size=num_rows),
    "Violation Description": np.random.choice(violations, size=num_rows),
    "Vehicle Body Type": np.random.choice(vehicle_types, size=num_rows),
    "Issue Date": np.random.choice(dates, size=num_rows),
    "Ticket Number": np.random.randint(1000000000, 9999999999, size=num_rows)
}

# Create a DataFrame
df = pd.DataFrame(data)

# Which parking violation is most commonly committed by vehicles from various U.S states?

(df[["Registration State", "Violation Description"]]  # get only these two columns
 .value_counts()  # get the count of offences per state and per type of offence
 .groupby("Registration State")  # group by state
 .head(1)  # get the first row in each group (the type of offence with the largest count)
 .sort_index()  # sort by state name
 .reset_index()
)


stdout:



stderr:

Traceback (most recent call last):
  File "<string>", line 4, in <module>
  File "/usr/local/lib/python3.11/dist-packages/numba_cuda/numba/cuda/cudadrv/driver.py", line 314, in __getattr__
    raise CudaSupportError("Error at driver init: \n%s:" %
numba.cuda.cudadrv.error.CudaSupportError: Error at driver init: 

CUDA driver library cannot be found.
If you are sure that a CUDA driver is installed,
try setting environment variable NUMBA_CUDA_DRIVER
with the file path of the CUDA driver shared library.
:


Not patching Numba


RuntimeError: Function "cuInit" not found