In [2]:
import cv2
import datetime
import time
import os
from ultralytics import YOLO
import torch

In [3]:
# --- Configuration for screenshots and logs ---
SCREENSHOT_DIR = "screenshots_yolo_generic_detect"
LOG_FILE = "log_yolo_generic_detect.txt"

# --- Configuration class ---
class CFG:
    WEIGHTS = 'yolov8n.pt' # Make sure this is your custom model
    CONFIDENCE = 0.50      # Confidence threshold for detection
    
    # Class IDs for detection and reaction (0, 2, 3, 4, 5)
    # Make sure these IDs correspond to the desired classes in your model!
    # For example, if your model is trained on COCO:
    # 0: 'person'
    # 2: 'car'
    # 3: 'motorcycle'
    # 4: 'airplane' (unlikely in a helmet scenario, but for example)
    # 5: 'bus'
    CLASSES_TO_DETECT = [0] 

In [4]:
def log_event(message): # Renamed from log_alarm for more generality
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] {message}\n"
    print(log_entry.strip()) # Print to console
    log_dir = os.path.dirname(LOG_FILE)
    if log_dir and not os.path.exists(log_dir): # Create log directory if it doesn't exist
        try:
            os.makedirs(log_dir)
        except OSError as e:
            print(f"Failed to create log directory: {e}")
    with open(LOG_FILE, "a", encoding='utf-8') as f:
        f.write(log_entry)

In [5]:
def get_predictions_df_for_videos(run = '', model = model, EXP_NAME = CFG.EXP_NAME, video_info = video_properties, save_df = True):
  df = pd.DataFrame()

  root = f'./runs/detect/predict{run}/labels/'
  pieces = sorted([i for i in os.listdir(root)])

  ### iterate over txt files (there is one for each frame in the video)
  for i, frame_txt in enumerate([i for i in pieces]):

    df = pd.read_csv(root + frame_txt, sep=" ", header=None) # read txt file as dataframe
    df.columns = ["class", "x_center", "y_center", "width", "height", "confidence"] # name columns (detection task)

    ### create column 'frame'
    frame_number = re.findall(r'\d+', frame_txt)[-1] # find frame in each txt filename
    df['frame'] = [int(frame_number) for i in range(len(df))]

    if i == 0:
      df_concat = df
    else:
      df_concat = pd.concat([df_concat, df], axis=0).reset_index(drop=True)

  ### create 4 new columns (coordinates converted into pixels): Calculate bounding box coordinates
  df_concat['x_min'] = (df_concat['x_center'] * video_info['width']) - ((df_concat['width'] * video_info['width'])/2)
  df_concat['x_max'] = (df_concat['x_center'] * video_info['width']) + ((df_concat['width'] * video_info['width'])/2)
  df_concat['y_min'] = (df_concat['y_center'] * video_info['height']) - ((df_concat['height'] * video_info['height'])/2)
  df_concat['y_max'] = (df_concat['y_center'] * video_info['height']) + ((df_concat['height'] * video_info['height']/2))

  ### create 2 new columns: height and width in pixels: will be used to filter out bad predictions (false positives)
  df_concat['width_px'] = (df_concat['width'] * video_info['width']).round(0).astype(int)
  df_concat['height_px'] = (df_concat['height'] * video_info['height']).round(0).astype(int)

  ### sort by frame
  df_concat = df_concat.sort_values(by='frame', ascending=True).reset_index(drop=True)

  ### add column 'class_name' and rearrange order
  df_concat['class_name'] = df_concat['class'].map(model.names)
  other_cols = [col for col in df_concat.columns.to_list() if col not in ['frame', 'class_name', 'class', 'confidence' ]]
  new_order = ['frame', 'class_name', 'class', 'confidence'] + other_cols
  df_concat = df_concat[new_order]

  ### export detections df
  if save_df:
    df_concat.to_csv(CFG.OUTPUT_DIR + f'detections_{CFG.EXP_NAME}_c{CFG.CONFIDENCE_INT}.csv', index=False)

  return df_concat

NameError: name 'model' is not defined

In [6]:
def get_nvr_capture_and_channel():
    # --- NVR Configuration ---
    nvr_ip_address = "147.121.119.88"
    rtsp_port = "554"
    username = "lgmpublic" 
    password = "Avery@1234"  
    channel_id_val = 12 # Channel to access
    stream_type_id = "01" # 01 for main stream, 02 for sub stream

    rtsp_channel_identifier = str(channel_id_val) + stream_type_id
    rtsp_url = f"rtsp://{username}:{password}@{nvr_ip_address}:{rtsp_port}/Streaming/Channels/{rtsp_channel_identifier}"
    log_event(f"Attempting to connect to RTSP stream: {rtsp_url.replace(password, '********')}")
    
    cap = cv2.VideoCapture(rtsp_url)

    if not cap.isOpened():
        log_event(f"Error: Failed to open video stream {rtsp_url.replace(password, '********')}")
        return None, None # Return None in case of error

    log_event(f"Video stream for channel {channel_id_val} connected successfully! Press 'q' to exit.")
    return cap, channel_id_val # Return capture object and channel ID

In [7]:
def setup_model_and_logging():
    if not os.path.exists(SCREENSHOT_DIR): # Create screenshot directory if it doesn't exist
        os.makedirs(SCREENSHOT_DIR)
    
    try:
        model = YOLO(CFG.WEIGHTS)
        log_event(f"YOLO model loaded: {CFG.WEIGHTS}")
        
        # Determining device (GPU or CPU)
        if torch.cuda.is_available():
            target_device_str = 'cuda'
            log_event(f"CUDA available. Model will use GPU ({target_device_str}).")
        else:
            target_device_str = 'cpu'
            log_event("CUDA unavailable. Model will use CPU.")
        
        # Checking model class names
        if hasattr(model, 'names') and model.names:
            log_event(f"Recognizable classes in model ({CFG.WEIGHTS}): {model.names}")
            # Check if all classes from CLASSES_TO_DETECT are in the model
            for class_id_to_check in CFG.CLASSES_TO_DETECT:
                if class_id_to_check not in model.names:
                    log_event(f"WARNING: Class with ID {class_id_to_check} is missing from the model {model.names}. It will be ignored.")
        else:
            log_event(f"WARNING: Failed to get class names from model {CFG.WEIGHTS}. Detection will be based on ID only.")
            
        return model, target_device_str # Return model and device string
        
    except Exception as e:
        log_event(f"Error: Failed to load YOLO model '{CFG.WEIGHTS}'. Info: {e}")
        return None, None

In [None]:
#global variant
detected_target_classes_info = []
def work_1frame(model,class_names_map, frame, frame_count, process_every_n_frames,channel_id_display, device_str,event_log_cooldown):
        
    frame_count[0] += 1
    display_frame = frame.copy() # Copy frame for display and drawing

    if frame_count % process_every_n_frames == 0:
        log_event(f"Processing frame #{frame_count[0]} for channel {channel_id_display}...")
        current_frame_annotations = [] # Annotations for the current frame
         # Collect information about detected target classes

        # 1. YOLOv8 model prediction
        results = model.predict(
            source=frame.copy(), # Pass a copy so the original frame remains clean for screenshot
            # save=True, # YOLO will save its images if needed - uncomment
            classes=CFG.CLASSES_TO_DETECT, # Filter only necessary classes
            conf=CFG.CONFIDENCE,
            # save_txt=True, # Save annotations to txt if needed - uncomment
            # save_conf=True,
            show=False, # Set to False, use cv2.imshow for display
            device=device_str, # Use determined device
            verbose=False # Less console output from YOLO
        )
        
        if results and results[0] and hasattr(results[0], 'boxes') and len(results[0].boxes) > 0:
            for i in range(len(results[0].boxes)):
                cls_id = int(results[0].boxes.cls[i])
                # Since 'classes' arg filters already, all results will be from this list
                conf_score = float(results[0].boxes.conf[i])
                bbox_xyxy = results[0].boxes.xyxy[i].cpu().numpy().astype(int)
                class_name = class_names_map.get(cls_id, f"ID:{cls_id}") # Get class name
                timestamp_file = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
                info_temp = [channel_id_display,class_name, round(conf_score,2), bbox_xyxy,timestamp_file]

                detected_target_classes_info.append(info_temp)
                
                # Prepare information for drawing
                px1, py1, px2, py2 = bbox_xyxy
                text_to_draw = f"{class_name} ({conf_score:.2f})"
                current_frame_annotations.append(
                    (text_to_draw, (px1, py1 - 10), 
                        (px1, py1, px2, py2), 
                        (0, 0,250), 2) # read color for all target objects
                )
        
        # If target objects were detected in the current processed frame
        if detected_target_classes_info:
            current_event_time = time.time()
            if current_event_time - last_event_log_time[0] > event_log_cooldown:
                #timestamp_file = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
                screenshot_filename = os.path.join(SCREENSHOT_DIR, f"{channel_id_display}_{timestamp_file}.jpg")
                
                # To have the screenshot with current detection boxes, draw them on a temp frame before saving
                temp_frame_for_screenshot = frame.copy()
                for text, text_pos, rect_coords, color, thickness in current_frame_annotations:
                        cv2.rectangle(temp_frame_for_screenshot, (rect_coords[0], rect_coords[1]), (rect_coords[2], rect_coords[3]), color, thickness)
                        cv2.putText(temp_frame_for_screenshot, text, text_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, thickness)
                cv2.imwrite(screenshot_filename, temp_frame_for_screenshot)# save pic
                
                event_summary = "; ".join(detected_target_classes_info)
                log_message = f"EVENT (Channel {channel_id_display}): Detected objects - {event_summary}. Screenshot saved: {screenshot_filename}"
                log_event(log_message)
                last_event_log_time[0] = current_event_time #change in place
                
                
        
        last_drawn_annotations = current_frame_annotations # Update annotations for display
    
    # Draw saved annotations on the display frame
    for text, text_pos, rect_coords, color, thickness in last_drawn_annotations:
        cv2.rectangle(display_frame, (rect_coords[0], rect_coords[1]), (rect_coords[2], rect_coords[3]), color, thickness)
        cv2.putText(display_frame, text, text_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, thickness)
    
    # Display the frame
    cv2.imshow(f'Object Detection - NVR Channel {channel_id_display}', display_frame)

 



In [None]:
# test main

In [43]:
cap, channel_id_display = get_nvr_capture_and_channel() # Get capture object and channel ID

[2025-05-27 16:12:41] Attempting to connect to RTSP stream: rtsp://lgmpublic:********@147.121.119.88:554/Streaming/Channels/1201
[2025-05-27 16:12:42] Video stream for channel 12 connected successfully! Press 'q' to exit.


In [10]:
model, device_str = setup_model_and_logging() 

[2025-05-27 15:56:09] YOLO model loaded: yolov8n.pt
[2025-05-27 15:56:09] CUDA unavailable. Model will use CPU.
[2025-05-27 15:56:09] Recognizable classes in model (yolov8n.pt): {0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chai

In [11]:
class_names_map = model.names if hasattr(model, 'names') and model.names else {}

In [12]:
last_event_log_time = 0 
event_log_cooldown = 10 # Seconds, cooldown for logging and screenshotting upon detection in a frame

frame_count = [0]
process_every_n_frames = 15 # Process every N-th frame (configurable)

In [13]:
last_drawn_annotations = [] 


In [18]:
ret, frame = cap.read()

In [36]:
CFG.CLASSES_TO_DETECT = [0, 1, 2, 3, 4, 5, 6,7, 8]

In [49]:
results = model.predict(
    source=frame.copy(), # Pass a copy so the original frame remains clean for screenshot
    # save=True, # YOLO will save its images if needed - uncomment
    classes=CFG.CLASSES_TO_DETECT, # Filter only necessary classes
    conf=CFG.CONFIDENCE,
    # save_txt=True, # Save annotations to txt if needed - uncomment
    # save_conf=True,
    show=False, # Set to False, use cv2.imshow for display
    device=device_str, # Use determined device
    verbose=False # Less console output from YOLO
)

In [90]:
detected_target_classes_info =[]
current_frame_annotations = []
if results and results[0] and hasattr(results[0], 'boxes') and len(results[0].boxes) > 0:
    for i in range(len(results[0].boxes)):
        cls_id = int(results[0].boxes.cls[i])
        # Since 'classes' arg filters already, all results will be from this list
        conf_score = float(results[0].boxes.conf[i])
        bbox_xyxy = results[0].boxes.xyxy[i].cpu().numpy().astype(int)
        class_name = class_names_map.get(cls_id, f"ID:{cls_id}") # Get class name
        timestamp_file = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
        info_temp = [class_name, round(conf_score,2), bbox_xyxy,timestamp_file]

        detected_target_classes_info.append(info_temp)
        
        # Prepare information for drawing
        px1, py1, px2, py2 = bbox_xyxy
        text_to_draw = f"{class_name} ({conf_score:.2f})"
        current_frame_annotations.append(
            (text_to_draw, (px1, py1 - 10), 
                (px1, py1, px2, py2), 
                (0, 0,250), 2) # read color for all target objects
        )
        

'20250527_163516_078781'

In [71]:
pd.DataFrame(detected_target_classes_info)

Unnamed: 0,0,1,2,3
0,person,0.67,"[898, 341, 949, 579]",20250527_163143_475499
1,person,0.6,"[836, 367, 897, 605]",20250527_163143_475692


In [58]:
detected_target_classes_info

['person (conf: 0.67) at [898 341 949 579]',
 'person (conf: 0.60) at [836 367 897 605]']

In [89]:
current_frame_annotations


[('person (0.67)',
  (np.int64(898), np.int64(331)),
  (np.int64(898), np.int64(341), np.int64(949), np.int64(579)),
  (250, 0, 0),
  2),
 ('person (0.60)',
  (np.int64(836), np.int64(357)),
  (np.int64(836), np.int64(367), np.int64(897), np.int64(605)),
  (250, 0, 0),
  2)]

In [91]:
temp_frame_for_screenshot = frame.copy()
for text, text_pos, rect_coords, color, thickness in current_frame_annotations:
    cv2.rectangle(temp_frame_for_screenshot, (rect_coords[0], rect_coords[1]), (rect_coords[2], rect_coords[3]), color, thickness)
    cv2.putText(temp_frame_for_screenshot, text, text_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, thickness)
cv2.imwrite('tt2.jpg', temp_frame_for_screenshot)

True

'20250527_162810_601136'

In [None]:
if detected_target_classes_info:
    current_event_time = time.time()
    if current_event_time - last_event_log_time > event_log_cooldown:
        timestamp_file = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
        screenshot_filename = os.path.join(SCREENSHOT_DIR, f"detected_objects_ch{channel_id_display}_{timestamp_file}.jpg")
        
        # To have the screenshot with current detection boxes, draw them on a temp frame before saving
        temp_frame_for_screenshot = frame.copy()
        for text, text_pos, rect_coords, color, thickness in current_frame_annotations:
                cv2.rectangle(temp_frame_for_screenshot, (rect_coords[0], rect_coords[1]), (rect_coords[2], rect_coords[3]), color, thickness)
                cv2.putText(temp_frame_for_screenshot, text, text_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, thickness)
        cv2.imwrite(screenshot_filename, temp_frame_for_screenshot)
        
        event_summary = "; ".join(detected_target_classes_info)
        log_message = f"EVENT (Channel {channel_id_display}): Detected objects - {event_summary}. Screenshot saved: {screenshot_filename}"
        log_event(log_message)
        
        last_event_log_time = current_event_time

In [54]:
results[0].boxes.cls[0]

tensor(0.)

In [46]:
while True:
    ret, frame = cap.read()
    cv2.imshow('1',frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
            break

cap.release()

In [47]:
#ret, frame = cap.read()
cv2.imwrite('test.jpg', frame)

True

In [45]:
ret, frame2 = cap.read()
cv2.imwrite('test2.jpg', frame2)

True

In [48]:
cv2.imshow('1',frame)

In [41]:
cv2.imshow(f'Object Detection - NVR Channel {channel_id_display}', frame)

error: OpenCV(4.11.0) D:\a\opencv-python\opencv-python\opencv\modules\highgui\src\window.cpp:973: error: (-215:Assertion failed) size.width>0 && size.height>0 in function 'cv::imshow'


In [None]:

# --- Main program ---
def main():
    cap, channel_id_display = get_nvr_capture_and_channel() # Get capture object and channel ID
    if not cap: # If failed to connect to NVR
        return

    model, device_str = setup_model_and_logging() # Get model and device string
    if not model: # If failed to load model
        cap.release() # Release NVR resource
        return
    
    class_names_map = model.names if hasattr(model, 'names') and model.names else {}

    last_event_log_time = 0 
    event_log_cooldown = 10 # Seconds, cooldown for logging and screenshotting upon detection in a frame

    frame_count = [0]
    process_every_n_frames = 15 # Process every N-th frame (configurable)
    
    # To store annotation info from the previously processed frame
    # (so that boxes remain visible between processing)
    last_drawn_annotations = [] 

    log_event(f"Starting video stream processing for channel {channel_id_display}...")
    log_event(f"Model will detect classes with ID: {CFG.CLASSES_TO_DETECT}")
    log_event(f"Confidence threshold for detection: {CFG.CONFIDENCE}")

    while True:
        ret, frame = cap.read()
        if not ret:
            log_event("Error: Failed to read frame.")
            break
        
        work_1frame(model=model,
                    class_names_map=class_names_map,
                    frame=frame, 
                    frame_count=frame_count, 
                    process_every_n_frames=process_every_n_frames,
                    channel_id_display=channel_id_display, 
                    device_str=device_str,
                    event_log_cooldown=event_log_cooldown,
                    last_event_log_time=last_event_log_time,
                    last_drawn_annotations=last_drawn_annotations
                    )

        if cv2.waitKey(1) & 0xFF == ord('q'):
            log_event("'q' key pressed, exiting...")
            break

    if cap: # Check if cap was successfully initialized
        cap.release()
    cv2.destroyAllWindows()
    log_event("Program stopped.")



In [None]:
main()