In [1]:
from google.colab import drive
drive.mount('/content/drive/')

!pip install datasets
!pip install accelerate
!pip install ultralytics
!pip install evaluate

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [None]:
from datasets import load_from_disk

ds_detected = load_from_disk("/content/drive/MyDrive/ml2/ML_project_2_course/datasets/frames_cutted_yolo/EGOR")

In [None]:
ds_detected = ds_detected.class_encode_column("labels")

In [None]:
ds_detected

In [None]:
from transformers import DeiTForImageClassification
processor = DeiTImageProcessor.from_pretrained('facebook/deit-base-distilled-patch16-224')


In [None]:
def process_example(example):
    inputs = processor(example['image'], return_tensors='pt')
    inputs['labels'] = example['labels']
    return inputs

In [None]:
def transform(example_batch):
    # Take a list of PIL images and turn them to pixel values
    inputs = processor([x for x in example_batch['image']], return_tensors='pt')

    # Don't forget to include the labels!
    inputs['labels'] = example_batch['labels']
    return inputs

In [None]:
prepared_ds = ds_detected.with_transform(transform)

In [None]:
from torch import nn

labels = ds_detected['train'].features['labels'].names

from transformers import ViTForImageClassification
model = DeiTForImageClassification.from_pretrained("facebook/deit-base-distilled-patch16-224", attn_implementation="sdpa", torch_dtype=torch.float16,
    num_labels=len(labels),
    id2label={str(i): c for i, c in enumerate(labels)},
    label2id={c: str(i) for i, c in enumerate(labels)}
)


In [None]:
# model.classifier[1].state_dict()

In [None]:
# for param in model.classifier[1].parameters():
#     param.data =  torch.randn_like(param)

In [None]:
# model.classifier[1].state_dict()

In [None]:
from torch.utils.data import DataLoader

batch_size = 16
train_dataloader = DataLoader(prepared_ds["train"], shuffle=True, batch_size=batch_size)
test_dataloader = DataLoader(prepared_ds["test"], batch_size=batch_size)

In [None]:
import evaluate
from torch.optim import AdamW, SGD, Adamax
from transformers import get_scheduler

optimizer = AdamW(model.parameters(), lr=2e-4)
#optimizer = SGD(model.parameters(), lr=2e-4, momentum=0.9, nesterov=True)
#optimizer = Adamax(model.parameters(), lr=2e-4)

num_epochs = 5
num_training_steps = num_epochs * len(train_dataloader)

lr_scheduler = get_scheduler(
	name = 'linear',
	optimizer = optimizer,
	num_warmup_steps = 0,
	num_training_steps = num_training_steps)

device = 'cuda'
model.to(device)
metric = evaluate.load('f1')

for epoch in tqdm(range(num_epochs)):
  model.train()
  for batch in tqdm(train_dataloader, leave=False):
    batch = {k: v.to(device) for k, v in batch.items()}
    outputs = model(**batch)
    loss = outputs.loss
    loss.backward()
    optimizer.step()
    lr_scheduler.step()
    optimizer.zero_grad()


  model.eval()
  for batch in tqdm(test_dataloader, leave=False):
    batch = {k: v.to(device) for k, v in batch.items()}
    with torch.no_grad():
      outputs = model(**batch)
    logits = outputs.logits
    predictions = torch.argmax(logits, dim=-1)

    metric.add_batch(predictions=predictions, references=batch['labels'])

  print(f'epoch {epoch} -', metric.compute())

model.save_pretrained("/content/drive/MyDrive/ml2/ML_project_2_course/weights3")

In [None]:
ex = ds_detected['test'][100]
ex['image']

In [None]:
ex['labels']

In [None]:
logits = model(inputs['pixel_values'].to(device)).logits

logits

In [None]:
 logits.argmax(-1).item()

In [None]:
softmax = torch.nn.Softmax()

def image_to_prob(img):
    inputs = processor(img, return_tensors='pt')
    logits = model(inputs['pixel_values'].to(device)).logits
    return float(softmax(logits)[0][1])

In [None]:
logits.cpu().data.numpy()

In [None]:
def get_logits(img):
    inputs = processor(img, return_tensors='pt')
    return model(inputs['pixel_values'].to(device)).logits.cpu().data.numpy()

In [None]:
torch.cuda.empty_cache()

In [None]:
# logits_test = []
# for i in tqdm(range(0, ds_detected['test'].num_rows)):
#   logits_test.append(get_logits(ds_detected['test'][i]['image']))
#   #logits_test.append(image_to_prob(ds_detected['test'][i]['image']))

In [None]:
# preds = np.array(logits_test).argmax(-1).flatten()

In [None]:
probs = []
for i in tqdm(range(0, ds_detected['test'].num_rows)):
  probs.append(image_to_prob(ds_detected['test'][i]['image']))

In [None]:
from sklearn.metrics import roc_auc_score

roc_auc_score(ds_detected['test']['labels'], probs)

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import PrecisionRecallDisplay

PrecisionRecallDisplay.from_predictions(ds_detected['test']['labels'], probs)
plt.show()

In [None]:
from sklearn.metrics import RocCurveDisplay

RocCurveDisplay.from_predictions(ds_detected['test']['labels'], probs)
plt.show()

In [None]:
from sklearn.metrics import precision_recall_curve

precisions, recalls, thresholds = precision_recall_curve(ds_detected['test']['labels'], probs)

def plot_precision_recall_vs_thresholds(precisions, recalls, thresholds):
  plt.plot(thresholds, precisions[:-1], "b--", label="Precision")
  plt.plot(thresholds, recalls[:-1], "g--", label="Recall")
  plt.xlabel("Threshold")
  plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)
  plt.xticks(np.arange(0, 1.1, 0.1))
  plt.yticks(np.arange(0, 1.1, 0.05))
  plt.grid(True)

plot_precision_recall_vs_thresholds(precisions, recalls, thresholds)
plt.show()

In [None]:
def image_to_prob(img):
    inputs = processor(img, return_tensors='pt')
    logits = model(inputs['pixel_values'].to(device)).logits
    return float(softmax(logits)[0][1])

In [None]:
def crop_image(image, box):
    additional_area = 1/10
    x_min, y_min, x_max, y_max = box
    width, height = image.size
    box_width = x_max - x_min
    box_height = y_max - y_min
    x_min_2 = max(0, x_min - box_width*additional_area)
    y_min_2 = max(0, y_min - box_height*additional_area)
    x_max_2 = min(width, x_max + box_width*additional_area)
    y_max_2 = min(height, y_max + box_height*additional_area)
    area = (x_min_2, y_min_2, x_max_2, y_max_2)
    cropped_img = image.crop(area)
    return cropped_img

device = 'cuda' if torch.cuda.is_available() else 'cpu'

def recieve_box(image):
    results = model_detect(source = image, classes=0, show = False, imgsz=640, conf=0.2, iou=0.4, save = False, verbose=False)
    boxes = results[0].boxes.xyxy.cpu().numpy().astype(int)
    cropped_images = []
    for box in boxes:
        cropped_images.append(crop_image(image, box))
    return cropped_images

In [None]:
from google.colab.patches import cv2_imshow

import cv2
from ultralytics import YOLO
import random
from PIL import Image

def draw_bounding_boxes_without_id(frame, results):
    boxes = results[0].boxes.xyxy.cpu().numpy().astype(int)
    classes = results[0].boxes.cls.cpu().numpy().astype(int)

    for box, clss in zip(boxes, classes):
        # Generate a random color for each object based on its ID
        if clss != 0:
            random.seed(int(clss)+8)
            color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))

            cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3],), color, 2)
            cv2.putText(
                frame,
                f"{model.model.names[clss]}",
                (box[0], box[1]),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.6,
                (50, 255, 50),
                2,
            )
    return frame


def process_video_with_tracking(model, input_video_path, show_video=True, save_video=False, output_video_path="output_video.mp4"):
    # Open the input video file
    cap = cv2.VideoCapture(input_video_path)

    if not cap.isOpened():
        raise Exception("Error: Could not open video file.")

    # Get input video frame rate and dimensions
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Define the output video writer
    if save_video:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        results = model.track(frame, iou=0.4, conf=0.25, persist=True, imgsz=608, verbose=False, tracker="bytetrack.yaml", classes=0)

        if results[0].boxes.id != None: # this will ensure that id is not None -> exist tracks
            boxes = results[0].boxes.xyxy.cpu().numpy().astype(int)
            ids = results[0].boxes.id.cpu().numpy().astype(int)

            for box, id in zip(boxes, ids):
                # Generate a random color for each object based on its ID
                random.seed(int(id))
                color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
                additional_area = 1/10
                x_min, y_min, x_max, y_max = box
                width, height = frame_width, frame_height
                box_width = x_max - x_min
                box_height = y_max - y_min
                x_min_2 = max(0, x_min - box_width*additional_area)
                y_min_2 = max(0, y_min - box_height*additional_area)
                x_max_2 = min(width, x_max + box_width*additional_area)
                y_max_2 = min(height, y_max + box_height*additional_area)
                area = (x_min_2, y_min_2, x_max_2, y_max_2)
                cropped_img = Image.fromarray(frame, 'RGB').crop(area)

                prob = image_to_prob(cropped_img)

                cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3],), color, 2)
                cv2.putText(
                    frame,
                    f"{prob:.2f}",
                    (box[0], box[1]),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.70,
                    (0, 255, 255),
                    2,
                )

        if save_video:
            out.write(frame)

        if show_video:
            frame = cv2.resize(frame, (0, 0), fx=0.75, fy=0.75)
            #cv2.imshow("frame", frame)
            cv2_imshow(frame)

        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    # Release the input video capture and output video writer
    cap.release()
    if save_video:
        out.release()

    # Close all OpenCV windows
    cv2.destroyAllWindows()
    return results

# Example usage:
model1 = YOLO('yolov8n.pt')
model1.fuse()
results = process_video_with_tracking(model1, "/content/drive/MyDrive/ml2/ML_project_2_course/other/videos/test2.mp4", show_video=False, save_video=True, output_video_path="/content/drive/MyDrive/ml2/ML_project_2_course/other/videos_tracked/output_video_for_vit_final.mp4")