Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

add handler and flusher

  • Loading branch information...
commit 02b300caca122d0f95720e7c3a97f4439313f7e9 1 parent d9e7e9b
@lemanyk authored
View
1  .gitignore
@@ -33,3 +33,4 @@ nosetests.xml
.mr.developer.cfg
.project
.pydevproject
+logs
View
2  __init__.py
@@ -1,3 +1,3 @@
# coding: utf-8
-from server import Server
+from api import Server, Client
View
102 netlog.py
@@ -0,0 +1,102 @@
+import os
+from gevent import server, socket, sleep, queue, spawn
+
+
+class Server(object):
+ """
+ """
+ BUF_SIZE = 1024 * 4
+ MAX_COUNT = 3
+
+ def __init__(self, path, port, host='0.0.0.0'):
+ self.path = path
+ self.port = port
+ self.host = host
+ self.queue = queue.Queue()
+
+ def start(self):
+ print 'start on %s %s' % (self.host, self.port)
+ spawn(self.flusher)
+ self.server = server.StreamServer((self.host, self.port), self.handle)
+ self.server.serve_forever()
+
+ def flusher(self):
+ logfiles = {}
+ count = 0
+
+ while True:
+ # wait and get new message
+ logfile, action, string = self.queue.get()
+ print 'queue', repr(logfile), repr(action), repr(string)
+
+ #
+ log = logfiles.setdefault(logfile, {})
+ if action == 'm':
+ count += 1
+ log.setdefault('strings', []).append(string)
+ elif action == 't':
+ log['terminator'] = string
+
+ # flush to files
+ if count >= self.MAX_COUNT:
+ print 'flush', logfiles
+ for logfile, logitem in logfiles.items():
+ path = os.path.join(self.path, logfile)
+ try:
+ os.makedirs(self.path)
+ except OSError:
+ pass
+ terminator = logitem.get('terminator', '\n')
+ with open(path, 'a') as f:
+ f.write(terminator)
+ f.write(terminator.join(logitem['strings']))
+
+ logfiles = {}
+ count = 0
+
+ def handle(self, sock, addr):
+ print 'connect with %s %s' % addr
+ buf = ''
+ logfile = None
+ while True:
+ # recv data
+ buf += sock.recv(self.BUF_SIZE)
+ parts = buf.split(' ', 2)
+
+ # parse data
+ if len(parts) == 3:
+ print 'buffer', addr, parts
+ count, action, buf = parts
+ count = int(count) # danger
+ string = buf[:count]
+ buf = buf[count:]
+ if action in ['m', 't']:
+ self.queue.put((logfile, action, string))
+ elif action == 'f':
+ logfile = string
+ else:
+ sleep(2)
+
+ def stop(self):
+ print 'seeya!'
+
+
+class Client(object):
+ """
+ """
+ def __init__(self, port, host, logfile):
+ self.sock = socket.socket()
+ self.sock.connect((port, host))
+ self.send('f %s' % logfile, True)
+
+ def send(self, string, control=False):
+ if not control:
+ string = 'm %s' % string
+ string = '%d %s' % (len(string)-2, string)
+ self.sock.send(string)
+
+ def close(self):
+ self.sock.close()
+
+ def __del__(self):
+ self.close()
View
25 server.py
@@ -1,25 +0,0 @@
-from gevent.server import StreamServer
-
-
-class Server(StreamServer):
- """
- """
- BUF_SIZE = 1024 * 4
-
- def __init__(self, port, host='0.0.0.0'):
- self.port = port
- self.host = host
-
- def start(self):
- print 'start on %s %s' % (self.host, self.port)
- self.server = StreamServer((self.host, self.port), self.handle)
- self.server.serve_forever()
-
- def handle(self, sock, addr):
- buf = sock.read(self.BUF_SIZE)
- file_name, buf = buf.split('\n', 1)
- while True:
- buf = sock.read(self.BUF_SIZE)
-
- def stop(self):
- print 'seeya!'
View
10 setup.py
@@ -0,0 +1,10 @@
+# codinf: utf-8
+
+from distutils.core import setup
+
+
+setup(
+ name='netlog',
+ version='0.1',
+ packages=['netlog'],
+)
View
11 tests.py
@@ -1,4 +1,11 @@
-from server import Server
+import gevent
+from netlog import Server, Client
-Server(port=5000).start()
+gevent.spawn(lambda: Server('./logs', 5000).start())
+gevent.sleep(1)
+
+client = Client('127.0.0.1', 5000, 'test0')
+client.send('asd xvb')
+client.send('asd\nxvb')
+client.close()
Please sign in to comment.
Something went wrong with that request. Please try again.