In [48]:
import time
import shutil
import json
import os
import ntpath
import tarfile
from libcloud.storage.types import Provider
from libcloud.storage.providers import get_driver

In [68]:
def upload_resources(model_yaml_path,container,resources_key,
                     storage_key,storage_secret,storage_provider,additional_filepaths=None):
    """ Creates resources archive expected by model converter, uploads to storage provider.
    Args:
        model_yaml_path (str): Path to model.yaml file to be included.
        container (str): Storage provider container name (e.g. Bucket name in S3).
        resources_key (str): Desired key for resource archive once uploaded to storage provider.
        storage_key (str): Storage provider access key.
        storage_secret (str): Storage provider secret key.
        storage_provider (str): Storage provider name (must be one of "S3", "AZURE_BLOBS", or "GOOGLE_STORAGE").
        additional_filepaths (list): List of filepaths of additional files to be included.
    """
    # Init libcloud driver
    if storage_provider == "S3":
        cls = get_driver(Provider.S3)
    elif storage_provider == "AZURE_BLOBS":
        cls = get_driver(Provider.AZURE_BLOBS)
    elif storage_provider == "GOOGLE_STORAGE":
        cls = get_driver(Provider.GOOGLE_STORAGE)
    else:
        raise ValueError('Only "S3", "AZURE_BLOBS", and "GOOGLE_STORAGE" are supported storage providers.')
        
    driver = cls(storage_key, storage_secret)
    container = driver.get_container(container_name=container)
    
    # TODO: Probably set these outside of this helper function
    RESOURCES_TAR_NAME = "resources.tar.gz"
    MODEL_YAML_NAME = "model.yaml"
    
    # Create temp dir
    tmp_dir_path = os.path.join(os.getcwd(),".tmp_"+str(time.time()))
    os.mkdir(tmp_dir_path)
        
    # Move the local resources that you have prepared for your model into an archive
    resources_tar_path = os.path.join(tmp_dir_path,RESOURCES_TAR_NAME)
    tar = tarfile.open(resources_tar_path, "w:gz")
    tar.add(model_yaml_path,arcname=MODEL_YAML_NAME)
    for filepath in additional_filepaths:
        tar.add(filepath,arcname=ntpath.split(filepath)[1])
    tar.close()

    # This method blocks until all the parts have been uploaded.
    extra = {'content_type': 'application/octet-stream'}
    
    # Upload archive to storage provider
    with open(resources_tar_path, 'rb') as iterator:
        obj = driver.upload_object_via_stream(iterator=iterator,
                                              container=container,
                                              object_name=resources_key,
                                              extra=extra)
        
    # Remove temp dir
    shutil.rmtree(tmp_dir_path)

In [93]:
def upload_mlflow_model(mlflow_model_dir,container,model_key,
                     storage_key,storage_secret,storage_provider):
    """ Creates resources archive expected by model converter, uploads to storage provider.
    Args:
        mlflow_model_dir (str): Path to saved MLFlow model directory (e.g. using mlflow.sklearn.save_model())
        container (str): Storage provider container name (e.g. Bucket name in S3).
        resources_key (str): Desired key for model archive once uploaded to storage provider.
        storage_key (str): Storage provider access key.
        storage_secret (str): Storage provider secret key.
        storage_provider (str): Storage provider name (must be one of "S3", "AZURE_BLOBS", or "GOOGLE_STORAGE").
    """
    # Init libcloud driver
    if storage_provider == "S3":
        cls = get_driver(Provider.S3)
    elif storage_provider == "AZURE_BLOBS":
        cls = get_driver(Provider.AZURE_BLOBS)
    elif storage_provider == "GOOGLE_STORAGE":
        cls = get_driver(Provider.GOOGLE_STORAGE)
    else:
        raise ValueError('Only "S3", "AZURE_BLOBS", and "GOOGLE_STORAGE" are supported storage providers.')
        
    driver = cls(storage_key, storage_secret)
    container = driver.get_container(container_name=container)
    
    # TODO: Probably set this outside of this helper function
    MODEL_TAR_NAME = "weights.tar.gz"
    
    # Create temp dir
    tmp_dir_path = os.path.join(os.getcwd(),".tmp_"+str(time.time()))
    os.mkdir(tmp_dir_path)
        
    # Move the local mlflow model artifacts that were saved out by MLFlow into an archive
    model_tar_path = os.path.join(tmp_dir_path,MODEL_TAR_NAME)
    tar = tarfile.open(model_tar_path, "w:gz")
    mlflow_model_filenames = os.listdir(mlflow_model_dir)
    for filename in mlflow_model_filenames:
        full_path = os.path.join(mlflow_model_dir,filename)
        tar.add(full_path,arcname=filename)
    tar.close()

    # This method blocks until all the parts have been uploaded.
    extra = {'content_type': 'application/octet-stream'}
    
    # Upload archive to storage provider
    with open(model_tar_path, 'rb') as iterator:
        obj = driver.upload_object_via_stream(iterator=iterator,
                                              container=container,
                                              object_name=model_key,
                                              extra=extra)
        
    # Remove temp dir
    shutil.rmtree(tmp_dir_path)

In [94]:
# set general params
BUCKET_NAME = "sagemaker-testing-ds"
STORAGE_PROVIDER = "S3"
storage_key = "ACCESS_KEY_HERE"
storage_secret = "SECRET_KEY_HERE"

In [95]:
# set resources-specific params
RESOURCES_KEY = "helper-testing/resources.tar.gz"
model_yaml_path = "/path/to/model.yaml"
additional_filepaths = ["/path/to/labels.json"]

In [96]:
# upload resources archive to S3
upload_resources(model_yaml_path,BUCKET_NAME,RESOURCES_KEY,
                                storage_key,storage_secret,STORAGE_PROVIDER,additional_filepaths)

In [97]:
# set mlflow-specific params
MODEL_KEY = "helper-testing/weights.tar.gz"
mlflow_dir = "/path/to/mlflow_saved_model_dir/"

In [98]:
# upload mlflow model archive to S3
upload_mlflow_model(mlflow_dir,BUCKET_NAME,MODEL_KEY,
                                storage_key,storage_secret,STORAGE_PROVIDER)