In [None]:
import signal
import sys
from ultralytics import YOLO
import cv2
import numpy as np
from sort.sort import *
from util import get_car, read_license_plate, write_csv
import tensorflow as tf
import torch

# Define a global variable to store results
results = {}
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = YOLO("D:/local-intelligent-cameras/backend/stream/best.pt")
coco_model = YOLO("/home/annone/ai-camera/backend/apis/anpr/yolov8n.pt")
license_plate_detector = YOLO("/home/annone/ai-camera/backend/apis/anpr/indian_license_plate_detector_0.2.pt")
coco_model.to(device)
license_plate_detector.to(device)
# print(device)

def save_results_and_exit(signum, frame):
    """ Save results to a CSV file and exit """
    print("Ctrl+C pressed. Saving results to CSV and exiting...")
    write_csv(results, "test.csv")
    sys.exit(0)

# Register the signal handler for Ctrl+C
signal.signal(signal.SIGINT, save_results_and_exit)

def process_video(coco_model, license_plate_detector, video_path, camera_id):

    global results
    mot_tracker = Sort()

    # coco_model = coco_model
    # license_plate_detector = license_plate_detector

    # cap = cv2.VideoCapture(video_path)
    vehicles = [2, 3, 5, 7]  # Vehicle classes (car, motorcycle, bus, truck)
    frame_nmr = -1
    ret = True
    with tf.device("gpu:0"):
        frame_nmr += 1
        frame = cv2.imread(video_path)
        if ret:
            results[frame_nmr] = {}
            print(device)
            detections = coco_model(frame,device=device)[0]
            detections_ = []
            for detection in detections.boxes.data.tolist():
                x1, y1, x2, y2, score, class_id = detection
                if int(class_id) in vehicles:
                    detections_.append([x1, y1, x2, y2, score, int(class_id)])
                        # Uncomment these lines if you want to visualize detected vehicles
                        # color = (0, 255, 0)
                        # label = coco_model.names[int(class_id)]
                        # label_text = f"{label}: {score:.2f}"
                        # cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
                        # cv2.putText(frame, label_text, (int(x1), int(y1) - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            if detections_:
                track_ids = mot_tracker.update(np.asarray(detections_))
            else:
                track_ids = mot_tracker.update(np.empty((0, 5)))

            license_plates = license_plate_detector(frame,device=device)[0]
            for license_plate in license_plates.boxes.data.tolist():
                x1, y1, x2, y2, score, class_id = license_plate

                xcar1, ycar1, xcar2, ycar2, car_id = get_car(license_plate, track_ids)

                if car_id != -1:
                    cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (250, 0, 0), 2)
                    license_plate_crop = frame[int(y1):int(y2), int(x1):int(x2), :]

                    license_plate_crop_gray = cv2.cvtColor(license_plate_crop, cv2.COLOR_BGR2GRAY)
                    _, license_plate_crop_thresh = cv2.threshold(
                            license_plate_crop_gray, 64, 255, cv2.THRESH_BINARY_INV
                        )

                    license_plate_text, license_plate_text_score = read_license_plate(license_plate_crop_thresh,video_path,camera_id,frame_nmr)

                    if license_plate_text is not None:
                        print(f'Recognized License Plate: {license_plate_text}, Confidence Score: {license_plate_text_score}')
                        results[frame_nmr][car_id] = {
                                "car": {"bbox": [xcar1, ycar1, xcar2, ycar2]},
                                "license_plate": {
                                    "bbox": [x1, y1, x2, y2],
                                    "text": license_plate_text,
                                    "bbox_score": score,
                                    "text_score": license_plate_text_score,
                            },
                        }

                # cv2.imshow('Frame', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                return

    # cap.release()
    cv2.destroyAllWindows()

    write_csv(results, "/home/annone/ai-camera/backend/apis/anpr/test.csv")

# yolo_coco_model_path = "D:/local-intelligent-cameras/backend/apis/anpr/yolov8n.pt"
# yolo_license_plate_model_path = "D:/local-intelligent-cameras/backend/apis/anpr/indian_license_plate_detector_0.2.pt"
video_path = "/home/annone/ai-camera/backend/apis/anpr/test.jpg"
camera_id = "1"
process_video(coco_model, license_plate_detector, video_path, camera_id)

In [36]:
from  PIL import Image
import PIL.Image
import redis
import PIL
import ast
from io import BytesIO
import cv2
from module import k_mean_color_detection
# Connect to the Redis server
r = redis.Redis(host='localhost', port=6379, db=0)

def clear_redis_database():
    r.flushdb()
    print("Redis database cleared.")

# Function to store string data with key namespacing
def store_string_with_namespace(serial_number, constant_string, value):
    full_key = f"{serial_number}:{constant_string}"
    r.set(full_key, value)
    print(f"Stored {full_key} -> {value}")

# Function to retrieve all data matching the constant part of the key
def retrieve_and_delete_matching_keys(constant_string):
    matching_keys = r.keys(f"*:{constant_string}")
    results = {}
    for key in matching_keys:
        value = r.get(key)
        if value:
            decoded_key = key.decode()
            decoded_value = value.decode()
            results[decoded_key] = decoded_value
            r.delete(key)  # Delete the key after retrieving the value
            print(f"Retrieved and deleted {decoded_key} -> {decoded_value}")
    return results

def show(constant_string):
    matching_keys = r.keys(f"*:{constant_string}")
    # print(matching_keys)
    for key in matching_keys:
        value = r.get(key)
        decode_key = key.decode()
        decode_value = value.decode().split(",")[3]
        print(decode_value)
        aa = k_mean_color_detection(f"/home/annone/ai-camera/backend/stream/temp/{decode_value}")
        # image = PIL.Image.open(f"/home/annone/ai-camera/backend/stream/temp/{decode_value}")
        # image.show()
        # cv2.imshow(f"/home/annone/ai-camera/backend/stream/temp/{decode_value}")
        return
# Example usage
# store_string_with_namespace("1", "activity", "login")
# store_string_with_namespace("2", "activity", "upload photo")
# store_string_with_namespace("3", "activity", "logout")
# store_string_with_namespace("1", "status", "active")
# 

ModuleNotFoundError: No module named 'module'

In [35]:
# clear_redis_database()
show("d_log")

310_X0GE1yLA_3_1723209889_127_0_0_1_572.png


/usr/bin/eog: symbol lookup error: /snap/core20/current/lib/x86_64-linux-gnu/libpthread.so.0: undefined symbol: __libc_pthread_init, version GLIBC_PRIVATE


In [None]:
# Retrieve and delete all data matching the constant string "activity"
retrieved_data = retrieve_and_delete_matching_keys("activity")
print(f"Retrieved data: {retrieved_data}")

# Verify that the data is deleted
remaining_data = retrieve_and_delete_matching_keys("activity")
print(f"Remaining data after deletion: {remaining_data}")

In [None]:
show("activity")
