In [12]:
# Real-time Surface Defect Detection
import tensorflow as tf
import numpy as np
import cv2
from PIL import Image
import time
import os

# ===== STEP 1: TRAIN A DUMMY MODEL AND EXPORT TO TFLITE =====
def create_dummy_tflite_model():
    if os.path.exists("defect_detection.tflite"):
        print("✅ TFLite model already exists.")
        return

    print("🔧 Creating dummy TFLite model...")
    x_train = np.random.rand(100, 224, 224, 3).astype(np.float32)
    y_train = tf.keras.utils.to_categorical(np.random.randint(0, 6, 100), num_classes=6)

    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(224, 224, 3)),
        tf.keras.layers.Conv2D(8, (3, 3), activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dense(6, activation='softmax')  # 6 defect classes
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    model.fit(x_train, y_train, epochs=1, batch_size=8, verbose=0)

    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite_model = converter.convert()

    with open("defect_detection.tflite", "wb") as f:
        f.write(tflite_model)
    print("✅ Dummy TFLite model saved as 'defect_detection.tflite'")

# ===== STEP 2: CREATE A DUMMY TEST IMAGE IF NEEDED =====
def create_test_image():
    if not os.path.exists("test_surface.jpg"):
        dummy_image = (np.random.rand(224, 224, 3) * 255).astype(np.uint8)
        cv2.imwrite("test_surface.jpg", dummy_image)
        print("🖼️  Dummy test image saved as 'test_surface.jpg'")

# ===== CLASS DEFINITION =====
class DefectDetector:
    def __init__(self, model_path):
        self.interpreter = tf.lite.Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()

        self.classes = ['Scratch', 'Crack', 'Rust', 'Dent', 'Bubble', 'Normal']
        self.materials = ['Metal', 'Glass', 'Plastic', 'Fabric']
        self.inference_times = []

    def preprocess_image(self, image):
        image = cv2.resize(image, (224, 224))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = image.astype(np.float32) / 255.0
        return np.expand_dims(image, axis=0)

    def detect_defects(self, image):
        start_time = time.time()
        input_data = self.preprocess_image(image)

        self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
        self.interpreter.invoke()

        predictions = self.interpreter.get_tensor(self.output_details[0]['index'])[0]
        inference_time = (time.time() - start_time) * 1000
        self.inference_times.append(inference_time)

        predicted_class_idx = np.argmax(predictions)
        confidence = predictions[predicted_class_idx]
        predicted_class = self.classes[predicted_class_idx]
        severity = self.get_severity(predicted_class, confidence)

        return {
            'defect_type': predicted_class,
            'confidence': float(confidence),
            'severity': severity,
            'all_predictions': {
                self.classes[i]: float(predictions[i])
                for i in range(len(self.classes))
            },
            'inference_time_ms': inference_time,
            'is_defective': predicted_class != 'Normal'
        }

    def get_severity(self, defect_type, confidence):
        if defect_type == 'Normal':
            return 'None'
        elif defect_type in ['Crack', 'Rust'] and confidence > 0.8:
            return 'High'
        elif defect_type in ['Scratch', 'Dent'] and confidence > 0.7:
            return 'Medium'
        else:
            return 'Low'

    def get_performance_stats(self):
        if not self.inference_times:
            return None
        return {
            'avg_inference_time': np.mean(self.inference_times),
            'min_inference_time': np.min(self.inference_times),
            'max_inference_time': np.max(self.inference_times),
            'total_inferences': len(self.inference_times)
        }

# ===== MAIN FUNCTION =====
def main():
    create_dummy_tflite_model()
    create_test_image()

    detector = DefectDetector('defect_detection.tflite')
    image = cv2.imread('test_surface.jpg')
    result = detector.detect_defects(image)

    print(f"\n🔍 Defect Detection Results:")
    print(f"Type: {result['defect_type']}")
    print(f"Confidence: {result['confidence']:.3f}")
    print(f"Severity: {result['severity']}")
    print(f"Inference Time: {result['inference_time_ms']:.1f}ms")

    if result['is_defective']:
        print("⚠️  DEFECT DETECTED - Requires attention")
    else:
        print("✅ Surface quality approved")

# ===== RUN MAIN =====
if __name__ == "__main__":
    main()


✅ TFLite model already exists.

🔍 Defect Detection Results:
Type: Normal
Confidence: 0.183
Severity: None
Inference Time: 6.5ms
✅ Surface quality approved


In [13]:
# Real-time Surface Defect Detection
import tensorflow as tf
import numpy as np
import cv2
from PIL import Image
import time
import os

# ===== STEP 1: Create Dummy TFLite Model if Missing =====
def create_dummy_model():
    if os.path.exists("defect_detection.tflite"):
        return
    print("📦 Creating dummy defect detection model...")

    x_train = np.random.rand(100, 224, 224, 3).astype(np.float32)
    y_train = tf.keras.utils.to_categorical(np.random.randint(0, 6, 100), num_classes=6)

    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(224, 224, 3)),
        tf.keras.layers.Conv2D(8, (3, 3), activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dense(6, activation='softmax')
    ])

    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    model.fit(x_train, y_train, epochs=1, batch_size=8, verbose=0)

    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite_model = converter.convert()

    with open("defect_detection.tflite", "wb") as f:
        f.write(tflite_model)
    print("✅ Model saved as 'defect_detection.tflite'")


# ===== STEP 2: Create Dummy Test Image if Missing =====
def create_dummy_image():
    if not os.path.exists("test_surface.jpg"):
        dummy_img = (np.random.rand(224, 224, 3) * 255).astype(np.uint8)
        cv2.imwrite("test_surface.jpg", dummy_img)
        print("🖼️  Dummy test image saved as 'test_surface.jpg'")


# ===== DEFECT DETECTOR CLASS =====
class DefectDetector:
    def __init__(self, model_path):
        # Load TensorFlow Lite model
        self.interpreter = tf.lite.Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()

        # Get input and output details
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()

        # Define class labels
        self.classes = ['Scratch', 'Crack', 'Rust', 'Dent', 'Bubble', 'Normal']
        self.materials = ['Metal', 'Glass', 'Plastic', 'Fabric']

        # Performance tracking
        self.inference_times = []

    def preprocess_image(self, image):
        """Preprocess image for model input"""
        image = cv2.resize(image, (224, 224))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = image.astype(np.float32) / 255.0
        return np.expand_dims(image, axis=0)

    def detect_defects(self, image):
        """Run defect detection on input image"""
        start_time = time.time()
        input_data = self.preprocess_image(image)

        self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
        self.interpreter.invoke()

        predictions = self.interpreter.get_tensor(self.output_details[0]['index'])[0]

        inference_time = (time.time() - start_time) * 1000
        self.inference_times.append(inference_time)

        predicted_class_idx = np.argmax(predictions)
        confidence = predictions[predicted_class_idx]
        predicted_class = self.classes[predicted_class_idx]

        severity = self.get_severity(predicted_class, confidence)

        return {
            'defect_type': predicted_class,
            'confidence': float(confidence),
            'severity': severity,
            'all_predictions': {
                self.classes[i]: float(predictions[i])
                for i in range(len(self.classes))
            },
            'inference_time_ms': inference_time,
            'is_defective': predicted_class != 'Normal'
        }

    def get_severity(self, defect_type, confidence):
        """Determine defect severity based on type and confidence"""
        if defect_type == 'Normal':
            return 'None'
        elif defect_type in ['Crack', 'Rust'] and confidence > 0.8:
            return 'High'
        elif defect_type in ['Scratch', 'Dent'] and confidence > 0.7:
            return 'Medium'
        else:
            return 'Low'

    def get_performance_stats(self):
        """Get performance statistics"""
        if not self.inference_times:
            return None
        return {
            'avg_inference_time': np.mean(self.inference_times),
            'min_inference_time': np.min(self.inference_times),
            'max_inference_time': np.max(self.inference_times),
            'total_inferences': len(self.inference_times)
        }


# ===== MAIN EXECUTION =====
def main():
    create_dummy_model()
    create_dummy_image()

    detector = DefectDetector('defect_detection.tflite')
    image = cv2.imread('test_surface.jpg')
    result = detector.detect_defects(image)

    print(f"\n🔍 Defect Detection Results:")
    print(f"Type: {result['defect_type']}")
    print(f"Confidence: {result['confidence']:.3f}")
    print(f"Severity: {result['severity']}")
    print(f"Inference Time: {result['inference_time_ms']:.1f}ms")

    if result['is_defective']:
        print("⚠️  DEFECT DETECTED - Requires attention")
    else:
        print("✅ Surface quality approved")


if __name__ == "__main__":
    main()



🔍 Defect Detection Results:
Type: Normal
Confidence: 0.183
Severity: None
Inference Time: 4.9ms
✅ Surface quality approved


In [14]:
# Production Line Integration
import cv2
import numpy as np
import threading
import queue
import time
from datetime import datetime
import json
import os
import tensorflow as tf

# --------------------------
# Step 1: Ensure Dummy Model
# --------------------------
def create_dummy_model():
    if os.path.exists("defect_detection.tflite"):
        return
    print("📦 Creating dummy defect detection model...")

    x_train = np.random.rand(100, 224, 224, 3).astype(np.float32)
    y_train = tf.keras.utils.to_categorical(np.random.randint(0, 6, 100), num_classes=6)

    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(224, 224, 3)),
        tf.keras.layers.Conv2D(8, (3, 3), activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dense(6, activation='softmax')
    ])

    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    model.fit(x_train, y_train, epochs=1, batch_size=8, verbose=0)

    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite_model = converter.convert()

    with open("defect_detection.tflite", "wb") as f:
        f.write(tflite_model)
    print("✅ Dummy TFLite model saved.")

# --------------------------
# Step 2: DefectDetector Class
# --------------------------
class DefectDetector:
    def __init__(self, model_path):
        self.interpreter = tf.lite.Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()
        self.classes = ['Scratch', 'Crack', 'Rust', 'Dent', 'Bubble', 'Normal']
        self.inference_times = []

    def preprocess_image(self, image):
        image = cv2.resize(image, (224, 224))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = image.astype(np.float32) / 255.0
        return np.expand_dims(image, axis=0)

    def detect_defects(self, image):
        start_time = time.time()
        input_data = self.preprocess_image(image)
        self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
        self.interpreter.invoke()
        predictions = self.interpreter.get_tensor(self.output_details[0]['index'])[0]
        inference_time = (time.time() - start_time) * 1000
        self.inference_times.append(inference_time)

        predicted_class_idx = np.argmax(predictions)
        confidence = predictions[predicted_class_idx]
        predicted_class = self.classes[predicted_class_idx]
        severity = self.get_severity(predicted_class, confidence)

        return {
            'defect_type': predicted_class,
            'confidence': float(confidence),
            'severity': severity,
            'all_predictions': {self.classes[i]: float(predictions[i]) for i in range(len(self.classes))},
            'inference_time_ms': inference_time,
            'is_defective': predicted_class != 'Normal'
        }

    def get_severity(self, defect_type, confidence):
        if defect_type == 'Normal':
            return 'None'
        elif defect_type in ['Crack', 'Rust'] and confidence > 0.8:
            return 'High'
        elif defect_type in ['Scratch', 'Dent'] and confidence > 0.7:
            return 'Medium'
        else:
            return 'Low'

    def get_performance_stats(self):
        if not self.inference_times:
            return None
        return {
            'avg_inference_time': np.mean(self.inference_times),
            'min_inference_time': np.min(self.inference_times),
            'max_inference_time': np.max(self.inference_times),
            'total_inferences': len(self.inference_times)
        }

# --------------------------
# Step 3: Production Line Class
# --------------------------
class ProductionLineDetector:
    def __init__(self, model_path, camera_id=0):
        self.detector = DefectDetector(model_path)
        self.camera_id = camera_id
        self.frame_queue = queue.Queue(maxsize=10)
        self.result_queue = queue.Queue()
        self.total_inspected = 0
        self.defects_found = 0
        self.production_log = []
        self.running = False
        self.capture_thread = None
        self.process_thread = None

    def start_inspection(self):
        self.running = True
        self.capture_thread = threading.Thread(target=self._capture_frames)
        self.process_thread = threading.Thread(target=self._process_frames)
        self.capture_thread.start()
        self.process_thread.start()
        print("🏭 Production line inspection started")

    def stop_inspection(self):
        self.running = False
        if self.capture_thread:
            self.capture_thread.join()
        if self.process_thread:
            self.process_thread.join()
        print("🛑 Production line inspection stopped")

    def _capture_frames(self):
        cap = cv2.VideoCapture(self.camera_id)
        if not cap.isOpened():
            print("⚠️ WARNING: Could not open camera. Using static dummy frame.")
            dummy_frame = (np.random.rand(720, 1280, 3) * 255).astype(np.uint8)
            while self.running:
                try:
                    self.frame_queue.put(dummy_frame.copy(), timeout=0.01)
                    time.sleep(0.033)
                except queue.Full:
                    pass
            return

        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
        cap.set(cv2.CAP_PROP_FPS, 30)
        while self.running:
            ret, frame = cap.read()
            if ret:
                try:
                    self.frame_queue.put(frame, timeout=0.01)
                except queue.Full:
                    pass
            time.sleep(0.033)
        cap.release()

    def _process_frames(self):
        while self.running:
            try:
                frame = self.frame_queue.get(timeout=1.0)
                result = self.detector.detect_defects(frame)
                self.total_inspected += 1
                if result['is_defective']:
                    self.defects_found += 1
                log_entry = {
                    'timestamp': datetime.now().isoformat(),
                    'item_id': self.total_inspected,
                    'defect_type': result['defect_type'],
                    'confidence': result['confidence'],
                    'severity': result['severity'],
                    'inference_time': result['inference_time_ms']
                }
                self.production_log.append(log_entry)
                self.result_queue.put({'frame': frame, 'result': result, 'log_entry': log_entry})

                if self.total_inspected % 10 == 0:
                    rate = (self.defects_found / self.total_inspected) * 100
                    print(f"📊 Inspected: {self.total_inspected}, Defects: {self.defects_found} ({rate:.1f}%)")
            except queue.Empty:
                continue

    def get_production_stats(self):
        if self.total_inspected == 0:
            return None
        defect_rate = (self.defects_found / self.total_inspected) * 100
        stats = self.detector.get_performance_stats()
        return {
            'total_inspected': self.total_inspected,
            'defects_found': self.defects_found,
            'defect_rate_percent': defect_rate,
            'avg_processing_time': stats['avg_inference_time'],
            'throughput_per_minute': 60000 / stats['avg_inference_time']
        }

    def export_production_log(self, filename):
        with open(filename, 'w') as f:
            json.dump(self.production_log, f, indent=2)
        print(f"📄 Production log exported to {filename}")

# --------------------------
# Step 4: Run the Inspection
# --------------------------
def run_production_line():
    create_dummy_model()
    inspector = ProductionLineDetector("defect_detection.tflite")

    try:
        inspector.start_inspection()
        time.sleep(20)  # Run for 20 seconds demo
        stats = inspector.get_production_stats()
        print("\n📈 Final Production Statistics:")
        print(f"Total Inspected: {stats['total_inspected']}")
        print(f"Defects Found: {stats['defects_found']}")
        print(f"Defect Rate: {stats['defect_rate_percent']:.2f}%")
        print(f"Avg Time: {stats['avg_processing_time']:.2f} ms")
        print(f"Throughput: {stats['throughput_per_minute']:.0f} items/minute")
        inspector.export_production_log("production_log.json")
    finally:
        inspector.stop_inspection()

# Run the full process
if __name__ == "__main__":
    run_production_line()


🏭 Production line inspection started
📊 Inspected: 10, Defects: 0 (0.0%)
📊 Inspected: 20, Defects: 0 (0.0%)
📊 Inspected: 30, Defects: 0 (0.0%)
📊 Inspected: 40, Defects: 0 (0.0%)
📊 Inspected: 50, Defects: 0 (0.0%)
📊 Inspected: 60, Defects: 0 (0.0%)
📊 Inspected: 70, Defects: 0 (0.0%)
📊 Inspected: 80, Defects: 0 (0.0%)
📊 Inspected: 90, Defects: 0 (0.0%)
📊 Inspected: 100, Defects: 0 (0.0%)
📊 Inspected: 110, Defects: 0 (0.0%)
📊 Inspected: 120, Defects: 0 (0.0%)
📊 Inspected: 130, Defects: 0 (0.0%)
📊 Inspected: 140, Defects: 0 (0.0%)
📊 Inspected: 150, Defects: 0 (0.0%)
📊 Inspected: 160, Defects: 0 (0.0%)
📊 Inspected: 170, Defects: 0 (0.0%)
📊 Inspected: 180, Defects: 0 (0.0%)
📊 Inspected: 190, Defects: 0 (0.0%)
📊 Inspected: 200, Defects: 0 (0.0%)
📊 Inspected: 210, Defects: 0 (0.0%)
📊 Inspected: 220, Defects: 0 (0.0%)
📊 Inspected: 230, Defects: 0 (0.0%)
📊 Inspected: 240, Defects: 0 (0.0%)
📊 Inspected: 250, Defects: 0 (0.0%)
📊 Inspected: 260, Defects: 0 (0.0%)
📊 Inspected: 270, Defects: 0 (0.0%)


In [None]:
# Raspberry Pi Real-time Surface Defect Detection System
# Run on Raspberry Pi with: OpenCV, TensorFlow Lite Runtime, NumPy, Pillow, RPi.GPIO
# Install (on Pi):
# sudo apt update && sudo apt install python3-opencv python3-pip
# pip3 install tflite-runtime numpy pillow

import os
import sys
import time
import json
import cv2
import numpy as np
import tensorflow as tf
from PIL import Image

# Handle GPIO simulation if not on Pi
if "google.colab" in sys.modules:
    class FakeGPIO:
        BCM = BOARD = OUT = IN = HIGH = LOW = None
        def setmode(self, mode): pass
        def setup(self, pin, mode): pass
        def output(self, pin, state): pass
        def cleanup(self): pass
    GPIO = FakeGPIO()
else:
    import RPi.GPIO as GPIO

class RaspberryPiDefectDetector:
    def __init__(self, model_path, led_pin=18, buzzer_pin=19):
        if not os.path.exists(model_path):
            raise FileNotFoundError(f"❌ Model file not found: {model_path}")

        # Load model
        self.interpreter = tf.lite.Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()
        self.classes = ['Scratch', 'Crack', 'Rust', 'Dent', 'Bubble', 'Normal']

        # GPIO setup
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(led_pin, GPIO.OUT)
        GPIO.setup(buzzer_pin, GPIO.OUT)
        self.led_pin = led_pin
        self.buzzer_pin = buzzer_pin

        # Runtime stats
        self.stats = {
            'total_processed': 0,
            'defects_detected': 0,
            'avg_inference_time': 0.0,
            'start_time': time.time()
        }

        self.log_path = "detection_log.json"
        with open(self.log_path, "w") as f:
            json.dump([], f)

    def preprocess_image(self, image):
        image = cv2.resize(image, (224, 224))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = image.astype(np.float32) / 255.0
        return np.expand_dims(image, axis=0)

    def detect_defect(self, image):
        start_time = time.time()

        input_data = self.preprocess_image(image)
        self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
        self.interpreter.invoke()
        predictions = self.interpreter.get_tensor(self.output_details[0]['index'])[0]

        inference_time = (time.time() - start_time) * 1000
        self.stats['total_processed'] += 1
        self.stats['avg_inference_time'] = (
            (self.stats['avg_inference_time'] * (self.stats['total_processed'] - 1) + inference_time)
            / self.stats['total_processed']
        )

        predicted_idx = np.argmax(predictions)
        confidence = predictions[predicted_idx]
        defect_type = self.classes[predicted_idx]
        is_defective = defect_type != 'Normal'
        severity = self.get_severity(defect_type, confidence)

        if is_defective:
            self.stats['defects_detected'] += 1
            self.trigger_alert(severity)

        result = {
            'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"),
            'defect_type': defect_type,
            'confidence': float(confidence),
            'is_defective': is_defective,
            'severity': severity,
            'inference_time_ms': inference_time
        }

        self.log_defect(result)
        return result

    def get_severity(self, defect_type, confidence):
        if defect_type == 'Normal':
            return 'None'
        elif defect_type in ['Crack', 'Rust'] and confidence > 0.8:
            return 'High'
        elif confidence > 0.7:
            return 'Medium'
        else:
            return 'Low'

    def trigger_alert(self, severity):
        GPIO.output(self.led_pin, GPIO.HIGH)
        if severity == 'High':
            for _ in range(3):
                GPIO.output(self.buzzer_pin, GPIO.HIGH)
                time.sleep(0.1)
                GPIO.output(self.buzzer_pin, GPIO.LOW)
                time.sleep(0.1)
        elif severity == 'Medium':
            for _ in range(2):
                GPIO.output(self.buzzer_pin, GPIO.HIGH)
                time.sleep(0.2)
                GPIO.output(self.buzzer_pin, GPIO.LOW)
                time.sleep(0.2)
        else:
            GPIO.output(self.buzzer_pin, GPIO.HIGH)
            time.sleep(0.3)
            GPIO.output(self.buzzer_pin, GPIO.LOW)
        time.sleep(1)
        GPIO.output(self.led_pin, GPIO.LOW)

    def log_defect(self, data):
        with open(self.log_path, "r+") as f:
            log = json.load(f)
            log.append(data)
            f.seek(0)
            json.dump(log, f, indent=2)

    def run_continuous_monitoring(self, camera_id=0):
        cap = cv2.VideoCapture(camera_id)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
        cap.set(cv2.CAP_PROP_FPS, 15)

        print("🔍 Monitoring started. Press Ctrl+C to stop.")

        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    continue

                if self.stats['total_processed'] % 3 == 0:
                    result = self.detect_defect(frame)
                    status = "🚨 DEFECT" if result['is_defective'] else "✅ OK"
                    print(f"{status} | {result['defect_type']} | "
                          f"{result['confidence']:.2f} | {result['inference_time_ms']:.1f} ms")

                    if self.stats['total_processed'] % 50 == 0:
                        self.print_statistics()

                time.sleep(0.1)

        except KeyboardInterrupt:
            print("\n🛑 Monitoring stopped by user.")
        finally:
            cap.release()
            self.shutdown()

    def print_statistics(self):
        uptime = time.time() - self.stats['start_time']
        total = self.stats['total_processed']
        defects = self.stats['defects_detected']
        avg_time = self.stats['avg_inference_time']

        defect_rate = (defects / max(1, total)) * 100
        print(f"\n📊 Performance:")
        print(f"Uptime: {uptime / 60:.1f} min")
        print(f"Processed: {total}")
        print(f"Defects: {defects} ({defect_rate:.1f}%)")
        print(f"Avg Inference Time: {avg_time:.2f} ms")

        if avg_time > 0:
            throughput = 60000 / avg_time
            print(f"Throughput: {throughput:.0f} items/min\n")
        else:
            print(f"Throughput: N/A (no items processed)\n")

    def shutdown(self):
        GPIO.output(self.led_pin, GPIO.LOW)
        GPIO.output(self.buzzer_pin, GPIO.LOW)
        GPIO.cleanup()
        self.print_statistics()
        print("💡 GPIO cleaned and system shut down safely.")

# Run program
def main():
    model_path = 'defect_detection.tflite'
    if not os.path.exists(model_path):
        print(f"❌ ERROR: Model file '{model_path}' not found.")
        return
    detector = RaspberryPiDefectDetector(model_path)
    detector.run_continuous_monitoring()

if __name__ == "__main__":
    main()


🔍 Monitoring started. Press Ctrl+C to stop.
