In [None]:
!wget https://lodmedia.hb.bizmrg.com/case_files/1112984/test_dataset_test_data_Minprirodi.zip

--2024-07-07 05:33:50--  https://lodmedia.hb.bizmrg.com/case_files/1112984/test_dataset_test_data_Minprirodi.zip
Resolving lodmedia.hb.bizmrg.com (lodmedia.hb.bizmrg.com)... 95.163.53.117
Connecting to lodmedia.hb.bizmrg.com (lodmedia.hb.bizmrg.com)|95.163.53.117|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4442528628 (4.1G) [application/zip]
Saving to: ‘test_dataset_test_data_Minprirodi.zip’


2024-07-07 05:39:20 (12.9 MB/s) - ‘test_dataset_test_data_Minprirodi.zip’ saved [4442528628/4442528628]



In [None]:
from IPython.display import clear_output

In [None]:
!unzip -P FnmZz4qRHDfxE9GV25rXU8 test_dataset_test_data_Minprirodi.zip
clear_output()

In [None]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.2.50-py3-none-any.whl (799 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/799.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━[0m [32m409.6/799.4 kB[0m [31m12.5 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m799.4/799.4 kB[0m [31m15.9 MB/s[0m eta [36m0:00:00[0m
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.0-py3-none-any.whl (25 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch>=1.8.0->ultralytics)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch>=1.8.0->ultralytics)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch>=1.8.0->ultralytics)
  Using cache

In [None]:
import cv2
import numpy as np
import torch
import pickle

from torch import nn
from torchvision import models
from sklearn.preprocessing import LabelEncoder
from ultralytics import YOLO
from torchvision.transforms.functional import normalize
from itertools import repeat
from pathlib import Path


class Prediction(object):
    def __init__(self, name, cls, prob, bbox):
        super().__init__()
        self.name = name
        self.cls = cls
        self.prob = prob
        self.bbox = bbox

    def __repr__(self):
        return f"Prediction({self.name}, {self.cls})"


class Model(object):
    CLASSES = ["Badger", "Bear", "Bison", "Cat", "Dog",
               "Empty", "Fox", "Goral", "Hare", "Lynx",
               "Marten", "Moose", "Mountain_Goat",
               "Musk_Deer", "Racoon_Dog", "Red_Deer",
               "Roe_Deer", "Snow_Leopard", "Squirrel",
               "Tiger", "Wolf", "Wolverine"]

    def __init__(self, detector_path: str, classifier_path: str, le_path: str):
        self.detector = YOLO(detector_path)
        self.classifier = models.vgg19(pretrained=False)
        # for param in self.classifier.parameters():
        #     param.requires_grad = False
        self.classifier.classifier = nn.Sequential(
            nn.Linear(25088, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 19)
        )
        self.classifier.load_state_dict(torch.load(classifier_path, map_location=torch.device('cpu')))
        self.classifier.cuda()
        self.classifier.eval()

        with open(le_path, "rb") as file:
            self.le: LabelEncoder = pickle.load(file)

    def predict(self, frame):
        if not isinstance(frame, (np.ndarray, str)):
            frame = frame.filename
        # print(frame)
        detections = self.detector.predict(frame, verbose=False)
        classes = []
        croped_frames = self.extract_crops(detections)
        for (img_name, batch_images_cls) in croped_frames.items():
            logits = self.classifier(batch_images_cls.to("cuda"))
            probabilities = torch.nn.functional.softmax(logits, dim=1)
            top_p, top_class_idx = probabilities.topk(1, dim=1)

            top_p = top_p.cpu().detach().numpy().ravel()
            top_class_idx = top_class_idx.cpu().numpy().ravel()
            # print(top_class_idx)
            class_names = self.le.inverse_transform(top_class_idx)

            classes.extend(
                [
                    Prediction(name, cls, prob, bbox.xyxy)
                    for name, cls, prob, bbox in
                    zip(repeat(img_name, len(class_names)), class_names, top_p, detections[0].boxes)
                ]
            )

        return classes

    @staticmethod
    def extract_crops(results: list) -> dict[str, torch.Tensor]:
        dict_crops = {}
        for res_per_img in results:
            if len(res_per_img) > 0:
                crops_per_img = []
                for box in res_per_img.boxes:
                    x0, y0, x1, y1 = box.xyxy.cpu().numpy().ravel().astype(np.int32)
                    crop = res_per_img.orig_img[y0: y1, x0: x1]

                    # Do squared crop
                    # crop = letterbox(img=crop, new_shape=config.imgsz, color=(0, 0, 0))
                    crop = cv2.resize(crop, (360, 360), interpolation=cv2.INTER_LINEAR)
                    crop = cv2.cvtColor(crop, cv2.COLOR_RGB2BGR)
                    # cv2.imshow('crop', crop)
                    # cv2.waitKey(111111110)
                    # Convert Array crop to Torch tensor with [batch, channels, height, width] dimensions
                    crop = torch.from_numpy(crop.transpose(2, 0, 1)) / 255
                    crop = crop.unsqueeze(0)
                    crop = normalize(crop.float(), mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
                    crops_per_img.append(crop)

                dict_crops[Path(res_per_img.path).name] = torch.cat(crops_per_img)  # if len(crops_per_img) else None
        return dict_crops




In [None]:
from collections import defaultdict
from copy import deepcopy
from datetime import datetime
from PIL import Image
from PIL.ExifTags import TAGS
from tqdm.auto import tqdm


class AnimalImage(object):
    def __init__(self, filename: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.filename = filename
        self.datetime = ""

        image = Image.open(self.filename)
        exif_data = image._getexif()
        if exif_data:
            for tag_id, value in exif_data.items():
                tag = TAGS.get(tag_id, tag_id)
                if tag == "DateTime":
                    self.datetime = value
                    break
        if self.datetime != "":
            self.datetime = datetime.strptime(self.datetime, "%Y:%m:%d %H:%M:%S")
        self.objects = []

    def __repr__(self):
        return f"AnimalImage({self.datetime})"


class FramesTracker(Model):
    def __init__(self, path_to_folder, detector_path: str, classifier_path: str, le_path: str, files):
        super().__init__(detector_path, classifier_path, le_path)
        self.path_to_folder = path_to_folder
        self.files = list(map(AnimalImage, files))
        self.current_animals = {}
        self.all_animals = []

    def sort(self, *, reverse=False):
        self.files.sort(key=lambda x: x.datetime, reverse=reverse)


def predict(folder_path, files):
    tracker = FramesTracker(
        folder_path + "/*.JPG",
        "./drive/MyDrive/weights/yolov8n-2.pt",
        "./drive/MyDrive/weights/last_animal_model-6.pt",
        "./drive/MyDrive/weights/le.bf",
        files
    )
    tracker.sort()

    for i in tqdm(tracker.files):
        i.objects = tracker.predict(i)

    for i in range(len(tracker.files)):
        time_has_passed = defaultdict(lambda: 0)
        if i != 0:
            for j in tracker.files[i].objects:
                if j.cls in tracker.current_animals:
                    time_has_passed[j.cls] = (
                            tracker.files[i].datetime - tracker.current_animals[j.cls]["last_seen"]
                    ).seconds
        for j in tracker.files[i].objects:
            if time_has_passed[j.cls] > 30 * 60:
                tracker.all_animals.append(deepcopy(tracker.current_animals[j.cls]))
                del tracker.current_animals[j.cls]
            if j.cls != "Empty":
                if j.cls not in tracker.current_animals:
                    tracker.current_animals[j.cls] = {
                        "first_seen": tracker.files[i].datetime,
                        "last_seen": tracker.files[i].datetime,
                        "count": 0,
                        "cls": j.cls,
                        "filename": tracker.files[i].filename
                    }
                tracker.current_animals[j.cls]["count"] = max(
                    tracker.current_animals[j.cls]["count"],
                    len([1 for animal in tracker.files[i].objects if animal.cls == j.cls])
                )
                tracker.current_animals[j.cls]["last_seen"] = tracker.files[i].datetime

    for i in tracker.current_animals:
        tracker.all_animals.append(deepcopy(tracker.current_animals[i]))
    # print(tracker.all_animals)
    return tracker.all_animals


In [None]:
import pandas as pd
from glob import glob

In [None]:
full_dataset = pd.DataFrame(columns=["name_folder", "class", "date_registration_start", "date_registration_end", "count"])

In [None]:
glob("test_data_Minprirodi/traps/*")

['test_data_Minprirodi/traps/51',
 'test_data_Minprirodi/traps/52',
 'test_data_Minprirodi/traps/57',
 'test_data_Minprirodi/traps/55',
 'test_data_Minprirodi/traps/59',
 'test_data_Minprirodi/traps/58',
 'test_data_Minprirodi/traps/60',
 'test_data_Minprirodi/traps/53',
 'test_data_Minprirodi/traps/54',
 'test_data_Minprirodi/traps/56']

In [None]:
for folder in tqdm(glob("test_data_Minprirodi/traps/*")):
    all_animals = predict("", glob(folder + "/*"))
    name_folders = []
    classes = []
    date_registration_starts = []
    date_registration_ends = []
    counts = []

    for row in all_animals:
        name_folders.append(int(row['filename'].split('/')[-2]))
        classes.append(row['cls'])
        date_registration_starts.append(str(row['first_seen']))
        date_registration_ends.append(str(row['last_seen']))
        counts.append(row['count'])

    dataframe_data = {
        'name_folder': name_folders,
        'class': classes,
        'date_registration_start': date_registration_starts,
        'date_registration_end': date_registration_ends,
        'count': counts
    }
    df = pd.DataFrame(dataframe_data)
    full_dataset = pd.concat([full_dataset, df], ignore_index=True)

  0%|          | 0/10 [00:00<?, ?it/s]



  0%|          | 0/423 [00:00<?, ?it/s]



  0%|          | 0/564 [00:00<?, ?it/s]



  0%|          | 0/399 [00:00<?, ?it/s]



  0%|          | 0/1246 [00:00<?, ?it/s]



  0%|          | 0/507 [00:00<?, ?it/s]



  0%|          | 0/645 [00:00<?, ?it/s]



  0%|          | 0/1355 [00:00<?, ?it/s]



  0%|          | 0/231 [00:00<?, ?it/s]



  0%|          | 0/832 [00:00<?, ?it/s]



  0%|          | 0/140 [00:00<?, ?it/s]

In [26]:
full_dataset.head()

Unnamed: 0,name_folder,class,date_registration_start,date_registration_end,count
0,51,Bear,2018-11-19 19:34:30,2018-11-19 19:34:34,2
1,51,Bear,2018-11-29 12:03:56,2018-11-29 12:04:00,1
2,51,Musk_Deer,2019-01-04 10:09:45,2019-01-04 10:09:46,1
3,51,Roe_Deer,2019-01-04 10:09:40,2019-01-04 10:10:26,1
4,51,Red_Deer,2019-01-04 10:09:41,2019-01-04 10:09:41,1


In [27]:
full_dataset.to_csv("./drive/MyDrive/submit.csv", index=False)