In [1]:
import simpy


def my_callback(event):
    print("Called back from", event)


env = simpy.Environment()
event = env.event()
event.callbacks.append(my_callback)
event.callbacks


[<function __main__.my_callback(event)>]

In [2]:
class School:
    def __init__(self, env):
        self.env = env
        self.class_ends = env.event()
        self.pupil_procs = [env.process(self.pupil()) for i in range(3)]
        self.bell_proc = env.process(self.bell())

    def bell(self):
        for i in range(2):
            yield self.env.timeout(45)
            self.class_ends.succeed()
            self.class_ends = self.env.event()

    def pupil(self):
        for i in range(2):
            print(r" \o/", end="")
            yield self.class_ends


school = School(env)
env.run()


 \o/ \o/ \o/ \o/ \o/ \o/

In [3]:
def sub(env):
    yield env.timeout(1)
    return 23


def parent(env):
    ret = yield env.process(sub(env))
    return ret


env.run(env.process(parent(env)))


23

In [4]:
from simpy.util import start_delayed


def sub(env):
    yield env.timeout(1)
    return 23


def parent(env):
    sub_proc = yield start_delayed(env, sub(env), delay=3)
    ret = yield sub_proc
    return ret


env.run(env.process(parent(env)))


23

In [5]:
from simpy.events import AnyOf, AllOf, Event

events = [Event(env) for i in range(3)]
a = AnyOf(env, events)  # Triggers if at least one of "events" is triggered.
b = AllOf(env, events)  # Triggers if all each of "events" is triggered.


In [6]:
def test_condition(env):
    t1, t2 = env.timeout(1, value="spam"), env.timeout(2, value="eggs")
    ret = yield t1 | t2
    assert ret == {t1: "spam"}

    t1, t2 = env.timeout(1, value="spam"), env.timeout(2, value="eggs")
    ret = yield t1 & t2
    assert ret == {t1: "spam", t2: "eggs"}

    # You can also concatenate & and |
    e1, e2, e3 = [env.timeout(i) for i in range(3)]
    yield (e1 | e2) & e3
    assert all(e.processed for e in [e1, e2, e3])


proc = env.process(test_condition(env))
env.run()


In [7]:
def fetch_values_of_multiple_events(env):
    t1, t2 = env.timeout(1, value="spam"), env.timeout(2, value="eggs")
    r1, r2 = (yield t1 & t2).values()
    assert r1 == "spam" and r2 == "eggs"


proc = env.process(fetch_values_of_multiple_events(env))
env.run()
