Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
  • 12 commits
  • 8 files changed
  • 0 commit comments
  • 1 contributor
View
147 examples/SIR_model.py
@@ -1,4 +1,6 @@
import random
+import math
+from gevent import Timeout
import networkx
from traits.trait_types import Enum, Int, Float, Set
@@ -7,32 +9,89 @@
from pynetsym import Activator
from pynetsym import Agent
-from pynetsym.simulation import BaseClock
+from pynetsym.simulation import BaseClock, SyncActivator
from pynetsym.configurators import NXGraphConfigurator
from pynetsym.termination.conditions import always_true
+import pandas as pd
+import numpy as np
+
+def nans(shape, dtype=float):
+ a = np.empty(shape, dtype)
+ a.fill(np.nan)
+ return a
+
class Recorder(Agent):
name = 'recorder'
current_time = Int(0)
def setup(self):
+ self.distributions = pd.DataFrame(
+ data=nans((self.steps +1, 2)),
+ columns=['infected', 'recovered'])
+ self.distributions.ix[self.current_time] = 0
self.send(BaseClock.name,
'register_observer', name=self.name)
def ticked(self):
+# self.send_log('[%d] ticked.' % (
+# self.current_time, ))
self.current_time += 1
+ self.distributions.ix[self.current_time] =\
+ self.distributions.ix[self.current_time - 1]
+
def node_infected(self, node):
self.send_log('[%d] infected %s' % (
self.current_time, node))
+ self.distributions.infected[self.current_time] += 1
def node_recovered(self, node):
self.send_log('[%d] recovered %s' % (
self.current_time, node))
+ self.distributions.infected[self.current_time] -= 1
+ self.distributions.recovered[self.current_time] += 1
+ def _save_infected_ratio(self):
+ df = self.distributions.dropna()
+ full_df = pd.concat(
+ [df, pd.DataFrame(data=-(df.infected + df.recovered) + network_size,
+ columns=['susceptible'])],
+ axis=1)
+ full_df.to_csv('SIR_model.csv', index_label='step')
+ def save_statistic(self):
+ self._save_infected_ratio()
+
+class AdvancedRecorder(Recorder):
+ def setup(self):
+ super(AdvancedRecorder, self).setup()
+ number_of_nodes = 1000
+ self.infection_times = pd.DataFrame(
+ data=nans((number_of_nodes, 2)),
+ columns=['infection', 'recovery'])
-class Activator(Activator):
+ def node_infected(self, node):
+ super(AdvancedRecorder, self).node_infected(node)
+ self.infection_times.infection[node] = self.current_time
+
+ def node_recovered(self, node):
+ super(AdvancedRecorder, self).node_recovered(node)
+ self.infection_times.recovery[node] = self.current_time
+
+ def _save_infection_times(self):
+ df = self.infection_times.dropna()
+ full_df = pd.concat(
+ [df, pd.DataFrame(data=df.recovery - df.infection, columns=['duration']),
+ tmp_solution.dropna()],
+ axis=1)
+ full_df.to_csv('SIR_model_infection_rates.csv', index_label='node')
+
+ def save_statistic(self):
+ super(AdvancedRecorder, self).save_statistic()
+ self._save_infection_times()
+
+class Activator(SyncActivator):
infected_nodes = Set(Int)
def tick(self):
@@ -53,36 +112,50 @@ def nodes_to_activate(self):
return self.infected_nodes
+
+tmp_solution = pd.DataFrame(
+ data=nans((1000, 1)),
+ columns=['tduration'])
+
class Specimen(Node):
- state = Enum('S', 'I', 'R')
- infection_time = Int(-1)
+ state = Enum('S', 'I', 'R')
+ remaining_infection_time = Int(-1)
infection_probability = Float
- infection_length = Int
+ average_infection_length = Int
+
+ std_infection_length = Int(3)
infected_fraction = Float
# DEBUG_SEND = True
- def initialize(self):
- self.state = 'S'
- if random.random() < self.infected_fraction:
- self.infect()
+ def infection_time(self):
+ value = int(random.gauss(self.average_infection_length,
+ self.std_infection_length))
+ tmp_solution.tduration[self.id] = value
+ return value
+
+ def initialize(self, state):
+ self.state = state
+ if state == 'I':
+ self.remaining_infection_time = self.infection_time()
+ self.send(Activator.name, 'infected', node=self.id)
def infect(self):
if self.state == 'S':
self.state = 'I'
- self.infection_time = self.infection_length
+ self.remaining_infection_time = self.infection_time()
self.send(Activator.name, 'infected', node=self.id)
def activate(self):
if (self.state == 'I'
- and self.infection_time > 0):
+ and self.remaining_infection_time > 0):
for node in self.neighbors():
if random.random() < self.infection_probability:
self.send(node, 'infect')
- self.infection_time -= 1
+ self.remaining_infection_time -= 1
elif (self.state == 'I'
- and self.infection_time == 0):
- self.infection_time -= 1
+ and self.remaining_infection_time == 0):
+ self.remaining_infection_time -= 1
self.state = 'R'
self.send(Activator.name, 'not_infected', node=self.id)
elif self.state in ('R', 'S'):
@@ -92,14 +165,12 @@ def activate(self):
class Simulation(Simulation):
- default_infection_probability = 0.1
- default_infection_length = 10
- default_infected_fraction = 0.5
-
- steps = 1000
+ default_infection_probability = 1.
+ default_average_infection_length = 10
+ default_infected_fraction = 0.01
- recorder_type = Recorder
- recorder_options = {}
+ recorder_type = AdvancedRecorder
+ recorder_options = {'steps'}
additional_agents = ('recorder', )
@@ -109,12 +180,12 @@ def require_termination(self, reason):
command_line_options = (
('-p', '--infection-probability',
- dict(default=default_infection_probability, type=float)),
- ('-t', '--infection-length',
- dict(default=default_infection_length, type=int)),
+ dict(default=default_infection_probability, type=float)),
+ ('-t', '--average-infection-length',
+ dict(default=default_average_infection_length, type=int)),
('-f', '--infected-fraction',
- dict(default=default_infected_fraction, type=float)),
- )
+ dict(default=default_infected_fraction, type=float)),
+ )
activator_type = Activator
options = {}
@@ -122,15 +193,27 @@ def require_termination(self, reason):
class configurator_type(NXGraphConfigurator):
node_type = Specimen
node_options = {
- 'infection_probability',
- 'infection_length',
- 'infected_fraction'}
- initialize_nodes = True
+ 'infection_probability',
+ 'average_infection_length',
+ 'infected_fraction'}
+
+ def initialize_nodes(self):
+ infected_fraction = self.full_parameters['infected_fraction']
+ infected_population_size = int(
+ math.ceil(len(self.node_identifiers) * infected_fraction))
+ infected_nodes = set(random.sample(self.node_identifiers, infected_population_size))
+
+ self.sync_send_all(self.node_identifiers, 'initialize',
+ state=lambda rid: 'I' if (rid in infected_nodes) else 'S')
+
if __name__ == '__main__':
graph = networkx.powerlaw_cluster_graph(100, 5, 0.1)
sim = Simulation()
- sim.run(starting_graph=graph)
+ sim.run(starting_graph=graph, force_cli=True)
+
+ print sim.motive
- assert sim.motive == 'No more infected'
+ network_size = sim.graph.number_of_nodes()
+ sim.recorder.save_statistic()
View
32 pynetsym/configurators/basic.py
@@ -1,9 +1,8 @@
-import collections
from traits.has_traits import implements
from traits.trait_types import Dict, Str, Int, Any, Type, Set, false
from .. import core
-from ..util import gather_from_ancestors, extract_subdictionary, SequenceAsyncResult
+from ..util import extract_subdictionary, SequenceAsyncResult
from ..node_manager import NodeManager
from .interface import IConfigurator
@@ -12,17 +11,6 @@ class AbstractConfigurator(core.Agent):
name = 'configurator'
- initialize_nodes = false
- """
- When all the nodes are created, if the initialize_nodes attribute
- is set to true, all the nodes are sent an initialize message.
- Such attribute can be both set as a configurator_option
- or directly in the class like::
-
- class SomeSimulation(simulation.Simulation):
- class configurator(node_manager.BasicConfigurator):
- initialize = True
- """
options = {"full_parameters"}
"""
Here we specify the names of the options for the configurator.
@@ -33,19 +21,10 @@ class configurator(node_manager.BasicConfigurator):
node_identifiers = Any
full_parameters = Dict(key_trait=Str)
-# def __init__(self, **additional_arguments):
-# full_options = gather_from_ancestors(
-# self, 'configurator_options')
-# configurator_arguments = extract_subdictionary(
-# additional_arguments, full_options)
-# self.set(**configurator_arguments)
-# self.set(additional_arguments=additional_arguments)
-
def _start(self):
self.create_nodes()
self.create_edges()
- if self.initialize_nodes:
- self.do_initialization()
+ self.initialize_nodes()
def create_nodes(self):
raise NotImplementedError()
@@ -53,10 +32,15 @@ def create_nodes(self):
def create_edges(self):
raise NotImplementedError()
- def do_initialization(self):
+ def do_initialize(self):
for identifier in self.node_identifiers:
self.send(identifier, 'initialize')
+ def do_not_initialize(self):
+ pass
+
+ initialize_nodes = do_not_initialize
+
class BasicConfigurator(AbstractConfigurator):
"""
View
10 pynetsym/configurators/interface.py
@@ -1,4 +1,5 @@
-from traits.has_traits import Interface
+from traits.api import Interface
+from traits.api import Method
class IConfigurator(Interface):
def create_nodes(self):
@@ -7,9 +8,4 @@ def create_nodes(self):
def create_edges(self):
pass
- def do_initialize_nodes(self):
- pass
-
- @property
- def initialize_node(self):
- pass
+ initialize_nodes = Method
View
83 pynetsym/core.py
@@ -1,3 +1,4 @@
+import copy
from pynetsym import addressing
from pynetsym import agent_db
@@ -199,11 +200,6 @@ def log_received(self, msg):
message = "RECV %s" % (msg,)
self.send_log(message)
- def send_all(self, receivers, message, **additional_parameters):
- return SequenceAsyncResult(
- [self.send(receiver_id, message, **additional_parameters)
- for receiver_id in receivers])
-
def _awaken_agent(self, identifier):
agent = self._node_db.recover(identifier)
agent.start(self._address_book, self._node_db, identifier)
@@ -224,6 +220,20 @@ def _resolve(self, identifier):
return receiver
+ def _handle_message(self, message, receiver):
+ if getattr(self, 'DEBUG_SEND', False):
+ self.log_sent(str(message), receiver)
+ result = event.AsyncResult()
+ receiver.deliver(message, result)
+ return result
+
+ def _handle_no_receiver(self, e):
+ logger = self._get_logger()
+ logger.put_log(self, e.message)
+ result = event.AsyncResult()
+ result.set_exception(e)
+ return result
+
def send(self, receiver_id, message_name, **additional_parameters):
"""
Send a message to the specified agent.
@@ -238,20 +248,57 @@ def send(self, receiver_id, message_name, **additional_parameters):
try:
receiver = self._resolve(receiver_id)
except addressing.AddressingError as e:
- logger = self._get_logger()
- logger.put_log(self, e.message)
- result = event.AsyncResult()
- result.set_exception(e)
+ result = self._handle_no_receiver(e)
return result
else:
# FIXME: no true message passing semantics!
message = Message(self.id, message_name, additional_parameters)
- if getattr(self, 'DEBUG_SEND', False):
- self.log_sent(str(message), receiver)
- result = event.AsyncResult()
- receiver.deliver(message, result)
+ result = self._handle_message(message, receiver)
return result
+ def sync_send(self, receiver_id, message_name, timeout=None, **additional_parameters):
+ return self.send(receiver_id, message_name, **additional_parameters).get(timeout=timeout)
+
+ def _send_all_fixed_params(self, additional_parameters, message_name, receivers):
+ message = Message(self.id, message_name, additional_parameters)
+ result_sequence = []
+ for receiver_id in receivers:
+ try:
+ receiver = self._resolve(receiver_id)
+ except addressing.AddressingError as e:
+ result = self._handle_no_receiver(e)
+ else:
+ result = self._handle_message(message, receiver)
+ result_sequence.append(result)
+ else:
+ return SequenceAsyncResult(result_sequence)
+
+ def _send_all_multi_params(self, additional_parameters, message_name, receivers):
+ callable_stuff = {name: func
+ for name, func in additional_parameters.items()
+ if callable(func)}
+ result_sequence = []
+ for receiver_id in receivers:
+ parameters = copy.copy(additional_parameters)
+ parameters.update({name: func(receiver_id)
+ for name, func in callable_stuff.items()})
+ result = self.send(receiver_id, message_name, **parameters)
+ result_sequence.append(result)
+ return SequenceAsyncResult(result_sequence)
+
+ def send_all(self, receivers, message_name, **additional_parameters):
+ if any(callable(param)
+ for param in additional_parameters.values()):
+ return self._send_all_multi_params(
+ additional_parameters, message_name, receivers)
+ else:
+ return self._send_all_fixed_params(
+ additional_parameters, message_name, receivers)
+
+ def sync_send_all(self, receivers, message_name, **additional_parameters):
+ return self.send_all(receivers, message_name, **additional_parameters).get()
+
+
def process(self, message, result):
"""
Processes the message. This means calling the message payload with
@@ -363,6 +410,16 @@ def put_error(self, sender, text):
dict(sender=sender, text=text))
self.deliver(message, result)
+ def stop_receiving(self):
+ while 1:
+ try:
+ message, result = self.read(timeout=0.01)
+ self.process(message, result)
+ del message, result
+ except NoMessage:
+ break
+ self.kill()
+
def error_message(self, sender, text):
ss = StringIO()
ss.write('=' * 10)
View
2  pynetsym/generation_models/watts_strogatz.py
@@ -50,7 +50,7 @@ class WS(Simulation):
activator_type = Activator
class configurator_type(BasicConfigurator):
- initialize_nodes = True
+ initialize_nodes = BasicConfigurator.do_initialize
node_type = Node
node_options = {
"rewiring_probability",
View
7 pynetsym/simulation.py
@@ -147,6 +147,8 @@ def send_simulation_ended(self):
def simulation_end(self):
self.active = False
self.send_simulation_ended().get()
+ self.send(Logger.name, 'stop_receiving')
+
def ask_to_terminate(self):
return self.send(
@@ -253,6 +255,7 @@ class configurator_type(generation.BasicConfigurator):
('-b', '--bar', dict(default=..., type=...))
"""
+
command_line_options = (
("-s", "--steps", dict(default=100, type=int)),
)
@@ -364,7 +367,7 @@ def create_address_book(self):
def create_logger(self):
logger_builder = ComponentBuilder(self, 'logger')
- logger_builder.build(stream=sys.stderr)\
+ logger_builder.build(stream=sys.stderr, set_=True)\
.start(self.address_book, self.agent_db)
@@ -480,6 +483,8 @@ def run(self, args=None, force_cli=False, **kwargs):
with timing.Timer(self.callback):
self.start_simulation()
self.clock.join()
+
+ self.logger.join()
return self
def exception_hook(self, node):
View
4 pynetsym/util/concurrency.py
@@ -15,6 +15,10 @@ def get(self):
print "xxx", gevent.getcurrent()
raise e
+ def wait(self, timeout=0):
+ for value in self.seq:
+ value.wait(timeout=timeout)
+
def flatten(self):
items = self.get()
return list(itertools.chain.from_iterable(items))
View
1  setup.py
@@ -36,7 +36,6 @@
exclude=["*.tests", "*.tests.*", "tests.*", "tests"]),
# scripts=find_generation_models(),
install_requires=[
- 'decorator',
'pytest',
'gevent',
'networkx',

No commit comments for this range

Something went wrong with that request. Please try again.