In [None]:
!pip install -U pip
!pip install boto3 huggingface_hub

In [None]:
# import required libraries
import os, yaml, sys, threading
from pathlib import Path
try:
    import boto3
    from boto3.s3.transfer import TransferConfig
    from botocore.exceptions import ClientError
    import huggingface_hub as hf
except Exception as e:
    print(f"Exception during library import: {e}")
    exit()

In [None]:
# dictionary class that holds parameters
# load values from a yaml file
class Parameters(object):
    def __init__(self, data: dict):
        if type(data) != dict:
            raise TypeError(f"Parameters: expected 'dict', got {type(data)}.")
        else:
            self.data = data

        for k in self.data.keys():
            if type(self.data.get(k)) != dict:
                self.__setattr__(k, self.data.get(k))
            else:
                self.__setattr__(k, Parameters(self.data.get(k)))

# shamelessly stolen from aws docs :D
class ProgressPercentage(object):
    def __init__(self, filename):
        self._filename = filename
        self._size = float(os.path.getsize(filename))
        self._seen_so_far = 0
        self._lock = threading.Lock()

    def __call__(self, bytes_amount):
        # To simplify, assume this is hooked up to a single filename
        with self._lock:
            self._seen_so_far += bytes_amount
            percentage = (self._seen_so_far / self._size) * 100
            sys.stdout.write(
                "\r%s  %s / %s  (%.2f%%)" % (
                    self._filename, self._seen_so_far, self._size,
                    percentage))
            sys.stdout.flush()

In [None]:
# load parameters file and read values into a dictionary class
try:
    with open("parameters.yaml") as parms:
        config_parms = yaml.safe_load(parms)
    creds = Parameters(config_parms)
except yaml.YAMLError as e:
    print(f"Error loading YAML file: {e}")
    exit()
except Exception as e:
    print(f"Caught exception: {e}")
    exit()

In [None]:
# connect to MinIO and prepare buckets
print(f"Accessing S3 endpoint {creds.params.url} with ACCESS_KEY {creds.params.accessKey}...")

# instantiate connection
minio_api = boto3.client("s3", endpoint_url=creds.params.url, aws_access_key_id=creds.params.accessKey, aws_secret_access_key=creds.params.secretKey)

# Create the models bucket
available_buckets = [buckets["Name"] for buckets in minio_api.list_buckets()["Buckets"]]
for bckname in creds.s3.bucket_list:
    print(f"-> Creating bucket {bckname}...")
    if bckname not in available_buckets:
        try:
            minio_api.create_bucket(Bucket=bckname)
        except Exception as e:
            print(f"Failure during bucket creation due to this error: {e}")
    else:
        print(f"--> Bucket ({bckname}) Already Exists. Skipping...")


In [None]:
# Download model checkpoint from HuggingFace repositories
os.environ["HF_HOME"] = creds.huggingface.hfHomePath
remote_model_objects = {}
mistral_models_path = "/".join((creds.huggingface.modelsPath, creds.huggingface.modelName))
os.makedirs(mistral_models_path, exist_ok=True)

print(f"Downloading model checkpoint: {creds.huggingface.modelName}")
for name in creds.huggingface.filenames:
    model_path = hf.snapshot_download(repo_id=creds.huggingface.modelName, 
                                        allow_patterns=creds.huggingface.filenames, 
                                        revision="main", 
                                        token=creds.huggingface.apiToken,
                                        local_dir=mistral_models_path)
    print(f"Downloaded model checkpoint {model_path}")
    for n in creds.huggingface.filenames:
        remote_model_objects["/".join((model_path, n))] = n

In [None]:
# checks whether a file exists in a remote bucket
def check_exists(s3api, bucket, filename):
    rsp = s3api.list_objects_v2(Bucket=bucket, Prefix=filename)
    try:
        contents = rsp.get("Contents")
        files = [ obj.get("Key") for obj in contents ]
        if filename in files:
            return True
        else:
            return False
    except Exception as e:
        return False

In [None]:
# Set the desired multipart threshold value (5GB)
GB = 1024 ** 3
transfer_config = TransferConfig(multipart_threshold = 5*GB, use_threads=False)

try:
    for k in remote_model_objects.keys():
        if not check_exists(minio_api, creds.huggingface.modelBucket, remote_model_objects[k]):
            print(f"Uploading {remote_model_objects[k]} to MinIO bucket {creds.huggingface.modelBucket}")
            minio_api.upload_file(k, creds.huggingface.modelBucket,
                                    remote_model_objects[k],
                                    Callback=ProgressPercentage(k),
                                    Config=transfer_config)
            print("---")
        else:
            print(f"File {k} already exists in {creds.huggingface.modelBucket}")
except ClientError as e:
    print(f"S3 Exception: {e.response['Error']['Code']}, trace: {e}")   
except Exception as e:
    print(f"Caught exception: {e}")
    
print("Upload Complete.")

In [None]:
# Upload training dataset (data for vectorization)
data_path = Path("training_data")
data_types = ["**/*.pdf", "**/*.txt"]

for ftype in data_types:
    for file in list(data_path.glob(ftype)):
        try:
            if not check_exists(minio_api, creds.training_data.trainingDataBucket, os.path.basename(file)):
                print(f"Uploading {file} to MinIO bucket {creds.training_data.trainingDataBucket}")
                minio_api.upload_file(file,
                                      creds.training_data.trainingDataBucket,
                                      os.path.basename(file),
                                      Callback=ProgressPercentage(file),
                                      Config=transfer_config)
                print("---")
            else:
                print(f"File {file} already exists in {creds.training_data.trainingDataBucket}")
        except Exception as e:
            print(f"Caught exception: {e}")

print(f"Upload Complete")