Skip to content
This repository has been archived by the owner on Oct 22, 2018. It is now read-only.

Commit

Permalink
Added tests for coord module
Browse files Browse the repository at this point in the history
  • Loading branch information
eriol committed Jun 15, 2009
1 parent c8fc657 commit 1ee4e51
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 84 deletions.
155 changes: 79 additions & 76 deletions pyntk/ntk/core/coord.py
Expand Up @@ -3,7 +3,7 @@
# (c) Copyright 2007 Andrea Lo Pumo aka AlpT <alpt@freaknet.org>
#
# This source code is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License,
# or (at your option) any later version.
#
Expand All @@ -16,26 +16,26 @@
# this source code; if not, write to:
# Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
##
#
#
# Coordinator Node
#
# TODO: tmp_deleted_purge() isn't called, moreover a fixed timeout isn't a
# good thing, you have to consider the rtt from the requester node to
# the coordinator node.

from ntk.lib.micro import microfunc
from ntk.lib.event import Event
from ntk.wrap.xtime import time
from ntk.core.p2p import P2P
from ntk.core.map import Map
from random import choice

from ntk.core.map import Map
from ntk.core.p2p import P2P
from ntk.lib.event import Event
from ntk.lib.micro import microfunc
from ntk.wrap.xtime import time

class Node:
def __init__(self,
class Node(object):
def __init__(self,
lvl=None, id=None # these are mandatory for Map.__init__(),
):

self.alive = False

def is_free(self):
Expand All @@ -44,11 +44,6 @@ def is_free(self):
def _pack(self):
return (self.alive,)

def _unpack(self, (p,)):
ret=Node()
ret.alive=p
return ret

class MapCache(Map):
def __init__(self, maproute):
Map.__init__(self, maproute.levels, maproute.gsize, Node, maproute.me)
Expand All @@ -60,19 +55,19 @@ def __init__(self, maproute):

def copy_from_maproute(self, maproute):
for lvl in xrange(self.levels):
for id in xrange(self.gsize):
if not maproute.node_get(lvl, id).is_empty():
self.node_add(lvl, id)
for id in xrange(self.gsize):
if not maproute.node_get(lvl, id).is_empty():
self.node_add(lvl, id)

def node_add(self, lvl, id):
if self.node_get(lvl, id).alive:
Map.node_add(self, lvl, id)
self.node_get(lvl, id).alive = True
def node_add(self, lvl, id, silent=0):
if not self.node_get(lvl, id).alive:
Map.node_add(self, lvl, id, silent)
self.node_get(lvl, id).alive = True

def node_del(self, lvl, id):
def node_del(self, lvl, id, silent=False):
if self.node_get(lvl, id).alive:
Map.node_del(self, lvl, id)
self.node_get(lvl, id).alive = False
Map.node_del(self, lvl, id, silent)
self.node_get(lvl, id).alive = False

def tmp_deleted_add(self, lvl, id):
self.tmp_deleted[lvl, id] = time()
Expand All @@ -81,32 +76,32 @@ def tmp_deleted_add(self, lvl, id):
def tmp_deleted_del(self, lvl, id):
"""Removes an entry from the self.tmp_deleted cache"""
if (lvl, id) in self.tmp_deleted:
del self.tmp_deleted[lvl, id]
del self.tmp_deleted[lvl, id]

@microfunc()
def tmp_deleted_purge(self, timeout=32):
"""After a `timeout' seconds, restore a node added in the tmp_deleted
cache"""

new_tmp_deleted = {}

curt = time()
for lvl, id in self.tmp_deleted:
t = self.tmp_deleted[lvl, id]
if curt-t >= timeout:
self.node_add(lvl, id, silent=1)
else:
new_tmp_deleted[lvl, id] = t
t = self.tmp_deleted[lvl, id]
if curt-t >= timeout:
self.node_add(lvl, id, silent=1)
else:
new_tmp_deleted[lvl, id] = t

self.tmp_deleted = new_tmp_deleted


class Coord(P2P):

pid = 1

def __init__(self, radar, maproute, p2pall):

P2P.__init__(self, radar, maproute, Coord.pid)

# let's register ourself in p2pall
Expand All @@ -117,64 +112,71 @@ def __init__(self, radar, maproute, p2pall):

self.maproute.events.listen('NODE_NEW', self.mapcache.node_add)
self.maproute.events.listen('NODE_DELETED', self.mapcache.node_del)
self.mapp2p.events.listen('NODE_NEW', self.new_participant_joined)

self.mapp2p.events.listen('NODE_NEW', self.new_partecipant_joined)

self.coordnode = [None]*self.maproute.levels
self.coordnode = [None] * self.maproute.levels
self.coornodes_set()

self.remotable_funcs = [self.going_out, self.going_out_ok, self.going_in]
self.remotable_funcs = [self.going_out,
self.going_out_ok,
self.going_in]

def h(self, (lvl, ip)):
"""h:KEY-->IP"""
def h(self, key):
"""h:KEY-->IP
:type key: a tuple (lvl, ip)
"""
lvl, ip = key
IP = list(ip)
for l in reversed(xrange(lvl)): IP[l] = 0
for l in reversed(xrange(lvl)):
IP[l] = 0
return IP

def coornodes_set(self):
"""Sets the coordinator nodes of each level, using the current map"""
for lvl in xrange(self.maproute.levels):
self.coordnode[lvl] = self.H(self.h((lvl, self.maproute.me)))
self.coordnode[lvl] = self.H(self.h((lvl, self.maproute.me)))

@microfunc()
def new_partecipant_joined(self, lvl, id):
"""Shall the new partecipant succeed us as a coordinator node?"""
def new_participant_joined(self, lvl, id):
"""Shall the new participant succeed us as a coordinator node?"""

# the node joined in level `lvl', thus it may be a coordinator of the
# level `lvl+1'
level = lvl+1
level = lvl + 1

pIP = self.maproute.me
pIP = list(self.maproute.me)
pIP[lvl] = id
for l in reversed(xrange(lvl)): pIP[l]=None

newcor = self.H(self.h(level, self.maproute.me))
for l in reversed(xrange(lvl)):
pIP[l] = None

newcor = self.H(self.h((level, self.maproute.me)))
if newcor != pIP:
# the new partecipant isn't a coordinator node
return
# the new partecipant isn't a coordinator node
return None

oldcor = self.coordnode[level]
self.coordnode[level] = newcor

if oldcor == self.maproute.me and pIP != self.maproute.me:
#if we were a coordinator node, and it is different from us:
# let's pass it our cache
peer = self.peer(hIP=newcor)
peer.mapp2p.map_data_merge(self.mapp2p.map_data_pack())
#if we were a coordinator node, and it is different from us:
# let's pass it our cache
peer = self.peer(hIP=newcor)
peer.mapp2p.map_data_merge(self.mapp2p.map_data_pack())

def going_out(self, lvl, id, gnumb=None):
"""The node of level `lvl' and ID `id', wants to go out from its gnode
G of level lvl+1. We are the coordinator of this gnode G.
We'll give an affermative answer if `gnumb' < |G| or if
`gnumb'=None"""
G of level lvl+1. We are the coordinator of this gnode G.
We'll give an affermative answer if `gnumb' < |G| or if `gnumb'=None
"""



if (gnumb < self.mapp2p.nodes_nb[lvl]-1 or gnumb == None) \
and self.mapp2p.node_get(lvl, id).alive:
if (gnumb < self.mapp2p.nodes_nb[lvl]-1 or gnumb is None) \
and self.mapp2p.node_get(lvl, id).alive:
self.mapp2p.node_del(lvl, id)
return self.mapp2p.nodes_nb[lvl]
else:
return None
return None

def going_out_ok(self, lvl, id):
"""The node, which was going out, is now acknowledging the correct
Expand All @@ -183,19 +185,20 @@ def going_out_ok(self, lvl, id):

def going_in(self, lvl, gnumb=None):
"""A node wants to become a member of our gnode G of level `lvl+1'.
We are the coordinator of this gnode G (so we are also a member of G).
We'll give an affermative answer if `gnumb' > |G| or if
`gnumb'=None"""

if gnumb > self.mapp2p.nodes_nb[lvl]+1:
fnl = self.mapp2p.free_nodes_list(lvl)
if fnl == []:
return None

newip = self.mapp2p.me
newip[lvl] = choice(fnl)
for l in reversed(xrange(lvl)): newnip[l]=randint(0, self.mapp2p.gsize)
self.node_add(lvl, newip[lvl])
return newip
We are the coordinator of this gnode G (so we are also a member of G).
We'll give an affermative answer if `gnumb' > |G| or if `gnumb'=None
"""

if gnumb > self.mapp2p.node_nb[lvl]+1:
fnl = self.mapp2p.free_nodes_list(lvl)
if fnl == []:
return None

newip = self.mapp2p.me
newip[lvl] = choice(fnl)
for l in reversed(xrange(lvl)):
newnip[l]=randint(0, self.mapp2p.gsize)
self.node_add(lvl, newip[lvl])
return newip

return None
17 changes: 9 additions & 8 deletions pyntk/ntk/lib/micro.py
Expand Up @@ -30,21 +30,21 @@ def micro(function, args=(), **kwargs):
return stackless.tasklet(function)(*args, **kwargs)

def microatomic(function, args=(), **kwargs):
'''Factory function that returns atomic tasklets,
'''Factory function that returns atomic tasklets,
usable only with preemptive schedulers.
@param function: A callable
@return: A tasklet
'''
t = stackless.tasklet()

def callable():
flag = t.set_atomic(True)
try:
function(*args, **kwargs)
finally:
t.set_atomic(flag)

t.bind(callable)
return t()

Expand Down Expand Up @@ -123,10 +123,11 @@ def microfunc(is_micro=False, is_atomic=False):
Note: This is a decorator! (see test/test_micro.py for examples)
If is_micro != True and is_micro != True (default), each call will be queued.
If is_micro != True and is_atomic != True (default), each call will be
queued.
A dispatcher microthread will automatically pop and execute each call.
If is_micro == True, each call of the function will be executed in a new
microthread.
microthread.
If is_atomic == True, each call will be executed inside a new atomic
microthread. WARNING: this means that the microthread won't be interrupted
by the stackless scheduler until it has finished running.
Expand All @@ -136,7 +137,7 @@ def decorate(func):
ch = Channel(True)

@functools.wraps(func)
def fsend(*data):
def fsend(*data, **kwargs):
ch.sendq(data)

@functools.wraps(func)
Expand All @@ -146,7 +147,7 @@ def fmicro(*data, **kwargs):
@functools.wraps(func)
def fatom(*data, **kwargs):
microatomic(func, data, **kwargs)

if is_atomic:
return fatom
elif is_micro:
Expand Down

0 comments on commit 1ee4e51

Please sign in to comment.