In [8]:
import os, sys, inspect, io

cmd_folder = os.path.realpath(
    os.path.dirname(
        os.path.abspath(os.path.split(inspect.getfile( inspect.currentframe() ))[0])))

if cmd_folder not in sys.path:
    sys.path.insert(0, cmd_folder)
    
from transitions import *
from collections import Queue
class Optimizer(object):
    q_in = Queue()
    q_out = Queue()
    
    def do_work(self, item):
        return item + 5
    
    def on_enter_training(self):
        item = self.q_in.get()
        self.q_out.put(self.do_work(item))
        self.q_in.task_done()
        self.trained()
        
    def on_exit_training(self):
        item = self.q_out.get()
        print("result of training {}".format(item))
        
    def on_enter_listening(self):
        self.q_in.put(1)
        self.heard()
        
    def is_valid(self):
        return True
    
    def is_not_valid(self):
        return False
    
    def is_also_valid(self):
        return True
    
    # graph object is created by the machine
    def show_graph(self, **kwargs):
        stream = io.BytesIO()
        self.get_graph(**kwargs).draw(stream, prog='dot', format='png')
        display(Image(stream.getvalue()))

ImportError: cannot import name 'Queue'

In [5]:
transitions = [
    { 'trigger': 'work_done', 'source': ['training', 'averaging'], 'dest': 'broadcasting' },
    { 'trigger': 'sent', 'source': 'broadcasting', 'dest': 'listening', 'conditions':'is_valid' },
    { 'trigger': 'heard', 'source': 'listening', 'dest': 'training', 'unless':'is_not_valid' }
#     { 'trigger': 'averaged', 'source': 'averaging', 'dest': 'broadcasting',
#       'conditions':['is_valid','is_also_valid'] }
]
states=['training', 'broadcasting', 'listening', 'averaging']

model = Optimizer()
machine = Machine(model=model, 
                       states=states, 
                       transitions=transitions,
                       initial='broadcasting')
model.state

NameError: name 'Optimizer' is not defined

In [6]:
class FederatedAveragingOptimizer(Machine):
    q_in = Queue()
    q_out = Queue()
    
    def do_work(self, item):
        return item + 5
    
    def on_enter_training(self):
        item = self.q_in.get()
        self.q_out.put(self.do_work(item))
        self.q_in.task_done()
        self.work_done()
        
    def on_exit_training(self):
        item = self.q_out.get()
        print("result of training {}".format(item))
        
    def on_enter_listening(self):
        self.q_in.put(1)
        self.heard()
        
    def is_valid(self):
        return True
    
    def is_not_valid(self):
        return False
    
    def is_also_valid(self):
        return True
    
    def __init__(self):
        transitions = [
        { 'trigger': 'work_done', 'source': ['training', 'averaging'], 'dest': 'broadcasting' },
        { 'trigger': 'sent', 'source': 'broadcasting', 'dest': 'listening', 'conditions':'is_valid' },
        { 'trigger': 'heard', 'source': 'listening', 'dest': 'training', 'unless':'is_not_valid' }
        #     { 'trigger': 'averaged', 'source': 'averaging', 'dest': 'broadcasting',
        #       'conditions':['is_valid','is_also_valid'] }
        ]
        states=['training', 'broadcasting', 'listening', 'averaging']
        Machine.__init__(self, states=states, transitions=transitions, initial='broadcasting')
        

NameError: name 'Queue' is not defined

In [7]:
fedavg = FederatedAveragingOptimizer()
fedavg.state
fedavg.sent()

NameError: name 'FederatedAveragingOptimizer' is not defined

In [25]:
class FullyConnectedLayer(Machine):
    
    def do_averaging(self, event):
        self.neural_network += 'trained'
        print(self.neural_network)
        
    def get_model_with_addr(self, model_addr):
        return "hello im a model"
    
    def on_enter_training(self, event):
        model_addr = event.kwargs.get('model_addr')
        self.neural_network = self.get_model_with_addr(model_addr)
        print(self.model)
        
    def on_exit_training(self, event):
        print("result of training {}".format(self.neural_network))
        
    def is_valid(self, event):
        return True
    
    def time_elapsed(self, event):
        return self.time >= self.time_threshold
    
    def is_also_valid(self):
        return True
    
    
    def __init__(self):
        self.time = 0
        self.time_threshold = 2
        self.neural_network = False
        transitions = [
        { 'trigger': 'trained', 'source': 'training', 'dest': 'broadcasting'},
        { 'trigger': 'averaged', 'source': 'averaging', 'dest': 'training' },
        { 'trigger': 'sent', 'source': 'broadcasting', 'dest': 'listening', 'conditions':'is_valid' },
        { 'trigger': 'heard', 'source': 'listening', 'dest': 'averaging', 'conditions':'time_elapsed', 'after':'do_averaging' }
        #     { 'trigger': 'averaged', 'source': 'averaging', 'dest': 'broadcasting',
        #       'conditions':['is_valid','is_also_valid'] }
        ]
        states=['listening', 'averaging', 'training', 'broadcasting']
        Machine.__init__(self, states=states, transitions=transitions, send_event=True, initial='broadcasting')

In [26]:
fclyr = P2POptimizer()
fclyr.sent()
fclyr.heard(model_addr='0xabc123')

False

In [77]:
from enum import Enum

class EventTypes(Enum):
    TRAIN = "TRAIN"
    VALIDATE = "VALIDATE"
    AVERAGE = "AVERAGE"
    UNDEFINED = "UNDEFINED"

In [68]:
event = {"EventType" : EventTypes.TRAIN.value}

In [63]:
# EventTypes(event["EventType"])

In [64]:
if(event['EventType'] )

{'EventType': 'TRAIN'}

In [1]:
def do_nothing(event):
    print("NOTHING")

def train(event):
    print("HELLO IT WORKS")
    
def validate(event):
    print("HOW DID I GET HERE")

def average(event):
    print("AVERAGE")

EVENT_TYPE_CALLBACKS = {
    EventTypes.TRAIN.value: train, 
    EventTypes.VALIDATE.value: validate,
    EventTypes.AVERAGE.value: average,
    EventTypes.UNDEFINED.value: do_nothing,
}

def parse(event):
    """
    Parses an event dictionary into a callback.
    If the callback is not defined, it does nothing.
    """
    event_type = event.get('EventType', EventTypes.UNDEFINED.value)
    callback = EVENT_TYPE_CALLBACKS[EventTypes.UNDEFINED.value]
    if event_type in EVENT_TYPE_CALLBACKS:
        callback = EVENT_TYPE_CALLBACKS[event_type]
    return callback(event)

event = {"EventType": EventTypes.TRAIN.value}
parse(event)

NameError: name 'EventTypes' is not defined

In [97]:
event = {"EventType": EventTypes.TRAIN.value}
parse(event)

HELLO IT WORKS


In [98]:
event = {"EventType": EventTypes.VALIDATE.value}
parse(event)

HOW DID I GET HERE


In [99]:
event = {"EventType": EventTypes.AVERAGE.value}
parse(event)

AVERAGE


In [100]:
event = {"EventType": "BAMBOOZLED"}
parse(event)

NOTHING


In [73]:
class FederatedAveragingLayer(object):

    # Define some states.
    states = ['splitting', 'training', 'communicating', 'averaging']
    transitions = [
        ['failure', ['splitting', 'training', 'communicating', 'averaging'], '='],
        ['done_training', 'training', 'communicating'],
        ['done_listening', 'communicating', 'averaging'],
        ['done_averaging', 'averaging', 'training'],
        ['done_splitting', 'splitting', 'training'],
    ]
    def __init__(self, session_id, train_bound, listen_bound):

        # Need to name our Optimizer
        self.id = session_id

        # What have we accomplished today?
        self.training_iterations = 0
        self.listen_iterations = 0
        self.train_bound = train_bound
        self.listen_bound = listen_bound
        self.training_history = []

        # Initialize the state machine
        self.machine = Machine(model=self, 
                               states=FederatedAveragingLayer.states, 
                               transitions=FederatedAveragingLayer.transitions, 
                               initial='training')
        self.machine.add_transition('train_iter', 'training', None, after='increment_train_iter')
        self.machine.add_transition('listen_iter', '*', None, after='increment_listen_iter')
        
    def increment_train_iter(self):
        self.training_iterations += 1
        if self.training_iterations > self.train_bound:
            self.done_training()
        
    def update_journal(self):
        """ Dear Diary, today I saved Mr. Whiskers. Again. """
        self.kittens_rescued += 1

    def is_exhausted(self):
        """ Basically a coin toss. """
        return random.random() < 0.5

    def change_into_super_secret_costume(self):
        print("Beauty, eh?")

def call(layer):
    layer.train_iter()
    return layer.state

In [79]:
layer = FederatedAveragingLayer(1, 2, 3)

In [82]:
call(layer)

'communicating'

In [44]:
layer.train_iter()

True

In [45]:
layer.state

'training'

In [46]:
layer.train_iter()

True

In [47]:
layer.state

'communicating'

In [49]:
from enum import Enum

class CommunicationManagerEventTypes(Enum):
    '''
    Dictionary of events that the Communication Manager might receive from the Optimizer.
    The event indicates what state the Optimizer has transitioned to.
    This is essentially a 'command' for the Comm. Mgr. so these are all imperatives.
    This dictionary has to contain all possible states that an Optimizer can be in.
    '''
    TRAIN = "TRAIN"
    COMM = "COMMUNICATE"
    AVERAGE = "AVERAGE"
    SPLIT = "SPLIT"
    WEIGHT = "WEIGHT"
    # INTERNAL = "INTERNAL"
    # EXTERNAL = "EXTERNAL"
    UNDEFINED = "UNDEFINED"

In [53]:
CommunicationManagerEventTypes.TRAIN.name

'TRAIN'

In [61]:
def do_nothing():
    pass

EVENT_TYPE_CALLBACKS = {
	    CommunicationManagerEventTypes.UNDEFINED.name: do_nothing,
	}

In [62]:
EVENT_TYPE_CALLBACKS["UNDEFINED"]

<function __main__.do_nothing>