# Writing a module

Calculation modules are most often subclasses of the *progressivis.table.module.TableModule* class. A lower level class exists (*progressivis.core.Module*) but it is unsuitable for direct use.

As a general rule, to define a new module you must:

1. Create a subclass of TableModule
2. Define the input slots on this class via a class attribute called `inputs`
3. Define on this class, only if necessary, additional output slots via a class attribute called `outputs` , knowing that the default output named `"table"`  is already defined in TableModule and inherited by your class.
4. Define the `run_step()` method on this class. This method implements your algorithm.

**NB:** For the sake of brevity, the examples which follow are devoid of a certain number of validity checks, however essential in a "real life", reliable software.

In [None]:
from progressivis.table.module import TableModule, Table
from progressivis import SlotDescriptor

class MyModule(TableModule):
    inputs = [SlotDescriptor('first', type=Table, required=True), 
              SlotDescriptor('second', type=Table, required=True)]
    # defining the "outputs" attribute is not necessary when 
    # only one output is requested. Just use the already defined "table" output
    def run_step(self, run_number, step_size, quantum):
        """
        * run_number: identifies the current run step
        * step_size: number of rows to be processed in this step
        * quantum: assigned time for this step
        """ 
        pass # your algorithm 

## Writing a `run_step()` method 

Regardless of the algorithm implemented, we can identify eight parts in the method which will be more or less well separated:

1. Analysis of input slots
2. Special actions if the state of the slots requires it
3. Choice of the size of the data that will be consumed on the slots
4. Effective reading on slots
5. Processing itself
6. Creation of output objects (only once, before the first output write)
7. Writing output data
8. Return of the next module state and number of steps taken

**NB:** in the next examples focused lines are commented like that: # /!\

### Analyze input slots

Analysis is possible via several methods available on an input slot object.

Access to the entry slots can be done by name:

In [None]:
class MyModule(TableModule):
    inputs = [SlotDescriptor('first', type=Table, required=True), 
              SlotDescriptor('second', type=Table, required=True)]
    def run_step(self, run_number, step_size, quantum):
        slot = self.get_input_slot('first') # /!\
        # ...

We can also process slots in a loop:

In [None]:
class MyModule(TableModule):
    inputs = [SlotDescriptor('first', type=Table, required=True), 
              SlotDescriptor('second', type=Table, required=True)]
    def run_step(self, run_number, step_size, quantum):
        for slot_name, slot in self._input_slots.items(): # /!\
            if slot is None: # e.g. _params slot not provided
                continue
            # ...

Available methods:

* `slot.has_buffered()`: informs if data has been created / modified / deleted, without distinction since the last run
* `slot.created.any()`: data has been created since the last run
* `slot.updated.any()`: data has been modified since the last run
* `slot.deleted.any()`: data has been deleted since the last run
* `slot.created.length()`: number of rows created since the last run
* `slot.updated.length()`: number of rows modified since the last run
* `slot.deleted.length()`: number of rows deleted since the last run

All of these methods are idempotent.

### Special actions if the state of the slots requires it

After analysis, depending on the state of the inputs and the semantics of your module, several situations may arise:

* If one or more slots have not changed since the last execution (according to your processing needs) the execution stops and the module switches to "blocked" state (waiting for new data):

In [None]:
class MyModule(TableModule):
    inputs = [SlotDescriptor('first', type=Table, required=True), 
              SlotDescriptor('second', type=Table, required=True)]
    def run_step(self, run_number, step_size, quantum):
        slot = self.get_input_slot('first')
        if not slot.has_buffered():
            return self._return_run_step(self.state_blocked, steps_run=0) # /!\
        # ...

* Most of the time, the modification or deletion of data already processed invalidates the result already produced. When this happens, your `run_step()` method should:
     * repair the results already produced when possible
     * or reinitialize the concerned slot (and sometimes other slots involved in the operation) as well as the output and start the processing from scratch.

Repair, when possible, is closely related to the semantics of your algorithm so this is not covered here, but the reset operation is algorithm-agnostic:

In [None]:
class Max(TableModule):
    inputs = [SlotDescriptor('table', type=Table, required=True)]
    def run_step(self, run_number, step_size, quantum):
        slot = self.get_input_slot('table')
        if slot.updated.any() or slot.deleted.any():
            slot.reset()
            if self._table is not None:
                self._table.resize(0)
            slot.update(run_number)
        # ...

**NB:** The most common cases requiring a reset can be expressed in a simplified form based on decorators. For more details consult the [notebook](./RunStepDecorators.ipynb)

**NB:** sometimes resetting only the impacted slot is not sufficient:

In [None]:
class Hadamard(TableModule):
    inputs = [SlotDescriptor('x1', type=Table, required=True), 
              SlotDescriptor('x2', type=Table, required=True)]
    def run_step(self, run_number, step_size, quantum):
        x1 = self.get_input_slot('x1')
        x2 = self.get_input_slot('x2')
        if x1.updated.any() or x1.deleted.any() or x2.updated.any() or x2.deleted.any():
            x1.reset()
            x2.reset()
            if self._table is not None:
                self._table.resize(0)
            x1.update(run_number)        
            x2.update(run_number)
        # ... process element-wise x1 * x2

**NB:** Always call slot.update() after a slot.reset()

### Choice of the size of the data that will be consumed on the slots

When there is only one input slot, the data size is given by the `step_size` argument but when there are several slots, you have to choose the size of data that you are able to process in a step:

In [None]:
class Hadamard(TableModule):
    inputs = [SlotDescriptor('x1', type=Table, required=True), 
              SlotDescriptor('x2', type=Table, required=True)]
    def run_step(self, run_number, step_size, quantum):
        x1 = self.get_input_slot('x1')
        x2 = self.get_input_slot('x2')
        if x1.updated.any() or x1.deleted.any() or x2.updated.any() or x2.deleted.any():
            x1.reset()
            x2.reset()
            if self._table is not None:
                self._table.resize(0)
            x1.update(run_number)        
            x2.update(run_number)
        step_size = min(x1.created.length(), x2.created.length(), step_size) # /!\
        # ... process element-wise x1 * x2

### Effective reading on the slots

The key method in this step is the `next()` method called with the size estimated in the previous step. 
Calling `slot.created.next(step_size)` can get `n <=step_size` rows if `slot.created.length() <=step_size` but never more rows than asked:

In [None]:
class Max(TableModule):
    inputs = [SlotDescriptor('table', type=Table, required=True)]
    def run_step(self, run_number, step_size, quantum):
        slot = self.get_input_slot('table')
        if slot.updated.any() or slot.deleted.any():
            slot.reset()
            if self._table is not None:
                self._table.resize(0)
            slot.update(run_number)
        indices = slot.created.next(step_size) # /!\ 
        steps = indices_len(indices)
        if steps==0:
            return self._return_run_step(self.state_blocked, steps_run=0)
        # ...

And also:

In [None]:
class Hadamard(TableModule):
    inputs = [SlotDescriptor('x1', type=Table, required=True), 
              SlotDescriptor('x2', type=Table, required=True)]
    def run_step(self, run_number, step_size, quantum):
        x1 = self.get_input_slot('x1')
        x2 = self.get_input_slot('x2')
        if x1.updated.any() or x1.deleted.any() or x2.updated.any() or x2.deleted.any():
            x1.reset()
            x2.reset()
            if self._table is not None:
                self._table.resize(0)
            x1.update(run_number)        
            x2.update(run_number)
        step_size = min(x1.created.length(), x2.created.length(), step_size) 
        x1_indices = x1.created.next(step_size) # /!\ 
        x2_indices = x2.created.next(step_size) # /!\ 
        # ... process element-wise x1 * x2

**NB:** the `next()` method is NOT idempotent! One has to run it only after having checked that all the conditions to finalize the step are met.

### The treatment itself

This part is related to your algorithm. You can access the data of a slot via the `data()` method. The data associated with a slot are most of the time `Table` objects and sometimes` PsDict` objects. `Table` objects have a **Pandas-alike** interface (a subset, actually) while` PsDict` objects are Python dictionaries (`PsDict` inherits from` dict`).

As the `next()` method could return a slice (numpy-alike) when the indices are contiguous or the set of indices if they are not (actually a Roaring bitmap), two other functions are important for this step:

* `indices_len(indices)`: get the number of indices regardless of the nature of `indices` (slice or bitmap)
* `fix_loc(indices)` : get a valid Pandas-alike `loc` index regardless of the nature of `indices` (slice or bitmap)

For example:

In [None]:
class Max(TableModule):
    inputs = [SlotDescriptor('table', type=Table, required=True)]
    def run_step(self, run_number, step_size, quantum):
        slot = self.get_input_slot('table')
        if slot.updated.any() or slot.deleted.any():
            slot.reset()
            if self._table is not None:
                self._table.resize(0)
            slot.update(run_number)
        indices = slot.created.next(step_size) # /!\ 
        steps = indices_len(indices) # /!\
        if steps==0:
            return self._return_run_step(self.state_blocked, steps_run=0)
        op = data.loc[fix_loc(indices)].max(keepdims=False) # /!\
        # ...

### Creation of output objects

This creation takes place only once before the first write out. 
The following times, the created object will be updated, emptied at the time of resets but it must not be re-created in order not to mislead the change manager.
The default output slot is called `"table"` and is inherited from the `TableModule` superclass. It is associated with the `_table` attribute assigned to `None` initially.
The type could be `PsDict` or `Table`:


In [None]:
    def run_step(self, run_number, step_size, quantum):
        slot = self.get_input_slot('table')
        if slot.updated.any() or slot.deleted.any():
            slot.reset()
            if self._table is not None:
                self._table.resize(0)
            slot.update(run_number)
        indices = slot.created.next(step_size) 
        steps = indices_len(indices)
        if steps==0:
            return self._return_run_step(self.state_blocked, steps_run=0)
        data = slot.data()
        op = data.loc[fix_loc(indices)].max(keepdims=False)
        if self._table is None:
            self._table = PsDict(op) # /!\

### Writing output data

This step cannot be completely dissociated from the previous one (see the previous example). When the output object is a `PsDict` things are pretty simple:

In [None]:
    def run_step(self, run_number, step_size, quantum):
        slot = self.get_input_slot('table')
        if slot.updated.any() or slot.deleted.any():
            slot.reset()
            if self._table is not None:
                self._table.resize(0)
            slot.update(run_number)
        indices = slot.created.next(step_size) 
        steps = indices_len(indices)
        if steps==0:
            return self._return_run_step(self.state_blocked, steps_run=0)
        data = slot.data()
        op = data.loc[fix_loc(indices)].max(keepdims=False)
        if self._table is None:
            self._table = PsDict(op)
        else: # /!\
            for k, v in self._table.items():
                self._table[k] = np.maximum(op[k], v)

### Return of the module next state and number of steps performed

We saw previously that when the conditions are not met to produce a result, the `run_step()` method calls the `_return_run_step()` method. This same method is used to produce the return value after finishing the job:

In [None]:
    def run_step(self, run_number, step_size, quantum):
        slot = self.get_input_slot('table')
        if slot.updated.any() or slot.deleted.any():
            slot.reset()
            if self._table is not None:
                self._table.resize(0)
            slot.update(run_number)
        indices = slot.created.next(step_size) 
        steps = indices_len(indices)
        if steps==0:
            return self._return_run_step(self.state_blocked, steps_run=0)
        data = slot.data()
        op = data.loc[fix_loc(indices)].max(keepdims=False)
        if self._table is None:
            self._table = PsDict(op)
        else:
            for k, v in self._table.items():
                self._table[k] = np.maximum(op[k], v)
        return self._return_run_step(self.next_state(slot), steps_run=steps) # /!\

The case where the output is a table is illustrated by the following module:

In [None]:
class Hadamard(TableModule):
    inputs = [SlotDescriptor('x1', type=Table, required=True), 
              SlotDescriptor('x2', type=Table, required=True)]

    def reset(self):
        if self._table is not None:
            self._table.resize(0)
    
    def run_step(self, run_number, step_size, quantum):
        x1 = self.get_input_slot('x1')
        x2 = self.get_input_slot('x2')
        if x1.updated.any() or x1.deleted.any() or x2.updated.any() or x2.deleted.any():
            x1.reset()
            x2.reset()
            if self._table is not None:
                self._table.resize(0)
            x1.update(run_number)        
            x2.update(run_number)
        step_size = min(x1.created.length(), x2.created.length(), step_size)
        x1_indices = x1.created.next(step_size)
        x2_indices = x2.created.next(step_size) 
        res = {}
        data1 = x1.data().loc[fix_loc(x1_indices)]
        data2 = x2.data().loc[fix_loc(x2_indices)]
        assert data1.columns == data2.columns
        for col in  data1.columns:
            res[col] = np.multiply(data1[col],
                              data2[col])
        if self._table is None:
            self._table = Table(name='simple_hadamard', data=res, create=True)
        else:
            self._table.append(res)
        return self._return_run_step(self.next_state(x1), steps_run=step_size)

## Running a module

In order to run the previous module we will use the `RandomTable` module to provide inputs as well as the `Print` module. 
These modules are part of **Progressivis**.
Since the execution of **Progressivis** is asynchronous, the `start()` method of the scheduler is a coroutine and is executed with **`await`**.

**NB:** A recent version of **Jupyter** is required

In [None]:
from progressivis.core import Scheduler
from progressivis.core import aio
from progressivis import Print
from progressivis.stats import RandomTable
from progressivis.core.utils import indices_len, fix_loc
import numpy as np

s = Scheduler()
random1 = RandomTable(3, rows=100000, scheduler=s)
random2 = RandomTable(3, rows=100000, scheduler=s)
module = Hadamard(scheduler=s)
module.input.x1 = random1.output.table
module.input.x2 = random2.output.table        
pr=Print(scheduler=s)
pr.input.df = module.output.table
await s.start()
res1 = np.multiply(random1.table().to_array(),
                random2.table().to_array())
res2 = module.table().to_array()
print("Check:", np.allclose(res1, res2, equal_nan=True))