FireSight Guardian App

FSG_App v1.7

- Flame location tracking
- No redundent detecton frames 
- 30s delay before new capture if the fire in same location 
- Preview window live vid crash fixed
- Change detection confidance 40< %
- Double detection, if fire in one location and another fire starts while in delay, Ignores 30s condition
- Combine Overlapping frames, Avoid Multiple saves for same detection
- Start stop button fixed, preview windows closes well
- Kernal crashes fixed with Thread Joining
- App GUI created
- Detection and App Launch/Close responsiveness fixed
- Pushover Notification ALERT added
- Notification Toggle switch added
- Last saved images preview in scrolable window 
- Fixed the App flickering


In [None]:
# Import Libs
from tkinter import Tk, Label, Entry, Button, Canvas, Text, NW, Frame, Scrollbar, VERTICAL, RIGHT, Y, LEFT, BOTH
from tkinter import messagebox
import tkinter as tk
from PIL import Image, ImageTk, ImageFilter
import os
import cv2
import threading
import time
import pathlib
import torch
import requests  # for notification
from concurrent.futures import ThreadPoolExecutor  # for asynchronous execution

# Temporary change for pathlib to work with Windows paths
temp = pathlib.PosixPath
pathlib.PosixPath = pathlib.WindowsPath


# Pushover configuration (keys required for sending notifications)
pushover_user_key = '-----'
pushover_api_token = '-----'


# Load the trained FD_model.pt model from the local machine
repo_or_dir = "D:/ML/VEn_torch_cu118/Object_Detection/yolov5"  # Path to the Yolov5 GitHub repo cloned on local machine
model = 'custom'
path = "D:/ML/VEn_torch_cu118/Object_Detection/Yolov5_Projects/FireSight_Guardian_ML_FireDetection_System/FD_model.pt"  # FD_model.pt trained model path
train_5 = torch.hub.load(repo_or_dir, model, source='local', path=path, force_reload=True)
bg_image_path = "D:/ML/VEn_torch_cu118/Object_Detection/Yolov5_Projects/FireSight_Guardian_ML_FireDetection_System/bg/flame-1363003_1920.jpg"  # GUI bg image path
# 'snapsave_dir' is the path to save captured detection frames

class VideoStreamApp:
    def __init__(self, video_url, snapsave_dir='D:/ML/VEn_torch_cu118/Object_Detection/Yolov5_Projects/FireSight_Guardian_ML_FireDetection_System/imgs'):
        self.video_url = video_url            # URL of the video stream
        self.snapsave_dir = snapsave_dir      # Directory to save snapshots
        self.cap = None                       # Video capture object
        self.frame_buffer = []                # Buffer to store video frames
        self.buffer_lock = threading.Lock()   # Lock to synchronize access to the frame buffer
        self.last_saved_time = time.time()    # Timestamp of the last saved detection frame
        self.last_detection = None            # Last detected fire location
        self.detection_interval = 30          # Interval between detections in seconds
        self.running = False                  # Flag to indicate if the video stream is running
        self.read_thread = None               # Thread for reading frames
        self.process_thread = None            # Thread for processing frames
        self.frame = None                     # Current frame being processed
        self.last_saved_image_path = None     # Path of the last saved image
        self.pushover_enabled = False         # Toggle for Pushover notifications
        os.makedirs(snapsave_dir, exist_ok=True)           # Create directory if it doesn't exist
        self.executor = ThreadPoolExecutor(max_workers=2)  # Thread pool for sending notifications

    def start_video_stream(self):
        # Open the video stream using OpenCV
        self.cap = cv2.VideoCapture(self.video_url)
        if not self.cap.isOpened():
            print(f"Error: Unable to open video stream from URL {self.video_url}")
            return False
        print("Video stream opened successfully!")
        self.running = True
        # Start threads for reading and processing frames
        self.read_thread = threading.Thread(target=self.read_frames, daemon=True)
        self.read_thread.start()
        self.process_thread = threading.Thread(target=self.process_frames, daemon=True)
        self.process_thread.start()
        return True

    # PushOver Alert part
    def send_pushover_notification(self, message, image_path=None):
        
        def send_notification():  # Function to send a Pushover notification
            data = {
                'token': pushover_api_token,
                'user': pushover_user_key,
                'message': message,
            }
            try:
                if image_path:
                    files = {'attachment': open(image_path, 'rb')}
                    response = requests.post('https://api.pushover.net/1/messages.json', data=data, files=files)
                else:
                    response = requests.post('https://api.pushover.net/1/messages.json', data=data)
                print(f'Notification sent: {response.status_code}')
            except Exception as e:
                print(f'Failed to send notification: {e}')
        
        # Submit the notification task to the thread pool executor
        self.executor.submit(send_notification)
    
    def stop_video_stream(self):    # Stop the video stream and release resources
        self.running = False
        if self.read_thread is not None:
            self.read_thread.join()
        if self.process_thread is not None:
            self.process_thread.join()
        if self.cap:
            self.cap.release()
        cv2.destroyAllWindows()
        print("Video stream closed.")

    def read_frames(self):   # Read frames from the video stream in a loop
        try:
            while self.running:
                ret, frame = self.cap.read()   # Read a frame from the video stream
                if not ret:
                    print(f"Error: Unable to read frame from URL {self.video_url}")
                    break
                with self.buffer_lock:   # Lock the buffer before modifying it
                    if len(self.frame_buffer) < 2:
                        self.frame_buffer.append(frame)   # Add the frame to the buffer
                time.sleep(0.02)  # Small sleep to reduce CPU usage
        except Exception as e:
            print(f"Exception in read_frames: {e}")
            self.running = False

    # Perform inference on the frame using the trained model
    def perform_inference(self, frame):
        results = train_5(frame)
        results.render()
        return results

    # Check if the current detection is a new fire detection
    def is_new_fire_detection(self, detection):
        if self.last_detection is None:
            return True
            
        # Check the spatial overlap and time interval
        x1, y1, x2, y2 = detection[:4]
        lx1, ly1, lx2, ly2, ltime = self.last_detection
        
        # Simple overlap check to see if the detection is within the bounds of the last detection
        if (time.time() - ltime) > self.detection_interval:
            return True
        # Simple overlap check
        if (x1 < lx2 and x2 > lx1 and y1 < ly2 and y2 > ly1):
            return False
        return True

    def process_frames(self):
        try:
            while self.running:
                with self.buffer_lock:
                    if len(self.frame_buffer) > 0:
                        frame = self.frame_buffer.pop(0)            # Get a frame from the buffer
                    else:
                        time.sleep(0.01)
                        continue

                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert the frame to RGB
                results = self.perform_inference(frame_rgb)         # Perform inference on the frame
                detections = results.xyxy[0]                        # Get the detections from the results
                high_conf_detections = [d for d in detections if d[4] > 0.40]   # Filter detections by confidence threshold

                # Initialize frame_with_results to the current frame for default display
                frame_with_results = frame_rgb.copy()

                new_detection_saved = False
                for detection in high_conf_detections:
                    if not new_detection_saved and self.is_new_fire_detection(detection):
                        frame_with_results = results.ims[0]
                        current_time = time.time()
                        timestamp = int(current_time)
                        save_path = os.path.join(self.snapsave_dir, f'image_{timestamp}.jpg')
                        cv2.imwrite(save_path, cv2.cvtColor(frame_with_results, cv2.COLOR_RGB2BGR))   # Save the frame with detection
                        print(f"New fire detected! Frame saved to {save_path}")

                        # Send Pushover notification if enabled
                        if self.pushover_enabled:
                            self.send_pushover_notification('High confidence detection made!', save_path)
                            print('Alert notification sent!')
                        
                        self.last_saved_time = current_time  
                        self.last_detection = (*detection[:4], current_time)   # Update the last detection details
                        new_detection_saved = True   # Ensure only one image is saved per cycle
                        self.last_saved_image_path = save_path

                self.frame = frame_with_results   # Update the frame to be displayed
                time.sleep(0.05)    # Small delay to control the processing speed
        except Exception as e:
            print(f"Exception in process_frames: {e}")
            self.running = False
        finally:
            cv2.destroyAllWindows()


# Configure GUI
class AppGUI:
    def __init__(self, root, bg_image_path=None):
        self.root = root
        self.root.title("FireSight Guardian")

        # Initialize video_stream_app to None
        self.video_stream_app = None

        # Configure the grid layout
        self.root.columnconfigure(0, weight=1)
        self.root.columnconfigure(1, weight=1)
        self.root.columnconfigure(2, weight=1)
        self.root.rowconfigure(0, weight=0)
        self.root.rowconfigure(1, weight=0)
        self.root.rowconfigure(2, weight=1)
        self.root.rowconfigure(3, weight=0)

        # Set background image if provided
        if bg_image_path and os.path.exists(bg_image_path):
            self.bg_image = Image.open(bg_image_path)
            self.bg_image = self.bg_image.resize((self.root.winfo_screenwidth(), self.root.winfo_screenheight()), Image.LANCZOS)
            self.bg_image = self.bg_image.filter(ImageFilter.GaussianBlur(radius=5))
            self.bg_photo = ImageTk.PhotoImage(self.bg_image)
            self.bg_canvas = Canvas(root, width=self.root.winfo_screenwidth(), height=self.root.winfo_screenheight())
            self.bg_canvas.grid(row=0, column=0, rowspan=4, columnspan=3)
            self.bg_canvas.create_image(0, 0, anchor=NW, image=self.bg_photo)
        else:
            self.bg_canvas = Canvas(root, width=800, height=600)
            self.bg_canvas.grid(row=0, column=0, rowspan=4, columnspan=3)
            self.bg_canvas.create_rectangle(0, 0, 800, 600, fill='lightblue', stipple='gray12')

        # Adding a label
        Label(root, text="CAM Stream URL:", font=('Arial', 14), bg='#2a9df4').grid(row=0, column=0, padx=10, pady=10, sticky='e')
        self.url_entry = Entry(root, width=50, font=('Arial', 14))
        self.url_entry.grid(row=0, column=1, padx=10, pady=10, sticky='w')
        self.url_entry.insert(0, "http://192.168.85.6/") # http://192.168.158.70/   # 192.168.158.6

        # Add start button
        self.start_button = Button(root, text="Start Stream", font=('Arial', 14), command=self.start_stream)
        self.start_button.grid(row=1, column=0, padx=10, pady=10, sticky='e')

        # Add stop button
        self.stop_button = Button(root, text="Stop Stream", font=('Arial', 14), command=self.stop_stream)
        self.stop_button.grid(row=1, column=1, padx=10, pady=10, sticky='w')

        # Notification toggle switch ON/OFF
        self.toggle_button = Button(root, text="Enable Notifications", font=('Arial', 14), command=self.toggle_notifications)
        self.toggle_button.grid(row=1, column=2, padx=10, pady=10, sticky='w')

        # Preview window size for live video feed
        self.preview_width = 580
        self.preview_height = 360

        # Create a canvas to display the live video feed
        self.canvas = Canvas(root, width=self.preview_width, height=self.preview_height)
        self.canvas.grid(row=2, column=0, columnspan=2, padx=10, pady=10, sticky='n')
        self.canvas.create_rectangle(0, 0, self.preview_width, self.preview_height, stipple='gray12')
        self.no_signal_text = self.canvas.create_text(self.preview_width // 2, self.preview_height // 2, text="No signal", fill="white", font=('Arial', 24))

        # Frame to hold snapshots
        self.snapshot_frame = Frame(root, width=180, height=380)
        self.snapshot_frame.grid(row=2, column=2, padx=10, pady=10, sticky='n')

        # Create a canvas within the frame to display snapshots of detected events
        self.snapshot_canvas = Canvas(self.snapshot_frame, width=160, height=360, bg='black')
        self.snapshot_canvas.pack(side=LEFT, fill=BOTH, expand=True)

        # Add a scrollbar to navigate through snapshots
        self.scrollbar = Scrollbar(self.snapshot_frame, orient=VERTICAL, command=self.snapshot_canvas.yview)
        self.scrollbar.pack(side=RIGHT, fill=Y)

        # Configure the canvas to work with the scrollbar
        self.snapshot_canvas.configure(yscrollcommand=self.scrollbar.set)
        self.snapshot_canvas.bind('<Configure>', lambda e: self.snapshot_canvas.configure(scrollregion=self.snapshot_canvas.bbox('all')))

        # Create a frame inside the canvas to hold the individual snapshot images
        self.image_frame = Frame(self.snapshot_canvas, width=160, height=360)
        self.snapshot_canvas.create_window((0, 0), window=self.image_frame, anchor=NW)

        # Create a text widget to display terminal-like output for logs and status updates
        self.terminal_text = Text(root, width=80, height=10, bg='black', fg='green', font=('Consolas', 12))
        self.terminal_text.grid(row=3, column=0, columnspan=3, padx=10, pady=10, sticky='n')
        self.terminal_text.insert(tk.END, "Terminal Output:\n")

        # Start the video feed and update the canvas with new frames
        self.update_video()

    def start_stream(self):
        # Configure start and stop button colors
        self.start_button.configure(bg='green')
        self.stop_button.configure(bg='white')

        # Get video URL from the input field
        video_url = self.url_entry.get()
        self.video_stream_app = VideoStreamApp(video_url)

        # Attempt to start the video stream, show an error if it fails
        if not self.video_stream_app.start_video_stream():
            messagebox.showerror("Error", "Unable to open video stream. Check the URL.")

        # Update the terminal output to indicate the stream has started
        self.terminal_text.insert(tk.END, "Started video stream...\n")

    def stop_stream(self):
        self.start_button.configure(bg='white')
        self.stop_button.configure(bg='red')

        # Stop the video stream if it is running and clean up resources
        if self.video_stream_app:
            self.video_stream_app.stop_video_stream()
            self.video_stream_app = None

        # Update the terminal output
        self.terminal_text.insert(tk.END, "Stopped video stream...\n")

    def toggle_notifications(self):
        if self.video_stream_app:
            self.video_stream_app.pushover_enabled = not self.video_stream_app.pushover_enabled
            if self.video_stream_app.pushover_enabled:
                self.toggle_button.config(text="Disable Notifications", bg='red')
                self.terminal_text.insert(tk.END, "Pushover notifications enabled.\n")
            else:
                self.toggle_button.config(text="Enable Notifications", bg='white')
                self.terminal_text.insert(tk.END, "Pushover notifications disabled.\n")

    def update_video(self):
        # Update the canvas with the latest video frame, if available
        if self.video_stream_app and self.video_stream_app.frame is not None:
            self.canvas.delete("all")
            frame_rgb = self.video_stream_app.frame
            frame_pil = Image.fromarray(frame_rgb)
            frame_pil = frame_pil.resize((self.preview_width, self.preview_height), Image.LANCZOS)
            frame_tk = ImageTk.PhotoImage(frame_pil)
            self.canvas.create_image(0, 0, anchor=NW, image=frame_tk)
            self.canvas.image = frame_tk
        else:
            # If no video frame is available, display "No signal"
            self.canvas.delete("all")
            self.canvas.create_rectangle(0, 0, self.preview_width, self.preview_height, fill='black', stipple='gray12')
            self.canvas.create_text(self.preview_width // 2, self.preview_height // 2, text="No signal", fill="white", font=('Arial', 24))

        # Update the snapshot preview if there is a new snapshot
        if self.video_stream_app:
            self.update_snapshots()

        # Schedule the next frame update after 30 milliseconds
        self.root.after(30, self.update_video)

    def update_snapshots(self):
        # Check if there is a last saved image path
        if self.video_stream_app and self.video_stream_app.last_saved_image_path:
            for widget in self.image_frame.winfo_children():
                widget.destroy()

            # Load and resize the latest snapshot image
            snapshot_image = Image.open(self.video_stream_app.last_saved_image_path)
            snapshot_image = snapshot_image.resize((160, 120), Image.LANCZOS)
            snapshot_tk = ImageTk.PhotoImage(snapshot_image)

            # Display the snapshot image in the frame
            label = Label(self.image_frame, image=snapshot_tk)
            label.image = snapshot_tk
            label.pack()

    def clean_up(self):
        # Stop the video stream and close the application window when exiting
        if self.video_stream_app:
            self.video_stream_app.stop_video_stream()
        self.root.destroy()

# Main entry point for the application
if __name__ == "__main__":
    root = Tk()
    app = AppGUI(root, bg_image_path)
    root.protocol("WM_DELETE_WINDOW", app.clean_up)
    root.mainloop()


YOLOv5  v7.0-297-gd07d0cf6 Python-3.9.13 torch-2.2.2+cu118 CUDA:0 (NVIDIA GeForce RTX 3060 Laptop GPU, 6144MiB)

Fusing layers... 
Model summary: 206 layers, 12312052 parameters, 0 gradients, 16.1 GFLOPs
Adding AutoShape... 


Video stream opened successfully!
New fire detected! Frame saved to D:/ML/VEn_torch_cu118/Object_Detection/Yolov5_Projects/FireSight_Guardian_ML_FireDetection_System/imgs\image_1734489695.jpg
New fire detected! Frame saved to D:/ML/VEn_torch_cu118/Object_Detection/Yolov5_Projects/FireSight_Guardian_ML_FireDetection_System/imgs\image_1734489706.jpg
New fire detected! Frame saved to D:/ML/VEn_torch_cu118/Object_Detection/Yolov5_Projects/FireSight_Guardian_ML_FireDetection_System/imgs\image_1734489707.jpg
New fire detected! Frame saved to D:/ML/VEn_torch_cu118/Object_Detection/Yolov5_Projects/FireSight_Guardian_ML_FireDetection_System/imgs\image_1734489708.jpg
New fire detected! Frame saved to D:/ML/VEn_torch_cu118/Object_Detection/Yolov5_Projects/FireSight_Guardian_ML_FireDetection_System/imgs\image_1734489710.jpg
New fire detected! Frame saved to D:/ML/VEn_torch_cu118/Object_Detection/Yolov5_Projects/FireSight_Guardian_ML_FireDetection_System/imgs\image_1734489712.jpg
New fire detected!

****

IP check ESP32-CAM

    - Ensure CAM module is connected to the same local network as your PC
    - Run code below to find the IP address of camera
    - Replace IP address in above code with the correct IP found
  
    --> self.url_entry.insert(0, "http://192.168.251.70/")

In [1]:
import os
import platform
import socket
import struct
import threading
import time

def get_ip():
    hostname = socket.gethostname()
    ip_address = socket.gethostbyname(hostname)
    return ip_address

def ping_ip(ip):
    param = '-n' if platform.system().lower() == 'windows' else '-c'
    command = ['ping', param, '1', ip]
    response = os.popen(' '.join(command)).read()
    if "TTL" in response:
        print(f"Device found: {ip}")

def scan_network():
    ip_parts = get_ip().split('.')
    base_ip = '.'.join(ip_parts[:3]) + '.'

    threads = []
    for i in range(1, 255):
        ip = base_ip + str(i)
        thread = threading.Thread(target=ping_ip, args=(ip,))
        threads.append(thread)
        thread.start()
        time.sleep(0.01)  # Small delay to prevent overwhelming the network

    for thread in threads:
        thread.join()

if __name__ == "__main__":
    scan_network()


Device found: 192.168.85.6
Device found: 192.168.85.57
Device found: 192.168.85.139
