# 01. Getting Started OCI Vision

Python SDK を用いて、OCI Vision の基本機能を確認するデモです。


### Prepare

In [None]:
%matplotlib inline

import os
import glob
import base64
import matplotlib.pyplot as plt
import matplotlib.image as mpimg


In [None]:
dir_path = '../images/'
image_list = glob.glob(os.path.join(dir_path, "*.png"))

plt.figure(figsize=(10, 8)).subplots_adjust(hspace=0.5)
for i, d in enumerate(image_list):
    plt.subplot(3, 2, i+1)
    plt.title(os.path.split(d)[1])
    plt.imshow(mpimg.imread(d),cmap="gray")


### Upload image to Object Storage

In [None]:
# 画像をアップロードするバケットを作成する

from config import *
from oci.config import from_file
from oci.object_storage import ObjectStorageClient
from oci.object_storage.models import CreateBucketDetails
from oci.exceptions import ServiceError

config = from_file()

os_client = ObjectStorageClient(config=config)

try:
    create_bucket_resp = os_client.create_bucket(
        namespace_name = NAMESPACE,
        create_bucket_details = CreateBucketDetails(
            name = BUCKET_NAME,
            compartment_id = COMPARTMENT_ID,
        )
    )
    print(f"bucket: {create_bucket_resp.data}")
except ServiceError:
    print(f"bucket: {BUCKET_NAME} is already exists.")


In [None]:
# 画像をアップロードする

from concurrent.futures import ThreadPoolExecutor
from oci.object_storage import UploadManager

um = UploadManager(object_storage_client = os_client)

def put_object(file):
    um.upload_file(
        namespace_name=NAMESPACE,
        bucket_name=BUCKET_NAME,
        object_name=file.removeprefix(f"{PRETRAINED_MODEL_TEST_IMAGE_PATH}/"),
        file_path=file
    )
    
print(f"{PRETRAINED_MODEL_TEST_IMAGE_PATH}/*.png")
files = glob.glob(f"{PRETRAINED_MODEL_TEST_IMAGE_PATH}/*.png")
print(f"Uploading {len(files)} to the bucket {BUCKET_NAME}, it will take some times...")
with ThreadPoolExecutor(NO_OF_PROCESSORS) as executor:
    futures = [executor.submit(put_object, file)
                for file in files]


### Image Classification (PreTrained Model)

In [None]:

from oci.ai_vision import AIServiceVisionClient
from oci.ai_vision.models import (
    AnalyzeImageDetails, 
    ImageClassificationFeature,
    InlineImageDetails,
    ObjectStorageImageDetails,
    ObjectLocation, 
    CreateImageJobDetails, 
    ObjectListInlineInputLocation, 
    OutputLocation, 
    ImageObjectDetectionFeature, 
    FaceDetectionFeature
)

vision_client = AIServiceVisionClient(config = config)


In [None]:
with open("../images/ImageClassification1.png", "rb") as file:
    image_file = file.read()

    # INLINE: リクエストのペイロード中に直接画像データを含める
    image_classification_analyze_details = AnalyzeImageDetails(
        features = [
            ImageClassificationFeature(
                feature_type = "IMAGE_CLASSIFICATION",
                # model_id = "..." if you use custom model, set the model_ocid.
            )
        ],
        image = InlineImageDetails(
            source = "INLINE",
            data = base64.b64encode(image_file).decode('utf-8')
        ),
        compartment_id = COMPARTMENT_ID,
    )

    image_classification_response = vision_client.analyze_image(analyze_image_details = image_classification_analyze_details)

print(image_classification_response.data)


In [None]:
# Object Storage に事前に格納した画像データを対象に画像解析を行う
OBJECT_NAME = "ImageClassification2.png"

image_classification_analyze_details = AnalyzeImageDetails(
    features = [
        ImageClassificationFeature(
            feature_type = "IMAGE_CLASSIFICATION",
            # model_id = "..." if you use custom model, set the model_ocid.
        )
    ],
    image = ObjectStorageImageDetails(
        source = "OBJECT_STORAGE",
        namespace_name = NAMESPACE,
        bucket_name = BUCKET_NAME,
        object_name = OBJECT_NAME
    ),
    compartment_id = COMPARTMENT_ID,
)

image_classification_response = vision_client.analyze_image(analyze_image_details = image_classification_analyze_details)

print(image_classification_response.data)


In [None]:
import time

dir_path = '../images/'
image_list = glob.glob(os.path.join(dir_path, "ImageClassification*.png"))
object_locations = []

now = time.time()

for d in image_list:
    object_locations.append(
        ObjectLocation(
            namespace_name = NAMESPACE,
            bucket_name = BUCKET_NAME,
            object_name = os.path.split(d)[1]
        )
    )

# Object Storage に事前に格納した画像データを対象にバッチ処理で画像解析を行う
create_image_job_details = CreateImageJobDetails(
    features = [
        ImageClassificationFeature(
            feature_type = "IMAGE_CLASSIFICATION",
            # model_id = "..." if you use custom model, set the model_ocid.
        )
    ],
    input_location = ObjectListInlineInputLocation(
        source_type = "OBJECT_LIST_INLINE_INPUT_LOCATION",
        object_locations = object_locations
    ),
    output_location = OutputLocation(
        namespace_name = NAMESPACE,
        bucket_name = BUCKET_NAME,
        prefix = f"batch_job_result_{now}"
    ),
    compartment_id = COMPARTMENT_ID,
    display_name = f"ImageClassificationBatchJob-{now}"
)

image_classification_batch_job_response = vision_client.create_image_job(create_image_job_details = create_image_job_details)

print(image_classification_batch_job_response.data)


In [None]:
from oci.object_storage import ObjectStorageClient

os_client = ObjectStorageClient(config = config)

for d in image_list:
    image_name = os.path.split(d)[1]
    object_name = f"{image_classification_batch_job_response.data.output_location.prefix}/{image_classification_batch_job_response.data.id}/{NAMESPACE}_{BUCKET_NAME}_{image_name}.json"
    get_object_response = os_client.get_object(
        namespace_name = NAMESPACE,
        bucket_name = BUCKET_NAME,
        object_name = object_name
    )
    with open(f'../results/image_classification/{image_name}.json', 'wb') as f:
        f.write(get_object_response.data.content)


### Object Detection (PreTrained Model)

In [None]:
with open("../images/ObjectDetection1.png", "rb") as file:
    image_file = file.read()

    # INLINE: リクエストのペイロード中に直接画像データを含める
    object_detection_analyze_details = AnalyzeImageDetails(
        features = [
            ImageObjectDetectionFeature(
                feature_type = "OBJECT_DETECTION",
                # model_id = "..." if you use custom model, set the model_ocid.
            )
        ],
        image = InlineImageDetails(
            source = "INLINE",
            data = base64.b64encode(image_file).decode('utf-8')
        ),
        compartment_id = COMPARTMENT_ID,
    )

    object_detection_analyze_details = vision_client.analyze_image(analyze_image_details = object_detection_analyze_details)

print(object_detection_analyze_details.data)


In [None]:
# Object Storage に事前に格納した画像データを対象に画像解析を行う
OBJECT_NAME = "ObjectDetection2.png"

object_detection_analyze_details = AnalyzeImageDetails(
    features = [
        ImageObjectDetectionFeature(
            feature_type = "OBJECT_DETECTION",
            # model_id = "..." if you use custom model, set the model_ocid.
        )
    ],
    image = ObjectStorageImageDetails(
        source = "OBJECT_STORAGE",
        namespace_name = NAMESPACE,
        bucket_name = BUCKET_NAME,
        object_name = OBJECT_NAME
    ),
    compartment_id = COMPARTMENT_ID,
)

object_detection_analyze_response = vision_client.analyze_image(analyze_image_details = object_detection_analyze_details)

print(object_detection_analyze_response.data)


In [None]:
import time

dir_path = '../images/'
image_list = glob.glob(os.path.join(dir_path, "ObjectDetection*.png"))
object_locations = []

now = time.time()

for d in image_list:
    object_locations.append(
        ObjectLocation(
            namespace_name = NAMESPACE,
            bucket_name = BUCKET_NAME,
            object_name = os.path.split(d)[1]
        )
    )

# Object Storage に事前に格納した画像データを対象にバッチ処理で画像解析を行う
create_image_job_details = CreateImageJobDetails(
    features = [
        ImageObjectDetectionFeature(
            feature_type = "OBJECT_DETECTION",
            # model_id = "..." if you use custom model, set the model_ocid.
        )
    ],
    input_location = ObjectListInlineInputLocation(
        source_type = "OBJECT_LIST_INLINE_INPUT_LOCATION",
        object_locations = object_locations
    ),
    output_location = OutputLocation(
        namespace_name = NAMESPACE,
        bucket_name = BUCKET_NAME,
        prefix = f"batch_job_result_{now}"
    ),
    compartment_id = COMPARTMENT_ID,
    display_name = f"ObjectDetectionBatchJob-{now}"
)

object_detection_batch_job_response = vision_client.create_image_job(create_image_job_details = create_image_job_details)

print(object_detection_batch_job_response.data)


In [None]:
from oci.object_storage import ObjectStorageClient

os_client = ObjectStorageClient(config = config)

for d in image_list:
    image_name = os.path.split(d)[1]
    object_name = f"{object_detection_batch_job_response.data.output_location.prefix}/{object_detection_batch_job_response.data.id}/{NAMESPACE}_{BUCKET_NAME}_{image_name}.json"
    get_object_response = os_client.get_object(
        namespace_name = NAMESPACE,
        bucket_name = BUCKET_NAME,
        object_name = object_name
    )
    with open(f'../results/object_detection/{image_name}.json', 'wb') as f:
        f.write(get_object_response.data.content)


### Face Detection (PreTrained Model)

In [None]:
with open("../images/FaceDetection1.png", "rb") as file:
    image_file = file.read()

    # INLINE: リクエストのペイロード中に直接画像データを含める
    face_detection_analyze_details = AnalyzeImageDetails(
        features = [
            FaceDetectionFeature(
                feature_type = "FACE_DETECTION",
                should_return_landmarks = True
                # model_id = "..." if you use custom model, set the model_ocid.
            )
        ],
        image = InlineImageDetails(
            source = "INLINE",
            data = base64.b64encode(image_file).decode('utf-8')
        ),
        compartment_id = COMPARTMENT_ID,
    )

    face_detection_response = vision_client.analyze_image(analyze_image_details = face_detection_analyze_details)

print(face_detection_response.data)


In [None]:
# Object Storage に事前に格納した画像データを対象に画像解析を行う
OBJECT_NAME = "FaceDetection1.png"

face_detection_analyze_details = AnalyzeImageDetails(
    features = [
        FaceDetectionFeature(
            feature_type = "FACE_DETECTION",
            should_return_landmarks = True
            # model_id = "..." if you use custom model, set the model_ocid.
        )
    ],
    image = ObjectStorageImageDetails(
        source = "OBJECT_STORAGE",
        namespace_name = NAMESPACE,
        bucket_name = BUCKET_NAME,
        object_name = OBJECT_NAME
    ),
    compartment_id = COMPARTMENT_ID,
)

face_detection_analyze_response = vision_client.analyze_image(analyze_image_details = face_detection_analyze_details)

print(face_detection_analyze_response.data)


In [None]:
# Object Storage に事前に格納した画像データを対象に画像解析を行う
OBJECT_NAME = "FaceDetection1.png"

face_detection_analyze_details = AnalyzeImageDetails(
    features = [
        FaceDetectionFeature(
            feature_type = "FACE_DETECTION",
            should_return_landmarks = True
            # model_id = "..." if you use custom model, set the model_ocid.
        )
    ],
    image = ObjectStorageImageDetails(
        source = "OBJECT_STORAGE",
        namespace_name = NAMESPACE,
        bucket_name = BUCKET_NAME,
        object_name = OBJECT_NAME
    ),
    compartment_id = COMPARTMENT_ID,
)

face_detection_analyze_response = vision_client.analyze_image(analyze_image_details = face_detection_analyze_details)

print(face_detection_analyze_response.data)


In [None]:
import time

dir_path = '../images/'
image_list = glob.glob(os.path.join(dir_path, "FaceDetection*.png"))
object_locations = []

now = time.time()

for d in image_list:
    object_locations.append(
        ObjectLocation(
            namespace_name = NAMESPACE,
            bucket_name = BUCKET_NAME,
            object_name = os.path.split(d)[1]
        )
    )

# Object Storage に事前に格納した画像データを対象にバッチ処理で画像解析を行う
create_image_job_details = CreateImageJobDetails(
    features = [
        ImageClassificationFeature(
            feature_type = "FACE_DETECTION",
            # model_id = "..." if you use custom model, set the model_ocid.
        )
    ],
    input_location = ObjectListInlineInputLocation(
        source_type = "OBJECT_LIST_INLINE_INPUT_LOCATION",
        object_locations = object_locations
    ),
    output_location = OutputLocation(
        namespace_name = NAMESPACE,
        bucket_name = BUCKET_NAME,
        prefix = f"batch_job_result_{now}"
    ),
    compartment_id = COMPARTMENT_ID,
    display_name = f"FaceDetectionBatchJob-{now}"
)

face_detection_batch_job_response = vision_client.create_image_job(create_image_job_details = create_image_job_details)

print(object_detection_batch_job_response.data)


In [None]:
from oci.object_storage import ObjectStorageClient

os_client = ObjectStorageClient(config = config)

for d in image_list:
    image_name = os.path.split(d)[1]
    object_name = f"{face_detection_batch_job_response.data.output_location.prefix}/{face_detection_batch_job_response.data.id}/{NAMESPACE}_{BUCKET_NAME}_{image_name}.json"
    get_object_response = os_client.get_object(
        namespace_name = NAMESPACE,
        bucket_name = BUCKET_NAME,
        object_name = object_name
    )
    with open(f'../results/face_detection/{image_name}.json', 'wb') as f:
        f.write(get_object_response.data.content)
