## OPS and Celery

Install a redis server and start it using

```
redis-server
```

then run workers

```
celery worker -A {projectname} --loglevel=INFO
```

the current project name is `tasks`

If you want to use celery flower to monitor performance, etc. install it and run

```
flower -A {projectname} --port=5555
```


### imports

In [1]:
import openpathsampling as p
import openpathsampling.engines.celery.tasks as t
from openpathsampling.engines.openmm import create_simple_openmm_from_pdb
import simtk.openmm as mm
import simtk.unit as u

### system setup

Create a simple openmm engine we can use later. To clean up the code we use a utility function that uses
default parameters and sets up a Langevin integrator at 300K.

This might be functions to implement in a MoveScheme to have better access to the usual needed stuff

In [2]:
class PathSamplingSystem(p.netcdfplus.StorableObject):
    """
    A package to allow to group all pathSampling aspects
    
    - Engine / System
    - CVs
    - Volumes
    - Network / Transition / Ensembles
    - MoveScheme(MoveStrategies) / PathMover
    - (initial) SampleSet
    """
    
    def __init__(self, objs):
        if isinstance(objs, (list, tuple)):
            self._objs = list(objs)
        else:
            self.objs = [objs]

    @classmethod
    def from_scheme(self):
        return cls()
        
    @property
    def states(self):
        return { 'name': p.Volume() }
    
    @property
    def cvs(self):
        return { 'name': p.Volume() }
    
    def transitions(self, from_state=None, to_state=None):

    @property
    def interfaces(self, transition):
        """
        get an ordered list of interfaces starting from inner per transition
        """
        return {' '}

IndentationError: expected an indented block (<ipython-input-2-bfa37d8a8961>, line 33)

In [2]:
engine, template = create_simple_openmm_from_pdb('../../examples/resources/AD_initial_frame.pdb')

### local test

A quick check if our worker function will run on the master process / in a kernel this notebook runs in.
It does not necessarily mean it will also fail on the worker but it is very likely.

In [3]:
traj = t.generate(engine, template, p.LengthEnsemble(100), init_args=['CPU'])

### run on worker

The next function will create the same call, i.e. the same engine, the same initial snapshot and the same ensemble.

In [4]:
delayed_trajectory = t.generate.delay(engine, template, p.LengthEnsemble(100), init_args=['CPU'])

In [7]:
trajs = [t.generate.delay(engine, template, p.LengthEnsemble(x*10 + 5), init_args=['CPU']) for x in range(20)]

In [20]:
for traj in trajs:
    print traj.result

Trajectory[5]
Trajectory[15]
Trajectory[25]
Trajectory[35]
Trajectory[45]
Trajectory[55]
Trajectory[65]
Trajectory[75]
Trajectory[85]
Trajectory[95]
Trajectory[105]
Trajectory[115]
Trajectory[125]
Trajectory[135]
Trajectory[145]
Trajectory[155]
Trajectory[165]
Trajectory[175]
Trajectory[185]
Trajectory[195]


In [21]:
ff = [traj.result for traj in trajs]

In [22]:
ff[0][0] == ff[10][0]

True

In [23]:
%%time
t.generate(engine, template, p.LengthEnsemble(1000), init_args=['CPU'])

CPU times: user 49.6 s, sys: 9.72 s, total: 59.3 s
Wall time: 27.4 s


Trajectory[1000]

In [24]:
%%time
trajs = [t.generate.delay(engine, template, p.LengthEnsemble(50), init_args=['CPU']) for x in range(20)]

running = True
while running:
    running = False
    for traj in trajs:
        if not traj.successful():
            running = True

CPU times: user 8.89 s, sys: 1.06 s, total: 9.95 s
Wall time: 16.8 s


In [62]:
qqq = simp.to_json(sum(ff, p.Trajectory([])))

In [61]:
%%time
len(simp.to_json(sum(ff, p.Trajectory([]))))

CPU times: user 2.01 s, sys: 399 ms, total: 2.41 s
Wall time: 2.42 s


222483248

In [63]:
%%time
len(simp.from_json(qqq))

CPU times: user 881 ms, sys: 154 ms, total: 1.03 s
Wall time: 1.03 s


2000

So it takes 
```
~ 2.5 seconds to pickle 
~ 2000 frames (plus engine).
Filesize 220MB

2.5ms per frame
100kB per frame
```

Gigabit network can transmit ~ 