<a href="https://colab.research.google.com/github/technologyhamed/Computer-Vision/blob/main/Traffic%20Control%20Systems/Human_Traffic_Control_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Step 1: Setting Up the Environment

In [None]:
!pip install torch torchvision matplotlib opencv-python


In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

Step 2: Data Preparation
Prepare your dataset with annotated images. The dataset should have bounding boxes around humans for the localization task.

In [None]:
import torch
import torchvision
from torchvision.datasets import CocoDetection
from torchvision.transforms import functional as F

# Define the dataset
class COCODataset(CocoDetection):
    def __init__(self, root, annFile, transform=None):
        super(COCODataset, self).__init__(root, annFile, transform)
        self.transform = transform

    def __getitem__(self, idx):
        img, target = super(COCODataset, self).__getitem__(idx)
        if self.transform:
            img = self.transform(img)
        target = {
            "boxes": torch.tensor([obj["bbox"] for obj in target if obj["category_id"] == 1], dtype=torch.float32),
            "labels": torch.tensor([1 for _ in target if obj["category_id"] == 1], dtype=torch.int64)
        }
        return img, target

# Instantiate the dataset
coco_dataset = COCODataset(root='path_to_coco_images', annFile='path_to_coco_annotations', transform=F.to_tensor)
dataloader = torch.utils.data.DataLoader(coco_dataset, batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))


Step 3: Transfer Learning with a Pre-trained Model
We'll use a pre-trained model like Faster R-CNN with a ResNet-50 backbone for object detection.

Load the Pre-trained Model

In [None]:
import torch
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn

# Load the pre-trained Faster R-CNN model
model = fasterrcnn_resnet50_fpn(pretrained=True)

# Modify the model to detect only one class (person)
num_classes = 2  # 1 class (person) + background
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)


Step 4: Training the Model
Define the Training Loop

In [None]:
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision.datasets import VOCDetection
from torchvision.transforms import functional as F

# Define dataset and dataloader
class CustomDataset(VOCDetection):
    def __getitem__(self, idx):
        img, target = super(CustomDataset, self).__getitem__(idx)
        img = F.to_tensor(img)
        target = {
            "boxes": torch.tensor([obj["bndbox"] for obj in target["annotation"]["object"]], dtype=torch.float32),
            "labels": torch.tensor([1 for _ in target["annotation"]["object"]], dtype=torch.int64)
        }
        return img, target

dataset = CustomDataset(root='path_to_data', year='2012', image_set='train', download=True)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))

# Define optimizer and training loop
optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.9, weight_decay=0.0005)

model.train()
for epoch in range(num_epochs):
    for images, targets in dataloader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

    print(f"Epoch #{epoch} loss: {losses.item()}")


Step 5: Testing the Model
Define the Testing Loop

In [None]:
from PIL import Image
import matplotlib.pyplot as plt
import cv2

model.eval()
test_image = Image.open("path_to_test_image.jpg")
test_image_tensor = F.to_tensor(test_image).unsqueeze(0).to(device)

with torch.no_grad():
    prediction = model(test_image_tensor)

# Visualize the prediction
image = cv2.imread("path_to_test_image.jpg")
for element in prediction[0]["boxes"]:
    box = element.int().cpu().numpy()
    cv2.rectangle(image, (box[0], box[1]), (box[2], box[3]), (255, 0, 0), 2)

plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
plt.show()


Step 6: Real-time Human Traffic Control
Real-time Inference with Webcam

In [None]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_tensor = F.to_tensor(frame).unsqueeze(0).to(device)

    with torch.no_grad():
        prediction = model(frame_tensor)

    for element in prediction[0]["boxes"]:
        box = element.int().cpu().numpy()
        cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), (255, 0, 0), 2)

    cv2.imshow('frame', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


Step 7: Localization
To localize and track humans over time, you can integrate object tracking algorithms like SORT or DeepSORT.

Example Integration with SORT

In [None]:
from sort import Sort

tracker = Sort()

cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_tensor = F.to_tensor(frame).unsqueeze(0).to(device)

    with torch.no_grad():
        prediction = model(frame_tensor)

    dets = []
    for element in prediction[0]["boxes"]:
        box = element.int().cpu().numpy()
        dets.append([box[0], box[1], box[2], box[3], prediction[0]["scores"].cpu().numpy()])

    dets = np.array(dets)
    tracks = tracker.update(dets)

    for track in tracks:
        cv2.rectangle(frame, (track[0], track[1]), (track[2], track[3]), (255, 0, 0), 2)
        cv2.putText(frame, str(int(track[4])), (track[0], track[1]-10), 0, 0.5, (255, 0, 0), 2)

    cv2.imshow('frame', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


This provides a comprehensive approach to creating a real-time human traffic control system using transfer learning and machine vision. You can further customize and optimize the system based on your specific requirements and dataset.