In [None]:
import numpy as np
import cv2
import pickle
import serial
import time
import pandas as pd
import matplotlib.pyplot as plt

#############################################

frameWidth = 640         # CAMERA RESOLUTION
frameHeight = 480
brightness = 180
threshold = 0.90         # PROBABILITY THRESHOLD
strict_threshold = 0.95  # Higher threshold for critical signs
font = cv2.FONT_HERSHEY_SIMPLEX
skip_frames = 1
detection_duration = 3
##############################################

# SETUP THE VIDEO CAMERA
cap = cv2.VideoCapture(1)  # Change to 0 if camera index 1 doesn't work
cap.set(3, frameWidth)
cap.set(4, frameHeight)
cap.set(10, brightness)

# Serial communication setup
try:
    ser = serial.Serial('/dev/cu.usbmodem14201', 115200, timeout=0)
    ser.flush()
except serial.SerialException:
    print("Serial port not found. Please check the port and connection.")
    exit()

# IMPORT THE TRAINED MODEL
try:
    pickle_in = open(".../Downloads/model_trained.p", "rb")  # rb = READ BYTE
    model = pickle.load(pickle_in)
except FileNotFoundError:
    print("Model file not found. Please check the path.")
    exit()

# Data logging
log_data = []

def log_to_dataframe(sign, pwm_value, command, timestamp):
    log_data.append({'Sign': sign, 'PWM': pwm_value, 'Command': command, 'Timestamp': timestamp})

def grayscale(img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    return img

def equalize(img):
    img = cv2.equalizeHist(img)
    return img

def preprocessing(img):
    img = grayscale(img)
    img = equalize(img)
    img = img / 255
    return img

def getCalssName(classNo):
    if   classNo == 0: return 'Speed Limit 20 km/h'
    elif classNo == 1: return 'Speed Limit 30 km/h'
    elif classNo == 2: return 'Speed Limit 50 km/h'
    elif classNo == 3: return 'Speed Limit 60 km/h'
    elif classNo == 4: return 'Speed Limit 70 km/h'
    elif classNo == 5: return 'Speed Limit 80 km/h'
    elif classNo == 6: return 'End of Speed Limit 80 km/h'
    elif classNo == 7: return 'Speed Limit 100 km/h'
    elif classNo == 8: return 'Speed Limit 120 km/h'
    elif classNo == 9: return 'No passing'
    elif classNo == 10: return 'No passing for vechiles over 3.5 metric tons'
    elif classNo == 11: return 'Right-of-way at the next intersection'
    elif classNo == 12: return 'Priority road'
    elif classNo == 13: return 'Yield'
    elif classNo == 14: return 'Stop'
    elif classNo == 15: return 'No vechiles'
    elif classNo == 16: return 'Vechiles over 3.5 metric tons prohibited'
    elif classNo == 17: return 'No entry'
    elif classNo == 18: return 'General caution'
    elif classNo == 19: return 'Dangerous curve to the left'
    elif classNo == 20: return 'Dangerous curve to the right'
    elif classNo == 21: return 'Double curve'
    elif classNo == 22: return 'Bumpy road'
    elif classNo == 23: return 'Slippery road'
    elif classNo == 24: return 'Road narrows on the right'
    elif classNo == 25: return 'Road work'
    elif classNo == 26: return 'Traffic signals'
    elif classNo == 27: return 'Pedestrians'
    elif classNo == 28: return 'Children crossing'
    elif classNo == 29: return 'Bicycles crossing'
    elif classNo == 30: return 'Beware of ice/snow'
    elif classNo == 31: return 'Wild animals crossing'
    elif classNo == 32: return 'End of all speed and passing limits'
    elif classNo == 33: return 'Turn right ahead'
    elif classNo == 34: return 'Turn left ahead'
    elif classNo == 35: return 'Ahead only'
    elif classNo == 36: return 'Go straight or right'
    elif classNo == 37: return 'Go straight or left'
    elif classNo == 38: return 'Keep right'
    elif classNo == 39: return 'Keep left'
    elif classNo == 40: return 'Roundabout mandatory'
    elif classNo == 41: return 'End of no passing'
    elif classNo == 42: return 'End of no passing by vechiles over 3.5 metric tons'

# MAIN LOOP
frame_counter = 0
last_detection_time = 0
current_sign = None
pwm_value = 120
normal_speed_pwm = 120
start_time = time.time()

log_to_dataframe('Normal', normal_speed_pwm, 'C', 0)

while True:
    success, imgOriginal = cap.read()
    if not success:
        print("Camera read failed. Stopping...")
        ser.write(b'P')
        log_to_dataframe('Stop', 0, 'P', time.time() - start_time)
        break

    frame_counter += 1
    if frame_counter % skip_frames == 0:
        img = np.asarray(imgOriginal)
        img = cv2.resize(img, (32, 32))
        img = preprocessing(img)
        cv2.imshow("Processed Image", img)
        img = img.reshape(1, 32, 32, 1)

        # Overlay text
        cv2.putText(imgOriginal, "CLASS: ", (20, 35), font, 0.75, (0, 0, 255), 2, cv2.LINE_AA)
        cv2.putText(imgOriginal, "PROBABILITY: ", (20, 75), font, 0.75, (0, 0, 255), 2, cv2.LINE_AA)

        # PREDICT IMAGE
        predictions = model.predict(img)
        classIndex = np.argmax(predictions, axis=1)[0]
        probabilityValue = np.amax(predictions)

        # Apply threshold based on class
        effective_threshold = strict_threshold if classIndex == 9 else threshold

        if probabilityValue > effective_threshold:
            if current_sign == classIndex:
                last_detection_time = time.time()
            else:
                current_sign = classIndex
                last_detection_time = time.time()

            className = getCalssName(classIndex)
            cv2.putText(imgOriginal, str(classIndex) + " " + str(className), (120, 35), font, 0.75, (0, 0, 255), 2, cv2.LINE_AA)
            cv2.putText(imgOriginal, str(round(probabilityValue * 100, 2)) + "%", (180, 75), font, 0.75, (0, 0, 255), 2, cv2.LINE_AA)

            command = None
            sub_command = None
            # Define Arduino control logic
            if classIndex in [9, 17]:  # No passing, No entry
                command = b'A'
                pwm_value = 0
            elif classIndex == 14:  # Stop
                command = b'P'
                pwm_value = 0
                ser.write(command)
                ser.flush()
                log_to_dataframe(className, pwm_value, 'P', time.time() - start_time)
                time.sleep(5)
            elif classIndex == 28:  # Children crossing
                command = b'L'
                pwm_value = 60
            elif classIndex in [0, 1]:  # Speed Limit 20 or 30
                command = b'D'
                sub_command = b'2' if classIndex == 0 else b'3'
                pwm_value = 70 if classIndex == 0 else 80
            elif classIndex in [3, 4, 5, 7, 8]:  # Speed Limit 60, 70, 80, 100, 120
                command = b'I'
                if classIndex == 3:
                    sub_command = b'6'
                    pwm_value = 120
                elif classIndex == 4:
                    sub_command = b'7'
                    pwm_value = 140
                elif classIndex == 5:
                    sub_command = b'8'
                    pwm_value = 160
                elif classIndex == 7:
                    sub_command = b'1'
                    pwm_value = 200
                elif classIndex == 8:
                    sub_command = b'2'
                    pwm_value = 255
            else:
                pwm_value = normal_speed_pwm

            if command:
                if classIndex != 14:  # Stop command handled separately
                    ser.write(command)
                    if sub_command:
                        ser.write(sub_command)
                    ser.flush()
                    print(f"Detected: {className} - Command: {command.decode()}{sub_command.decode() if sub_command else ''} - PWM: {pwm_value}")
                log_to_dataframe(className, pwm_value, f"{command.decode()}{sub_command.decode() if sub_command else ''}", time.time() - start_time)

        else:
            current_sign = None

        # Reset after timeout
        current_time = time.time()
        if current_sign:
            if classIndex == 14 and (current_time - last_detection_time > 5):
                ser.write(b'C')
                ser.flush()
                print("Stop timeout - Command: C")
                log_to_dataframe('Normal', normal_speed_pwm, 'C', time.time() - start_time)
                current_sign = None
            elif (current_time - last_detection_time > detection_duration):
                ser.write(b'C')
                ser.flush()
                print("General timeout - Command: C")
                log_to_dataframe('Normal', normal_speed_pwm, 'C', time.time() - start_time)
                current_sign = None

    # Show result
    cv2.imshow("Result", imgOriginal)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        ser.write(b'P')
        ser.flush()
        log_to_dataframe('Stop', 0, 'P', time.time() - start_time)
        break

cap.release()
cv2.destroyAllWindows()
ser.close()

# Save log
df = pd.DataFrame(log_data)
df.to_csv('car_data_log.csv', index=False)

# Plotting
def generate_square_wave(pwm_values, timestamps, fs=10):
    square_wave = np.zeros(int(fs * max(timestamps)))
    for i in range(len(pwm_values)):
        start_time = int(timestamps[i] * fs)
        end_time = start_time + int(fs)
        square_wave[start_time:end_time] = pwm_values[i]
    return np.arange(len(square_wave)) / fs, square_wave

pwm_values = [entry['PWM'] for entry in log_data]
timestamps = [entry['Timestamp'] for entry in log_data]
time_values, square_wave = generate_square_wave(pwm_values, timestamps)

plt.figure(figsize=(12, 6))
plt.subplot(2, 1, 1)
plt.plot(df['Timestamp'], df['PWM'], marker='o', linestyle='-')
plt.title('PWM Changes over Time')
plt.xlabel('Time (seconds)')
plt.ylabel('PWM Value')
plt.grid(True)

plt.subplot(2, 1, 2)
plt.plot(time_values, square_wave, linestyle='-', color='r')
plt.title('PWM Square Wave Signal')
plt.xlabel('Time (seconds)')
plt.ylabel('PWM Value')
plt.grid(True)

plt.tight_layout()
plt.show()