# Parallel processing with Pastastore

This notebook shows parallel processing capabilities of `PastaStore`.


<div class="alert alert-warning">

<strong>Note</strong> 

Parallel processing is platform dependent and may not
always work. The current implementation works well for Linux users, though this
will likely change with Python 3.14 and higher. For Windows users, parallel
solving does not work when called directly from Jupyter Notebooks or IPython.
To use parallel solving on Windows, the following code should be used in a
Python file. 

</div>

```python
from multiprocessing import freeze_support

if __name__ == "__main__":
    freeze_support()
    pstore.apply("models", some_func, parallel=True)
```

In [34]:
import pastas as ps

import pastastore as pst
from pastastore.datasets import example_pastastore

ps.set_log_level("ERROR")  # silence Pastas logger for this notebook
pst.get_color_logger("INFO", "pastastore")
pst.show_versions()

Pastastore version : 1.12.0

Python version     : 3.13.11
Pandas version     : 2.3.3
Matplotlib version : 3.10.8
Pastas version     : 1.12.0
PyYAML version     : 6.0.3



## Example pastastore

Load some example data, create models and solve them to showcase parallel processing.

In [35]:
# get the example pastastore
conn = pst.PasConnector("my_connector", "./temp")
# conn = pst.ArcticDBConnector("my_connector", "lmdb://./temp")
pstore = example_pastastore(conn)
pstore.create_models_bulk();

[32mPasConnector: library 'oseries' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/oseries'[0m
[32mPasConnector: library 'stresses' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/stresses'[0m
[32mPasConnector: library 'models' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/models'[0m
[32mPasConnector: library 'oseries_models' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/oseries_models'[0m
[32mPasConnector: library 'stresses_models' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/stresses_models'[0m


Bulk creation models:   0%|          | 0/5 [00:00<?, ?it/s]

## Solving models

The `PastaStore.solve_models()` method supports parallel processing.

In [36]:
pstore.solve_models(parallel=True)

Solving models (parallel):   0%|          | 0/5 [00:00<?, ?it/s]

## Parallel processing using `.apply()`

Define some function that takes a name as input and returns some result. In this case,
return the $R^2$ value for each model.

In [37]:
def rsq(model_name: str) -> float:
    """Compute the R-squared value of a Pastas model."""
    ml = pstore.get_models(model_name)
    return ml.stats.rsq()

We can apply this function to all models in the pastastore using `pstore.apply()`. 
By default this function is run sequentially. 

In [38]:
pstore.apply("models", rsq, progressbar=True)

Computing rsq:   0%|          | 0/5 [00:00<?, ?it/s]

head_nb5    0.438129
oseries2    0.931883
head_mw     0.159318
oseries1    0.904487
oseries3    0.030468
dtype: float64

In order to run this function in parallel, set `parallel=True` in the keyword arguments.

In [39]:
pstore.apply("models", rsq, progressbar=True, parallel=True)

Computing rsq (parallel):   0%|          | 0/5 [00:00<?, ?it/s]

head_nb5    0.438129
oseries2    0.931883
head_mw     0.159318
oseries1    0.904487
oseries3    0.030468
dtype: float64

## Get model statistics

The function `pstore.get_statistics` also supports parallel processing.

In [40]:
pstore.get_statistics(["rsq", "mae"])

Unnamed: 0,rsq,mae
head_nb5,0.438129,0.318361
oseries2,0.931883,0.087067
head_mw,0.159318,0.631517
oseries1,0.904487,0.091329
oseries3,0.030468,0.106254


In [41]:
pstore.get_statistics(["rsq", "mae"], parallel=True)

Unnamed: 0_level_0,rsq,mae
_get_statistics,Unnamed: 1_level_1,Unnamed: 2_level_1
head_nb5,0.438129,0.318361
oseries2,0.931883,0.087067
head_mw,0.159318,0.631517
oseries1,0.904487,0.091329
oseries3,0.030468,0.106254


## Compute prediction intervals

Let's try using a more complex function and passing that to apply to use
parallel processing. In this case we want to compute the prediction interval,
and pass along the $\alpha$ value via the keyword arguments.

In [42]:
def prediction_interval(model_name, **kwargs):
    """Compute the prediction interval for a Pastas model."""
    ml = pstore.get_models(model_name)
    return ml.solver.prediction_interval(**kwargs)

In [43]:
pstore.apply("models", prediction_interval, kwargs={"alpha": 0.05})

Computing prediction_interval:   0%|          | 0/5 [00:00<?, ?it/s]

Unnamed: 0_level_0,head_nb5,head_nb5,oseries2,oseries2,head_mw,head_mw,oseries1,oseries1,oseries3,oseries3
Unnamed: 0_level_1,0.025,0.975,0.025,0.975,0.025,0.975,0.025,0.975,0.025,0.975
1960-04-29,,,,,6.300204,9.406311,,,,
1960-04-30,,,,,6.340113,9.291056,,,,
1960-05-01,,,,,6.197963,9.459711,,,,
1960-05-02,,,,,6.256496,9.436107,,,,
1960-05-03,,,,,6.161201,9.336737,,,,
...,...,...,...,...,...,...,...,...,...,...
2020-01-17,7.961349,9.682071,,,,,,,,
2020-01-18,7.975718,9.576222,,,,,,,,
2020-01-19,7.914376,9.647849,,,,,,,,
2020-01-20,7.914663,9.710748,,,,,,,,


In [44]:
pstore.apply("models", prediction_interval, kwargs={"alpha": 0.05}, parallel=True)

Computing prediction_interval (parallel):   0%|          | 0/5 [00:00<?, ?it/s]

Unnamed: 0_level_0,head_nb5,head_nb5,oseries2,oseries2,head_mw,head_mw,oseries1,oseries1,oseries3,oseries3
Unnamed: 0_level_1,0.025,0.975,0.025,0.975,0.025,0.975,0.025,0.975,0.025,0.975
1960-04-29,,,,,6.318763,9.625481,,,,
1960-04-30,,,,,6.157354,9.441837,,,,
1960-05-01,,,,,6.178571,9.421986,,,,
1960-05-02,,,,,6.243335,9.478925,,,,
1960-05-03,,,,,6.237774,9.313179,,,,
...,...,...,...,...,...,...,...,...,...,...
2020-01-17,7.860393,9.630678,,,,,,,,
2020-01-18,7.874968,9.671061,,,,,,,,
2020-01-19,7.913059,9.650184,,,,,,,,
2020-01-20,7.956231,9.667919,,,,,,,,


## Get signatures

The function `pstore.get_signatures` does not explicitly support parallel processing but can be used in combination with `pstore.apply`

In [45]:
signatures = [
    "cv_period_mean",
    "cv_date_min",
    "cv_date_max",
    "cv_fall_rate",
    "cv_rise_rate",
]

In [46]:
pstore.get_signatures(signatures=signatures)

Unnamed: 0,head_nb5,oseries2,head_mw,oseries1,oseries3
cv_period_mean,0.061879,0.015199,0.145062,0.013066,0.029168
cv_date_min,0.246021,0.128636,0.254627,0.145884,1.394852
cv_date_max,1.262425,0.722945,1.083929,0.300328,0.444442
cv_fall_rate,-1.13645,-0.722718,-1.4302,-0.744797,-1.032837
cv_rise_rate,1.25945,0.836678,1.097257,0.862981,0.931181


In [47]:
pstore.apply(
    "oseries", pstore.get_signatures, kwargs={"signatures": signatures}, parallel=True
)

Computing get_signatures (parallel):   0%|          | 0/5 [00:00<?, ?it/s]

get_signatures,head_nb5,oseries2,head_mw,oseries1,oseries3
cv_period_mean,0.061879,0.015199,0.145062,0.013066,0.029168
cv_date_min,0.246021,0.128636,0.254627,0.145884,1.394852
cv_date_max,1.262425,0.722945,1.083929,0.300328,0.444442
cv_fall_rate,-1.13645,-0.722718,-1.4302,-0.744797,-1.032837
cv_rise_rate,1.25945,0.836678,1.097257,0.862981,0.931181


## Load models

Load models in parallel.

In [48]:
pstore.apply("models", pstore.get_models, fancy_output=True)

Computing get_models:   0%|          | 0/5 [00:00<?, ?it/s]

{'head_nb5': Model(oseries=head_nb5, name=head_nb5, constant=True, noisemodel=False),
 'oseries2': Model(oseries=oseries2, name=oseries2, constant=True, noisemodel=False),
 'head_mw': Model(oseries=head_mw, name=head_mw, constant=True, noisemodel=False),
 'oseries1': Model(oseries=oseries1, name=oseries1, constant=True, noisemodel=False),
 'oseries3': Model(oseries=oseries3, name=oseries3, constant=True, noisemodel=False)}

The `max_workers` keyword argument sets the number of workers that are spawned. The default value is often fine, but it can be set explicitly.

The following works for `PasConnector`. See alternative code below for `ArcticDBConnector`.  

In [49]:
pstore.apply(
    "models", pstore.get_models, fancy_output=True, parallel=True, max_workers=5
)

Computing get_models (parallel):   0%|          | 0/5 [00:00<?, ?it/s]

{'head_nb5': Model(oseries=head_nb5, name=head_nb5, constant=True, noisemodel=False),
 'oseries2': Model(oseries=oseries2, name=oseries2, constant=True, noisemodel=False),
 'head_mw': Model(oseries=head_mw, name=head_mw, constant=True, noisemodel=False),
 'oseries1': Model(oseries=oseries1, name=oseries1, constant=True, noisemodel=False),
 'oseries3': Model(oseries=oseries3, name=oseries3, constant=True, noisemodel=False)}

## Storing models in parallel
<div class="alert alert-info">
<strong>Note</strong>

This section is mostly for the developer so he doesn't forget why and how 
delayed updating of the model links was implemented.
</div>

We want to build and solve our time series models in 2 steps, first without a noise
model and then with a noise model, and then store the result. We empty the models
library to start from scratch.

In [50]:
pstore.empty_library("models", prompt=False, progressbar=False)

[32mEmptied library models in my_connector: <class 'pastastore.connectors.PasConnector'>[0m
[32mEmptied library oseries_models in my_connector: <class 'pastastore.connectors.PasConnector'>[0m
[32mEmptied library stresses_models in my_connector: <class 'pastastore.connectors.PasConnector'>[0m


In [51]:
def two_step_solve(name):
    """Solve a Pastas model in two steps: first without noise model, then with."""
    ml = pstore.create_model(name)
    ml.solve(report=False)
    ml.add_noisemodel(ps.ArNoiseModel())
    ml.solve(initial=False, report=False)
    pstore.add_model(ml, overwrite=True)

In the first example we apply the function in parallel. A separate recomputation is
performed after the parallel apply to update the links between the time series names
and the models.

The `conn._added_models` keeps track of added models so that the model links can be
updated after all models have been added. In parallel mode, the child processes do not
have access to this variable in the main thread, meaning it is not updated.

In [52]:
pstore.conn._added_models

[]

In [53]:
pstore.apply("oseries", two_step_solve, parallel=True, max_workers=2)

Computing two_step_solve (parallel):   0%|          | 0/5 [00:00<?, ?it/s]

As expected, the list remains empty:

In [54]:
pstore.conn._added_models

[]

The parallel apply automatically updates the model links libraries, so the update flags
should be equal to False.

In [55]:
# check if update flags were reset after adding models links after parallel apply
print(f"{pstore.conn._oseries_links_need_update.value = }")
print(f"{pstore.conn._stresses_links_need_update.value = }")

pstore.conn._oseries_links_need_update.value = False
pstore.conn._stresses_links_need_update.value = False


Let's check the `oseries_models` result:

In [56]:
pstore.oseries_models

{'head_nb5': ['head_nb5'],
 'oseries2': ['oseries2'],
 'head_mw': ['head_mw'],
 'oseries1': ['oseries1'],
 'oseries3': ['oseries3']}

Now we repeat the process with `parallel=False`. Now the `_added_models` attribute can
be updated properly since there is only the main instance of PastaStore.

Once again, we empty the models library to start fresh.

In [57]:
pstore.empty_library("models", prompt=False, progressbar=False)

[32mEmptied library models in my_connector: <class 'pastastore.connectors.PasConnector'>[0m
[32mEmptied library oseries_models in my_connector: <class 'pastastore.connectors.PasConnector'>[0m
[32mEmptied library stresses_models in my_connector: <class 'pastastore.connectors.PasConnector'>[0m


In [58]:
pstore.apply("oseries", two_step_solve, parallel=False)

Computing two_step_solve:   0%|          | 0/5 [00:00<?, ?it/s]

The `_added_models` attribute should now contain the names of all 5 models.

In [59]:
pstore.conn._added_models

['head_nb5', 'oseries2', 'head_mw', 'oseries1', 'oseries3']

The update flags should be set to True, which should trigger an update once we try to
access the out-of-date data.

In [60]:
# check if update flags were reset after adding models links after parallel apply
print(f"{pstore.conn._oseries_links_need_update.value = }")
print(f"{pstore.conn._stresses_links_need_update.value = }")

pstore.conn._oseries_links_need_update.value = True
pstore.conn._stresses_links_need_update.value = True


Now let's trigger the update by looking at `oseries_models`. This will update the
database, empty the `_added_models` attribute and set the update flags to False.

In [61]:
pstore.oseries_models

{'head_nb5': ['head_nb5'],
 'oseries2': ['oseries2'],
 'head_mw': ['head_mw'],
 'oseries1': ['oseries1'],
 'oseries3': ['oseries3']}

In [62]:
pstore.conn._added_models

[]

In [63]:
# check if update flags were reset after adding models links after parallel apply
print(f"{pstore.conn._oseries_links_need_update.value = }")
print(f"{pstore.conn._stresses_links_need_update.value = }")

pstore.conn._oseries_links_need_update.value = False
pstore.conn._stresses_links_need_update.value = False


## ArcticDBConnector workaround

For `ArcticDBConnector`, the underlying database connection objects cannot be pickled, which is required for Python's multiprocessing. Therefore, passing methods directly from the `PastaStore` or `ArcticDBConnector` classes will not work in parallel mode.

**The workaround:** The `_parallel()` method uses an initializer that creates a new connector instance in each worker process and stores it in a global `conn` variable. Your custom functions can then access this connector to retrieve data from the database.

This is the standard Python pattern for using unpicklable objects with multiprocessing. See the [Python documentation](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) for more details.

**Note:** If you need access to methods from the `PastaStore` class, just create a new one by passing it the global connector: `PastaStore(conn)`.

**Example:** Write a simple function that uses the global `conn` variable to access the database:

In [64]:
# Simple function to get models from database
def get_model(model_name):
    """Get model using global connector (ArcticDBConnector workaround).

    The global 'conn' variable is set by the _parallel() initializer
    in each worker process, providing access to an ArcticDBConnector instance.
    """
    pstore = pst.PastaStore(conn)
    return pstore.get_model(model_name)

In [65]:
pstore.apply("models", get_model, fancy_output=True, parallel=True, max_workers=5)

Computing get_model (parallel):   0%|          | 0/5 [00:00<?, ?it/s]

{'head_nb5': Model(oseries=head_nb5, name=head_nb5, constant=True, noisemodel=True),
 'oseries2': Model(oseries=oseries2, name=oseries2, constant=True, noisemodel=True),
 'head_mw': Model(oseries=head_mw, name=head_mw, constant=True, noisemodel=True),
 'oseries1': Model(oseries=oseries1, name=oseries1, constant=True, noisemodel=True),
 'oseries3': Model(oseries=oseries3, name=oseries3, constant=True, noisemodel=True)}

## Clean up

Clean up temporary pastastore.

In [66]:
pst.util.delete_pastastore(pstore)

[32mDeleting PasConnector database: 'my_connector' ... [0m
[32mDone![0m
