In [1]:
from minio import Minio
from minio.error import S3Error
import math
import requests
from requests.auth import HTTPBasicAuth
import json
import time

# Definition of necessary functions

def get_token(text):
    browser=['token":"','","file_stage_in']
    pos=[]
    for n in browser:
        k =text.find(n)
        if k!=-1:
            pos.append(k)
        else:
            break
    if k==-1:
        print('Error in connection')
        return None
    else:
        return text[pos[0]+8:pos[1]]

def get_cpuService(text):
    browser=['cpu":"','","total_memory']
    pos=[]
    for n in browser:
        k=text.find(n)
        if k!=-1:
            pos.append(k)
        else:
            break
    if k==-1:
        print('Error in connection')
        return None
    else:
        return 1000*float(text[pos[0]+6:pos[1]])

def get_memoryService(text):
    browser=['memory":"','Gi']
    pos=[]
    for n in browser:
        k=text.find(n)
        if k!=-1:
            pos.append(k)
        else:
            break
    if k==-1:
        print('Error in connection')
        return None
    else:
        return (float(text[pos[0]+9:pos[1]]))

def connect_to_minio(config):
    MinIO_url = config['url']
    MinIO_access_key = config['access_key']
    MinIO_secret_key = config['secret_key']
    #print(f"Connecting to MinIO at {url_minio} with access key {access_key}")
    return MinIO_url,MinIO_access_key,MinIO_secret_key 

def use_bucket(config):
    bucket_name = config['name']
    folder_prefix = config['folder_prefix']
    #print(f"Using bucket {bucket_name} with folder prefix {folder_prefix}")
    return bucket_name, folder_prefix

def setup_output(config):
    output_file = config['file']
    return output_file

def use_service(config):
    service_name = config['name']
    return service_name

def connect_to_oscar_cluster(config):
    token_cluster=''
    oscar_cluster= config['url']
    if 'username' in config_data.get('oscar_cluster', {}).get('auth_basic', {}):
        username = config['auth_basic']['username']
    if 'password' in config_data.get('oscar_cluster', {}).get('auth_basic', {}):
        password = config['auth_basic']['password']
    if username !="" and password != "":
        basic= True 
    else:
        if 'token' in config_data.get('oscar_cluster', {}).get('auth_token', {}):
            token_cluster = config['auth_token']['token']
            if token_cluster !='':
                basic=False
             
    return oscar_cluster,username,password,token_cluster,basic

In [2]:
# Read the JSON configuration file
with open('config-walton.json', 'r') as config_file:
    config_data = json.load(config_file)

# Take configuration values
MinIO_url,MinIO_access_key,MinIO_secret_key = connect_to_minio(config_data['MinIO'])
bucket_name, folder_prefix = use_bucket(config_data['bucket'])
output_file=setup_output(config_data['output'])
service_name=use_service(config_data['service'])
oscar_cluster, username, password,token_cluster, basic = connect_to_oscar_cluster(config_data['oscar_cluster'])

# Configure the MinIO client
client = Minio(
    MinIO_url,  # MinIO server
    access_key=MinIO_access_key,  
    secret_key=MinIO_secret_key,  
    secure=True  
)

output_path = folder_prefix + output_file

try:
    # List objects in the bucket
    objects = client.list_objects(bucket_name,  prefix=folder_prefix)
    
    # Count the number of objects
    object_list = []
    for obj in objects:
         if obj.object_name.endswith('.jpg'):
            #print(obj.object_name)
            object_list.append(obj.object_name)

    num_imag = len(object_list)
    print(num_imag)
  
    # Open the file in write mode
    with open(output_file, 'w') as file:
        for obj in object_list:
            #print(f"{obj}\n")
            file.write(f"{obj}\n")

    # Upload the text file to the bucket
    client.fput_object(
        bucket_name, 
        output_path,
        output_file,
        content_type="text/plain"
    )

    print(f"File {output_file} uploaded to {bucket_name}")

except S3Error as exc:
    print("Error occurred: ", exc)

60
File index.txt uploaded to fish-detector


In [3]:
service_info = "https://" + oscar_cluster + "/system/services/" + service_name

# GET request via basic authentication or token
if basic:
    response = requests.get(service_info, auth=HTTPBasicAuth(username, password),verify=True)
else:
    headers = {
    'Authorization': "Bearer " + token_cluster
    }
    response = requests.get(service_info, headers=headers,verify=True)

# Check the status of the response
if response.status_code == 200:
    resp = response.text
    # Calculate CPU, Memory and token of the service
    cpu_service = get_cpuService(resp)
    memory_service = get_memoryService(resp)
    print(cpu_service)
    print(memory_service)
    token_service = get_token(resp)
else:
    print(f"Request error: {response.status_code}")
    print("Error message:")
    print(response.text)

1000.0
3.0


In [4]:
# URL to which you want to make the request

cpu_Alloc=0
memory_Alloc=0
cpu_invoke=0
memory_invoke=0

url_status = "https://" + oscar_cluster + "/system/status"

# Make the GET request with basic authentication or token authentication
if basic:
    response = requests.get(url_status, auth=HTTPBasicAuth(username, password),verify=False)
else:
    headers = {
    'Authorization': "Bearer " + token_cluster
    }
    response = requests.get(url_status, headers=headers,verify=True)

# Check the status of the response
if response.status_code == 200:
    # Convert the response to JSON
    try:
        data = response.json()
        print(data)
        
        """
        # Verify that the response is an array of objects
        if isinstance(data, list):
            nodos = len(data)
            # Iterate over each object in the array
            if nodos > 1:
                # Iterate over each object in the array, except the front node
                for obj in data[1:]:
                    # Calculate the available CPU and memory (sum of each node)
                    cpu_Alloc += int(obj['cpuCapacity']) - int(obj['cpuUsage'])
                    memory_Alloc += int(obj['memoryCapacity']) - int(obj['memoryUsage'])
        else:
            print("The response is not a JSON array of objects.")
        """
        if isinstance(data, dict):
            nodos = len(data['detail'])
            data=data['detail']
             # Iterate over each object in the array
            if nodos >= 1:
                # Iterate over each object in the array, except the front node
                for obj in data:
                    # Calculate the available CPU and memory (sum of each node)
                    cpu_Alloc=(int(obj['cpuCapacity']))*0.8 - int(obj['cpuUsage'])
                    #print(int((cpu_Alloc/cpu_service)))
                    cpu_invoke += int((cpu_Alloc/cpu_service))
                    memory_Alloc=(int(obj['memoryCapacity'])*0.8) - int(obj['memoryUsage'])
                    
                    #print(int(memory_Alloc/(1000000000*memory_service)))
                          
                    memory_invoke += int((memory_Alloc/(1000000000*memory_service)))
                    
        else:
            print("The response is not a JSON array of objects.")
       
    
    except ValueError as e:
        print("Error converting the response to JSON:", e)
else:
    print(f"Request error: {response.status_code}")
    print("Error message:")
    print(response.text)
print(cpu_invoke)
print(memory_invoke)




{'numberNodes': 6, 'cpuFreeTotal': 35057, 'cpuMaxFree': 7773, 'memoryFreeTotal': 125177708544, 'memoryMaxFree': 18261676032, 'detail': [{'nodeName': 'vnode-1.localdomain', 'cpuCapacity': '8000', 'cpuUsage': '3256', 'cpuPercentage': '40.70', 'memoryCapacity': '20862906368', 'memoryUsage': '6019203072', 'memoryPercentage': '28.85'}, {'nodeName': 'vnode-2.localdomain', 'cpuCapacity': '8000', 'cpuUsage': '3226', 'cpuPercentage': '40.33', 'memoryCapacity': '20862910464', 'memoryUsage': '5931450368', 'memoryPercentage': '28.43'}, {'nodeName': 'vnode-3.localdomain', 'cpuCapacity': '8000', 'cpuUsage': '3432', 'cpuPercentage': '42.90', 'memoryCapacity': '20862914560', 'memoryUsage': '5358682112', 'memoryPercentage': '25.69'}, {'nodeName': 'vnode-4.localdomain', 'cpuCapacity': '8000', 'cpuUsage': '2554', 'cpuPercentage': '31.92', 'memoryCapacity': '20862910464', 'memoryUsage': '5281415168', 'memoryPercentage': '25.31'}, {'nodeName': 'vnode-5.localdomain', 'cpuCapacity': '8000', 'cpuUsage': '227'

In [5]:

"""
# Search for the CPU needed to run the service defined in its creation (FDL)
service_info = "https://" + oscar_cluster + "/system/services/" + service_name

# GET request via basic authentication or token
if basic:
    response = requests.get(service_info, auth=HTTPBasicAuth(username, password),verify=True)
else:
    response = requests.get(service_info, headers=headers,verify=True)

# Check the status of the response
if response.status_code == 200:
    resp = response.text
    # Calculate CPU, Memory and token of the service
    cpu_service = get_cpuService(resp)
    memory_service = get_memoryService(resp)  
    token_service = get_token(resp)
else:
    print(f"Request error: {response.status_code}")
    print("Error message:")
    print(response.text)

# Calculate the number of service invocations according to the available CPU in the cluster and the service's CPU
cpu_invoke = math.floor(cpu_Alloc / cpu_service)
memory_invoke = math.floor(memory_Alloc / memory_service)
"""
print(cpu_invoke)
print(memory_invoke)

# Min value between invocations by CPU and by memory
cant_invoke = min(cpu_invoke, memory_invoke)
print(cant_invoke)
print(num_imag)
# Calculate the number of images per invocation
resto = (num_imag) % cant_invoke
img_invoke = int(num_imag / cant_invoke)
print(f"Images per invocation: {img_invoke}")


23
20
20
60
Images per invocation: 3


In [41]:
"""
headers_invoke = {    
    'Authorization': "Bearer " + token_cluster,
    'Content-Type': 'application/json',
}
"""

url_invoke = "https://" + oscar_cluster + "/job/" + service_name

if basic:
    headers = {    
    'Authorization': "Bearer " + token_service,
    'Content-Type': 'application/json',
}
   # response = requests.get(url_invoke, headers=headers,verify=False)
else:
    headers = {
    'Authorization': "Bearer " + token_cluster,
    'Content-Type': 'application/json',
    }
   # response = requests.get(url_invoke, headers=headers,verify=True)


end=0


# Range of images to process (start-end)
for i in range(cant_invoke):
    # Range of images
    start = end+1
    end = end+ img_invoke
    if i < resto:
        end = end+1
        
    
    data = {
        "start": start,
        "end": end
    }
   
    try:
        response = requests.post(url_invoke, headers=headers, json=data, verify=True)
        print(response.text)
        if response.status_code == 200 or response.status_code== 201:
            print("Services OK")
        else:
            print(response.status_code)
    except Exception as ex:
        print("Error running service: ", ex)
        print(response.text)
        
    time.sleep(1)    
    print(f"Start value: {start}")
    print(f"End value: {end}")
    print(f"Invocation {i + 1} to the service")


Services OK
Start value: 1
End value: 3
Invocation 1 to the service

Services OK
Start value: 4
End value: 6
Invocation 2 to the service

Services OK
Start value: 7
End value: 9
Invocation 3 to the service

Services OK
Start value: 10
End value: 12
Invocation 4 to the service

Services OK
Start value: 13
End value: 15
Invocation 5 to the service

Services OK
Start value: 16
End value: 18
Invocation 6 to the service

Services OK
Start value: 19
End value: 21
Invocation 7 to the service

Services OK
Start value: 22
End value: 24
Invocation 8 to the service

Services OK
Start value: 25
End value: 27
Invocation 9 to the service

Services OK
Start value: 28
End value: 30
Invocation 10 to the service

Services OK
Start value: 31
End value: 33
Invocation 11 to the service

Services OK
Start value: 34
End value: 36
Invocation 12 to the service

Services OK
Start value: 37
End value: 39
Invocation 13 to the service

Services OK
Start value: 40
End value: 42
Invocation 14 to the service

Servic