Skip to content

Commit

Permalink
Merge pull request #21 from keyko-io/feature/compute-logs-status
Browse files Browse the repository at this point in the history
Added support for the gateway compute logs and compute status endpoints
  • Loading branch information
r-marques committed Sep 20, 2020
2 parents e42fc15 + 44e5404 commit efffe88
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 1 deletion.
27 changes: 27 additions & 0 deletions nevermined_sdk_py/assets/asset_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,33 @@ def download(service_index, ddo, owner_account, destination,

return asset_folder

@staticmethod
def compute_logs(service_agreement_id, execution_id, account, gateway, config):
"""
Get the logs of a compute workflow.
:param service_agreement_id: The id of the service agreement that ordered the compute job, str
:param execution_id: The id of the compute job, str
:param account: Account instance that ordered the execution of the compute job
:param gateway: Gateway instance
:param config: Sdk configuration instance
:return: list, compute logs
"""
return gateway.compute_logs(service_agreement_id, execution_id, account, config).json()

@staticmethod
def compute_status(service_agreement_id, execution_id, account, gateway, config):
"""
Get the status of a compute workflow.
:param service_agreement_id: The id of the service agreement that ordered the compute job, str
:param execution_id: The id of the compute job, str
:param account: Account instance that ordered the execution of the compute job
:param gateway: Gateway instance
:param config: Sdk configuration instance
:return: dict, compute logs
"""
return gateway.compute_status(service_agreement_id, execution_id, account, config).json()

def create_asset_folder(did, service_index, destination):
"""
Expand Down
86 changes: 86 additions & 0 deletions nevermined_sdk_py/gateway/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,62 @@ def access_service(did, service_agreement_id, service_endpoint, account, destina
Gateway.write_file(response, destination_folder, file_name or f'file-{index}')
return response

@staticmethod
def compute_logs(service_agreement_id, execution_id, account, config):
"""Get the logs of a compute workflow.
Args:
service_agreement_id (str): The id of the service agreement.
execution_id (str): The id of the workflow execution.
account (:py:class:`contracts_lib_py.account.Account`): The account that ordered
the execution of the workflow.
config (:py:class:`nevermined_sdk_py.config.Config`): nevermined-sdk config instance.
Returns:
:py:class:`requests.Response`: HTTP server response
"""
signature = Keeper.get_instance().sign_hash(
add_ethereum_prefix_and_hash_msg(execution_id),
account)
headers = {
'X-Consumer-Address': account.address,
'X-Signature': signature,
}
consume_url = Gateway._create_compute_logs_url(config, service_agreement_id, execution_id)
response = Gateway._http_client.get(consume_url, headers=headers)
if response.status_code != 200:
raise ValueError(response.text)
return response

@staticmethod
def compute_status(service_agreement_id, execution_id, account, config):
"""Get the status of a compute workflow.
Args:
service_agreement_id (str): The id of the service agreement.
execution_id (str): The id of the workflow execution.
account (:py:class:`contracts_lib_py.account.Account`): The account that ordered
the execution of the workflow.
config (:py:class:`nevermined_sdk_py.config.Config`): nevermined-sdk config instance.
Returns:
:py:class:`requests.Response`: HTTP server response
"""
signature = Keeper.get_instance().sign_hash(
add_ethereum_prefix_and_hash_msg(execution_id),
account)
headers = {
'X-Consumer-Address': account.address,
'X-Signature': signature,
}
consume_url = Gateway._create_compute_status_url(config, service_agreement_id, execution_id)
response = Gateway._http_client.get(consume_url, headers=headers)
if response.status_code != 200:
raise ValueError(response.text)
return response

@staticmethod
def download(did, account, destination_folder, index, config):
"""Allows an asset data file if the account is the owner or provider of the asset
Expand Down Expand Up @@ -262,6 +318,26 @@ def get_download_endpoint(config):
"""
return f'{Gateway.get_gateway_url(config)}/services/download'

@staticmethod
def get_compute_logs_endpoint(config):
"""
Return he endpoint to get the logs of a compute workflow.
:param config:Config
:return: Url, str
"""
return f'{Gateway.get_gateway_url(config)}/services/compute/logs'

@staticmethod
def get_compute_status_endpoint(config):
"""
Return he endpoint to get the status of a compute workflow.
:param config:Config
:return: Url, str
"""
return f'{Gateway.get_gateway_url(config)}/services/compute/status'

@staticmethod
def get_consume_endpoint(config):
"""
Expand Down Expand Up @@ -357,6 +433,16 @@ def _create_download_url(config, index=None):
"""Return the url to download an asset."""
return f'{Gateway.get_download_endpoint(config)}/{index}'

@staticmethod
def _create_compute_logs_url(config, service_agreement_id, execution_id):
"""Return the url to get the execution logs of a compute workflow."""
return f'{Gateway.get_compute_logs_endpoint(config)}/{service_agreement_id}/{execution_id}'

@staticmethod
def _create_compute_status_url(config, service_agreement_id, execution_id):
"""Return the url to get the status of a compute workflow."""
return f'{Gateway.get_compute_status_endpoint(config)}/{service_agreement_id}/{execution_id}'

@staticmethod
def _create_consume_url(service_endpoint, service_agreement_id, account, _file=None,
signature=None, index=None):
Expand Down
34 changes: 34 additions & 0 deletions nevermined_sdk_py/nevermined/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,40 @@ def download(self, did, service_index, owner_account, destination, index=None):
index
)

def compute_logs(self, service_agreement_id, execution_id, account):
"""
Get the logs of a compute workflow.
:param service_agreement_id: The id of the service agreement that ordered the compute job, str
:param execution_id: The id of the compute job, str
:param account: Account instance that ordered the execution of the compute job
:return: list, compute logs
"""
return self._asset_consumer.compute_logs(
service_agreement_id,
execution_id,
account,
GatewayProvider.get_gateway(),
self._config,
)

def compute_status(self, service_agreement_id, execution_id, account):
"""
Get the status of a compute workflow.
:param service_agreement_id: The id of the service agreement that ordered the compute job, str
:param execution_id: The id of the compute job, str
:param account: Account instance that ordered the execution of the compute job
:return: str compute status
"""
return self._asset_consumer.compute_status(
service_agreement_id,
execution_id,
account,
GatewayProvider.get_gateway(),
self._config,
)

def validate(self, metadata):
"""
Validate that the metadata is ok to be stored in Metadata.
Expand Down
2 changes: 1 addition & 1 deletion tests/resources/helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def get_ddo_sample():

def get_algorithm_ddo():
return json.loads(urlopen(
"https://raw.githubusercontent.com/keyko-io/nevermined-docs/master/docs/architecture/specs/examples/metadata/v0.1/ddo-example-algorithm.json").read().decode(
"https://raw.githubusercontent.com/keyko-io/nevermined-docs/263678ffe35839e335a7e96f7ba2268fcd860762/docs/architecture/specs/examples/metadata/v0.1/ddo-example-algorithm.json").read().decode(
'utf-8'))


Expand Down

0 comments on commit efffe88

Please sign in to comment.