Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: e7713198bf
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 157 lines (128 sloc) 4.213 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156

import struct
import socket
import hashlib
import logging
from time import sleep

logger = logging.getLogger('pyhpfeeds')

OP_ERROR = 0
OP_INFO = 1
OP_AUTH = 2
OP_PUBLISH = 3
OP_SUBSCRIBE = 4
BUFSIZ = 16384

__all__ = ["new", "FeedException"]

def msghdr(op, data):
return struct.pack('!iB', 5+len(data), op) + data
def msgpublish(ident, chan, data):
# if isinstance(data, str):
# data = data.encode('latin1')
return msghdr(OP_PUBLISH, struct.pack('!B', len(ident)) + ident + struct.pack('!B', len(chan)) + chan + data)
def msgsubscribe(ident, chan):
return msghdr(OP_SUBSCRIBE, struct.pack('!B', len(ident)) + ident + chan)
def msgauth(rand, ident, secret):
hash = hashlib.sha1(rand+secret).digest()
return msghdr(OP_AUTH, struct.pack('!B', len(ident)) + ident + hash)

class FeedUnpack(object):
def __init__(self):
self.buf = bytearray()
def __iter__(self):
return self
def next(self):
return self.unpack()
def feed(self, data):
self.buf.extend(data)
def unpack(self):
if len(self.buf) < 5:
raise StopIteration('No message.')

ml, opcode = struct.unpack('!iB', buffer(self.buf,0,5))
if len(self.buf) < ml:
raise StopIteration('No message.')

data = bytearray(buffer(self.buf, 5, ml-5))
del self.buf[:ml]
return opcode, data

class FeedException(Exception):
pass

class HPC(object):
def __init__(self, host, port, ident, secret, timeout=3, reconnect=False, sleepwait=20):
self.host, self.port = host, port
self.ident, self.secret = ident, secret
self.timeout = timeout
self.reconnect = reconnect
self.sleepwait = sleepwait
self.brokername = 'unknown'
self.connected = False
self.stopped = False
self.unpacker = FeedUnpack()

self.connect()

def connect(self):
logger.info('connecting to {0}:{1}'.format(self.host, self.port))
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.settimeout(self.timeout)
try: self.s.connect((self.host, self.port))
except: raise FeedException('Could not connect to broker.')
self.connected = True

try: d = self.s.recv(BUFSIZ)
except socket.timeout: raise FeedException('Connection receive timeout.')

self.unpacker.feed(d)
for opcode, data in self.unpacker:
if opcode == OP_INFO:
rest = buffer(data, 0)
name, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0]))
rand = str(rest)

logger.debug('info message name: {0}, rand: {1}'.format(name, repr(rand)))
self.brokername = name

self.s.send(msgauth(rand, self.ident, self.secret))
break
else:
raise FeedException('Expected info message at this point.')

self.s.settimeout(None)

def _run(self, message_callback, error_callback):
while not self.stopped:
while self.connected:
d = self.s.recv(BUFSIZ)
if not d:
self.connected = False
break

self.unpacker.feed(d)
for opcode, data in self.unpacker:
if opcode == OP_PUBLISH:
rest = buffer(data, 0)
ident, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0]))
chan, content = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0]))

message_callback(str(ident), str(chan), content)
elif opcode == OP_ERROR:
error_callback(data)

if self.stopped: break

if self.stopped: break
self.connect()

def run(self, message_callback, error_callback):
if not self.reconnect:
self._run(message_callback, error_callback)
else:
while True:
self._run(message_callback, error_callback)
# reconnect now we've failed
sleep(self.sleepwait)
while True:
try:
self.connect()
break
except FeedException:
sleep(self.sleepwait)

def subscribe(self, chaninfo):
if type(chaninfo) == str:
chaninfo = [chaninfo,]
for c in chaninfo:
self.s.send(msgsubscribe(self.ident, c))
def publish(self, chaninfo, data):
if type(chaninfo) == str:
chaninfo = [chaninfo,]
for c in chaninfo:
self.s.send(msgpublish(self.ident, c, data))

def stop(self):
self.stopped = True

def close(self):
try: self.s.close()
except: logger.warn('Socket exception when closing.')

def new(host=None, port=10000, ident=None, secret=None, reconnect=True, sleepwait=20):
return HPC(host, port, ident, secret, reconnect)

Something went wrong with that request. Please try again.