Browse files

Merge pull request #5 from buffer/master

Using TCP Keep-Alive for detecting lost connections
  • Loading branch information...
2 parents 5dd4f55 + 07dfbee commit 47f86b575f46a2bb3ba579e6e67b0c4c70474ac1 @rep committed Jul 4, 2012
Showing with 81 additions and 68 deletions.
  1. +0 −67 cli/grabfiles.py
  2. +6 −1 cli/hpfeeds.py
  3. +75 −0 cli/thugfiles.py
View
67 cli/grabfiles.py
@@ -1,67 +0,0 @@
-
-import os
-import sys
-import datetime
-import json
-import logging
-logging.basicConfig(level=logging.CRITICAL)
-
-import hpfeeds
-
-HOST = 'hpfeeds.honeycloud.net'
-PORT = 10000
-CHANNELS = ['thug.files',]
-IDENT = ''
-SECRET = ''
-OUTFILE = './grab.log'
-OUTDIR = './files/'
-
-def main():
- try: outfd = open(OUTFILE, 'a')
- except:
- print >>sys.stderr, 'could not open output file for message log.'
- return 1
-
- if not os.path.exists(OUTDIR): os.mkdir(OUTDIR)
-
- hpc = hpfeeds.new(HOST, PORT, IDENT, SECRET)
- print >>sys.stderr, 'connected to', hpc.brokername
-
- def on_message(identifier, channel, payload):
- try: decoded = json.loads(str(payload))
- except: decoded = {'raw': payload}
-
- if not 'sha1' in decoded or not 'data' in decoded:
- print >>sys.stderr, "Message received does not contain hash or data :( - ignoring it..."
- else:
- print >>sys.stderr, "Got a message with sha1 and data"
-
- csv = ', '.join(['{0}={1}'.format(i,decoded[i]) for i in ['sha1', 'type', 'md5']])
- outmsg = '{0} PUBLISH chan={1}, identifier={2}, {3}'.format(
- datetime.datetime.now().ctime(), chan, ident, csv
- )
-
- print >>outfd, outmsg
-
- # now store the file itself
- filedata = decoded['data'].decode('base64')
- fpath = os.path.join(OUTDIR, decoded['sha1'])
- try:
- open(fpath, 'wb').write(filedata)
- except:
- print >>outfd, '{0} ERROR could not write to {1}'.format(datetime.datetime.now().ctime(), fpath)
- outfd.flush()
-
- def on_error(payload):
- print >>sys.stderr, ' -> errormessage from server: {0}'.format(payload)
- hpc.stop()
-
- hpc.subscribe(CHANNELS)
- hpc.run(on_message, on_error)
- hpc.close()
- return 0
-
-if __name__ == '__main__':
- try: sys.exit(main())
- except KeyboardInterrupt:sys.exit(0)
-
View
7 cli/hpfeeds.py
@@ -1,4 +1,5 @@
+import sys
import struct
import socket
import hashlib
@@ -92,7 +93,11 @@ def connect(self):
else:
raise FeedException('Expected info message at this point.')
- self.s.settimeout(None)
+ self.s.settimeout(None)
+ self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+
+ if sys.platform in ('linux2', ):
+ self.s.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60)
def _run(self, message_callback, error_callback):
while not self.stopped:
View
75 cli/thugfiles.py
@@ -0,0 +1,75 @@
+
+import os
+import sys
+import time
+import datetime
+import json
+import logging
+import hpfeeds
+
+HOST = 'hpfeeds.honeycloud.net'
+PORT = 10000
+CHANNELS = ['thug.files',]
+IDENT = ''
+SECRET = ''
+OUTFILE = './grab.log'
+OUTDIR = './files/'
+
+log = logging.getLogger("thug.files")
+handler = logging.FileHandler(OUTFILE)
+formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s')
+handler.setFormatter(formatter)
+log.addHandler(handler)
+log.setLevel(logging.INFO)
+
+class ThugFiles:
+ def __init__(self):
+ if not os.path.exists(OUTDIR):
+ os.mkdir(OUTDIR)
+
+ def run(self):
+ def on_message(identifier, channel, payload):
+ try:
+ decoded = json.loads(str(payload))
+ except:
+ decoded = {'raw': payload}
+
+ if not 'md5' in decoded or not 'data' in decoded:
+ log.info("Received message does not contain hash or data - Ignoring it")
+ return
+
+ csv = ', '.join(['{0} = {1}'.format(i, decoded[i]) for i in ['md5', 'sha1', 'type']])
+ outmsg = 'PUBLISH channel = %s, identifier = %s, %s' % (channel, identifier, csv)
+ log.info(outmsg)
+
+ filedata = decoded['data'].decode('base64')
+ fpath = os.path.join(OUTDIR, decoded['md5'])
+
+ with open(fpath, 'wb') as fd:
+ fd.write(filedata)
+
+ def on_error(payload):
+ log.critical("Error message from server: %s" % (payload, ))
+ self.hpc.stop()
+
+ while True:
+ try:
+ self.hpc = hpfeeds.new(HOST, PORT, IDENT, SECRET)
+ log.info("Connected to %s" % (self.hpc.brokername, ))
+ self.hpc.subscribe(CHANNELS)
+ except hpfeeds.FeedException:
+ break
+
+ try:
+ self.hpc.run(on_message, on_error)
+ except:
+ self.hpc.close()
+ time.sleep(20)
+
+if __name__ == '__main__':
+ try:
+ f = ThugFiles()
+ f.run()
+ except KeyboardInterrupt:
+ sys.exit(0)
+

0 comments on commit 47f86b5

Please sign in to comment.