# Azure Custom Vision を使った物体検知モデルの作成

## 前準備

### 各種設定を行う

In [None]:
# 作成する Azure Custom Vision のプロジェクト名を指定する
acv_project_name = "Sample Project"

# アノテーションファイルが格納されているフォルダパスを指定する
annotations_dir_path = "annotations"

# Custom Vision のアクセス情報ファイルのパスを指定する
acv_config_path = "acv_config.json"

# 作成した分析モデル(ONNX)や動画の分析結果を格納するフォルダパスを指定する
outputs_dir_path = "outputs"

# 分析に使用するビデオファイルのパス
video_file_path = "sample_video.mp4"

### VoTTから出力されたアノテーションファイル(.json)を読み込む

In [None]:
import json
import glob

json_file_paths = glob.glob(f"{annotations_dir_path}/*.json")
vott_images = []
for json_file_path in json_file_paths:
    with open(json_file_path, "r") as f:
        vott_image = json.load(f)
        vott_images.append(vott_image)

## Custom Vision プロジェクトの使用

### 作成した Custom Vision のエンドポイントとキーを読み込む

In [None]:
with open(acv_config_path, "r") as f:
    acv_config = json.load(f)

acv_endpoint = acv_config["endpoint"]
acv_key = acv_config["key"]

### Custom Vision へのアクセスクライアントを生成する

In [None]:
from azure.cognitiveservices.vision.customvision.training import CustomVisionTrainingClient
from msrest.authentication import ApiKeyCredentials

credentials = ApiKeyCredentials(in_headers={"Training-key": acv_key})
client = CustomVisionTrainingClient(acv_endpoint, credentials)

### プロジェクトを作成する

In [None]:
# 作成するプロジェクトと同一の名前のプロジェクトが既に存在している場合は削除する
for exist_project in list(filter(lambda x: x.name == acv_project_name, client.get_projects())):
    client.delete_project(exist_project.id)

# ドメイン (プロジェクトタイプ) を取得する
domain_type = "ObjectDetection"
domain_name = "General (compact)"

# プロジェクトを作成する
domain = next(domain for domain in client.get_domains() if domain.type == domain_type and domain.name == domain_name)
project = client.create_project(acv_project_name, domain_id=domain.id)

### 後に使用する関数を定義
Custom Vision プロジェクトには1API呼び出しにつき最大64枚の画像がアップロード可能であるため、64枚ずつ画像のアップロード処理を行うために使用する、配列(画像のファイルパス)を64個ごとに分割するための関数を用意します。

In [None]:
# 配列を指定したサイズの配列の配列に変換する関数を定義
# in: ([1,2,3,4,5], 2) -> out: [[1,2],[3,4],[5]]
def generate_chunks(arr, size):
    chunks = []
    index = 0
    while True:
        chunk = arr[index : index + size]
        if len(chunk) == 0:
            break
        chunks.append(chunk)
        index += size
    return chunks

### プロジェクトにタグと画像を追加する
Custom Vision プロジェクトにタグを追加して、そのタグに対応する画像をアップロードします。

In [None]:
import os
from azure.cognitiveservices.vision.customvision.training.models import ImageFileCreateBatch, ImageFileCreateEntry, Region

# 一度にアップロードできる画像枚数が最大64枚であるため、配列を64画像ごとの配列に変換する
chunks = generate_chunks(vott_images, 64)

# 作成したタグ一覧
tags = {} # Key: tag name, Value: tag instance

# 画像64枚ごとに処理を行う
for chunk in chunks:
    upload_images = []
    for vott_image in chunk:
        image_file_path = vott_image['asset']['path'].replace("file:", "")
        image_width = vott_image['asset']['size']['width']
        image_height = vott_image['asset']['size']['height']
        vott_regions = vott_image['regions']

        # タグを作成する
        for region in vott_regions:
            for tag_name in region['tags']:
                if tag_name not in tags:
                    print(f"add tag: {tag_name}")
                    tag = client.create_tag(project.id, tag_name)
                    tags[tag_name] = tag
        
        # リージョン指定を行う
        regions = []
        for vott_region in vott_regions:
            bounding_box = vott_region['boundingBox']
            for tag_name in vott_region['tags']:
                region = Region(tag_id=tags[tag_name].id, left=bounding_box['left'] / image_width, top=bounding_box['top'] / image_height, width=bounding_box['width'] / image_width, height=bounding_box['height'] / image_height)
                regions.append(region)

        # 画像ファイルを指定する
        with open(image_file_path, "rb") as image_contents:
            upload_image = ImageFileCreateEntry(name=os.path.basename(image_file_path), contents=image_contents.read(), regions=regions)
            upload_images.append(upload_image)
                
    # 画像をまとめてアップロードする
    print(f'upload {len(upload_images)} images')
    client.create_images_from_files(project.id, ImageFileCreateBatch(images=upload_images))

### トレーニングを開始する
Quick Training でトレーニングを開始します。

In [None]:
iteration = client.train_project(project.id)
print(f"training got started: {iteration.name}")

### トレーニングが終了するまで待機する
トレーニングは約3分で完了します。

In [None]:
import time

while iteration.status != "Completed":
    iteration = client.get_iteration(project.id, iteration.id)
    # print("Training status: " + iteration.status)
    time.sleep(10)

### 完了したトレーニング結果のエクスポートを開始する
トレーニングにて作成された分析モデルをダウンロードするため、まずエクスポートリクエストを行います。リクエストをすると、Azure Custom Vision 側でユーザがダウンロード可能なファイルを用意してくれます。

In [None]:
from azure.cognitiveservices.vision.customvision.training.models import CustomVisionErrorException

platform = "ONNX"
try:
    exported = client.export_iteration(project.id, iteration.id, platform=platform)
except CustomVisionErrorException as e:
    # 既にエクスポートが実行されている場合は例外をスローしない
    if e.message != f"{iteration.id} is already queued for export":
        raise (e)

### エクスポートが完了するまで待機する
エクスポート準備には数秒の時間がかかるため、エクスポート準備が完了するまで待機を行います。

In [None]:
while True:
    exports = client.get_exports(project.id, iteration.id)
    export = list(filter(lambda x: x.platform == platform, exports))[0]
    if export.download_uri is not None:
        break
    time.sleep(1)

### 結果を出力するフォルダを再生成する

In [None]:
import shutil

if os.path.exists(outputs_dir_path):
    shutil.rmtree(outputs_dir_path)
os.makedirs(outputs_dir_path, exist_ok=True)

### エクスポートされたモデルをダウンロードしてローカルに保存する
Custom Vision で作成された分析モデルをダウンロードして、作業フォルダに```model.onnx```と```labels.txt```を格納します。

In [None]:
import requests

# ZIPファイルをダウンロードする
downloaded = requests.get(export.download_uri).content
model_zip_path = f"{outputs_dir_path}/exported_model.zip"
with open(model_zip_path, "wb") as f:
    f.write(downloaded)

model_dir_path = f"{outputs_dir_path}/exported_model"
model_file_path = f"{outputs_dir_path}/model.onnx"
labels_file_path = f"{outputs_dir_path}/labels.txt"

# ZIPファイルを解凍する
shutil.unpack_archive(model_zip_path, model_dir_path)

# 必要なファイルのみを作業フォルダにコピーする
shutil.move(f"{model_dir_path}/model.onnx", model_file_path)
shutil.move(f"{model_dir_path}/labels.txt", labels_file_path)

# ZIPファイル解凍で作成されたフォルダを削除する
shutil.rmtree(model_dir_path)

# ZIPファイルを削除する
os.remove(model_zip_path)

## 分析モデルを使った動画の分析

### 分析モデルを読み込む

In [None]:
from libs.onnx_obj_detection_model import OnnxObjectDetectionModel

# 物体検知モデルのラベルを読み込む
with open(labels_file_path, "r") as f:
    labels = [l.strip() for l in f.readlines()]

# 物体検知モデルを読み込む
model = OnnxObjectDetectionModel(model_file_path, labels)

### 指定したビデオの各フレームに物体検知分析を行い、物体検知結果を追加したビデオを生成する

In [None]:
import cv2
from PIL import Image

# 物体検知結果を追加したビデオを出力するファイルパス
output_video_path = f"{outputs_dir_path}/analyzed_video.mp4"

# 画像分析に使用する一時ファイルのパス
tmp_file_path = "tmp.jpg"  

# 出力先の動画を用意する
fourcc = cv2.VideoWriter_fourcc("m", "p", "4", "v")
video = cv2.VideoWriter(output_video_path, fourcc, 20.0, (1920, 1080))

# 動画からフレームを取得する
cap = cv2.VideoCapture(video_file_path)

# 動画のフレームごとに処理を行う
frame = 0
while True:
    print(f"process frame {frame}")
    ret, image = cap.read()
    if not ret:
        break

    # 指定された画像に対して物体検知の推論を行う
    cv2.imwrite(tmp_file_path, image)
    results = model.predict_image(Image.open(tmp_file_path))
    print(results)

    # 各タグでも最も確度(Probability)が高い結果のみを使用する
    predictions = {}
    for result in results:
        if result["tagName"] not in predictions or predictions[result["tagName"]]["probability"] < result["probability"]:
            predictions[result["tagName"]] = {
                "probability": result["probability"],
                "left": result["boundingBox"]["left"],
                "top": result["boundingBox"]["top"],
                "width": result["boundingBox"]["width"],
                "height": result["boundingBox"]["height"],
            }


    # 検知した各物体(タグ)を赤枠で囲って確度を表示する
    image_height = image.shape[0]
    image_width = image.shape[1]
    for tag in predictions.keys():
        prediction = predictions[tag]
        left = max(int(prediction["left"] * image_width), 0)
        top = max(int(prediction["top"] * image_height), 0)
        width = int(prediction["width"] * image_width)
        height = int(prediction["height"] * image_height)
        cv2.rectangle(
            image,
            (left, top),
            (left + width, top + height),
            (0, 0, 255),
            thickness=2,
        )
        cv2.putText(
            image,
            f"{tag} ({round(prediction['probability'], 2)})",
            (left, max(top - 10, 0)),
            cv2.FONT_HERSHEY_SIMPLEX,
            1.0,
            (0, 0, 255),
            thickness=2,
        )

    # 出力ビデオにフレームを追加する
    video.write(image)

    frame += 1

# 動画を出力する
video.release()

# 一時画像ファイルを削除する
if os.path.isfile(tmp_file_path):
    os.remove(tmp_file_path)