# 1. Подготовка

## Подключение к Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Библиотеки

In [None]:
import cv2
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from keras.callbacks import EarlyStopping

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import  Model
from tensorflow.keras.layers import Dense, Conv2D, MaxPool2D, Flatten, Dropout, Input

from sklearn.model_selection import train_test_split

## Датасет

In [None]:
!wget https://gitlab.com/ISU-group/computer-vision/download_dataset/-/blob/main/dataset_urls.txt
!git clone https://gitlab.com/ISU-group/computer-vision/download_dataset.git
!python3 download_dataset/main.py

## Сохранение датасета в Google Drive
> Для быстрого использование датасета в будущем

In [None]:
!cp -r ./download_dataset/dataset/ ./drive/MyDrive/dataset/

# 2. Стороние готовые функции для ИИ

## Resizing images and bounding boxes
[Источник](https://jovian.ai/19521242/road-signs-bounding-box-prediction)

In [None]:
def create_mask(bb, x):
    """Creates a mask for the bounding box of same shape as image"""
    rows,cols,*_ = x.shape
    bb = list(map(int, bb))
    Y = np.zeros((rows, cols))
    Y[bb[0]:bb[1], bb[2]:bb[3]] = 1.
    return Y

def mask_to_bb(Y):
    """Convert mask Y to a bounding box, assumes 0 as background nonzero object"""
    rows, cols = np.nonzero(Y)
    if len(cols)==0: 
        return np.zeros(4, dtype=np.float32)
    top_row = np.min(rows)
    left_col = np.min(cols)
    bottom_row = np.max(rows) + 1
    right_col = np.max(cols) + 1
    return [top_row, bottom_row, left_col, right_col]

## IOU (TensorFlow)

In [None]:
def iou(bbox1, bbox2):
    y1, y2, x1, x2 = bbox1[:, 0], bbox1[:, 1], bbox1[:, 2], bbox1[:, 3]
    y3, y4, x3, x4 = bbox2[:, 0], bbox2[:, 1], bbox2[:, 2], bbox2[:, 3]
    inter_w = tf.reduce_min(tf.stack([x2, x4]), 0) - tf.reduce_max(tf.stack([x1, x3]), 0)
    inter_h = tf.reduce_min(tf.stack([y2, y4]), 0) - tf.reduce_max(tf.stack([y1, y3]), 0)
    pos = tf.logical_or(inter_w <= 0, inter_h <= 0)
    inter_area = inter_w * inter_h
    union_area = (y2-y1) * (x2-x1) + (y4-y3) * (x4-x3) - inter_area
    result = tf.where(pos, 0.0, inter_area / union_area)
    return 1 - result

# 3. ИИ (Нахождение Bounding Box)

## Сборка датасета

In [None]:
width = 150
height = 200

In [None]:
balls = []
balls_roi = []

df = pd.read_csv("/content/drive/MyDrive/dataset/balls/_annotations.csv")

for index, row in df.iterrows():

    img = cv2.imread(f'/content/drive/MyDrive/dataset/balls/{row["filename"]}')

    if (np.round(row["height"] / row["width"],3) == np.round(4/3,3)):

      resized = cv2.resize(img, (width,height), interpolation = cv2.INTER_AREA)

      roi = [
        row["ymin"] / (row["height"] / height),
        row["ymax"] / (row["height"] / height),
        row["xmin"] / (row["width"] / width),
        row["xmax"] / (row["width"] / width)
      ]

      balls.append(resized)
      balls_roi.append(roi)

      src = resized
      mask = create_mask(roi,src)

      for i in range(-1,2):
        img_flip = cv2.flip(src, i)
        bb = mask_to_bb(cv2.flip(mask, i))
        balls.append(img_flip)
        balls_roi.append(bb)


print(len(balls))


In [None]:
X = np.array(balls)
y = np.array(balls_roi)
yl = np.ones(len(balls_roi))

In [None]:
X_train, X_test, y_train, y_test, yl_train, yl_test = train_test_split(X, y, yl, test_size=0.2, random_state=42)

In [None]:
y_test = np.concatenate((
    y_test[:,0:2] / height, 
    y_test[:,2:] / width
), 1)

y_train = np.concatenate((
    y_train[:,0:2] / height, 
    y_train[:,2:] / width
), 1)

## Модель

In [None]:
inp = Input(shape=(height, width, 3),  name='base_0')

scale = layers.experimental.preprocessing.Rescaling(1./255, name='base_1')(inp)

cov_ = Conv2D(filters=16, kernel_size=(5, 5),  activation="relu", name='base_2')(scale)
cov_ = MaxPool2D(pool_size=(2, 2), name='base_3')(cov_)

cov_ = Conv2D(filters=32, kernel_size=(5, 5),  activation="relu", name='base_4')(cov_)
cov_ = MaxPool2D(pool_size=(2, 2), name='base_5')(cov_)

cov_ = Conv2D(filters=64, kernel_size=(5, 5), activation="relu", name='base_6')(cov_)
cov_ = MaxPool2D(pool_size=(2, 2), name='base_7')(cov_)

flatten = Flatten(name='base_8')(cov_)

bbox = Dense(256, activation="relu", name='bbox_1')(flatten)
bbox = Dropout(0.3)(bbox)
bbox = Dense(128, activation="relu", name='bbox_2')(bbox)
bbox = Dropout(0.3)(bbox)
bbox = Dense(64, activation="relu", name='bbox_3')(bbox)
bbox = Dropout(0.3)(bbox)
bbox = Dense(4, activation="sigmoid", name="bbox_output")(bbox)

class_ = Dense(64, activation='relu')(flatten)
class_ = Dropout(0.5)(class_)
class_ = Dense(32, activation='relu')(class_)
class_ = Dropout(0.3)(class_)
class_ = Dense(1, activation='sigmoid', name='class_output')(class_)

model = Model(inp, [class_, bbox])
# model.summary()

In [None]:
plot_model(model, show_shapes=True)

## Обучение

In [None]:
tf.keras.backend.clear_session()

for layer in model.layers:
    if layer.name.startswith('base_'):
        layer.trainable = True
        
for layer in model.layers:
    if layer.name.startswith('bbox_'):
        layer.trainable = True

model.compile(
    "adam", 
    loss={
        
        'bbox_output': iou,
        "class_output": 'binary_crossentropy',
    }, 
    metrics=["accuracy"]
)

early_stopping_monitor = EarlyStopping(
    monitor='val_bbox_output_accuracy',
    patience=10,
)

model.fit(
    X_train, {"class_output": yl_train, 'bbox_output':y_train}, 
    epochs=100, batch_size=8, verbose=2, shuffle=True,
    validation_data=(X_test, {"class_output": yl_test, 'bbox_output': y_test}) , callbacks=[early_stopping_monitor]
)

In [None]:
predictions = model.predict(X_test)

In [None]:
classes = predictions[0][:]

bboxes = np.concatenate((
    np.round(predictions[1][:,:2] * height), 
    np.round(predictions[1][:,2:] * width)
), 1)

for i in range(len(X_test[:5])):
  img = X_test[i].reshape(X_train[i].shape[0:3])
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  img = img.astype('uint8')

  img = cv2.rectangle(img.copy(), (bboxes[i][2],bboxes[i][0]), (bboxes[i][3],bboxes[i][1]), (255,0,0), 1)
  
  plt.imshow(img)
  plt.show()

# 3. ИИ (Есть шарик или нету)

## Сборка датасета

In [None]:
no_balls = []
no_balls_roi = []

for img in glob.glob("/content/drive/MyDrive/dataset/no_balls/*.jpg"):
    img = cv2.imread(img)

    if (np.round(img.shape[0] / img.shape[1],3) == np.round(4/3,3)):
      resized = cv2.resize(img, (width,height), interpolation = cv2.INTER_AREA)
      roi = [0,0,0,0]

      no_balls.append(resized)
      no_balls_roi.append(roi)

      src = resized
      for i in range(-1,2):
        img_flip = cv2.flip(src, i)
        no_balls.append(img_flip)
        no_balls_roi.append(roi)

print(len(no_balls))

In [None]:
balls = balls + no_balls
roi = balls_roi + no_balls_roi

In [None]:
X = np.array(balls)
y = np.array(roi)
yl = np.concatenate((np.ones(len(balls) - len(no_balls)), np.zeros(len(no_balls))))

In [None]:
X_train, X_test, y_train, y_test, yl_train, yl_test = train_test_split(X, y, yl, test_size=0.2, random_state=42)

In [None]:
y_test = np.concatenate((y_test[:,0:2] / height,y_test[:,2:] / width),1)
y_train = np.concatenate((y_train[:,0:2] / height,y_train[:,2:] / width),1)

## Обучение

In [None]:
for layer in model.layers:
    if layer.name.startswith('base_'):
        layer.trainable = False
        
for layer in model.layers:
    if layer.name.startswith('bbox'):
        layer.trainable = False

In [None]:
for layer in model.layers:
  if layer.trainable == True:
    print(layer.name)

In [None]:
tf.keras.backend.clear_session()


model.compile(
    "adam", 
    loss={
        
        'bbox_output': "mse",
        "class_output": 'binary_crossentropy',
    }, 
    metrics=["accuracy"]
)

early_stopping_monitor = EarlyStopping(
    monitor='val_class_output_accuracy',
    patience=20,
)

model.fit(
    X_train, {"class_output": yl_train, 'bbox_output':y_train}, 
    epochs=100, batch_size=8, verbose=2, shuffle=True,
    validation_data=(X_test, {"class_output": yl_test, 'bbox_output': y_test}), callbacks=[early_stopping_monitor]
)

## Проверка

In [None]:
predictions = model.predict(X_test)
print(type(X_test[0][0][0][0]))

In [None]:
classes = predictions[0][:]
bboxes = np.concatenate((np.round(predictions[1][:,:2] * height), np.round(predictions[1][:,2:] * width)),1)


for i in range(len(X_test[:50])):
  print(bboxes[i])
  print(np.concatenate((np.round(y_test[i][:2] * height), np.round(y_test[i][2:] * width))))
  print(classes[i])

  img = X_test[i].reshape(X_train[i].shape[0:3])
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  img = img.astype('uint8')

  if classes[i][0] > 0.5:
    img = cv2.rectangle(img.copy(), (bboxes[i][2],bboxes[i][0]), (bboxes[i][3],bboxes[i][1]), (255,0,0), 1)
  
  plt.imshow(img)
  plt.show()

# 4. Сохранение обученной модели на Google Drive

In [None]:
model.save('saved_model/ball_bbox_classifier')

In [None]:
!rm -r ./drive/MyDrive/ball_bbox_classifier
!mkdir ./drive/MyDrive/ball_bbox_classifier
!cp -r ./saved_model/ball_bbox_classifier ./drive/MyDrive/ball_bbox_classifier