In [1]:
import torch 
import torch.nn as nn
import torch.optim as optim
import numpy as np
import os
import matplotlib.pyplot as plt
import pandas as pd
from PIL import Image
import inspect as inspect
import cv2
from sklearn.utils import shuffle

In [126]:
#set path to data sets
positive_examples = r'C:\Users\Anwender\Downloads\RedClasses\RedClasses'
negative_examples = r'C:\Users\Anwender\Downloads\no_traffic_sign_recognition'


In [127]:
def load_images_from_folder(folder_path):
    images = []
    for filename in os.listdir(folder_path):
        img_path = os.path.join(folder_path, filename)
        if img_path.endswith('.jpg'):
            img = Image.open(img_path)
            img = img.resize((64, 64))
        if img is not None:
            images.append(img)
    return images

In [128]:

positive_images = load_images_from_folder(positive_examples)
negative_images = load_images_from_folder(negative_examples)

positive_data = np.array([np.array(img) for img in positive_images])
negative_data = np.array([np.array(img) for img in negative_images])

positive_labels = np.zeros(len(positive_data))
negative_labels = np.ones(len(negative_data))

data = np.concatenate((positive_data, negative_data), axis=0)
labels = np.concatenate((positive_labels, negative_labels), axis=0)

data, labels = shuffle(data, labels, random_state=42)  

print("Shuffled data shape:", data.shape)
print("Shuffled labels shape:", labels.shape)

Shuffled data shape: (1278, 64, 64, 3)
Shuffled labels shape: (1278,)


In [129]:
split_index = 840
x_train = data[:split_index]
y_train = labels[:split_index]
x_test = data[split_index:]
y_test = labels[split_index:]

x_train = np.array(x_train)
y_train = np.array(y_train)
x_test = np.array(x_test)
y_test = np.array(y_test)

print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)

(840, 64, 64, 3)
(840,)
(438, 64, 64, 3)
(438,)


In [130]:

x_train = x_train.astype('float32') / 255  # Normalize pixel values to be between 0 and 1
x_test = x_test.astype('float32') / 255

In [131]:
x_train = torch.tensor(x_train.transpose((0, 3, 1, 2)))  # Transpose to match PyTorch's tensor format (NCHW)
y_train = torch.tensor(y_train)
x_test = torch.tensor(x_test.transpose((0, 3, 1, 2)))  # Transpose to match PyTorch's tensor format (NCHW)
y_test = torch.tensor(y_test)

In [137]:
class TrafficSignSimpleClassifier(nn.Module):
    def __init__(self):
        super(TrafficSignSimpleClassifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.dropout1 = nn.Dropout(0.2)
        #self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.pool = nn.AvgPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.dropout2 = nn.Dropout(0.2)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.fc1 = nn.Linear(64 * 16 * 8, 64)
        self.fc2 = nn.Linear(64, 1)
        

    def forward(self, x):
        x = self.pool(torch.relu(self.bn1(self.conv1(x))))
        x = self.dropout1(x)
        x = self.pool(torch.relu(self.bn2(self.conv2(x))))
        x = self.dropout2(x)
        x = self.pool(torch.relu(self.bn3(self.conv3(x))))
        x = x.reshape(-1, 64 * 16 * 8)
        #x = x.flatten(start_dim=1)
        x = torch.relu(self.fc1(x))
        #x = self.fc2(x)
        x = torch.sigmoid(self.fc2(x))
        return x

In [138]:
model = TrafficSignSimpleClassifier()

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [139]:
num_batches = len(x_train) // 64
num_epochs =  15

for epoch in range(num_epochs):
    total_loss = 0
    for batch in range(num_batches):
        batch_x = x_train[batch * 64 : (batch + 1) * 64]
        batch_y = y_train[batch * 64 : (batch + 1) * 64]
        batch_y = batch_y.unsqueeze(-1)

        optimizer.zero_grad

        outputs = model(batch_x)

        for i in range(len(outputs)):
            print('outs: ' + str(outputs[i].item()))
            print('explect: ' + str(batch_y[i].item()))

        batch_y = batch_y.float()
        loss = criterion(outputs, batch_y)
        total_loss += loss.item()

        loss.backward()
        optimizer.step()

    # Print average loss for the epoch
    average_loss = total_loss / num_batches
    print(f"Epoch [{epoch+1}/{num_epochs}], Average Loss: {average_loss:.4f}")


outs: 0.47053080797195435
explect: 0.0
outs: 0.4936266243457794
explect: 0.0
outs: 0.5094553232192993
explect: 1.0
outs: 0.5447137951850891
explect: 1.0
outs: 0.48422759771347046
explect: 1.0
outs: 0.47593533992767334
explect: 0.0
outs: 0.48473742604255676
explect: 1.0
outs: 0.4976462721824646
explect: 0.0
outs: 0.5071845054626465
explect: 0.0
outs: 0.4854346513748169
explect: 1.0
outs: 0.5038313865661621
explect: 0.0
outs: 0.48704931139945984
explect: 0.0
outs: 0.5074499845504761
explect: 0.0
outs: 0.504231870174408
explect: 0.0
outs: 0.47324293851852417
explect: 0.0
outs: 0.5067514777183533
explect: 0.0
outs: 0.4899543821811676
explect: 1.0
outs: 0.4906647205352783
explect: 1.0
outs: 0.49515682458877563
explect: 0.0
outs: 0.5569310188293457
explect: 0.0
outs: 0.5258522033691406
explect: 1.0
outs: 0.49582305550575256
explect: 0.0
outs: 0.49450233578681946
explect: 0.0
outs: 0.4710926413536072
explect: 1.0
outs: 0.500549852848053
explect: 0.0
outs: 0.5021761655807495
explect: 1.0
outs:

In [140]:
# Save the model
torch.save(model, './cache.pth')

In [141]:
model = torch.load('./cache.pth', map_location=torch.device('cpu'))

model.eval()


num_batches = len(x_train) // 64

ac = 0

for batch in range(num_batches):
    batch_x = x_test[batch * 64 : (batch + 1) * 64]
    batch_y = y_test[batch * 64 : (batch + 1) * 64]
    batch_y = batch_y.unsqueeze(-1)

    outputs = model(batch_x)

    batch_ac = 0

    for i in range(len(outputs)):
        print('outs: ' + str(outputs[i].item()))
        print('explect: ' + str(batch_y[i].item()))
        batch_ac += outputs[i].item() - batch_y[i].item()
    ac += batch_ac/64

print(ac/num_batches)


outs: 0.5861450433731079
explect: 0.0
outs: 0.5040244460105896
explect: 0.0
outs: 1.0
explect: 1.0
outs: 0.619817852973938
explect: 1.0
outs: 0.9999946355819702
explect: 1.0
outs: 0.410912424325943
explect: 0.0
outs: 0.9999128580093384
explect: 1.0
outs: 0.5660961866378784
explect: 0.0
outs: 0.410912424325943
explect: 0.0
outs: 1.0
explect: 1.0
outs: 0.5176371932029724
explect: 0.0
outs: 0.5498618483543396
explect: 0.0
outs: 0.9999357461929321
explect: 1.0
outs: 0.4953380823135376
explect: 1.0
outs: 0.4634113311767578
explect: 0.0
outs: 0.9998466968536377
explect: 1.0
outs: 0.410912424325943
explect: 0.0
outs: 1.0
explect: 1.0
outs: 0.410912424325943
explect: 0.0
outs: 0.44270092248916626
explect: 1.0
outs: 0.410912424325943
explect: 0.0
outs: 0.410912424325943
explect: 0.0
outs: 0.6699977517127991
explect: 1.0
outs: 0.5520218014717102
explect: 0.0
outs: 0.49986258149147034
explect: 0.0
outs: 0.5555294752120972
explect: 1.0
outs: 0.9826527833938599
explect: 1.0
outs: 1.0
explect: 1.0
o

In [2]:
def load_vid(path):
    video = cv2.VideoCapture(path)

    if not video.isOpened():
        print("Fehler beim laden des Videos: " + path)
        return []
    
    frames = []
    frame_count = 0


    while video.isOpened():
        ret, frame = video.read()
        
        frame_count += 1
        # if bool(frame.any()):
        #     print('could not load video1')
        #     break
        # if all(pix.all(pix is None) is not None for pix in frame):
        #     #hier die angabe für Aussortierung 
        #     print(frame)
        print(ret)
        if not ret:
            break
        print(frame)

        # Save frames for processing
        frames.append(frame)
        #frame_count += 1

    print(frame_count)
    video.release()
    return frames

In [3]:
video_path_list = []
frames_lists = []

for name, _, datas in os.walk(r'.\MP4_1\TSR_2010_10_29'):
    for data in datas:
        video_path = os.path.join(name, data)

        if video_path.endswith('.mp4'):
            video_path_list.append(video_path)


# frames_lists.append(load_vid(r'.\2022-09-20_18-19-13.mkv'))
for video_path in video_path_list:
   frames_lists.append(load_vid(video_path))
   
   #print(video_list[len(video_list)-1])'

True
[[[254 215 215]
  [244 205 205]
  [250 249 251]
  ...
  [ 16  43  57]
  [ 23  50  64]
  [ 28  55  69]]

 [[242 203 203]
  [238 199 199]
  [245 244 246]
  ...
  [ 23  50  64]
  [ 29  56  70]
  [ 33  60  74]]

 [[143 153 150]
  [148 158 155]
  [178 198 200]
  ...
  [ 31  58  72]
  [ 38  65  79]
  [ 42  69  83]]

 ...

 [[ 71  57  50]
  [ 71  57  50]
  [ 71  57  50]
  ...
  [ 60  47  43]
  [ 60  47  43]
  [ 60  47  43]]

 [[ 71  57  50]
  [ 71  57  50]
  [ 71  57  50]
  ...
  [ 60  47  43]
  [ 60  47  43]
  [ 60  47  43]]

 [[ 71  57  50]
  [ 71  57  50]
  [ 71  57  50]
  ...
  [ 60  47  43]
  [ 60  47  43]
  [ 60  47  43]]]
True
[[[250 253 251]
  [250 253 251]
  [250 253 251]
  ...
  [ 25  52  66]
  [ 23  52  66]
  [ 21  50  64]]

 [[250 253 251]
  [250 253 251]
  [250 253 251]
  ...
  [ 24  51  65]
  [ 22  51  65]
  [ 20  49  63]]

 [[250 253 251]
  [250 253 251]
  [250 253 251]
  ...
  [ 23  50  64]
  [ 20  49  63]
  [ 17  46  60]]

 ...

 [[ 68  54  47]
  [ 69  55  48]
  [ 69  55

In [4]:
print(len(frames_lists))

13


In [8]:
processed_frames = []

for frames in frames_lists:
    valid_frames = [frame for frame in frames if frame is not None]
    if len(valid_frames) > 0:
        stacked_frames = np.stack(valid_frames)
        resized_frames = [cv2.resize(frame, (1360, 1024)) for frame in stacked_frames]
        resized_frames = np.array(resized_frames)
        print(resized_frames.shape)
        processed_frames.append(resized_frames)

(236, 1024, 1360, 3)
(166, 1024, 1360, 3)
(330, 1024, 1360, 3)
(178, 1024, 1360, 3)
(231, 1024, 1360, 3)
(535, 1024, 1360, 3)
(163, 1024, 1360, 3)
(211, 1024, 1360, 3)
(207, 1024, 1360, 3)
(207, 1024, 1360, 3)
(160, 1024, 1360, 3)
(361, 1024, 1360, 3)
(70, 1024, 1360, 3)


In [10]:
# preprocessing
processed_videos = []
for frames in processed_frames:

    frames = np.array(frames)
    print(frames.shape)
    frames = frames.astype('float32')/255
    frames = torch.tensor(frames.transpose((0, 3, 1, 2)))
    processed_videos.append(frames)

(236, 1024, 1360, 3)
(166, 1024, 1360, 3)
(330, 1024, 1360, 3)
(178, 1024, 1360, 3)
(231, 1024, 1360, 3)
(535, 1024, 1360, 3)
(163, 1024, 1360, 3)
(211, 1024, 1360, 3)
(207, 1024, 1360, 3)
(207, 1024, 1360, 3)
(160, 1024, 1360, 3)
(361, 1024, 1360, 3)
(70, 1024, 1360, 3)


In [11]:
def sliding_window (image, step_size, window_size):   # window_size must be given as a list with 2 elements (h, w)
  h, w = window_size
  image_h, image_w = image.shape[:2]

  for y in range (0, image_h, step_size):
    for x in range (0, image_w, step_size):
      window = image[y:y + h, x:x + w]
      if window.shape[:2] != window_size:
        continue
      yield (x, y, window)

In [None]:
model = torch.load('./cache.pth', map_location=torch.device('cpu'))
lower_threshold = 0.8
upper_threshold = 0.9
frame_count = 0
video_count = 0

for video in processed_videos:

    for batch in range(video):
        batch_frames = video[batch]

        window_size = (64, 64)
        step_size = 50

        for (x, y, window) in sliding_window(batch_frames, step_size, window_size): # after this line should be where each window is evaluated by the model
            outputs = model(window)

            predictions = outputs[0].squeeze()
            traffic_sign_present = predictions >= lower_threshold and predictions <= upper_threshold
    
            if traffic_sign_present.any():
                current_frame = batch_frames[0].numpy()
                frame_path = r"./cache/frame_" + str(frame_count) + ".jpg"
                cv2.imwrite(frame_path, current_frame)
                print("Traffic sign detected in the current frame. Saved as frame_" + str(frame_count) + ".jpg")
                frame_count += 1
    frame_count = 0
    video_count += 1
