<a href="https://colab.research.google.com/github/stas420/machine_learning_ev_ice/blob/main/src/EV_to_ICE_ML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# EV/ICE ML project

The goal of this project is to use an existing ML model to find a yearly ratio of Electric Vehicles occurences to all other, non-electric (Internal Combustion Engine) vehicles, where provided dataset is an archive of street cameras recordings.

---
# A few global variables

In [None]:
VIDEOS_PATH = '/content/drive/MyDrive/videos'
IMAGES_PATH = '/content/imgs'
CAR_IMAGES_PATH = '/content/drive/MyDrive/carImages'
TRAINING_DATA = '/content/drive/MyDrive/training_data'

---
# Preparations

## Setup and dependencies
Run the below cell, which shall clone the repo contents and prepare the machine for work. This may take a few minutes.

***Important:*** This is the first cell you shall run in this project, otherwise it won't be working.

After 'pip' is done, it may ask you to restart this session - please do accordingly.

In [None]:
# remove possible leftovers and clear-clone the whole repo
%cd /content
%rm -rf ./machine_learning_ev_ice
!git clone https://github.com/stas420/machine_learning_ev_ice.git

# a git pull pro forma
%cd ./machine_learning_ev_ice
!git pull

# go to our main directory
%cd ./src

# check (and download, if needed) dependencies for the project
%pip install -r requirements.txt

/content
Cloning into 'machine_learning_ev_ice'...
remote: Enumerating objects: 108, done.[K
remote: Counting objects: 100% (108/108), done.[K
remote: Compressing objects: 100% (90/90), done.[K
remote: Total 108 (delta 60), reused 44 (delta 15), pack-reused 0 (from 0)[K
Receiving objects: 100% (108/108), 25.64 MiB | 5.97 MiB/s, done.
Resolving deltas: 100% (60/60), done.
/content/machine_learning_ev_ice
Already up to date.
/content/machine_learning_ev_ice/src


## Mounting Google Drive data

Dataset uploaded to Google Drive shall be mounted for further work, it will be in ```/content/drive``` directory

In [None]:
%cd /content

from google.colab import drive
import os

if not 'drive' in os.listdir():
  drive.mount('/content/drive')
else:
  print("drajw dajrektory alredi prezent")

/content
Mounted at /content/drive


---
# Preparing images

## Extracting video frames

***Important!*** *Below code cells should be run, if - and only if - the frames are not yet present!* This procedure take quite some time, and has been already done. Prepared data shall be in ```/content/drive/MyDrive/carImages```

In [None]:
# clearing leftovers pro forma
%rm -rf /content/imgs
%mkdir /content/imgs

import cv2
import os

# function which takes a .mp4 file and extracts its frames,
# then saves them as images in the provided directory
def extract_frames(video_path, output_dir, fps=1) -> None:
    os.makedirs(output_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    count = 0
    saved = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        else:
            frame_path = os.path.join(output_dir, f"frame_{saved:03d}.jpg")
            cv2.imwrite(frame_path, frame)
            saved += 1
        count += 1
    cap.release()

# image extraction
for x in os.listdir(VIDEOS_PATH):
  extract_frames(VIDEOS_PATH + '/' + x, IMAGES_PATH + '/' + x, fps=1)

!ls -la ./content/imgs

In [None]:
# Extract frames for a time range and pack them into a zip archive
from typing import List
import datetime as dt
import zipfile

# EDIT HERE
START_DATE: dt.date = dt.date.fromisoformat("2024-03-01")
END_DATE: dt.date = dt.date.fromisoformat("2024-03-31")

def get_dates_in_range(start_date: dt.date, end_date: dt.date) -> List[str]:
  delta = end_date - start_date
  return [(start_date + dt.timedelta(days=i)).isoformat() for i in range(delta.days + 1)]

## Archives the given directories into a single zip file and returns the path to it
def zip_frame_dirs(start_date: dt.date, end_date: dt.date, output_dir = IMAGES_PATH) -> str:
  dir_names = [date + ".mp4" for date in get_dates_in_range(start_date, end_date)]
  existing_dir_paths = [output_dir + "/" + name for name in dir_names if name in os.listdir(output_dir)]
  zip_filename = f"{start_date.isoformat()}--{end_date.isoformat()}.zip"
  with zipfile.ZipFile(output_dir + "/" + zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for dir_path in existing_dir_paths:
            for root, _, files in os.walk(dir_path):
                for file in files:
                    full_path = os.path.join(root, file)
                    arcname = os.path.relpath(full_path, start=os.path.dirname(dir_path))
                    print(f"Writing {full_path} to zip file")
                    zipf.write(full_path, arcname)
  return zip_filename

if __name__ == "__main__":
  file_names = [date + ".mp4" for date in get_dates_in_range(START_DATE, END_DATE)]
  file_names_in_dir = [name for name in file_names if name in os.listdir(VIDEOS_PATH)]
  for name in file_names_in_dir:
    if os.path.isdir(IMAGES_PATH + "/" + name):
      print(f"Directory {name} already exists, skipping")
      continue
    print(f"Extracting from {name}")
    extract_frames(VIDEOS_PATH + "/" + name, IMAGES_PATH + "/" + name)
    print(f"Done with {name}")
  print(f"All videos in range {START_DATE.isoformat()} - {END_DATE.isoformat()} done extracting")
  zip_filename = zip_frame_dirs(START_DATE, END_DATE)
  print(f"Done zipping files to {zip_filename}")

Extracting from 2024-03-01.mp4
Done with 2024-03-01.mp4
Extracting from 2024-03-02.mp4
Done with 2024-03-02.mp4
Extracting from 2024-03-03.mp4
Done with 2024-03-03.mp4
Extracting from 2024-03-04.mp4
Done with 2024-03-04.mp4
Extracting from 2024-03-05.mp4
Done with 2024-03-05.mp4
Extracting from 2024-03-06.mp4
Done with 2024-03-06.mp4
Extracting from 2024-03-07.mp4
Done with 2024-03-07.mp4
Extracting from 2024-03-08.mp4
Done with 2024-03-08.mp4
Extracting from 2024-03-09.mp4
Done with 2024-03-09.mp4
Extracting from 2024-03-10.mp4
Done with 2024-03-10.mp4
Extracting from 2024-03-11.mp4
Done with 2024-03-11.mp4
Extracting from 2024-03-12.mp4
Done with 2024-03-12.mp4
Extracting from 2024-03-13.mp4
Done with 2024-03-13.mp4
Extracting from 2024-03-14.mp4
Done with 2024-03-14.mp4
Extracting from 2024-03-15.mp4
Done with 2024-03-15.mp4
Extracting from 2024-03-16.mp4
Done with 2024-03-16.mp4
Extracting from 2024-03-17.mp4
Done with 2024-03-17.mp4
Extracting from 2024-03-18.mp4
Done with 2024-03

### Extracting cars from the frames

Here is the first usage of YOLO model - we provide it with extracted videos' frames, the model recognizes cars, which then are cut out and saved to separate images. This simplifies our work.

***Immportant!*** *As stated above, this cell should only be run if there are no contents of ```/content/drive/MyDrive/carImages```*

In [None]:
from ultralytics import YOLO
import cv2
import os

CAR_IMAGES_DIR = "/content/drive/MyDrive/carImages"
INPUT_IMAGES_DIR = "/content/imgs"

# Returns a list of boundaries tagged car from single frame and a loaded cv2 image
def find_car_boundaries(image_path: str, model: YOLO, car_cls: int):
  img = cv2.imread(image_path)
  results = model(img)
  boundaries = []
  for box in results[0].boxes:
    if not int(box.cls) == car_cls:
      continue
    boundaries.append(map(int, box.xyxy[0]))
  return (img, boundaries)

# Returns a list of cv2 images cropped to given boundaries
def crop_boundaries(image, boundaries):
  cropped_imgs = []

  for x1, y1, x2, y2 in boundaries:
    cropped_imgs.append(image[y1:y2, x1:x2])

  return cropped_imgs


def main():
  if not os.path.isdir("/content/drive/MyDrive"):
    print("Google Drive not mounted, exiting")
    return

  if not os.path.isdir(INPUT_IMAGES_DIR):
    print("Images not extracted, exiting")
    return

  os.makedirs(CAR_IMAGES_DIR, exist_ok = True)

  model = YOLO('./yolov8s.pt')
  input_day_dirs = os.listdir(INPUT_IMAGES_DIR)

  car_cls: int = -1
  for label_cls, label_name in model.names.items():
    if label_name == "car":
      car_cls = label_cls

  if car_cls == -1:
    print("Car CLS not found, exiting")
    return
  else:
    print(f"Car CLS found: {car_cls}")

  for day_dir in input_day_dirs:
    if not day_dir.endswith(".mp4"):
      continue
    # Y Y Y Y - M M | - D D .  m  p  4
    # 0 1 2 3 4 5 6 | 7 8 9 10 11 12 13
    month_dir = day_dir[:7]

    in_day_dir = INPUT_IMAGES_DIR + "/" + day_dir
    out_day_dir = CAR_IMAGES_DIR + "/" + month_dir + "/" + day_dir

    frame_paths = os.listdir(in_day_dir)
    for frame_name in frame_paths:
      in_frame_dir = in_day_dir + "/" + frame_name
      out_frame_dir = out_day_dir + "/" + frame_name

      if os.path.isdir(out_frame_dir):
        print(f"Directory for {in_frame_dir} already exists, skipping")
        continue

      os.makedirs(out_frame_dir, exist_ok=True)
      print(f"Finding car boundaries in {in_frame_dir}")
      image, boundaries = find_car_boundaries(in_frame_dir, model, car_cls)
      print("Car boundaries found, cropping")
      cropped_imgs = crop_boundaries(image, boundaries)

      for index, img in enumerate(cropped_imgs):
        out_car_file = f"{out_frame_dir}/car-{index}.jpg"
        print("Writing to " + out_car_file)
        cv2.imwrite(out_car_file, img)



if __name__ == "__main__":
  main()



[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Writing to /content/drive/MyDrive/carImages/2024-03/2024-03-23.mp4/frame_009.jpg/car-0.jpg
Writing to /content/drive/MyDrive/carImages/2024-03/2024-03-23.mp4/frame_009.jpg/car-1.jpg
Writing to /content/drive/MyDrive/carImages/2024-03/2024-03-23.mp4/frame_009.jpg/car-2.jpg
Writing to /content/drive/MyDrive/carImages/2024-03/2024-03-23.mp4/frame_009.jpg/car-3.jpg
Finding car boundaries in /content/imgs/2024-03-23.mp4/frame_049.jpg

0: 384x640 16 cars, 1 bus, 667.8ms
Speed: 4.1ms preprocess, 667.8ms inference, 2.2ms postprocess per image at shape (1, 3, 384, 640)
Car boundaries found, cropping
Writing to /content/drive/MyDrive/carImages/2024-03/2024-03-23.mp4/frame_049.jpg/car-0.jpg
Writing to /content/drive/MyDrive/carImages/2024-03/2024-03-23.mp4/frame_049.jpg/car-1.jpg
Writing to /content/drive/MyDrive/carImages/2024-03/2024-03-23.mp4/frame_049.jpg/car-2.jpg
Writing to /content/drive/MyDrive/carImages/2024-03/2024-03-23.m

KeyboardInterrupt: 

## XML to YOLO format conversion

Labels made in labelImg are in XML format, which is not the same as YOLO labels format - cast is needed.

In [None]:
from xml.dom import minidom

# look-up table = map of class name<->label
# provided our own labels
LUT = {"EV": 0, "ICE": 1}

def convert_coordinates(size, box):
    dw = 1.0/size[0]
    dh = 1.0/size[1]
    x = (box[0]+box[1])/2.0
    y = (box[2]+box[3])/2.0
    w = box[1]-box[0]
    h = box[3]-box[2]
    x = x*dw
    w = w*dw
    y = y*dh
    h = h*dh
    return (x,y,w,h)

def convert_xml2yolo(file_path: str):
    xmldoc = minidom.parse(file_path)
    fname_out = (file_path[:-4]+'.txt')

    with open(fname_out, "w") as f:

        itemlist = xmldoc.getElementsByTagName('object')
        size = xmldoc.getElementsByTagName('size')[0]
        width = int((size.getElementsByTagName('width')[0]).firstChild.data)
        height = int((size.getElementsByTagName('height')[0]).firstChild.data)

        for item in itemlist:
            classid =  (item.getElementsByTagName('name')[0]).firstChild.data
            if classid in LUT:
                label_str = str(LUT[classid])
            else:
                label_str = "-1"
                print ("warning: label '%s' not in look-up table" % classid)

            xmin = ((item.getElementsByTagName('bndbox')[0]).getElementsByTagName('xmin')[0]).firstChild.data
            ymin = ((item.getElementsByTagName('bndbox')[0]).getElementsByTagName('ymin')[0]).firstChild.data
            xmax = ((item.getElementsByTagName('bndbox')[0]).getElementsByTagName('xmax')[0]).firstChild.data
            ymax = ((item.getElementsByTagName('bndbox')[0]).getElementsByTagName('ymax')[0]).firstChild.data
            b = (float(xmin), float(xmax), float(ymin), float(ymax))
            bb = convert_coordinates((width,height), b)

            f.write(label_str + " " + " ".join([("%.6f" % a) for a in bb]) + '\n')

    print ("%s" % fname_out)


if __name__ == '__main__':
    PATH = "E:/studia/machine_learning_ev_ice/imgs/frame_034.xml"
    convert_xml2yolo(PATH)

---

# Real work here

## Training the model with povided labeled data

The model first needs to understand what cars are EV, and what cars are ICE.


### Important check

Before proceeding, make sure there is a directory structured like this:

```
training_data/
├── images/
│   ├── train/
│   └── val/
└── labels/
    ├── train/
    └── val/
```

and the files these folders contain have unique pairs (non-overlapping across train/val, too!) of corresponding names, i.e.:

images/train/img1.png <-> labels/traing/img1.txt; images/val/img2.png <-> labels/val/img2.png

### Configuration file

The *config.yaml* file configures the flow of the model training process, it looks like this:

```
# data.yaml
path: training_data

train: images/train
val: images/val

nc: 2
names:
  0: EV
  1: ICE
```

For full info, check out the [ultralytics docs](https://docs.ultralytics.com/usage/cfg/#predict-settings).

In [None]:
%cd /content/machine_learning_ev_ice/src
from ultralytics import YOLO

MODEL_PATH = './yolov8s.pt'
CONF_FILE = './config.yaml'
EPOCHS_NUM = 100

model = YOLO(MODEL_PATH)
model.train(data = CONF_FILE, epochs = EPOCHS_NUM)

/content/machine_learning_ev_ice/src
path: /content/drive/MyDrive/carImages/flat

train: images/train
val: images/val

nc: 2
names:
    0: EV
    1: ICE

---
# __archiwum

In [None]:
%cd /content/machine_learning_ev_ice/src

# for development only
%rm -rf /content/results
from ultralytics import YOLO
import cv2
import os

def is_green_plate(plate_img):
    hsv = cv2.cvtColor(plate_img, cv2.COLOR_BGR2HSV)
    h, s, _ = hsv.mean(axis=0).mean(axis=0)
    return 35 < h < 85 and s > 40

model = YOLO('./yolov8s.pt')
input_dir = "/content/imgs/2024-02-08.mp4"
output_dir = "/content/results"
os.makedirs(output_dir, exist_ok=True)

# for each file, check if it's a jpg and pass it to the model
for fname in os.listdir(input_dir):
    if not fname.endswith(".jpg"):
        continue
    path = os.path.join(input_dir, fname)
    img = cv2.imread(path)
    results = model(img)

    # for each 'box' in obtained result write a frame around the found object
    for box in results[0].boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        crop = img[y1:y2, x1:x2]
        class_id = int(box.cls)
        label = model.names[class_id]
        color = (0,255,0) # if label == "green" else (255,0,0)

        cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
        cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

    cv2.imwrite(os.path.join(output_dir, fname), img)


Dżem dobry
:)