# Tasks and Task Dispatchers
In this tutorial we will describe how to define your own Tasks and TaskDispatchers and use them in pipelines.

## Tasks
Tasks are basic units of pipelines. You create (or import) them and combine into pipelines, and then executor will take them and execute in a given order.

If Pipeline represents experimenter's intent ("I need to ping 8.8.8.8 and record network traffic"), then Tasks are low-level implementation of this intent (e.g., start recording traffic, ping google, stop recording traffic, upload results to some server). 

Let's start from basics.

In [1]:
from netunicorn.base import Task

Task is an abstract Python class with predefined methods that you need to implement. Each time you create a Task, you will need to inherit this class and implement at least one method (most often - two) to make it work. 

Let's take a look at these both methods.

In [2]:
class SimpleTask(Task):
    def __init__(self, some_parameter: int):
        self.some_parameter = some_parameter
        super().__init__()
    
    def run(self):
        return self.some_parameter

To understand these both methods, we need to know the task _lifetime_:
1. You instantiate the task on **client side** and provide needed parameters for the task to be executed later.
2. Then, you combine tasks into pipelines, netunicorn-client serializes them and sends them to the system which distributes them to nodes.
3. A node deserializes the task and starts its execution on the **edge node side**.

To correctly execute code on both side (**client** and **edge node**), task has two methods: `__init__` and `run`.

### `__init__`  method
This is a constructor of the class, where you can pass different parameters and later use it during the task execution. This method executes on the **client side** (during task instantiation). If you raise any Exception during execution of this method, you'll receive it instantly during task instantiation.

Most often you'll use this method to save and check different parameters that you'll use later during task execution. All parameters are serialized using _cloudpickle_ library together with the task code. Please, don't forget that you need to call `super().__init__()` somewhere in your constructor.
Here're some examples:

In [3]:
class FileUploadTask(Task):
    """Uploads some file to some endpoint"""
    def __init__(self, endpoint: str, filename: str):
        self.endpoint = endpoint
        self.filename = filename
        super().__init__()
    
    def run(self):
        """Implementation omitted"""
        pass

    
class PingTask(Task):
    """Ping address count times"""
    
    def __init__(self, address: str, count: int):
        if count < 1:
            raise ValueError("Parameter count should be positive")
        self.address = address
        self.count = count
        super().__init__()
    
    def run(self):
        """Implementation omitted"""
        pass


from datetime import datetime
class SleepAndFailTask(Task):
    """Sleep until some date and then raise an exception"""
    def __init__(self, sleep_until: datetime, exception: Exception):
        if sleep_until < datetime.now():
            raise ValueError("Parameter sleep_until should be in future")
        self.sleep_until = sleep_until
        self.exception = exception
        super().__init__()
    
    def run(self):
        """Implementation omitted"""
        pass

### `run(self)` method
This method would be called by executor on the **edge node side** to execute the task. This is where you put all your actual code that implements what you want to do - ping some server, upload files, etc.

This method can use any instance attribute of the current instance (that you saved in `__init__` method). You can write any valid Python code here, including calls to file system, OS, other libraries, programs, etc. 

Here're some examples of run methods implementations:

In [4]:
import time

class SleepTask(Task):
    def __init__(self, seconds: int):
        assert seconds > 0
        self.seconds = seconds
        super().__init__()
    
    def run(self):
        time.sleep(self.seconds)
        return "Done!"


class OSPleaseRunMySuperCommandTask(Task):
    def __init__(self, super_command: str):
        self.super_command = super_command
        super().__init__()
    
    def run(self):
        print("please")
        os.system(self.super_command)

There're some rules regarding `run(self)` method:
1. You cannot change signature of `run(self)` method (like adding arguments).
2. Code inside `run(self)` method will have access ONLY to the current task instance and any imported Python library. You cannot define a global variable on your host and expect code inside the method to use this global variable.
3. This method can return values. These values would be saved and returned back to you (on your **client side**) after the whole pipeline would be finished. The next rules apply to the returned value from this method:
    1. We use object `Result` from the library `returns` to represent successful or failed result of the task execution. See examples below.
    2. If method `run(self)` returns `Result` object, the object would be returned to you as is.
    3. If method `run(self)` returns ANY other object, the task would be considered successfull and you'll receive `Success(your_value)` as a result.
    4. If method `run(self)` raises ANY exception, the task would be considered failed and you'll receive `Failure(str(exception))` as a result.

See some simple examples how you can use the `Result` class from the `returns` library.

In [5]:
from returns.result import Result, Success, Failure
from returns.pipeline import is_successful

# the same as:
from netunicorn.base import Result, Success, Failure, is_successful

successful_object: Success[str] = Success("some value")
another_successful_object: Success[dict] = Success({'my_data': 'data'})
failed_object: Failure[str] = Failure('meh')

for obj in [successful_object, another_successful_object, failed_object]:
    print(f"{obj} is successful: {is_successful(obj)}")
    if is_successful(obj):
        print(f"{obj} stores some value inside: {obj.unwrap()}")
    else:
        print(f"{obj} stores some error inside: {obj.failure()}")


<Success: some value> is successful: True
<Success: some value> stores some value inside: some value
<Success: {'my_data': 'data'}> is successful: True
<Success: {'my_data': 'data'}> stores some value inside: {'my_data': 'data'}
<Failure: meh> is successful: False
<Failure: meh> stores some error inside: meh


Typical result of execution looks like this:

In [6]:
from netunicorn.base import Success, Failure

results = [
    Success("Started PCAP capture"),
    [
        Success("Ping to 8.8.8.8 finished"),
        Success("Pentagon is hacked"),
    ],
    Success("Finished PCAP capture"),
    Failure("Failed to upload files to the server due to incorrect Moon phase. Error: ...")
]

print(results)

[<Success: Started PCAP capture>, [<Success: Ping to 8.8.8.8 finished>, <Success: Pentagon is hacked>], <Success: Finished PCAP capture>, <Failure: Failed to upload files to the server due to incorrect Moon phase. Error: ...>]


### requirements
Sometimes you need some libraries to be installed or prerequisites to be met before starting task execution. This could be achieved using **class-level** `requirements` attribute, that contains list of commands to be executed on OS before starting of your pipeline. For example, if your task requires to install 'numpy' library before execution, it could be achieven like this:

In [7]:
class MyNumpyTask(Task):
    requirements = ['pip install numpy']
    
    def __init__(self):
        super().__init__()
    
    def run(self):
        import numpy as np
        return np.zeroes((3, 3)) 

The system will collect **all** requirements from **all** tasks in the pipeline and execute them **before** starting the pipeline. This is called _deployment_ phase (and executed when you call `client.deploy()`).

### Important!
Please, note that `requirements` are ALL executed before the pipeline starts. That means, that if in the pipeline there're tasks with contradicting requirements, we do not guarantee that it will work correctly.

For example, pipeline consisting of the next tasks would most likely fail, because requirements of the second task would remove file used by the furst task:

In [8]:
class WriteToFileTask(Task):
    requirements = [
        'mkdir /tmp/my_folder',
        'touch /tmp/my_folder/somefile'
    ]
    ...

class WriteToAnotherFileTask(Task):
    requirements = [
        'rm -rf /tmp/*',
        'mkdir /tmp/data',
    ]
    ...

### Important!
Please, also notice that if you would add several task instances of the same class, **all** their requirements would be combined and executed. Most often it's not a problem (if you just install something), but sometimes side effects could be important:

In [9]:
from netunicorn.base import Pipeline, Experiment
from netunicorn.base.nodes import Node

class WriteToTmp(Task):
    requirements = [
        'mkdir /tmp/mydata'
    ]
    
    def run(self):
        return

# creating dummy pipeline, node, and experiment to show the consequences
pipeline = Pipeline().then(WriteToTmp()).then(WriteToTmp()).then(WriteToTmp())
dummy_node = Node(name="dummy", properties={})
experiment = Experiment().append(dummy_node, pipeline)

# let's print what the system would execute on deployment stage in the node
# hint: second command would fail with `mkdir: cannot create directory ‘/tmp/mydata’: File exists`
print(experiment.deployment_map[0].environment_definition.commands)

['mkdir /tmp/mydata', 'mkdir /tmp/mydata', 'mkdir /tmp/mydata']


What if you want to execute some command only once?

If you know Python well: remove class attribute and add instance attribute `requirements` to a single task instance.  
If you know Python not so well yet: you can create Task class with empty requirements, and call `add_requirement()` method for a single instance of this class. See the example:

In [10]:
class WriteToTmp(Task):  
    def run(self):
        return

# creating dummy pipeline, node, and experiment to show the consequences
pipeline = (
    Pipeline()
    .then(WriteToTmp().add_requirement('mkdir /tmp/mydata'))
    .then(WriteToTmp())
    .then(WriteToTmp())
)
dummy_node = Node(name="dummy", properties={})
experiment = Experiment().append(dummy_node, pipeline)

# let's print what the system would execute on deployment stage in the node
# It will work correctly during deployment
print(experiment.deployment_map[0].environment_definition.commands)

['mkdir /tmp/mydata']


### Imports and Serialization
Very often you will find yourself importing different tasks or functions from other modules and libraries. Any of imported libraries should be installed (e.g., via requirements) in the environment.

One specific common case is when you write tasks in a different module than your experiment. In this situation, you either need to have this module installed in the environment (if it's a public module, like netunicorn-library), or you can mark your module to be registered by value by cloudpickle. This is done by using the cloudpickle's function `register_pickle_by_value`:
```python
import cloudpickle
import my_module

cloudpickle.register_pickle_by_value(my_module)
```

More information about this could be found in the [cloudpickle documentation](https://github.com/cloudpipe/cloudpickle#overriding-pickles-serialization-mechanism-for-importable-constructs).

### Outro
Now you know everything about creating your own Tasks. You can always look at the existing implementations in netunicorn-library, or consult with class documentation if you forgot something:

In [11]:
help(Task)

Help on class Task in module netunicorn.base.task:

class Task(abc.ABC)
 |  Task() -> 'None'
 |  
 |  This is a base class for all tasks. All new task classes should inherit from this class.
 |  The task instance should encapsulate all the logic and data needed to execute the task.
 |  Task entrypoint is the run() method.
 |  Task class can have requirements - commands to be executed to change environment to support this task.
 |  These requirements would be executed with OS shell during environment setup.
 |  Each task is to be implemented for a specific architecture, platform, or combination (like Linux + arm64).
 |  TaskDispatcher can be used for selecting a specific task for the given architecture, platform, or combination.
 |  
 |  Task always returns a Result object.
 |  - If the task's `run` method returns Result object by itself, you'll receive this Result object
 |  - If the task's `run` method returns any other object, you'll receive a Success with returned_value encapsulated

## Task Dispatchers

TaskDispatcher is an additional mechanism for smart dispatching of your tasks according to nodes parameters.  
You need it in the next cases:
- You want to use different Task implementation for different nodes
- You want to initialize the task with different parameters for different nodes

TaskDispatcher **always** works only on **client side** and just selects a proper implementation of the task or parameters for initialization. You just put TaskDispacther instance instead of Task instance in a pipeline.  Let's consider both abovementioned examples.

Your custom Dispatcher should inherit from TaskDispatcher and implement two methods: `__init__` and `dispatch(self, node: Node)`

In [12]:
from netunicorn.base.task import Task, TaskDispatcher


class MyCoolTask(TaskDispatcher):
    def __init__(self, param1: str, param2: list):
        # init saves parameters that can be used in dispatch method
        self.param1 = param1
        self.param2 = param2
        super().__init__()

    def dispatch(self, node: Node) -> Task:
        # dispatch method receives Node and should return Task instance for the given node
        # if some_condition:
        #    return SomeTaskImplementation(...)
        pass

Let's look at both abovementioned cases of using TaskDispatchers.

### Different Task Implementations
Most often you will use it when your nodes have different OS and you need totally different implementations for them due to execution differences. Let's consider the next example where we use different commands depending on Node OS family:

In [13]:
from netunicorn.base.task import Task, TaskDispatcher
from netunicorn.base.nodes import Node
from netunicorn.base import Pipeline, Experiment
import cloudpickle

class RemoveFileLinuxImplementation(Task):
    def __init__(self, filename: str):
        self.filename = filename
        super().__init__()
    
    def run(self):
        os.system(f'rm {self.filename}')
    
    
class RemoveFileWindowsImplementation(Task):
    def __init__(self, filename: str):
        self.filename = filename
        super().__init__()
    
    def run(self):
        os.system(f'Remove-Item {self.filename}')

# yes yes, 'rm' is also alias for Remove-Item in Powershell, so in this particular case it would work
# but this is just an example :)

class RemoveFile(TaskDispatcher):
    def __init__(self, filename: str):
        self.filename = filename
    
    def dispatch(self, node: Node) -> Task:
        if node.properties.get("os_family", "").lower() == "linux":
            return RemoveFileLinuxImplementation(self.filename)
        else:
            return RemoveFileWindowsImplementation(self.filename)

# let's look at this
pipeline = Pipeline().then(RemoveFile('somefile'))

win_node = Node(name='dummy windows node', properties={'os_family': 'Windows'})
lin_node = Node(name='dummy linux node', properties={'os_family': 'Linux'})

experiment = (
    Experiment()
    .append(win_node, pipeline)
    .append(lin_node, pipeline)
)

# pipelines are already serialzied and ready to be executed, so let's manually deserialize them to check
# you usually don't need to do it, it's just for demonstration
win_node_pipeline = cloudpickle.loads(experiment.deployment_map[0].pipeline)
lin_node_pipeline = cloudpickle.loads(experiment.deployment_map[1].pipeline)

# you can see that though pipeline was the same, tasks are different
print(win_node_pipeline)
print(lin_node_pipeline)

Pipeline(5a2ab1ac-a4c0-49fd-9d75-2421c5749aa8): [[<__main__.RemoveFileWindowsImplementation object at 0x7fe64c2ef520>]]
Pipeline(5a2ab1ac-a4c0-49fd-9d75-2421c5749aa8): [[<__main__.RemoveFileLinuxImplementation object at 0x7fe64c2ef5b0>]]


### Different parameters for different nodes
Sometimes you want to initialize a task with parameters that depend on a certain node property. Let's consider an example where each node presents its IP address and you want to use this information during task instantiation.

In [14]:
import os
from netunicorn.base.task import Task, TaskDispatcher
from netunicorn.base.nodes import Node
from netunicorn.base import Pipeline, Experiment
import cloudpickle

class SayIPImplementation(Task):
    def __init__(self, ip: str):
        self.ip = ip
        super().__init__()
    
    def run(self):
        print(f'My IP is: {self.ip}')


class SayIP(TaskDispatcher):
    def dispatch(self, node: Node) -> Task:
        ip = node.properties['ipv4']
        return SayIPImplementation(ip)


pipeline = Pipeline().then(SayIP())

node1 = Node(name='Dummy1', properties={'ipv4': '192.168.0.1'})
node2 = Node(name='Dummy2', properties={'ipv4': '192.168.0.2'})

experiment = (
    Experiment()
    .append(node1, pipeline)
    .append(node2, pipeline)
)

# pipelines are already serialzied and ready to be executed, so let's manually deserialize them to check
# you usually don't need to do it, it's just for demonstration
node1_pipeline = cloudpickle.loads(experiment.deployment_map[0].pipeline)
node2_pipeline = cloudpickle.loads(experiment.deployment_map[1].pipeline)

# you can see that though pipeline was the same, tasks are initialized differently
node1_pipeline.tasks[0][0].run()
node2_pipeline.tasks[0][0].run()

My IP is: 192.168.0.1
My IP is: 192.168.0.2


TaskDispatcher is just an advanced mechanism for easier task sharing and implementation for different platforms/conditions, that makes experiments a bit harder for *developers* of tasks, but a bit easier for *users* of tasks, who just want to import them and everything will magically work. As usual, you can always consult with class documentation if you forgot something.

In [15]:
help(TaskDispatcher)

Help on class TaskDispatcher in module netunicorn.base.task:

class TaskDispatcher(abc.ABC)
 |  This class is a wrapper for several tasks that are designed to implement the same functionality
 |  but depend on node attributes. Most often you either want to use a specific
 |  implementation for a specific architecture (e.g., different Tasks for Windows and Linux),
 |  or instantiate a task with some specific parameters for a specific node (e.g., node-specific IP address).
 |  You should implement your own TaskDispatcher class and override the dispatch method.
 |  
 |  Dispatching is done by calling the dispatch method that you should implement.
 |  
 |  Method resolution order:
 |      TaskDispatcher
 |      abc.ABC
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  dispatch(self, node: 'Node') -> 'Task'
 |      This method takes a node and should return and instance of the task that is designed to be executed on this node.
 |      The instance could depend on the node info