## Design
1. tasks it needs to run gets appended to a task queue (which it queries from s3 accordingly using the directory of the file)
2. it processes tasks in batches of M. After each batch is complete, it sends "OK" to another json. It also updates its task queue from s3 (deletes these 20, and takes in more if more were added onto it)
3. If it fails for any task, it sents "NOT OK" and the tasks on its taskqueue to s3 and script stops. Then batch_manager will kill that gpu and redistribute the tasks to an alive gpu.
4. Once a gpu's task queue is empty, it dies.
    - if the last gpu fails early, it'll tell batch manager that there were some tasks left over. then you just need to rerun those tasks when you wake up
    - so the system won't be perfect, but it's risk free, and cost efficient, and it does make it a whole lot less annoying.
    - also later on you can increase risk tolerance (say it only dies early if it fails some percentage of jobs, and failed jobs get added to some dead letter queue. But I'm not adding that in right now.
   

In [1]:
!pip install psutil
!pip install boto3
!pip install transformers torch huggingface_hub

Defaulting to user installation because normal site-packages is not writeable
[33mDEPRECATION: flatbuffers 1.12.1-git20200711.33e2d80-dfsg1-0.6 has a non-standard version number. pip 24.0 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of flatbuffers or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m
Defaulting to user installation because normal site-packages is not writeable
Collecting boto3
  Downloading boto3-1.34.156-py3-none-any.whl.metadata (6.6 kB)
Collecting botocore<1.35.0,>=1.34.156 (from boto3)
  Downloading botocore-1.34.156-py3-none-any.whl.metadat

In [2]:
import json
import os
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

In [3]:
os.environ['AWS_ACCESS_KEY_ID'] = 'key1'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'key2'
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'
!aws sts get-caller-identity


An error occurred (InvalidClientTokenId) when calling the GetCallerIdentity operation: The security token included in the request is invalid.


## Inject Model for Inference

- this is just for llm ops, not multimodal ops, so just a simple inference function should be ok
- Super modular (just inject your inference code here)

In [4]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

torch.set_default_device("cuda")

model = AutoModelForCausalLM.from_pretrained("microsoft/phi-2", torch_dtype="auto", trust_remote_code=True)
tokenizer = AutoTokenizer.from_pretrained("microsoft/phi-2", trust_remote_code=True)


config.json:   0%|          | 0.00/735 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/35.7k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/5.00G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/564M [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/7.34k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/798k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.11M [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/1.08k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/99.0 [00:00<?, ?B/s]

In [5]:
def run_model(input_text):
    inputs = tokenizer(input_text, return_tensors="pt")
    inputs = inputs.to("cuda" if torch.cuda.is_available() else "cpu")
    with torch.no_grad():
        outputs = model.generate(
            inputs.input_ids,
            attention_mask=inputs.attention_mask,
            max_new_tokens=200,
            pad_token_id=tokenizer.eos_token_id
        )
    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return generated_text

## The Worker 

-  cluster_num and bucket_name need to be inputted. Everything else is good as-is.

In [6]:
def download_file_to_dict(bucket_name, object_name):
    try:
        response = s3.get_object(Bucket=bucket_name, Key=object_name)
        content = response['Body'].read().decode('utf-8')
        data_dict = json.loads(content)
        # print(f'File {object_name} downloaded from bucket {bucket_name} and converted to dictionary.')
        return data_dict
    except (NoCredentialsError, PartialCredentialsError):
        print('Credentials not available.')
    except Exception as e:
        print(f'An error occurred: {e}')
        return None

def upload_dict_to_s3(bucket_name, object_name, dictionary):
    json_data = json.dumps(dictionary)
    s3.put_object(Bucket=bucket_name, Key=object_name, Body=json_data)

def create_bucket(bucket_name, region=None):
    try:
        if region:
            s3.create_bucket(
                Bucket=bucket_name,
                # CreateBucketConfiguration={'LocationConstraint': region}
            )
        else:
            s3.create_bucket(Bucket=bucket_name)
        print(f'Bucket {bucket_name} created successfully.')
    except s3.exceptions.BucketAlreadyExists:
        print(f'Bucket {bucket_name} already exists.')
    except s3.exceptions.BucketAlreadyOwnedByYou:
        print(f'Bucket {bucket_name} is already owned by you.')
    except (NoCredentialsError, PartialCredentialsError):
        print('Credentials not available.')

region = 'us-east-1'
cluster_num = 3 #hardcode which batch_num this gpu is associated with.
bucket_name = f"rapper-vkg-{cluster_num}"
process_batch_size = 10

In [27]:
s3 = boto3.client('s3')

In [28]:
taskqueue_dict = download_file_to_dict(bucket_name, f"task_queue_{cluster_num}.json")
taskqueue = taskqueue_dict["task_queue"]

In [29]:
create_bucket(f"{bucket_name}-results", region)

Bucket rapper-vkg-3-results created successfully.


In [30]:
#just one prompt. p ez to refactor to multiprompt.
prompt = "Is this politically correct? If so, explain why?"

In [None]:
while(taskqueue):
    try:
        for i in range(min(process_batch_size, len(taskqueue))):
            file = taskqueue[i]
            print(file)
            tf = download_file_to_dict(bucket_name, file)
            context = tf["text"]
            full_prompt = f"""
            CONTEXT: 
            {context}

            PROMPT: 
            {prompt}
            """
            tf.update(
                {
                    "model_inference": run_model(full_prompt)
                }
            )
            upload_dict_to_s3(f"{bucket_name}-results", f"{file}_results.json", tf)
            print(f"{file} has been completed")
        taskqueue = taskqueue[process_batch_size:]
        temp_taskqueue = download_file_to_dict(bucket_name, f"task_queue_{cluster_num}.json")["task_queue"]
        taskqueue = list(set(taskqueue)|set(temp_taskqueue[process_batch_size:]))
        upload_dict_to_s3(bucket_name, f"task_queue_{cluster_num}.json", {"task_queue": taskqueue})
        print(f"A batch of size {process_batch_size} has been completed")
        print("Length of taskqueue: ", len(taskqueue))
    except Exception as e:
        print(e)
        print("WORKER FAILED")
        taskqueue_dict["task_queue"] = taskqueue
        taskqueue_dict["status"] = "NOT OK"
        upload_dict_to_s3(bucket_name, f"task_queue_{cluster_num}.json", taskqueue_dict)
        print("updated task queue")
        break

print("exited process")
if taskqueue_dict.get("status", None) != "NOT OK":
    print("updating status to complete")
    taskqueue_dict["status"] = "COMPLETE"
    upload_dict_to_s3(bucket_name, f"task_queue_{cluster_num}.json", taskqueue_dict)
else:
    pass

data_3.json
data_3.json has been completed
data_10.json
data_10.json has been completed
data_17.json
data_17.json has been completed
data_24.json
data_24.json has been completed
data_31.json
data_31.json has been completed
data_38.json
data_38.json has been completed
data_45.json
data_45.json has been completed
data_52.json
data_52.json has been completed
data_59.json
data_59.json has been completed
data_66.json
data_66.json has been completed
A batch of size 10 has been completed
Length of taskqueue:  58
data_465.json
data_465.json has been completed
data_150.json
data_150.json has been completed
data_381.json
data_381.json has been completed
data_472.json
data_472.json has been completed
data_325.json
data_325.json has been completed
data_353.json
data_353.json has been completed
data_241.json
data_241.json has been completed
data_122.json
data_122.json has been completed
data_283.json
data_283.json has been completed
data_262.json
data_262.json has been completed
A batch of size 10 