# Distributed Multi-Node, Multi-GPU Yolo image inferencing in ML Container Runtime

In [None]:
# Import python packages
import streamlit as st
import pandas as pd
import torch
# We can also use Snowpark for our analyses!
from typing import Dict
from pathlib import Path
import numpy as np
import shutil
from snowflake.snowpark.context import get_active_session
from snowflake.ml.ray.datasource import SFStageImageDataSource
from snowflake.ml.runtime_cluster import scale_cluster, get_nodes
from snowflake.ml.ray.datasink import SnowflakeTableDatasink
import ray
import subprocess
import logging
session = get_active_session()

In [None]:
try:
    ray.shutdown()
except:
    pass
runtime_env = {"pip": ["ultralytics"]}
ray.init(ignore_reinit_error=False, runtime_env=runtime_env)
num_nodes = len([node for node in ray.nodes() if node["Alive"]==True])
print(num_nodes)

### Set params

In [None]:
batch_size = 32
num_nodes = 5

### Scale to 5 nodes

In [None]:
scale_cluster('"Yolo object detection - Distributed Inferencing"', num_nodes)

### Control ray logging

In [None]:
def configure_ray_logger() -> None:
    #Configure Ray logging
    ray_logger = logging.getLogger("ray")
    ray_logger.setLevel(logging.CRITICAL)

    data_logger = logging.getLogger("ray.data")
    data_logger.setLevel(logging.CRITICAL)

    #Configure root logger
    logger = logging.getLogger()
    logger.setLevel(logging.CRITICAL)

    #Configure Ray's data context
    context = ray.data.DataContext.get_current()
    context.execution_options.verbose_progress = False
    context.enable_operator_progress_bars = False

configure_ray_logger()

### Print resources in Ray cluster

In [None]:
print(int(ray.cluster_resources()['GPU']))

In [None]:
import pprint

def _format_resources(resources):
    """Convert memory fields to GB and filter out internal node tags."""
    formatted = {}
    for k, v in resources.items():
        # Skip internal node identifiers
        if k.startswith("node:"):
            continue
        if k in {"memory", "object_store_memory"}:
            formatted[k] = f"{v / (1024 ** 3):.2f} GB"
        else:
            formatted[k] = v
    return formatted

def show_ray_cluster_resources():
    """Nicely formatted cluster-wide and node-level resource info from Ray."""
    print("Cluster Resources:")
    cluster = _format_resources(ray.cluster_resources())
    pprint.pprint(cluster, sort_dicts=True, width=100)

    print("\n Node-Level Resources:")
    for node in ray.nodes():
        print(f"\nNode: {node['NodeManagerAddress']}")
        node_resources = _format_resources(node["Resources"])
        pprint.pprint(node_resources, sort_dicts=True, width=100)

In [None]:
show_ray_cluster_resources()

### See the trained YOLO model in snowflake stage

In [None]:
ls @MODELREGISTRYTOSPCSYOLO_INTERNALSTAGE/model/

### See image files in snowflake stage

In [None]:
ls @MODELREGISTRYTOSPCSYOLO_INTERNALSTAGE/data/

### Download Yolo trained model to local from snowflake stage

In [None]:
session.file.get("@modelregistrytospcsyolo_internalstage/model/best.pt", "/home/app/model/")

### Visualize sample test image

In [None]:
session.file.get("@modelregistrytospcsyolo_internalstage/data/000001.jpg", "/home/app/data/")

In [None]:
st.image("/home/app/data/000001.jpg", caption="Input")

### Access image files in snowflake stage as ray dataset using Snowflake APIs

In [None]:
image_source = SFStageImageDataSource(
    stage_location = "@MODELREGISTRYTOSPCSYOLO_INTERNALSTAGE/data/",
    database = session.get_current_database(),
    schema = session.get_current_schema()
)

# Load image files into a ray dataset
image_dataset = ray.data.read_datasource(image_source)

In [None]:
image_dataset.schema()

In [None]:
image_dataset.count()

In [None]:
image_dataset.show(1)

### Inferencing with Yolo trained model

In [None]:
! pip install ultralytics

In [None]:
from ultralytics import YOLO

### Load Yolo trained model from local and put into Ray plasma store

In [None]:
model = YOLO("/home/app/model/best.pt")
model_ref = ray.put(model)

### Distributed Object Detection with Ray on Snowflake ML Container Runtime

In [None]:
import pandas as pd
class YoloObjectDetection:
    def __init__(self, model_ref):
        self.model = ray.get(model_ref)
        self.model.to("cuda")

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        images = batch['image'].tolist()
        results = self.model(images, stream=True, device='cuda')
        json_results = []
        for result in results:
            js = result.to_json()
            json_results.append(js)
        batch['result'] = json_results
        return batch

In [None]:
detections_ds = image_dataset.map_batches(YoloObjectDetection,
        batch_size=batch_size,
        batch_format='pandas',
        concurrency=5,
        num_gpus=1,
        fn_constructor_kwargs={"model_ref": model_ref}
).select_columns(['file_name', 'result'])

In [None]:
drop table if exists YOLO_OBJECT_DETECTION_DEMO_OUTPUT

In [None]:
datasink = SnowflakeTableDatasink(
    table_name="YOLO_OBJECT_DETECTION_DEMO_OUTPUT",
    database=session.get_current_database(),
    schema=session.get_current_schema(),
    auto_create_table=True
)

In [None]:
detections_ds.write_datasink(datasink)

In [None]:
results_snowdf = session.table("YOLO_OBJECT_DETECTION_DEMO_OUTPUT")
results_snowdf.count()

In [None]:
st.image("/home/app/data/000001.jpg", caption="Input")

In [None]:
results_snowdf.print_schema()

In [None]:
results_snowdf.filter(results_snowdf['"file_name"'] == 'data/000001.jpg').show()