# Defect detection using Yolo and model deployment into SPCS using model registry

## Import packages

In [None]:
# Import python packages
import streamlit as st
import pandas as pd
import shutil
import gc
from snowflake.ml.model.custom_model import ModelContext, CustomModel, inference_api
from typing import List, Dict
from snowflake.ml.model import model_signature
from snowflake.ml.registry import Registry
import requests
# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
! pip install ultralytics snowflake-ml-python==1.7.5

In [None]:
from ultralytics import YOLO

In [None]:
current_database = session.get_current_database().replace('"', '')
current_schema = session.get_current_schema().replace('"', '')
image_repo_name = f"{current_database}.{current_schema}.IMAGE_REPO_1210"
num_spcs_nodes = '3'
spcs_instance_family = 'GPU_NV_S'
model_name = "YOLO_TRAINED_MODEL"
service_name_without_namespace = 'YOLO_DEFECT_DETECTION_INFERENCE_SERVICE'
cp_name = "YOLO_DEFECT_DETECTION_INFERENCE_CP"
service_name = f'{current_database}.{current_schema}.{service_name_without_namespace}'
print(f"Database: {current_database}, schema: {current_schema}")
print(f"Service: {service_name}")
print(f"Model name: {model_name}")

In [None]:
try:
    shutil.rmtree('/home/app/yoloruns')
except:
    pass

In [None]:
!mkdir -p /home/app/yoloruns
!yolo settings runs_dir=/home/app/yoloruns

## Start training

In [None]:
# Train yolo11n on VOC for 2 epochs
!yolo train model=yolo11n.pt data=VOC.yaml epochs=2 imgsz=640 batch=32 device=0

## Validation
After training our model, we can validate it on the validation set of our dataset.

In [None]:
!yolo val model=/home/app/yoloruns/detect/train/weights/best.pt data=VOC.yaml

## Inference on files in S3

In [None]:
alter stage modelregistrytospcsyolo_s3stage refresh;
ls @modelregistrytospcsyolo_s3stage/

In [None]:
create or replace table modelregistrytospcsyolo_presignedurls as
SELECT
   relative_path,
   last_modified,
   get_presigned_url(@modelregistrytospcsyolo_s3stage, relative_path, 3600) AS presigned_url
  FROM
    directory(@modelregistrytospcsyolo_s3stage);

In [None]:
presigned_urls_snowdf = session.table("modelregistrytospcsyolo_presignedurls")
presigned_urls_pdf = presigned_urls_snowdf.to_pandas()

In [None]:
presigned_urls_pdf.head()

In [None]:
st.image(presigned_urls_pdf["PRESIGNED_URL"].tolist()[0], caption="Input")

## Define custom model to log into model registry

In [None]:
from pathlib import Path
import multiprocessing as mp
class DownloadError(Exception):
    """Custom exception for download errors"""
    pass

class YoloModel(CustomModel):
    def __init__(self, context: ModelContext) -> None:
        super().__init__(context)
        context_path = self.context.path("model_dir")
        self.model = YOLO(f"{context_path}/best.pt")
        self.output_dir = "/tmp/images/"
        self.max_workers = mp.cpu_count() * 2
    def download_file(self, url, output_dir):
        try:
            filename = url.split("/")[-1].split("?")[0]
            local_path = output_dir / filename
            response = requests.get(url)
            if response.status_code == 403:
                raise DownloadError(f"Access forbidden (403) for URL: {url}")
            response.raise_for_status()
            with open(local_path, mode="wb") as file:
                file.write(response.content)
            return url, local_path
        except requests.exceptions.RequestException as e:
            raise DownloadError(f"Failed to download {url}: {str(e)}")
        except Exception as e:
            raise DownloadError(f"Error processing {url}: {str(e)}") 
    def download_files_in_parallel(self, urls, max_workers=5) -> None:
        output_path = Path(self.output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        failed_exceptions = []
        from concurrent.futures import ThreadPoolExecutor, as_completed
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_url = {
                executor.submit(self.download_file, url, output_path): url 
                for url in urls
            }
            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    url, local_path = future.result()
                except DownloadError as e:
                    failed_exceptions.append(str(e))
        if failed_exceptions:
            for exception_message in failed_exceptions:
                raise DownloadError(str(exception_message))
    
    def perform_object_detection(self) -> List[List[Dict]]:
        results = self.model(self.output_dir, stream=True)
        json_results = []
        for result in results:
            js = result.to_json()
            json_results.append(js)
        return json_results
    @inference_api
    def predict(self, input_pdf: pd.DataFrame) -> pd.DataFrame:
        urls = input_pdf["PRESIGNED_URL"].tolist()
        _ = self.download_files_in_parallel(urls=urls, max_workers=self.max_workers)
        results = self.perform_object_detection()
        input_pdf['RESULT'] = results
        return input_pdf

In [None]:
yolo_mc = ModelContext(
	models={ # This should be for models that is supported by Model Registry
	},
	artifacts={ # Everything not supported needs to be here
		'model_dir': "/home/app/yoloruns/detect/train/weights"
	}
)
yolo_model = YoloModel(yolo_mc)

In [None]:
input_pdf = presigned_urls_pdf['PRESIGNED_URL'].to_frame()
output_pdf = yolo_model.predict(input_pdf.copy())
predict_sign = model_signature.infer_signature(input_data=input_pdf, output_data=output_pdf)
output_pdf.head()

In [None]:
predict_sign

In [None]:
!nvidia-smi 

In [None]:
native_registry = Registry(session=session)

In [None]:
mv = native_registry.log_model(
    yolo_model,
    model_name=model_name,
    conda_dependencies=["conda-forge::ultralytics", "conda-forge::pytorch", "conda-forge::torchvision"],
    signatures={
        "predict": predict_sign
    },
    options={"cuda_version": "12.4"}
)

### See all model versions in model registry

In [None]:
m = native_registry.get_model(model_name)
version_df = m.show_versions()
version_df.head(100)

In [None]:
last_version_name = version_df['name'].iloc[-1]
print(last_version_name)
lv = m.version(last_version_name)

### Deploy the model on SPCS for inference

In [None]:
session.sql(f"alter compute pool if exists {cp_name} stop all").collect()

In [None]:
session.sql(f"drop compute pool if exists {cp_name}").collect()
session.sql(f"create compute pool {cp_name} min_nodes={num_spcs_nodes} max_nodes={num_spcs_nodes} instance_family={spcs_instance_family} auto_resume=True auto_suspend_secs=300").collect()

In [None]:
session.sql(f"show compute pools like '{cp_name}'").show()

In [None]:
session.sql(f"create or replace image repository {image_repo_name}").collect()

In [None]:
session.sql(f"drop service if exists {service_name}").collect()

In [None]:
lv.create_service(service_name=service_name,
                  service_compute_pool=cp_name,
                  image_repo=image_repo_name,
                  max_instances=int(num_spcs_nodes),
                  gpu_requests="1",
                  ingress_enabled=True)

### Set up external network access for inferencing

In [None]:
lv.list_services()

In [None]:
describe network rule NR_ALLOW_S3_ACCESS_TO_DOWNLOAD_IMAGES

In [None]:
describe external access integration EAI_ALLOW_S3_ACCESS_TO_DOWNLOAD_IMAGES

In [None]:
alter service YOLO_DEFECT_DETECTION_INFERENCE_SERVICE set EXTERNAL_ACCESS_INTEGRATIONS = (EAI_ALLOW_S3_ACCESS_TO_DOWNLOAD_IMAGES);

### Setup test data for inferencing

In [None]:
create or replace table modelregistrytospcsyolo_presignedurls as
SELECT
   relative_path,
   last_modified,
   get_presigned_url(@modelregistrytospcsyolo_s3stage, relative_path, 3600) AS presigned_url
  FROM
    directory(@modelregistrytospcsyolo_s3stage);

In [None]:
input_data_for_inferencing_snowdf = session.table("modelregistrytospcsyolo_presignedurls").select("PRESIGNED_URL")
input_data_for_inferencing_snowdf.show()

### Batch inferencing

In [None]:
defect_predictions_snowdf = lv.run(input_data_for_inferencing_snowdf,function_name="PREDICT",service_name='YOLO_DEFECT_DETECTION_INFERENCE_SERVICE')
defect_predictions_snowdf.show()

### REST API inferencing

In [None]:
test_url = input_data_for_inferencing_snowdf.limit(1).to_pandas()['PRESIGNED_URL'][0]
data = {'data': [[0, test_url]]}
print(data)

### Now call the REST API externally using JWT technique