Skip to content
Browse files

Change code to fit in 80 character devices.

  • Loading branch information...
1 parent 29c7842 commit 7a9195d9dd039ff40bb38501646fdf158178fbc7 @llpamies committed Jul 13, 2012
View
3 clusterdfs/__init__.py
@@ -1,3 +1,4 @@
"""This is the main module."""
-__all__ = ['bufferedio','datablock','processname','networking','namenode','datanode','dfs','coding']
+__all__ = ['bufferedio','datablock','processname','networking','namenode',
+ 'datanode','dfs','coding']
View
31 clusterdfs/bufferedio.py
@@ -1,15 +1,12 @@
import io
import sys
-import gc
import os.path
import numpy
import gevent.queue
import gevent.event
from clusterdfs.common import ClassLogger
-from galoisbuffer import GaloisBuffer
-
@ClassLogger
class IOBuffer(object):
defsize = 4*io.DEFAULT_BUFFER_SIZE
@@ -40,8 +37,6 @@ def __init__(self, factory=None, size=None):
self.buff = bytearray(size)
self.mem = memoryview(self.buff)
self.reset(factory=factory, size=size)
-
- self.galois = GaloisBuffer(size, bitfield=16, buffer=self.buff)
def reset(self, factory=None, size=None):
if size==None:
@@ -97,10 +92,11 @@ def create(self, *args, **kwargs):
@ClassLogger
class InputStreamReader(object):
- def __init__(self, input_stream, debug_name=None, num_buffers=2, size=None, async=False):
+ def __init__(self, input_stream, debug_name=None, num_buffers=2, size=None,
+ async=False):
'''
- If the 'size' is larger than the 'available()' bytes in the input stream,
- then garbage is read to achieve 'size'.
+ If the 'size' is larger than the 'available()' bytes in the input
+ stream, then garbage is read to achieve 'size'.
'''
if not isinstance(input_stream, InputStream):
raise TypeError('input_stream must be an InputStream instance.')
@@ -121,19 +117,23 @@ def __init__(self, input_stream, debug_name=None, num_buffers=2, size=None, asyn
else:
self.get = self._get_sync
- if __debug__: self.logger.debug("Starting new %d bytes reader (async=%s).", self.size, unicode(self.async))
+ if __debug__: self.logger.debug("Starting new %d bytes reader "
+ "(async=%s).", self.size,
+ unicode(self.async))
def _run(self):
assert self.async
- if __debug__: self.logger.debug("Starting %s internal async process.", self.debug_name or hex(id(self)))
+ if __debug__: self.logger.debug("Starting %s internal async process.",
+ self.debug_name or hex(id(self)))
try:
while self.bytes_left>0:
self.queue.put(self._get_sync())
if __debug__: self.logger.debug("Reader has successfully finished.")
return False
except Exception, e:
- self.logger.error('Reader subprocesses got a %s exception.', e.__class__.__name__)
+ self.logger.error('Reader subprocesses got a %s exception.',
+ e.__class__.__name__)
self.logger.error(unicode(e))
self.exc_info = sys.exc_info()
return True
@@ -143,7 +143,8 @@ def _run(self):
self.finalized = True
def _get_async(self):
- if __debug__: self.logger.debug("Calling async get in %s.", self.debug_name or hex(id(self)))
+ if __debug__: self.logger.debug("Calling async get in %s.",
+ self.debug_name or hex(id(self)))
try:
iobuffer = self.queue.get()
if self.exc_info:
@@ -153,7 +154,8 @@ def _get_async(self):
raise IOError("Reader ended before it was expected!")
def _get_sync(self):
- if __debug__: self.logger.debug("Calling sync get in %s.", self.debug_name or hex(id(self)))
+ if __debug__: self.logger.debug("Calling sync get in %s.",
+ self.debug_name or hex(id(self)))
assert self.bytes_left>0
if __debug__: self.logger.debug("%s get iteration %d/%d.",
self.debug_name or hex(id(self)),
@@ -267,7 +269,8 @@ def join(self):
class InputStream(object):
def __init__(self, size):
if (type(size)!=int and type(size)!=long) or size<=0:
- raise TypeError("Parameter size must be a positive integer, got %s."%str(size))
+ raise TypeError("Parameter size must be a positive integer, "
+ "got %s."%str(size))
self.size = size
self.bytes_left = size
View
78 clusterdfs/coding.py
@@ -8,10 +8,6 @@
from networking import Client
from galoisbuffer import GaloisBuffer
-#coding = sys.modules[__name__]
-
-bitfield_op = 16
-
class CodingException(Exception):
pass
@@ -32,13 +28,17 @@ def delete_temp(name):
@ClassLogger
class RemoteNetCodingReader(InputStreamReader):
- def __init__(self, node_addr, block_id, coding_id, stream_id, nodes, **kwargs):
+ def __init__(self, node_addr, block_id, coding_id, stream_id,
+ nodes, **kwargs):
nodes = ';'.join(map(str, nodes))
self.client = Client(*node_addr)
- self.header = DataNodeHeader.generate(DataNodeHeader.OP_CODING, block_id, coding_id, stream_id, nodes)
+ self.header = DataNodeHeader.generate(DataNodeHeader.OP_CODING,
+ block_id, coding_id, stream_id,
+ nodes)
self.client.send(self.header)
- super(RemoteNetCodingReader, self).__init__(self.client.recv_stream(), async=True, **kwargs)
+ super(RemoteNetCodingReader, self).__init__(self.client.recv_stream(),
+ async=True, **kwargs)
def finalize(self, kill=False):
if not kill:
@@ -69,26 +69,31 @@ class NetCodingExecutor(object):
sizes = {}
numreg = collections.defaultdict(int)
- def __init__(self, operations, resolver, stream_id):
+ def __init__(self, operations, resolver, stream_id, bf=16):
self.stream_id = stream_id
self.operations = operations
self.resolver = resolver
self.finalized = False
+ self.bitfield_op = bf
# Create dictionaries
self.buffers = {}
self.readers = {}
self.writers = {}
- if __debug__: self.logger.debug('New executor for %d %s.', os.getpid(), self.stream_id)
+ if __debug__: self.logger.debug('New executor for %d %s.', os.getpid(),
+ self.stream_id)
if __debug__: self.logger.debug('Processing streams...')
for stream_name, stream_type in self.operations.streams:
- if __debug__: self.logger.debug('Processing stream: %s.',unicode(stream_name))
+ if __debug__: self.logger.debug('Processing stream: %s.',
+ unicode(stream_name))
if stream_type=='r':
- self.readers[stream_name] = self.resolver.get_reader(stream_name)
+ self.readers[stream_name] = self.resolver.\
+ get_reader(stream_name)
elif stream_type=='w':
- self.writers[stream_name] = self.resolver.get_writer(stream_name)
+ self.writers[stream_name] = self.resolver.\
+ get_writer(stream_name)
else:
raise TypeError('Invalid operation stream.')
@@ -100,7 +105,8 @@ def __init__(self, operations, resolver, stream_id):
if self.size==None:
self.size = s
elif self.size!=s:
- raise CodingException('Reader sizes are not aligned: %d, %d.'%(self.size, s))
+ raise CodingException('Reader sizes are not aligned: %d, %d.'\
+ %(self.size, s))
# If there is no input stream... (no fixed size) we
# try to get the size from a previous set executor in the same stream.
@@ -110,7 +116,8 @@ def __init__(self, operations, resolver, stream_id):
assert self.size!=None
if self.stream_id not in NetCodingExecutor.queues:
- NetCodingExecutor.queues[self.stream_id] = collections.defaultdict(gevent.queue.Queue)
+ NetCodingExecutor.queues[self.stream_id] = collections\
+ .defaultdict(gevent.queue.Queue)
# Keep track of which iobuffers we have to free here.
self.disposable_buffers = set()
@@ -151,7 +158,8 @@ def finalize(self):
'''
def execute_instruction(self, instruction):
- if __debug__: self.logger.debug('NetCodingInputStream %s is processing instruction %s',
+ if __debug__: self.logger.debug('NetCodingInputStream %s is '
+ 'processing instruction %s',
self.stream_id, str(instruction))
bytes_processed = None
@@ -197,10 +205,10 @@ def execute_instruction(self, instruction):
elif instruction[0]=='IADD':
src_buffer = self.buffers[instruction[2]]
dst_buffer = self.buffers[instruction[1]]
- #src = GaloisBuffer(src_buffer.size, bitfield=bitfield_op, buffer=src_buffer.buff)
- #dst = GaloisBuffer(dst_buffer.size, bitfield=bitfield_op, buffer=dst_buffer.buff)
- src = src_buffer.galois
- dst = dst_buffer.galois
+ src = GaloisBuffer(src_buffer.size, bitfield=self.bitfield_op,
+ buffer=src_buffer.buff)
+ dst = GaloisBuffer(dst_buffer.size, bitfield=self.bitfield_op,
+ buffer=dst_buffer.buff)
dst += src
dst_buffer.length = src_buffer.length
bytes_processed = dst_buffer.length
@@ -216,10 +224,10 @@ def execute_instruction(self, instruction):
self.logger.error('Buffer sizes are not aligned.')
raise CodingException('Buffers sizes are not aligned.')
- #src = GaloisBuffer(src_buffer.size, bitfield=bitfield_op, buffer=src_buffer.buff)
- #dst = GaloisBuffer(dst_buffer.size, bitfield=bitfield_op, buffer=dst_buffer.buff)
- src = src_buffer.galois
- dst = dst_buffer.galois
+ src = GaloisBuffer(src_buffer.size, bitfield=self.bitfield_op,
+ buffer=src_buffer.buff)
+ dst = GaloisBuffer(dst_buffer.size, bitfield=self.bitfield_op,
+ buffer=dst_buffer.buff)
src.multadd(literal_value, dest=dst, add=True)
dst_buffer.length = src_buffer.length
bytes_processed = dst_buffer.length
@@ -235,19 +243,21 @@ def execute_instruction(self, instruction):
self.logger.error('Buffer sizes are not aligned.')
raise CodingException('Buffer sizes are not aligned.')
- #src = GaloisBuffer(src_buffer.size, bitfield=bitfield_op, buffer=src_buffer.buff)
- #dst = GaloisBuffer(dst_buffer.size, bitfield=bitfield_op, buffer=dst_buffer.buff)
- src = src_buffer.galois
- dst = dst_buffer.galois
+ src = GaloisBuffer(src_buffer.size, bitfield=self.bitfield_op,
+ buffer=src_buffer.buff)
+ dst = GaloisBuffer(dst_buffer.size, bitfield=self.bitfield_op,
+ buffer=dst_buffer.buff)
src.multadd(literal_value, dest=dst, add=False)
dst_buffer.length = src_buffer.length
bytes_processed = dst_buffer.length
else:
- raise CodingException('Invalid coding instruction: %s'%(str(instruction)))
+ raise CodingException('Invalid coding instruction: %s'\
+ %(str(instruction)))
if bytes_processed==None:
- raise CodingException('The number of processed bytes was not set for instruction: %s'%(str(instruction)))
+ raise CodingException('The number of processed bytes was not set '
+ 'for instruction: %s'%(str(instruction)))
return bytes_processed
@@ -271,13 +281,15 @@ def execute_step(self, output=None):
iobuffer.free()
self.disposable_buffers.clear()
- if __debug__: self.logger.debug('Coding processed %d bytes.', bytes_processed)
+ if __debug__: self.logger.debug('Coding processed %d bytes.',
+ bytes_processed)
return bytes_processed
def execute(self):
read = 0
while read<self.size:
- if __debug__: self.logger.debug('execute iter %d/%d', read, self.size)
+ if __debug__: self.logger.debug('execute iter %d/%d',
+ read, self.size)
read += self.execute_step()
yield read
@@ -298,7 +310,9 @@ def read(self, iobuffer, nbytes=None):
else:
nbytes = min(iobuffer.size, nbytes)
- if __debug__: self.logger.debug('Reading %d bytes from NetCodingInputStream %s.', nbytes, hex(id(self)))
+ if __debug__: self.logger.debug('Reading %d bytes from '
+ 'NetCodingInputStream %s.', nbytes,
+ hex(id(self)))
num_read = self.executor.execute_step(iobuffer)
if __debug__: self.logger.debug('Coding processed %d bytes.', num_read)
assert num_read==iobuffer.length, (num_read,iobuffer.length)
View
11 clusterdfs/datablock.py
@@ -5,7 +5,8 @@
import cStringIO
class DataBlockHeader(object):
- '''Each stored block has a 256 byte header. This is its binary structure (all little-endian):
+ '''Each stored block has a 256 byte header. This is its binary structure
+ (all little-endian):
+ 32 bytes ( 0 -- 31) --> file uuid (hex formatted string)
+ 32 bytes ( 32 -- 71) --> block uuid (hex formatted string)
+ 8 bytes ( 72 -- 83) --> offset in the file (unsigned int)
@@ -98,14 +99,18 @@ def create(base_dir='', file_uuid=None, block_uuid=None):
if block_uuid==None: block_uuid = '%032x'%uuid.uuid4().int
header = DataBlockHeader(file_uuid=file_uuid, block_uuid=block_uuid)
- f = DataBlock(os.path.join(base_dir, DataBlock.filename(file_uuid, block_uuid)), 'w')
+ f = DataBlock(os.path.join(base_dir,
+ DataBlock.filename(file_uuid,
+ block_uuid)), 'w')
f.init(header)
f.seek(DataBlockHeader.SIZE)
return f
@staticmethod
def open(file_uuid, block_uuid, base_dir=''):
- f = DataBlock(os.path.join(base_dir, DataBlock.filename(file_uuid, block_uuid)), 'r')
+ f = DataBlock(os.path.join(base_dir,
+ DataBlock.filename(file_uuid,
+ block_uuid)), 'r')
f.init(DataBlockHeader.parse(f))
return f
View
88 clusterdfs/datanode.py
@@ -5,13 +5,14 @@
import logging
import tempfile
import importlib
-
+
from common import Config, ClassLogger
from coding import NetCodingExecutor, NetCodingInputStream
from headers import DataNodeHeader, NameNodeHeader
from networking import Client, Server, ServerHandle
-from bufferedio import FileInputStream, FileOutputStream, InputStreamReader, OutputStreamWriter
-
+from bufferedio import FileInputStream, FileOutputStream, InputStreamReader,\
+ OutputStreamWriter
+
class DataNodeConfig(Config):
port = 13100
bind_addr = '0.0.0.0'
@@ -22,7 +23,7 @@ class DataNodeConfig(Config):
fakeout = False
isolated = False
coding_mod_name = 'clusterdfs.rapidraid'
-
+
def check(self):
if self.datadir==None:
self.datadir = tempfile.mkdtemp()
@@ -69,13 +70,15 @@ def get_reader(self, block_id, debug_name=None):
'''
Returns a FileInputStream for the block with block_id.
'''
- return InputStreamReader(self.get_input_stream(block_id), debug_name=debug_name)
+ return InputStreamReader(self.get_input_stream(block_id),
+ debug_name=debug_name)
def get_writer(self, block_id, debug_name=None):
'''
Returns a FileOutputStream for the block with block_id.
'''
- return OutputStreamWriter(self.get_output_stream(block_id), debug_name=debug_name)
+ return OutputStreamWriter(self.get_output_stream(block_id),
+ debug_name=debug_name)
@ClassLogger
class DataNodeQuery(ServerHandle):
@@ -103,31 +106,9 @@ def store_block(self, block_id=None, **kwargs):
reader = self.recv_reader()
- '''
- if 'fwdlist' in self.header:
- # Get the forward list and the next forward node
- forward_list = self.header['fwdlist']
- logging.info("Forwarding '%s' to %s.", block_id, repr(forward_list[0]))
- logging.info("Remaining forwards: %d.", len(forward_list)-1)
- next_node = Client(*forward_list[0])
- next_forward_list = forward_list[1:]
-
- # Send header to next node
- header = self.header.copy()
- header['fwdlist'] = next_forward_list
- next_node.send(header)
-
- # processing
- writer = OutputStreamWriter(self.server.block_store.get_output_stream(block_id), next_node.output_stream)
- reader.flush(writer)
- writer.finalize()
- writer.join()
- next_node.assert_ack()
-
- else:
- '''
# processing
- writer = OutputStreamWriter(self.server.block_store.get_output_stream(block_id))
+ writer = OutputStreamWriter(self.server.block_store\
+ .get_output_stream(block_id))
reader.flush(writer)
writer.finalize()
writer.join()
@@ -151,46 +132,59 @@ def insert_data(self, block_id=None, **kwargs):
real_size = instream.size
block_size = int(math.ceil(float(real_size)/coding.k))
store_size = block_size*coding.k
- self.logger.info("Inserting a file of size %d, block size %d.", store_size, block_size)
+ self.logger.info("Inserting a file of size %d, block size %d.",
+ store_size, block_size)
for i in xrange(coding.k):
self.logger.info("Inserting part %d of %d.", i, coding.k)
reader = InputStreamReader(instream, size=block_size)
- writer = OutputStreamWriter(self.server.block_store.get_output_stream(block_id+"_part%d"%i))
+ writer = OutputStreamWriter(self.server.block_store\
+ .get_output_stream(block_id+"_part%d"%i))
reader.flush(writer)
writer.finalize()
writer.join()
logging.info("Object '%s' inserted successfully.", block_id)
- def node_coding(self, block_id='', coding_id=None, stream_id='', nodes='', **kwargs):
+ def node_coding(self, block_id='', coding_id=None, stream_id='',
+ nodes='', **kwargs):
if not block_id:
raise ValueError("'block_id' is not provided.")
if not coding_id:
raise ValueError("'coding_id' is not provided.")
if not nodes:
raise ValueError("'nodes' is not provided.")
- # Generate an ID that will be the ID for all the coding stream (pipeline).
+ '''
+ Generate an ID that will be the ID for all the coding
+ stream (pipeline).
+ '''
if stream_id=='':
stream_id = uuid.uuid4().hex
reader = None
writer = None
coding_executor = None
try:
- self.logger.info("Starting coding operation %s - %s", coding_id, stream_id)
+ self.logger.info("Starting coding operation %s - %s", coding_id,
+ stream_id)
coding = self.server.config.coding_mod
nodes = map(eval, nodes.split(';'))
coding_operations = coding.operations[coding_id]
- resolver = coding.RapidRaidResolver(block_id, stream_id, self.server.block_store, nodes, config=self.server.config)
- coding_executor = NetCodingExecutor(coding_operations, resolver, stream_id)
+ resolver = coding.RapidRaidResolver(block_id, stream_id,
+ self.server.block_store,
+ nodes,
+ config=self.server.config)
+ coding_executor = NetCodingExecutor(coding_operations, resolver,
+ stream_id, bf=coding.bf)
if coding_operations.is_stream():
if __debug__: self.logger.debug("Forwarding coding stream.")
input_stream = NetCodingInputStream(coding_executor)
- reader = InputStreamReader(input_stream, debug_name='coding_result', async=False)
+ reader = InputStreamReader(input_stream,
+ debug_name='coding_result',
+ async=False)
self.send(input_stream)
writer = self.new_writer(async=False)
reader.flush(writer)
@@ -206,7 +200,8 @@ def node_coding(self, block_id='', coding_id=None, stream_id='', nodes='', **kwa
self.logger.info('Coding ended successfully.')
finally:
- if __debug__: self.logger.debug("executing datanode finally statement.")
+ if __debug__: self.logger.debug("executing datanode "
+ "finally statement.")
if coding_executor: coding_executor.finalize()
if reader: reader.finalize(True)
if writer: writer.finalize()
@@ -218,7 +213,8 @@ def __init__(self, config, server):
self.config = config
self.server = server
self.process = gevent.spawn(self.timeout)
- self.ping = {'op':NameNodeHeader.OP_PING, 'datanode_port':self.config.port}
+ self.ping = {'op':NameNodeHeader.OP_PING,
+ 'datanode_port':self.config.port}
def stop(self):
self.process.kill()
@@ -230,14 +226,16 @@ def timeout(self):
try:
logging.debug('Sending ping.')
'''
- ne = NetworkEndpoint(gevent.socket.create_connection((self.config.namenode_addr, self.config.namenode_port)))
+ ne = NetworkEndpoint(gevent.socket.create_connection(
+ (self.config.namenode_addr, self.config.namenode_port)))
ne.send(self.ping)
ne.send([])
response = ne.recv()
'''
except Exception as e:
- logging.error("Cannot deliver ping to nameserver: %s."%unicode(e))
+ logging.error("Cannot deliver ping to nameserver: %s."\
+ %unicode(e))
# sleep timeout
gevent.sleep(self.config.ping_timeout)
@@ -246,7 +244,8 @@ def timeout(self):
class DataNode(Server):
def __init__(self, config):
self.config = config
- self.logger.info("Configuring DataNode to listen on localhost:%d", self.config.port)
+ self.logger.info("Configuring DataNode to listen on localhost:%d",
+ self.config.port)
self.logger.info("DataNode data dir: %s", config.datadir)
self.logger.info("Using a fake out? %s", unicode(config.fakeout))
Server.__init__(self, DataNodeQuery, port=self.config.port)
@@ -296,7 +295,8 @@ def retrieve(self, block_id, local_path):
if __debug__: self.logger.debug("END.")
def coding(self, block_id, coding_id, nodes):
- self.send(DataNodeHeader.generate(DataNodeHeader.OP_CODING, block_id, coding_id, nodes=nodes))
+ self.send(DataNodeHeader.generate(DataNodeHeader.OP_CODING,
+ block_id, coding_id, nodes=nodes))
if __debug__: self.logger.debug("Wating for ACK.")
size = self.recv()
assert isinstance(size, int) or isinstance(size, long)
View
3 clusterdfs/dfs.py
@@ -8,7 +8,8 @@
class DFS(Client):
def store(self, path):
length = os.path.getsize(path)
- header = {'op':DataNodeHeader.OP_STORE, 'length':length, 'id':path.split('/')[-1]}
+ header = {'op':DataNodeHeader.OP_STORE, 'length':length,
+ 'id':path.split('/')[-1]}
header['fwdlist'] = [('172.21.48.151',7777)]
print header
self.send(header)
View
6 clusterdfs/headers.py
@@ -29,11 +29,13 @@ def parse(s):
raise TypeError("must be a string")
reader = StringIO(s)
decoder = avro.io.BinaryDecoder(reader)
- datum_reader = avro.io.DatumReader(writers_schema=DataNodeHeader.schema, readers_schema=DataNodeHeader.schema)
+ datum_reader = avro.io.DatumReader(writers_schema=DataNodeHeader.schema,
+ readers_schema=DataNodeHeader.schema)
return datum_reader.read(decoder)
@staticmethod
- def generate(operation, block_id=None, coding_id='', stream_id='', nodes=''):
+ def generate(operation, block_id=None, coding_id='', stream_id='',
+ nodes=''):
writer = StringIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(writers_schema=DataNodeHeader.schema)
View
21 clusterdfs/namenode.py
@@ -20,7 +20,9 @@ class NameNodeConfig(Config):
def check(self):
pass
-NameNodeTreeNode = collections.namedtuple('NameNodeTreeNode', ['id', 'name', 'children', 'parent', 'dir'])
+NameNodeTreeNode = collections.namedtuple('NameNodeTreeNode',
+ ['id', 'name', 'children', 'parent',
+ 'dir'])
class NameNodeTreeException(Exception):
pass
@@ -31,7 +33,9 @@ class NameNodeTree(object):
def __init__(self, config):
self.dict = shelve.open(config.tree_file)
if self.rootname not in self.dict:
- self.store(NameNodeTreeNode(id=self.rootname, name=None, children={None:self.rootname}, parent=None, dir=True))
+ self.store(NameNodeTreeNode(id=self.rootname, name=None,
+ children={None:self.rootname},
+ parent=None, dir=True))
self.root = self.dict[self.rootname]
def store(self, node):
@@ -60,7 +64,8 @@ def create(self, path, directory=False):
raise NameNodeTreeException('Parent path should be a directory.')
if name in parent.children:
raise NameNodeTreeException('The file exists.')
- new = NameNodeTreeNode(id=uuid.uuid4().hex, name=name, children={}, parent=parent.id, dir=directory)
+ new = NameNodeTreeNode(id=uuid.uuid4().hex, name=name, children={},
+ parent=parent.id, dir=directory)
parent.children[name] = new.id
self.store(parent)
self.store(new)
@@ -90,7 +95,9 @@ def move(self, src, dst):
parent_dst, name_dst = self.parse(dst)
parent_dst.children[name_dst] = node_src.id
- new = NameNodeTreeNode(id=node_src.id, name=name_dst, children=node_src.children, parent=parent_dst.id, dir=node_src.dir)
+ new = NameNodeTreeNode(id=node_src.id, name=name_dst,
+ children=node_src.children, parent=parent_dst.id,
+ dir=node_src.dir)
self.store(parent_dst)
self.store(new)
self.dict.sync()
@@ -146,7 +153,8 @@ def ping(self):
stored_blocks = self.recv()
for file_uuid, block_uuid in stored_blocks:
self.server.db_direct_lookup[file_uuid][block_uuid] = datanode_addr
- self.server.db_reverse_lookup[datanode_addr].add((file_uuid, block_uuid))
+ self.server.db_reverse_lookup[datanode_addr].add((file_uuid,
+ block_uuid))
return ServerResponse.ok(msg='Blocks processed.')
@@ -159,7 +167,8 @@ def getnodes(self):
class NameNode(Server):
def __init__(self, config):
self.config = config
- logging.info("Configuring NameNode to listen on localhost:%d"%(self.config.port))
+ logging.info("Configuring NameNode to listen on localhost:%d"\
+ %(self.config.port))
Server.__init__(self, NameNodeQuery, port=self.config.port)
self.db_nodes = []
View
63 clusterdfs/networking.py
@@ -7,7 +7,8 @@
import socket
from common import ClassLogger
-from bufferedio import NetworkOutputStream, OutputStreamWriter, InputStream, InputStreamReader, NetworkInputStream, IOBuffer
+from bufferedio import NetworkOutputStream, OutputStreamWriter, InputStream,\
+ InputStreamReader, NetworkInputStream, IOBuffer
class NetworkHeader(object):
ERROR = 1
@@ -24,7 +25,8 @@ def __init__(self, message='', trace=''):
self.trace = trace
if not trace:
exc_info = sys.exc_info()
- self.trace = ''.join(traceback.format_list(traceback.extract_tb(exc_info[2])))
+ err_list = traceback.extract_tb(exc_info[2])
+ self.trace = ''.join(traceback.format_list(err_list))
self.trace += ' '+message
def log_forward(self, node):
@@ -34,8 +36,8 @@ def serialize(self):
return self.trace
@classmethod
- def unserialize(clazz, s):
- return clazz(trace=s)
+ def unserialize(klass, s):
+ return klass(trace=s)
def __str__(self):
return '\n'+self.trace
@@ -48,13 +50,16 @@ def __init__(self, socket):
self.output_stream = NetworkOutputStream(self)
'''
- The sendall socket mehtod can only be used in blocking (timeout==None) sockets.
+ The sendall socket mehtod can only be used in blocking
+ (timeout==None) sockets.
'''
if self.socket.gettimeout()==None:
- if __debug__: self.logger.debug("Using '_send_bytes_sendall' function.")
+ if __debug__: self.logger.debug("Using '_send_bytes_sendall' "
+ "function.")
self._send_bytes = self._send_bytes_sendall
else:
- if __debug__: self.logger.debug("Using '_send_bytes_iter' function.")
+ if __debug__: self.logger.debug("Using '_send_bytes_iter' "
+ "function.")
self._send_bytes = self._send_bytes_iter
self._streamed = 0
@@ -124,36 +129,43 @@ def _recv_bytes(self, num_bytes):
def recv(self):
try:
if self._partial_buffer>0:
- raise IOError("Cannot read new data if the previous buffer wasn't completely read.")
+ raise IOError("Cannot read new data if the previous buffer "
+ "wasn't completely read.")
if self.reading_stream!=None:
if not self.reading_stream.is_processed():
- raise Exception("Cannot receive data from the socket until the stream is processed.")
+ raise Exception("Cannot receive data from the socket until"
+ " the stream is processed.")
self.reading_stream = None
packet_type, data_len = self._recv_header()
- if __debug__: self.logger.debug("Received header: %d %d.", packet_type, data_len)
+ if __debug__: self.logger.debug("Received header: %d %d.",
+ packet_type, data_len)
if packet_type==NetworkHeader.ERROR:
- if __debug__: self.logger.debug("Received error (%d bytes).", data_len)
+ if __debug__: self.logger.debug("Received error (%d bytes).",
+ data_len)
self._recv_error(data_len)
elif packet_type==NetworkHeader.INTEGER:
if __debug__: self.logger.debug("Received integer.")
return self._recv_integer()
elif packet_type==NetworkHeader.STRING:
- if __debug__: self.logger.debug("Received string (%d bytes).", data_len)
+ if __debug__: self.logger.debug("Received string (%d bytes).",
+ data_len)
return self._recv_bytes(data_len)
elif packet_type==NetworkHeader.STREAM_HEADER:
- if __debug__: self.logger.debug("Received stream (%d bytes).", data_len)
+ if __debug__: self.logger.debug("Received stream (%d bytes).",
+ data_len)
self._streamed = 0
self._to_stream = data_len
return NetworkInputStream(self, data_len)
else:
- raise TypeError("Incompatible NetworkHeader value %d."%(packet_type))
+ raise TypeError("Incompatible NetworkHeader value %d."\
+ %(packet_type))
except:
#self.kill()
@@ -197,8 +209,14 @@ def recv_into(self, memview):
return self._fill(memview)
else:
- if __debug__: self.logger.debug("Invalid header after receiving %d streamed bytes out of %d."%(self._streamed, self._to_stream))
- raise TypeError("Incompatible NetworkHeader value %d."%(packet_type))
+ if __debug__: self.logger.debug("Invalid header after "
+ "receiving %d streamed "
+ "bytes out of %d."\
+ %(self._streamed,
+ self._to_stream))
+
+ raise TypeError("Incompatible NetworkHeader value %d."\
+ %(packet_type))
else:
return self._fill(memview)
@@ -237,7 +255,8 @@ def send(self, obj):
raise TypeError('Invalid type.')
def local_address(self):
- return commands.getoutput("/sbin/ifconfig").split("\n")[1].split()[1][5:]
+ ifconfig = commands.getoutput("/sbin/ifconfig")
+ return ifconfig.split("\n")[1].split()[1][5:]
def kill(self):
if __debug__: self.logger.debug("Killing socket.")
@@ -251,6 +270,9 @@ def __init__(self, server, socket, address):
self.address = address
self.server = server
+ def process_query(self):
+ raise NotImplementedError()
+
def handle(self):
send_ack = True
response = False
@@ -259,7 +281,9 @@ def handle(self):
response = True
except socket.error as e:
- self.logger.error("Failed connection from %s (child of %s): %s."%(repr(self.address), repr(self.server.address), unicode(e)))
+ self.logger.error("Failed connection from %s (child of %s): %s."\
+ %(repr(self.address), repr(self.server.address),
+ unicode(e)))
except NetworkException as e:
self.logger.error("RaisedNetworkException:\n"+unicode(e))
@@ -289,7 +313,8 @@ def handle(self):
class Server():
def __init__(self, handle_class=ServerHandle, addr='', port=7777):
self.address = (addr,port)
- self.server = gevent.server.StreamServer(self.address, self.netser_handle)
+ self.server = gevent.server.StreamServer(self.address,
+ self.netser_handle)
self.handle_class = handle_class
'''
View
152 clusterdfs/rapidraid.py
@@ -1,10 +1,11 @@
from clusterdfs.common import ClassLogger
-from clusterdfs.coding import NetCodingOperations, NetCodingResolver, RemoteNetCodingReader
+from clusterdfs.coding import NetCodingOperations, NetCodingResolver,\
+ RemoteNetCodingReader
import re
import galoisbuffer
-
+'''
bf = 16
xis = [48386, 55077, 40589, 63304, 49062, 47871, 17507, 49390,
54054, 45897, 55796, 27611, 50294, 30336, 882, 60087,
@@ -18,7 +19,7 @@
149, 237, 31, 198, 15, 45, 138, 246, 100, 101, 16]
psis = [129, 207, 158, 56, 171, 247, 145, 139, 137, 211, 190,
230, 140, 177, 75, 47, 94, 60, 244, 69, 190, 167]
-'''
+
xisi = map(lambda x: galoisbuffer.inverse_val(x, bitfield=bf), xis)
@@ -52,14 +53,16 @@
# Decoding Operations:
-dec_node0 = NetCodingOperations('dec_node0', [('coded0', 'r'), ('orig0', 'w')], output='stream')
+dec_node0 = NetCodingOperations('dec_node0', [('coded0', 'r'), ('orig0', 'w')],
+ output='stream')
dec_node0.add(('LOAD','temp','coded0'))
dec_node0.add(('MULT', 'temp', xisi[0], 'temp'))
dec_node0.add(('MULT', 'stream', psis[0], 'temp'))
dec_node0.add(('WRITE', 'temp', 'orig0'))
dec_node0.add(('PUSH', 'queue', 'temp'))
-dec_node1 = NetCodingOperations('dec_node1', [('coded1', 'r'), ('orig1', 'w'), ('dec_node0', 'r')], output='stream')
+dec_node1 = NetCodingOperations('dec_node1', [('coded1', 'r'), ('orig1', 'w'),
+ ('dec_node0', 'r')], output='stream')
dec_node1.add(('LOAD','temp','coded1'))
dec_node1.add(('LOAD','prev','dec_node0'))
dec_node1.add(('IADD', 'temp', 'prev'))
@@ -69,7 +72,8 @@
dec_node1.add(('COPY', 'stream', 'prev'))
dec_node1.add(('MULADD', 'stream', psis[1],'temp'))
-dec_node2 = NetCodingOperations('dec_node2', [('coded2', 'r'), ('orig2', 'w'), ('dec_node1', 'r')], output='stream')
+dec_node2 = NetCodingOperations('dec_node2', [('coded2', 'r'), ('orig2', 'w'),
+ ('dec_node1', 'r')], output='stream')
dec_node2.add(('LOAD','temp','coded2'))
dec_node2.add(('LOAD','prev','dec_node1'))
dec_node2.add(('IADD', 'temp', 'prev'))
@@ -79,7 +83,8 @@
dec_node2.add(('COPY', 'stream', 'prev'))
dec_node2.add(('MULADD', 'stream', psis[2],'temp'))
-dec_node3 = NetCodingOperations('dec_node3', [('coded3', 'r'), ('orig3', 'w'), ('dec_node2', 'r')], output='stream')
+dec_node3 = NetCodingOperations('dec_node3', [('coded3', 'r'), ('orig3', 'w'),
+ ('dec_node2', 'r')], output='stream')
dec_node3.add(('LOAD','temp','coded3'))
dec_node3.add(('LOAD','prev','dec_node2'))
dec_node3.add(('IADD', 'temp', 'prev'))
@@ -89,7 +94,8 @@
dec_node3.add(('COPY', 'stream', 'prev'))
dec_node3.add(('MULADD', 'stream', psis[3],'temp'))
-dec_node4 = NetCodingOperations('dec_node4', [('coded4', 'r'), ('orig4', 'w'), ('dec_node3', 'r')], output='stream')
+dec_node4 = NetCodingOperations('dec_node4', [('coded4', 'r'), ('orig4', 'w'),
+ ('dec_node3', 'r')], output='stream')
dec_node4.add(('LOAD','temp','coded4'))
dec_node4.add(('LOAD','prev','dec_node3'))
dec_node4.add(('IADD', 'temp', 'prev'))
@@ -99,7 +105,10 @@
dec_node4.add(('COPY', 'stream', 'prev'))
dec_node4.add(('MULADD', 'stream', psis[4],'temp'))
-dec_node5 = NetCodingOperations('dec_node5', [('coded5', 'r'), ('orig5', 'w'), ('dec_node4', 'r'), ('dec_node0_aux', 'r')], output='stream')
+dec_node5 = NetCodingOperations('dec_node5', [('coded5', 'r'), ('orig5', 'w'),
+ ('dec_node4', 'r'),
+ ('dec_node0_aux', 'r')],
+ output='stream')
dec_node5.add(('LOAD','temp','coded5'))
dec_node5.add(('LOAD','prev','dec_node4'))
dec_node5.add(('LOAD','prevaux','dec_node0_aux'))
@@ -112,7 +121,10 @@
dec_node5.add(('MULADD', 'stream', psis[5],'prevaux'))
dec_node5.add(('MULADD', 'stream', psis[6],'temp'))
-dec_node6 = NetCodingOperations('dec_node6', [('coded6', 'r'), ('orig6', 'w'), ('dec_node5', 'r'), ('dec_node1_aux', 'r')], output='stream')
+dec_node6 = NetCodingOperations('dec_node6', [('coded6', 'r'), ('orig6', 'w'),
+ ('dec_node5', 'r'),
+ ('dec_node1_aux', 'r')],
+ output='stream')
dec_node6.add(('LOAD','temp','coded6'))
dec_node6.add(('LOAD','prev','dec_node5'))
dec_node6.add(('LOAD','prevaux','dec_node1_aux'))
@@ -124,7 +136,10 @@
dec_node6.add(('MULADD', 'stream', psis[7],'prevaux'))
dec_node6.add(('MULADD', 'stream', psis[8],'temp'))
-dec_node7 = NetCodingOperations('dec_node7', [('coded7', 'r'), ('orig7', 'w'), ('dec_node6', 'r'), ('dec_node2_aux', 'r')], output='stream')
+dec_node7 = NetCodingOperations('dec_node7', [('coded7', 'r'), ('orig7', 'w'),
+ ('dec_node6', 'r'),
+ ('dec_node2_aux', 'r')],
+ output='stream')
dec_node7.add(('LOAD','temp','coded7'))
dec_node7.add(('LOAD','prev','dec_node6'))
dec_node7.add(('LOAD','prevaux','dec_node2_aux'))
@@ -136,7 +151,10 @@
dec_node7.add(('MULADD', 'stream', psis[9],'prevaux'))
dec_node7.add(('MULADD', 'stream', psis[10],'temp'))
-dec_node8 = NetCodingOperations('dec_node8', [('coded8', 'r'), ('orig8', 'w'), ('dec_node7', 'r'), ('dec_node3_aux', 'r')], output='stream')
+dec_node8 = NetCodingOperations('dec_node8', [('coded8', 'r'), ('orig8', 'w'),
+ ('dec_node7', 'r'),
+ ('dec_node3_aux', 'r')],
+ output='stream')
dec_node8.add(('LOAD','temp','coded8'))
dec_node8.add(('LOAD','prev','dec_node7'))
dec_node8.add(('LOAD','prevaux','dec_node3_aux'))
@@ -148,7 +166,10 @@
dec_node8.add(('MULADD', 'stream', psis[11],'prevaux'))
dec_node8.add(('MULADD', 'stream', psis[12],'temp'))
-dec_node9 = NetCodingOperations('dec_node9', [('coded9', 'r'), ('orig9', 'w'), ('dec_node8', 'r'), ('dec_node4_aux', 'r')], output='stream')
+dec_node9 = NetCodingOperations('dec_node9', [('coded9', 'r'), ('orig9', 'w'),
+ ('dec_node8', 'r'),
+ ('dec_node4_aux', 'r')],
+ output='stream')
dec_node9.add(('LOAD','temp','coded9'))
dec_node9.add(('LOAD','prev','dec_node8'))
dec_node9.add(('LOAD','prevaux','dec_node4_aux'))
@@ -160,7 +181,10 @@
dec_node9.add(('MULADD', 'stream', psis[13],'prevaux'))
dec_node9.add(('MULADD', 'stream', psis[14],'temp'))
-dec_node10 = NetCodingOperations('dec_node10', [('coded10', 'r'), ('orig10', 'w'), ('dec_node9', 'r'), ('dec_node5_aux', 'r')])
+dec_node10 = NetCodingOperations('dec_node10', [('coded10', 'r'),
+ ('orig10', 'w'),
+ ('dec_node9', 'r'),
+ ('dec_node5_aux', 'r')])
dec_node10.add(('LOAD','temp','coded10'))
dec_node10.add(('LOAD','prev','dec_node9'))
dec_node10.add(('LOAD','prevaux','dec_node5_aux'))
@@ -171,45 +195,57 @@
# Encoding operations:
-enc_node0 = NetCodingOperations('enc_node0', [('part0', 'r'), ('coded0', 'w')], output='stream')
+enc_node0 = NetCodingOperations('enc_node0', [('part0', 'r'), ('coded0', 'w')],
+ output='stream')
enc_node0.add(('LOAD', 'temp', 'part0'))
enc_node0.add(('MULT', 'stream', psis[0], 'temp'))
#enc_node0.add(('MULT', 'temp', xis[0], 'temp'))
enc_node0.add(('WRITE', 'temp', 'coded0'))
-enc_node1 = NetCodingOperations('enc_node1', [('part1', 'r'), ('coded1', 'w'), ('enc_node0','r')], output='stream')
+enc_node1 = NetCodingOperations('enc_node1', [('part1', 'r'), ('coded1', 'w'),
+ ('enc_node0','r')],
+ output='stream')
enc_node1.add(('LOAD', 'local', 'part1'))
enc_node1.add(('LOAD', 'prev', 'enc_node0'))
enc_node1.add(('COPY', 'stream', 'prev'))
enc_node1.add(('MULADD', 'stream', psis[1], 'local'))
enc_node1.add(('MULADD', 'prev', xis[1], 'local'))
enc_node1.add(('WRITE', 'prev', 'coded1'))
-enc_node2 = NetCodingOperations('enc_node2', [('part2', 'r'), ('coded2', 'w'), ('enc_node1','r')], output='stream')
+enc_node2 = NetCodingOperations('enc_node2', [('part2', 'r'), ('coded2', 'w'),
+ ('enc_node1','r')],
+ output='stream')
enc_node2.add(('LOAD', 'local', 'part2'))
enc_node2.add(('LOAD', 'prev', 'enc_node1'))
enc_node2.add(('COPY', 'stream', 'prev'))
enc_node2.add(('MULADD', 'stream', psis[2], 'local'))
enc_node2.add(('MULADD', 'prev', xis[2], 'local'))
enc_node2.add(('WRITE', 'prev', 'coded2'))
-enc_node3 = NetCodingOperations('enc_node3', [('part3', 'r'), ('coded3', 'w'), ('enc_node2','r')], output='stream')
+enc_node3 = NetCodingOperations('enc_node3', [('part3', 'r'), ('coded3', 'w'),
+ ('enc_node2','r')],
+ output='stream')
enc_node3.add(('LOAD', 'local', 'part3'))
enc_node3.add(('LOAD', 'prev', 'enc_node2'))
enc_node3.add(('COPY', 'stream', 'prev'))
enc_node3.add(('MULADD', 'stream', psis[3], 'local'))
enc_node3.add(('MULADD', 'prev', xis[3], 'local'))
enc_node3.add(('WRITE', 'prev', 'coded3'))
-enc_node4 = NetCodingOperations('enc_node4', [('part4', 'r'), ('coded4', 'w'), ('enc_node3','r')], output='stream')
+enc_node4 = NetCodingOperations('enc_node4', [('part4', 'r'), ('coded4', 'w'),
+ ('enc_node3','r')],
+ output='stream')
enc_node4.add(('LOAD', 'local', 'part4'))
enc_node4.add(('LOAD', 'prev', 'enc_node3'))
enc_node4.add(('COPY', 'stream', 'prev'))
enc_node4.add(('MULADD', 'stream', psis[4], 'local'))
enc_node4.add(('MULADD', 'prev', xis[4], 'local'))
enc_node4.add(('WRITE', 'prev', 'coded4'))
-enc_node5 = NetCodingOperations('enc_node5', [('part5', 'r'), ('part0', 'r'), ('coded5', 'w'), ('enc_node4','r')], output='stream')
+enc_node5 = NetCodingOperations('enc_node5', [('part5', 'r'), ('part0', 'r'),
+ ('coded5', 'w'),
+ ('enc_node4','r')],
+ output='stream')
enc_node5.add(('LOAD', 'local0', 'part0'))
enc_node5.add(('LOAD', 'local5', 'part5'))
enc_node5.add(('LOAD', 'prev', 'enc_node4'))
@@ -220,7 +256,10 @@
enc_node5.add(('MULADD', 'prev', xis[6], 'local5'))
enc_node5.add(('WRITE', 'prev', 'coded5'))
-enc_node6 = NetCodingOperations('enc_node6', [('part6', 'r'), ('part1', 'r'), ('coded6', 'w'), ('enc_node5','r')], output='stream')
+enc_node6 = NetCodingOperations('enc_node6', [('part6', 'r'), ('part1', 'r'),
+ ('coded6', 'w'),
+ ('enc_node5','r')],
+ output='stream')
enc_node6.add(('LOAD', 'local1', 'part1'))
enc_node6.add(('LOAD', 'local6', 'part6'))
enc_node6.add(('LOAD', 'prev', 'enc_node5'))
@@ -231,7 +270,10 @@
enc_node6.add(('MULADD', 'prev', xis[8], 'local6'))
enc_node6.add(('WRITE', 'prev', 'coded6'))
-enc_node7 = NetCodingOperations('enc_node7', [('part7', 'r'), ('part2', 'r'), ('coded7', 'w'), ('enc_node6','r')], output='stream')
+enc_node7 = NetCodingOperations('enc_node7', [('part7', 'r'), ('part2', 'r'),
+ ('coded7', 'w'),
+ ('enc_node6','r')],
+ output='stream')
enc_node7.add(('LOAD', 'local2', 'part2'))
enc_node7.add(('LOAD', 'local7', 'part7'))
enc_node7.add(('LOAD', 'prev', 'enc_node6'))
@@ -242,7 +284,10 @@
enc_node7.add(('MULADD', 'prev', xis[10], 'local7'))
enc_node7.add(('WRITE', 'prev', 'coded7'))
-enc_node8 = NetCodingOperations('enc_node8', [('part8', 'r'), ('part3', 'r'), ('coded8', 'w'), ('enc_node7','r')], output='stream')
+enc_node8 = NetCodingOperations('enc_node8', [('part8', 'r'), ('part3', 'r'),
+ ('coded8', 'w'),
+ ('enc_node7','r')],
+ output='stream')
enc_node8.add(('LOAD', 'local3', 'part3'))
enc_node8.add(('LOAD', 'local8', 'part8'))
enc_node8.add(('LOAD', 'prev', 'enc_node7'))
@@ -253,7 +298,10 @@
enc_node8.add(('MULADD', 'prev', xis[12], 'local8'))
enc_node8.add(('WRITE', 'prev', 'coded8'))
-enc_node9 = NetCodingOperations('enc_node9', [('part9', 'r'), ('part4', 'r'), ('coded9', 'w'), ('enc_node8','r')], output='stream')
+enc_node9 = NetCodingOperations('enc_node9', [('part9', 'r'), ('part4', 'r'),
+ ('coded9', 'w'),
+ ('enc_node8','r')],
+ output='stream')
enc_node9.add(('LOAD', 'local4', 'part4'))
enc_node9.add(('LOAD', 'local9', 'part9'))
enc_node9.add(('LOAD', 'prev', 'enc_node8'))
@@ -264,7 +312,10 @@
enc_node9.add(('MULADD', 'prev', xis[14], 'local9'))
enc_node9.add(('WRITE', 'prev', 'coded9'))
-enc_node10 = NetCodingOperations('enc_node10', [('part10', 'r'), ('part5', 'r'), ('coded10', 'w'), ('enc_node9','r')], output='stream')
+enc_node10 = NetCodingOperations('enc_node10', [('part10', 'r'), ('part5', 'r'),
+ ('coded10', 'w'),
+ ('enc_node9','r')],
+ output='stream')
enc_node10.add(('LOAD', 'local5', 'part5'))
enc_node10.add(('LOAD', 'local10', 'part10'))
enc_node10.add(('LOAD', 'prev', 'enc_node9'))
@@ -275,39 +326,53 @@
enc_node10.add(('MULADD', 'prev', xis[16], 'local10'))
enc_node10.add(('WRITE', 'prev', 'coded10'))
-enc_node11 = NetCodingOperations('enc_node11', [('part6', 'r'), ('coded11', 'w'), ('enc_node10','r')], output='stream')
+enc_node11 = NetCodingOperations('enc_node11', [('part6', 'r'),
+ ('coded11', 'w'),
+ ('enc_node10','r')],
+ output='stream')
enc_node11.add(('LOAD', 'local', 'part6'))
enc_node11.add(('LOAD', 'prev', 'enc_node10'))
enc_node11.add(('COPY', 'stream', 'prev'))
enc_node11.add(('MULADD', 'stream', psis[17], 'local'))
enc_node11.add(('MULADD', 'prev', xis[17], 'local'))
enc_node11.add(('WRITE', 'prev', 'coded11'))
-enc_node12 = NetCodingOperations('enc_node12', [('part7', 'r'), ('coded12', 'w'), ('enc_node11','r')], output='stream')
+enc_node12 = NetCodingOperations('enc_node12', [('part7', 'r'),
+ ('coded12', 'w'),
+ ('enc_node11','r')],
+ output='stream')
enc_node12.add(('LOAD', 'local', 'part7'))
enc_node12.add(('LOAD', 'prev', 'enc_node11'))
enc_node12.add(('COPY', 'stream', 'prev'))
enc_node12.add(('MULADD', 'stream', psis[18], 'local'))
enc_node12.add(('MULADD', 'prev', xis[18], 'local'))
enc_node12.add(('WRITE', 'prev', 'coded12'))
-enc_node13 = NetCodingOperations('enc_node13', [('part8', 'r'), ('coded13', 'w'), ('enc_node12','r')], output='stream')
+enc_node13 = NetCodingOperations('enc_node13', [('part8', 'r'),
+ ('coded13', 'w'),
+ ('enc_node12','r')],
+ output='stream')
enc_node13.add(('LOAD', 'local', 'part8'))
enc_node13.add(('LOAD', 'prev', 'enc_node12'))
enc_node13.add(('COPY', 'stream', 'prev'))
enc_node13.add(('MULADD', 'stream', psis[19], 'local'))
enc_node13.add(('MULADD', 'prev', xis[19], 'local'))
enc_node13.add(('WRITE', 'prev', 'coded13'))
-enc_node14 = NetCodingOperations('enc_node14', [('part9', 'r'), ('coded14', 'w'), ('enc_node13','r')], output='stream')
+enc_node14 = NetCodingOperations('enc_node14', [('part9', 'r'),
+ ('coded14', 'w'),
+ ('enc_node13','r')],
+ output='stream')
enc_node14.add(('LOAD', 'local', 'part9'))
enc_node14.add(('LOAD', 'prev', 'enc_node13'))
enc_node14.add(('COPY', 'stream', 'prev'))
enc_node14.add(('MULADD', 'stream', psis[20], 'local'))
enc_node14.add(('MULADD', 'prev', xis[20], 'local'))
enc_node14.add(('WRITE', 'prev', 'coded14'))
-enc_node15 = NetCodingOperations('enc_node15', [('part10', 'r'), ('coded15', 'w'), ('enc_node14','r')])
+enc_node15 = NetCodingOperations('enc_node15', [('part10', 'r'),
+ ('coded15', 'w'),
+ ('enc_node14','r')])
enc_node15.add(('LOAD', 'local', 'part10'))
enc_node15.add(('LOAD', 'prev', 'enc_node14'))
enc_node15.add(('MULADD', 'prev', xis[21], 'local'))
@@ -351,7 +416,8 @@
operations['dec_node4_aux'] = dec_node4_aux
operations['dec_node5_aux'] = dec_node5_aux
-test = NetCodingOperations('test', [('part10', 'r'), ('part5', 'r'), ('coded10', 'w'), ('part0','r')])
+test = NetCodingOperations('test', [('part10', 'r'), ('part5', 'r'),
+ ('coded10', 'w'), ('part0','r')])
test.add(('LOAD', 'local5', 'part5'))
test.add(('LOAD', 'local10', 'part10'))
test.add(('LOAD', 'prev', 'part0'))
@@ -375,34 +441,40 @@ def get_reader(self, key):
if key.startswith('enc_node'):
coding_id = key[8:]
coding_id_int = int(re.search("(\d*)",coding_id).group(0))
- return RemoteNetCodingReader(self.get_enc_node(coding_id_int), self.block_id,
- key, self.stream_id, self.nodes, debug_name=key)
+ return RemoteNetCodingReader(self.get_enc_node(coding_id_int),
+ self.block_id, key, self.stream_id,
+ self.nodes, debug_name=key)
elif key.startswith('dec_node'):
coding_id = key[8:]
coding_id_int = int(re.search("(\d*)",coding_id).group(0))
- return RemoteNetCodingReader(self.get_enc_node(coding_id_int), self.block_id,
- key, self.stream_id, self.nodes, debug_name=key)
+ return RemoteNetCodingReader(self.get_enc_node(coding_id_int),
+ self.block_id, key, self.stream_id,
+ self.nodes, debug_name=key)
elif key.startswith('part'):
coding_id = int(key[4:])
- return self.block_store.get_reader(self.get_part(coding_id), debug_name=key)
+ return self.block_store.get_reader(self.get_part(coding_id),
+ debug_name=key)
elif key.startswith('coded'):
coding_id = int(key[5:])
- return self.block_store.get_reader(self.get_coded(coding_id), debug_name=key)
+ return self.block_store.get_reader(self.get_coded(coding_id),
+ debug_name=key)
else:
assert False
def get_writer(self, key):
if key.startswith('coded'):
coding_id = int(key[5:])
- return self.block_store.get_writer(self.get_coded(coding_id), debug_name=key)
+ return self.block_store.get_writer(self.get_coded(coding_id),
+ debug_name=key)
elif key.startswith('orig'):
coding_id = int(key[4:])
- return self.block_store.get_writer(self.get_orig(coding_id), debug_name=key)
+ return self.block_store.get_writer(self.get_orig(coding_id),
+ debug_name=key)
else:
assert False
@@ -420,4 +492,4 @@ def get_orig(self, coding_id):
return self.block_id+'.orig%d'%coding_id
k = 11
-__all__ = [operations, RapidRaidResolver, k]
+__all__ = [operations, RapidRaidResolver, k, bf]
View
48 experiment_infocom/infocom_test
@@ -39,15 +39,15 @@ class Congester(object):
stdout.channel.recv_exit_status()
stdin, stdout, stderr = ssh.exec_command(conge)
stdout.channel.recv_exit_status()
- print node, 'congested'
+ #print node, 'congested'
def _free(self, node):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(node, username='root')
stdin, stdout, stderr = ssh.exec_command(self.clean)
stdout.channel.recv_exit_status()
- print node, 'free'
+ #print node, 'free'
def congest(self, proportion, Mbps):
p = []
@@ -151,33 +151,41 @@ if __name__=='__main__':
"""
Experiment measuring individual coding times.
"""
- '''
while True:
- ps = RunnerPipe(nodes, name='results_pipe.dat')
+ ps = RunnerPipe(nodes, name='times_pipe.dat')
ps.run(1)
- pp = RunnerPipe(nodes, name='results_pipe_par.dat')
+ pp = RunnerPipe(nodes, name='times_pipe_par.dat')
pp.runp(1)
- cs = RunnerCauchy(nodes, name='results_cauchy.dat')
- cs.run(1)
+ #cs = RunnerCauchy(nodes, name='times_cauchy.dat')
+ #cs.run(1)
- cp = RunnerCauchy(nodes, name='results_cauchy_par.dat')
- cp.runp(1)
- '''
+ #cp = RunnerCauchy(nodes, name='times_cauchy_par.dat')
+ #cp.runp(1)
+
"""
Experiment to measure the effects of congestion.
"""
+ '''
c = Congester(nodes)
+ bw = 500
while True:
- for n,p in [('33',.3333), ('66',.6666), ('100',1.0)]:
- for bw in [500,1000]:
- c.congest(p, bw)
- print '\n', bw, p
-
- ps = RunnerPipe(nodes, name='times_pipe_%s_%s.dat'%(n, unicode(bw)))
- ps.run(1)
-
- cs = RunnerCauchy(nodes, name='times_cauchy_%s_%s.dat'%(n,unicode(bw)))
- cs.run(1)
+ for n,p in [('25',.25), ('50',.5), ('75',.75), ('100',1.0)]:
+ c.congest(p, bw)
+ print '\n', bw, p
+
+ ps = RunnerPipe(nodes, name='times_pipe_%s.dat'%(n))
+ ps.run(1)
+
+ pp = RunnerPipe(nodes, name='times_pipe_par_%s.dat'%(n))
+ pp.runp(1)
+
+ cs = RunnerCauchy(nodes, name='times_cauchy_%s.dat'%(n))
+ cs.run(1)
+
+ cp = RunnerCauchy(nodes, name='times_cauchy_par_%s.dat'%(n))
+ cp.runp(1)
+ '''
+
View
2 experiment_infocom/obtain_ec2_addr.sh
@@ -1,5 +1,5 @@
#instances="i-baa9c4ee i-a4a9c4f0 i-a6a9c4f2 i-a0a9c4f4 i-a2a9c4f6 i-aca9c4f8 i-aea9c4fa i-a8a9c4fc i-aaa9c4fe i-54a8c500 i-56a8c502 i-50a8c504 i-52a8c506 i-5ca8c508 i-5ea8c50a i-58a8c50c"
-instances="i-26bbd572 i-20bbd574 i-22bbd576 i-2cbbd578 i-2ebbd57a i-28bbd57c i-2abbd57e i-d4bbd580 i-d6bbd582 i-d0bbd584 i-d2bbd586 i-dcbbd588 i-debbd58a i-d8bbd58c i-dabbd58e i-c4bbd590"
+instances=" i-2a6f077e i-286f077c i-2e6f077a i-2c6f0778 i-226f0776 i-206f0774 i-266f0772 i-246f0770 i-3a6f076e i-386f076c i-3e6f076a i-3c6f0768 i-326f0766 i-306f0764 i-366f0762 i-346f0760"
rm amazon.pssh.ext
rm amazon.pssh.int
rm amazon.pssh.ips
View
25 tests/cpucost.py
@@ -8,7 +8,7 @@
def test_ec(k=11, m=5, w=4, iters=1000):
code = CauchyEC(k, m, bitfield=w)
- buffers = [IOBuffer().as_numpy_byte_array()for i in xrange(k+m)]
+ buffers = [IOBuffer().as_numpy_byte_array() for i in xrange(k+m)]
t = time.time()
for it in xrange(iters):
@@ -19,22 +19,25 @@ def test_ec(k=11, m=5, w=4, iters=1000):
return time.time()-t
-def test_pipe(iters=1000):
- rr = numpy.random.randint(0, 65000, size=IOBuffer.defsize)
- buffers = [IOBuffer() for i in xrange(16)]
- for b in buffers:
+def test_pipe(iters=1000, bits=8):
+ mx = (2**bits)-1
+ rr = numpy.random.randint(0, mx, size=IOBuffer.defsize)
+ iobuffers = [IOBuffer() for i in xrange(16)]
+ buffers = [GaloisBuffer(iob.size, bitfield=bits, buffer=iob.buff) for iob in iobuffers]
+
+ for b in iobuffers:
array = b.as_numpy_byte_array()
- array += rr*random.randrange(0,65000)
+ array += rr*random.randrange(0,mx)
t = time.time()
for it in xrange(iters):
random.shuffle(buffers)
- buffers[-1].copy_to(buffers[-2])
+ iobuffers[-1].copy_to(iobuffers[-2])
for i in xrange(2+(it&1)):
- buffers[i].galois.multadd(random.randrange(0,65000),\
- dest=buffers[i+1].galois, add=True)
+ buffers[i].multadd(random.randrange(0,mx), dest=buffers[i+1], add=True)
return (time.time()-t)*16
iters = 500
-print test_ec(iters=iters)
-print test_pipe(iters=iters)
+print 'caucy', test_ec(iters=iters)
+print 'pipe8', test_pipe(iters=iters, bits=8)
+print 'pipe16', test_pipe(iters=iters, bits=16)

0 comments on commit 7a9195d

Please sign in to comment.
Something went wrong with that request. Please try again.