In [None]:
# default_exp workflows

# workflows
> Execute workflows on Server

In [None]:
#hide
from yx_motor.tests.utils.unit_test_helpers import (
    pickle_object,
    unpickle_object,
    workflow_test_pickles
)

In [None]:
#hide
# just removing the insecure warning for now
# TODO: Secure requests and remove this code
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

In [None]:
# export
from typing import List
import requests

from yx_motor.api import API
from yx_motor.files import Files
from yx_motor.jobs import Jobs


class Workflows:
    "Class for workflow-related API actions"

    def __init__(self, api: API):
        self.api = api
        self.base_endpoint = "workflows/"
        self.files = Files(self.api)
        self.jobs = Jobs(self.api)

    @staticmethod
    def build_schedule_payload(schedule_name: str, asset_id: str):
        return {"schedule": {"name": schedule_name, "assetId": asset_id,}}

    def get_questions(self, asset_id: str):
        """Return question payload for an asset of type analytic app."""
        response = self.api.post(url=f"workflows/{asset_id}/questions")
        return response

    def get_vfs_inputs(self, asset_id: str):
        """Return the list of vfs inputs for a given workflow asset."""
        response = self.api.get(url=f"workflows/{asset_id}/vfsInputs")
        return response

    def update_vfs_inputs(self, asset_id: str, tool_id: str):
        pass

    def get_workflow_dependencies(self, asset_id: str):
        """Return the list of asset dependencies for a given workflow asset"""
        response = self.api.get(url=f"workflows/{asset_id}/dependencies")
        return response

    def update_workflow_dependencies(self, asset_id: str):
        pass

    def add_workflow_dependency(self, asset_id: str):
        pass

    def add_workflow_dependency_connection(self, asset_id: str, tool_id: str):
        pass

    def run_workflow(
        self, asset_id: str, schedule_name: str, vfs_inputs: List[str] = None
    ):
        """Schedule a workflow to be executed instantaneously.
           Returns a schedule object."""
        response = self.api.post(
            url="workflows/run",
            json=self.build_schedule_payload(schedule_name, asset_id),
        )
        return response

    def download_workflow_results(self, schedule_id: str, download_path: str):
        # TODO: Input validation
        workflow_job = self.jobs.get_job(params={"scheduleId": schedule_id})
        # TODO: Need to check the status of the job, for completion
        # and/or error state before trying to get outputs.

        # TODO: Add validation, error handling here
        output_asset_id = self.get_wf_job_output(workflow_job)[0]

        response = self.files.download_file(
            file_uuid=output_asset_id, download_path=download_path
        )

        return response

    @staticmethod
    def get_wf_job_output(wf_job: object):
        jobs_object = wf_job.json()["jobs"]
        job = jobs_object[0]
        outputs = job["outputs"]
        output_asset_id_list = [file["assetId"] for file in outputs]
        return output_asset_id_list

In [None]:
from nbdev.showdoc import *
show_doc(Workflows.get_vfs_inputs)

<h4 id="Workflows.get_vfs_inputs" class="doc_header"><code>Workflows.get_vfs_inputs</code><a href="__main__.py#L28" class="source_link" style="float:right">[source]</a></h4>

> <code>Workflows.get_vfs_inputs</code>(**`asset_id`**:`str`)

Return the list of vfs inputs for a given workflow asset.

**Arguments**:

- asset_id: Unique VFS identifier for the workflow you want to execute.

In [None]:
#hide

#Unit test code for get_vfs_inputs
from unittest.mock import Mock

#TODO: write code to use the pickle for a mock response from api
mock_response_get_vfs_inputs = unpickle_object(
    workflow_test_pickles.workflow_get_vfs_inputs_response_pickle_path
)

api = Mock()
api.get = Mock()
api.get.return_value = Mock()
api.get().json.return_value = mock_response_get_vfs_inputs

In [None]:
workflows = Workflows(api)
response = workflows.get_vfs_inputs(asset_id='test_id')
response.json()

[{'connectionHash': 'ecde0973a04241b152e0d9b8f120c45fa265ff9130f47759dc321d9e8522298e',
  'externalPath': 'C:/projects/covid19/county_location_data.csv',
  'isInput': True,
  'vfsInputMembers': {'effective': [{'permissions': ['CREATE',
      'DOWNLOAD',
      'EDIT',
      'VIEW'],
     'partyType': 0,
     'partyId': 'a1100000-0000-4000-9000-000000000000',
     'createdBy': None,
     'fromEntryPath': '/Workspaces/Public',
     'fromEntryFolderType': 'GENERIC',
     'isOwner': False,
     'identity': {'groupId': 'a1100000-0000-4000-9000-000000000000',
      'userId': 'a1100000-0000-4000-9000-000000000000',
      'id': 'a1100000-0000-4000-9000-000000000000',
      'avatar': None,
      'email': None,
      'name': 'All Users',
      'firstName': 'All',
      'lastName': 'Users'}},
    {'permissions': ['DOWNLOAD', 'VIEW'],
     'partyType': 0,
     'partyId': 'a1100000-0000-4000-9000-000000000000',
     'createdBy': None,
     'fromEntryPath': '/Workspaces',
     'fromEntryFolderType': 

In [None]:
from nbdev.showdoc import *
show_doc(Workflows.get_workflow_dependencies)

<h4 id="Workflows.get_workflow_dependencies" class="doc_header"><code>Workflows.get_workflow_dependencies</code><a href="__main__.py#L36" class="source_link" style="float:right">[source]</a></h4>

> <code>Workflows.get_workflow_dependencies</code>(**`asset_id`**:`str`)

Return the list of asset dependencies for a given workflow asset

**Arguments**:

- asset_id: Unique VFS identifier for the workflow you want to execute.

In [None]:
#hide

# Unit test code for get_workflow_dependencies
# NOTE:  get_vfs_inputs SHOULD be returning the vfs input 
# along with others, but endpoint isn't complete until API is out of beta.
# for now, mocking with get_vfs_inputs

api = Mock()
api.get = Mock()
api.get.return_value = Mock()
api.get().json.return_value = mock_response_get_vfs_inputs

In [None]:
workflows = Workflows(api)
response = workflows.get_workflow_dependencies(asset_id='test_id')
response.json()

[{'connectionHash': 'ecde0973a04241b152e0d9b8f120c45fa265ff9130f47759dc321d9e8522298e',
  'externalPath': 'C:/projects/covid19/county_location_data.csv',
  'isInput': True,
  'vfsInputMembers': {'effective': [{'permissions': ['CREATE',
      'DOWNLOAD',
      'EDIT',
      'VIEW'],
     'partyType': 0,
     'partyId': 'a1100000-0000-4000-9000-000000000000',
     'createdBy': None,
     'fromEntryPath': '/Workspaces/Public',
     'fromEntryFolderType': 'GENERIC',
     'isOwner': False,
     'identity': {'groupId': 'a1100000-0000-4000-9000-000000000000',
      'userId': 'a1100000-0000-4000-9000-000000000000',
      'id': 'a1100000-0000-4000-9000-000000000000',
      'avatar': None,
      'email': None,
      'name': 'All Users',
      'firstName': 'All',
      'lastName': 'Users'}},
    {'permissions': ['DOWNLOAD', 'VIEW'],
     'partyType': 0,
     'partyId': 'a1100000-0000-4000-9000-000000000000',
     'createdBy': None,
     'fromEntryPath': '/Workspaces',
     'fromEntryFolderType': 

In [None]:
from nbdev.showdoc import *
show_doc(Workflows.run_workflow)

<h4 id="Workflows.run_workflow" class="doc_header"><code>Workflows.run_workflow</code><a href="__main__.py#L50" class="source_link" style="float:right">[source]</a></h4>

> <code>Workflows.run_workflow</code>(**`asset_id`**:`str`, **`schedule_name`**:`str`, **`vfs_inputs`**:`List`\[`str`\]=*`None`*)

Schedule a workflow to be executed instantaneously.
Returns a schedule object.

**Arguments**:

- asset_id: Unique VFS identifier for the workflow you want to execute.
- schedule_name: Optional: Name of the schedule that will be created when workflow execution is triggered.
- vfs_inputs: Optional: List of vfs asset ids, if desired

In [None]:
#hide
from unittest.mock import Mock

run_workflow_response_mock = unpickle_object(workflow_test_pickles.workflow_run_response_pickle)

api = Mock()
api.post = Mock()
api.post.return_value = Mock()
api.post().json.return_value = run_workflow_response_mock

In [None]:
workflows = Workflows(api)
workflows.run_workflow(asset_id="test_id",
                       schedule_name="jp_client_test").json()

{'scheduleId': 'f35045d4-5298-43d2-918b-3281159e05dd',
 'name': 'schedule_name_test',
 'userId': 'd57f3054-8eeb-4993-b021-cc7c6764d27c',
 'assetId': 'cd68ff45-fc21-4a32-b2e9-8d5b85881b7e',
 'assetVersion': None,
 'runAsId': None,
 'toolConnections': None,
 'status': 'active',
 'startDate': None,
 'endDate': None,
 'lastRun': None,
 'nextRun': None,
 'frequencyInterval': 'manual',
 'frequencyRule': None,
 'type': 'immediate',
 'isDisabled': False,
 'isDeleted': False,
 'comment': None,
 'siteId': '2d05e824-7421-4c7a-8e4e-a22d984953c2',
 'parameters': {'runMode': 0},
 'analyticAppAnswers': None,
 'inputFiles': None,
 'creationDate': '2020-07-27T21:09:25.035Z',
 'lastUpdate': '2020-07-27T21:09:25.035Z',
 'outputLocation': None,
 'timezone': 'UTC',
 'utcOffset': 0,
 'priority': 50}

In [None]:
from nbdev.showdoc import *
show_doc(Workflows.download_workflow_results)

<h4 id="Workflows.download_workflow_results" class="doc_header"><code>Workflows.download_workflow_results</code><a href="__main__.py#L61" class="source_link" style="float:right">[source]</a></h4>

> <code>Workflows.download_workflow_results</code>(**`schedule_id`**:`str`, **`download_path`**:`str`)



**Arguments**:

- schedule_id: Unique identifier for the completed schedule that orchestrated the workflow execution
- download_path: Local path to download the scheduled workflow results file to

In [None]:
#hide

#Unit Test Mocks for download_workflow_results

workflow_job_response_mock = unpickle_object(
    workflow_test_pickles.workflow_job_response_pickle_path
)
workflow_download_files_response_mock = unpickle_object(
    workflow_test_pickles.workflow_download_files_response_pickle_path
)

In [None]:
#hide

#Unit test code

workflows = Mock()
workflows.download_workflow_results = Mock()
workflows.download_workflow_results.return_value = workflow_download_files_response_mock


In [None]:
workflows.download_workflow_results(schedule_id='test_schedule_id', 
                                    download_path='wf_out_test.csv')

<Response [200]>