Skip to content

Commit

Permalink
that last commit broke python2.7, this commit works for both 2.7+3.5 …
Browse files Browse the repository at this point in the history
…but mis-parses standard RSS
  • Loading branch information
bvacaliuc committed Dec 31, 2016
1 parent f6a1a0b commit f47b77b
Showing 1 changed file with 35 additions and 11 deletions.
46 changes: 35 additions & 11 deletions host/rss2rtl-power/rss2rtl-power.py
Expand Up @@ -26,6 +26,20 @@
PKT_SZ = struct.calcsize(PKT_FMT)
PKT_DTYPE = np.dtype( [('timestamp','>f8'),('hz_low','>f4'),('hz_high','>f4'),('hz_step','>f4'),('samples','>u4'),('channels','>u4')] )

# http://stackoverflow.com/questions/16867347/step-by-step-debugging-with-ipython
# https://github.com/ipython/ipython/wiki/Cookbook%3a-Updating-code-for-use-with-IPython-0.11-and-later
def _ipsh():
from IPython.frontend.terminal.embed import InteractiveShellEmbed
from IPython.config.loader import Config
import inspect
ipshell = InteractiveShellEmbed(config=Config(),
banner1='*** STOP\nCTRL-D to exit', exit_msg='Resume...')
frame = inspect.currentframe().f_back
msg = 'Stopped at {0.f_code.co_filename} at line {0.f_lineno}'.format(frame)
# Go back one level!
# This is needed because the call to ipshell is inside the function ipsh()
ipshell(msg,stack_depth=2)

class connection(asyncore.dispatcher):
pktlen = 1024
data = b''
Expand Down Expand Up @@ -57,7 +71,7 @@ def parseConnectionString(self,buf):
@param buf: bytes received in connection string
@return: a dictionary with key/value pairs interpreted from buf
"""
d = buf.decode('utf-8').rpartition('|')
d = buf.decode('ascii', errors='ignore').rpartition('|')
connect = d[0].split('|')
# parse key/value pairs in connection string
conf = {}
Expand All @@ -68,7 +82,6 @@ def parseConnectionString(self,buf):

def connectionReceived(self,buf):
"""Default connection handler that drops the connection."""
logger.debug('%s:%s:connectionReceived().len(buf)=%d'%(self.name,self.state,len(buf)))
self.close()
self.state = 'Disconnected'
return {}
Expand All @@ -81,7 +94,6 @@ def handle_error(self):
self.close()

def handle_connect(self):
logger.debug('%s:%s:handle_connect()'%(self.name,self.state))
self.state = 'Connecting'
self.pktlen = 1024

Expand Down Expand Up @@ -200,7 +212,8 @@ def __init__(self,opts):

def OnPacketReceived(self,buf):
"""Implement the callback when a spectrogram packet is received."""
logger.debug('%s:%s:OnPacketReceived().len(buf)=%d'%(self.name,self.conn.state,len(buf)))
import binascii
logger.debug('%s:%s:OnPacketReceived().len(buf)=%d:\n%s'%(self.name,self.conn.state,len(buf),binascii.hexlify(buf)))
if self.conn.state.startswith('Payload'):
# RSS timestamp @reception of payload
ts = time.time()
Expand All @@ -212,6 +225,7 @@ def OnPacketReceived(self,buf):
# parse payload as RSS would
b = np.frombuffer(buf,dtype=self.dtyp)
psd = np.reshape(b, (-1,self.channels+1))
logger.debug('%s:%s:OnPacketReceived().psd[0][%d]=%d ? %d'%(self.name,self.conn.state,self.channels,psd[0][self.channels],self.eos_marker))
assert(psd[0][self.channels] == self.eos_marker)
scan = psd[0][0:self.channels]
db = ', '.join(format(x,'d') for x in scan)
Expand All @@ -229,10 +243,11 @@ def OnPacketReceived(self,buf):

def OnConnectReceived(self,buf):
"""Implement the callback when a connection to the server is made."""
logger.debug('%s:%s:OnConnectReceived().len(buf)=%d'%(self.name,self.conn.state,len(buf)))
ts = time.time()
conf = self.conn.parseConnectionString(buf)

logger.debug('%s:%s:OnConnectReceived().conf=%s'%(self.name,self.conn.state,str(conf)))

# ensure canonical connection string
assert('F' in conf.keys())
assert('S' in conf.keys())
Expand Down Expand Up @@ -265,19 +280,29 @@ def OnConnectReceived(self,buf):
self.stats = statistics()
sys.stderr.write(self.stats.header()+os.linesep)
# consume (variable) connection buffer leaving payload
d = buf.decode('utf-8').rpartition('|')
self.conn.data = bytes(d[2].strip(),'utf-8')
d = buf.decode('ascii', errors='ignore').rpartition('|')
try:
self.conn.data = bytes(d[2].strip(),'utf-8')
except TypeError as e:
# Python2 throws TypeError
if str(e).startswith('str()'):
self.conn.data = bytes(d[2].strip())
else:
raise
# state to receive payloads
self.conn.pktlen = (self.channels+1)*self.dtyp.itemsize
self.conn.partial = False
self.conn.state = 'Payload'
# process any full payloads contained in the connection
while len(self.conn.data) > self.conn.pktlen:
logger.debug('%s:%s:OnConnectReceived(): len(data)=%d, pktlen=%d - call OnPacketReceived'%(self.name,self.conn.state,len(self.conn.data),self.conn.pktlen))
buf = self.conn.data
logger.debug('%s:%s:OnConnectReceived().loop.len(buf2)=%d'%(self.name,self.conn.state,len(buf)))
self.OnPacketReceived(buf[0:self.conn.pktlen]) # NB: discards self.conn.data
self.conn.data = buf[self.conn.pktlen:]
logger.debug('%s:%s:OnConnectReceived().loop.len(buf3)=%d'%(self.name,self.conn.state,len(self.conn.data)))
# after this, the async system kicks in to give us packets
import binascii
logger.debug('%s:%s:OnConnectReceived().loop.len(buf)=%d:\n%s'%(self.name,self.conn.state,len(self.conn.data),binascii.hexlify(self.conn.data)))

class rssx(object):
name = 'rssx'
Expand Down Expand Up @@ -321,7 +346,8 @@ def __init__(self,opts):

def OnPacketReceived(self,buf):
"""Implement the callback when a spectrogram packet is received."""
logger.debug('%s:%s:OnPacketReceived().len(buf)=%d'%(self.name,self.conn.state,len(buf)))
import binascii
logger.debug('%s:%s:OnPacketReceived().len(buf)=%d:\n%s'%(self.name,self.conn.state,len(buf),binascii.hexlify(buf)))
if self.conn.state.startswith('Header'):
ts, hz_low, hz_high, hz_step, samples, self.channels = struct.unpack(PKT_FMT, buf)
self.dt = datetime.datetime.fromtimestamp(ts)
Expand Down Expand Up @@ -353,7 +379,6 @@ def OnPacketReceived(self,buf):

def OnConnectReceived(self,buf):
"""Implement the callback when a connection to the server is made."""
logger.debug('%s:%s:OnConnectReceived().len(buf)=%d'%(self.name,self.conn.state,len(buf)))
ts = time.time()
conf = self.conn.parseConnectionString(buf)

Expand Down Expand Up @@ -429,7 +454,6 @@ def OnConnectReceived(self,buf):
logger.setLevel(logging.DEBUG if opts.verbose else logging.INFO)
if not opts.quiet:
logger.info('*** STARTED ***')

try:
c = rssx(opts) if opts.extended else rss(opts)
asyncore.loop(timeout=2)
Expand Down

0 comments on commit f47b77b

Please sign in to comment.