In [92]:
import json
from minio import Minio
from oscar_python.client import Client
import oscar_python._utils as utils
import tarfile
import requests
import time
from IPython.display import Javascript

In [None]:
endpoint = "https://oscar-grnet.intertwin.fedcloud.eu"
user = ""
password = ""
service = "hydroform"
service_config = "oscar_services/hydroform_oscar_svc.yaml"
output = "output"

# https://aai.egi.eu/token
print("Paste your EGI token here:")


def window_open(url):
    time.sleep(2)
    display(Javascript('window.open("{url}");'.format(url=url)))


window_open("https://aai.egi.eu/token")

token = input()

In [None]:
def check_oscar_connection():
    # Check the service or create it
    print("Checking OSCAR connection status")
    if user and password:
        options_basic_auth = {'cluster_id': 'cluster-id',
                              'endpoint': endpoint,
                              'user': user,
                              'password': password,
                              'ssl': 'True'}
        print("Using credentials user/password")
    elif token:
        options_basic_auth = {'cluster_id': 'cluster-id',
                              'endpoint': endpoint,
                              'oidc_token': token,
                              'ssl': 'True'}
        print("Using credentials token")
    else:
        print("Introduce the credentials user/password or token")
        exit(2)

    client = Client(options=options_basic_auth)
    try:
        client.get_cluster_info()
    except Exception as err:
        print(err)
        print("OSCAR cluster not Found")
        exit(1)
    return client


client = check_oscar_connection()

In [None]:
def check_service(client, service, service_config):
    print("Checking OSCAR service status")
    try:
        service_info = client.get_service(service)
        service_data = json.loads(service_info.text)
        minio_info = service_data["storage_providers"]["minio"]["default"]
        input_info = service_data["input"][0]
        output_info = service_data["output"][0]
        if service_info.status_code == 200:
            print("OSCAR Service " + service + " already exists")
            return minio_info, input_info, output_info
    except Exception:
        print("OSCAR Service " + service + " not found. Creating it...")
        try:
            creation = client.create_service(service_config)
            print(creation)
        except Exception as err:
            print(err)
            print("OSCAR Service " + service + " not created")
            exit(1)
    try:
        service_info = client.get_service(service)
        minio_info = json.loads(service_info.text)["storage_providers"]["minio"]["default"]
        input_info = json.loads(service_info.text)["input"][0]
        output_info = json.loads(service_info.text)["output"][0]
        print("OSCAR Service " + service + " created")
        return minio_info, input_info, output_info
    except Exception as err:
        print(err)
        print("OSCAR Service " + service + " not created")
        exit(1)


minio_info, input_info, output_info = check_service(client, service, service_config)

In [None]:
def run_service(client, service, token, output):
    print(f'Running {service} service')
    response = None
    try:
        data = {
            "Records": [
                {
                    "requestParameters": {
                        "principalId": "uid",
                        "sourceIPAddress": "ip"
                    },
                }
            ]
        }
        json_data = json.dumps(data).encode('utf-8')
        if token:
            headers = utils.get_headers_with_token(token)
            response = requests.request("post", endpoint + "/job/" + service, headers=headers, verify=client.ssl, data=json_data, timeout=1500)
        elif user and password:
            response = requests.request("post", endpoint + "/job/" + service, auth=(user, password), verify=client.ssl, data=json_data, timeout=1500)
        else:
            raise ValueError("Either token or user/password must be provided")
    except Exception as err:
        print("Failed with: ", err)
    return response


def connect_minio(minio_info):
    # Create client with access and secret key.
    print("Creating connection with MinIO")
    client = Minio(minio_info["endpoint"].split("//")[1],
                   minio_info["access_key"],
                   minio_info["secret_key"])
    return client


def wait_output_and_download(client, output_info, output):
    # Wait the output
    print("Waiting the output")
    with client.listen_bucket_notification(
        output_info["path"].split("/")[0],
        prefix='/'.join(output_info["path"].split("/")[1:]),
        events=["s3:ObjectCreated:*", "s3:ObjectRemoved:*"],
    ) as events:
        for event in events:
            outputfile = event["Records"][0]["s3"]["object"]["key"]
            print(event["Records"][0]["s3"]["object"]["key"])
            break
    # Download the file
    print("Downloading the file")
    client.fget_object(output_info["path"].split("/")[0], 
                       outputfile,
                       output + "/" + outputfile.split("/")[-1])
    return output + "/" + outputfile.split("/")[-1]


response = run_service(client, service, token, output)
output_file = wait_output_and_download(connect_minio(minio_info), output_info, output)

In [None]:
def extract(output_file):
    print(f"Decompressing output: {output_file}")
    with tarfile.open(output_file, 'r') as tar:
        for member in tar.getmembers():
            tar.extract(member, path=output)


extract(output_file)