Permalink
Browse files

use socketserver instead gevent

  • Loading branch information...
1 parent 90f7f19 commit 64e30ae78d14f7859062652671ddaed731719a93 @lemanyk committed Jan 2, 2013
Showing with 206 additions and 67 deletions.
  1. +1 −0 .gitignore
  2. +155 −60 netlog.py
  3. +50 −7 tests.py
View
@@ -34,3 +34,4 @@ nosetests.xml
.project
.pydevproject
logs
+*.tar.gz
View
215 netlog.py
@@ -1,109 +1,204 @@
+from SocketServer import ThreadingTCPServer, StreamRequestHandler
+import threading
+import socket
+import time
+from Queue import Queue
import os
-from gevent import server, socket, sleep, queue, spawn
+import datetime
+import gzip
+import signal
+
+
+__all__ = ['Server', 'Client']
class Server(object):
"""
server process
- while started listen specified port for connections
- after every MAX_COUNT requests flushe to files
+ listen specified port for connections
usage:
Server('/logs', 5010).start()
:param path: directory to save logs
:param port: tcp port to listen
+ :param host='0.0.0.0': host for listen
+ :param binary=True: put into .tar.gz file or into plain text file
+ :param max_count=MAX_COUNT: maximum strings in memory before flush to files
+ :param life_time=LIFE_TIME: maximum days before delete log file
+ :param terminator=TERMINATOR: terminator between log strings
"""
+
BUF_SIZE = 1024 * 4
MAX_COUNT = 1000 # max lines before flush
LIFE_TIME = 30 # count of days for save files
TERMINATOR = '\n'
+ debug = False
+
+ class Handler(StreamRequestHandler):
+ def handle(self):
+ self.server._server.handle(self.request)
- def __init__(self, path, port, host='0.0.0.0'):
+ def __init__(self, path, port, host='0.0.0.0', binary=True,
+ max_count=MAX_COUNT, life_time=LIFE_TIME, terminator=TERMINATOR):
self.path = path
self.port = port
self.host = host
- self.queue = queue.Queue()
+ self.binary = binary
+ self.max_count = max_count
+ self.life_time = life_time
+ self.terminator = terminator
- def start(self, blocking=True):
+ self.dirs_file = os.path.join(self.path, 'netlog_dirs')
+ self.open = gzip.open if self.binary else open
+
+ def start(self, debug=False):
"""
start serving
- :param blocking: if True then code blocked in Server(..).start() line
+ :param debug: if True then write self work info into 'netlog' log
"""
- print 'start on %s %s' % (self.host, self.port)
- spawn(self.flusher)
- spawn(self.rotator)
- self._start() if blocking else spawn(self._start)
+ if not os.path.exists(self.path):
+ os.makedirs(self.path)
+ if not os.path.exists(self.dirs_file):
+ open(self.dirs_file, 'w')
+
+ self.debug = debug
+ self.debug_log('start on %s %s' % (self.host, self.port))
+
+ self.queue = Queue()
+ self.stopping = threading.Event()
- def _start(self):
- self.server = server.StreamServer((self.host, self.port), self.handle)
+ signal.signal(signal.SIGTERM, self.stop) # signal 15 (kill)
+ signal.signal(signal.SIGINT, self.stop) # signal 2 (ctrl+c)
+
+ threading.Thread(target=self.flusher).start()
+ threading.Thread(target=self.rotator).start()
+
+ self.server = ThreadingTCPServer((self.host, self.port), self.Handler)
+ self.server._server = self
self.server.serve_forever()
+ def handle(self, connection):
+ """work with connection, in dedicated thread"""
+ self.debug_log('connect with %s' % connection)
+ buf = ''
+ logfile = None
+ buf_partial = False
+ while not self.stopping.is_set():
+ # recv data
+ parts = buf.split(' ', 2)
+ if len(parts) < 3 or buf_partial:
+ buf += connection.recv(self.BUF_SIZE) # blocking
+ buf_partial = False
+ continue
+
+ # parse data
+ self.debug_log('buffer', connection, parts)
+ count, action, buf = parts
+ try:
+ count = int(count)
+ except:
+ print 444, parts
+ assert 0
+ print 222, count, action, len(buf)
+ if len(buf) < count:
+ buf_partial = True
+ print 333
+ continue
+
+ string = buf[:count]
+ buf = buf[count:]
+ if action in ['m', 't']:
+ self.queue.put((logfile, action, string))
+ elif action == 'f':
+ logfile = string
+
def flusher(self):
- """flusher process, flushe memory data to logs """
- logfiles = {}
+ """flusher process, flush memory data to logs """
+ logs = {}
count = 0
while True:
# wait and get new message
- logfile, action, string = self.queue.get()
- print 'queue', repr(logfile), repr(action), repr(string)
+ log_dir, action, string = self.queue.get()
+ self.debug_log('queue', repr(log_dir), repr(action), repr(string))
# parse message
- log = logfiles.setdefault(logfile, {})
+ log = logs.setdefault(log_dir, {})
if action == 'm':
count += 1
log.setdefault('strings', []).append(string)
elif action == 't':
log['terminator'] = string
# flush to files
- if count >= self.MAX_COUNT:
- print 'flush', logfiles
- for logfile, logitem in logfiles.items():
- path = os.path.join(self.path, logfile)
- try:
- os.makedirs(self.path)
- except OSError:
- pass
- terminator = logitem.get('terminator', self.TERMINATOR)
- with open(path, 'a') as f:
- f.write(terminator.join(logitem['strings']))
+ if count >= self.max_count or self.stopping.is_set():
+ self.debug_log('flush', logs.keys())
+ date_str = str(datetime.datetime.now().date())
+ for log_dir, log in logs.items():
+ path = os.path.join(self.path, log_dir)
+ if not os.path.exists(path):
+ os.makedirs(path)
+ terminator = log.get('terminator', self.terminator)
+ with self.open(os.path.join(path, date_str), 'a') as f:
+ f.write(terminator.join(log['strings']))
f.write(terminator)
-
- logfiles = {}
+ f.flush()
+
+ # register dirs in rotator
+ reg_logs = open(self.dirs_file).read().split('\n')
+ reg_logs_new = [log for log in reg_logs if os.path.exists(log)]
+ for log in reg_logs:
+ if log not in reg_logs_new:
+ reg_logs_new.append(log)
+ if reg_logs_new != reg_logs:
+ open(self.dirs_file, 'w').write('\n'.join(reg_logs_new))
+
+ logs = {}
count = 0
+ if self.stopping.is_set():
+ break
+
def rotator(self):
"""rotation process"""
- while True:
- sleep(600)
-
- def handle(self, sock, addr):
- print 'connect with %s %s' % addr
- buf = ''
- logfile = None
- while True:
- # recv data
- buf += sock.recv(self.BUF_SIZE)
- parts = buf.split(' ', 2)
-
- # parse data
- if len(parts) == 3:
- print 'buffer', addr, parts
- count, action, buf = parts
- count = int(count) # danger
- string = buf[:count]
- buf = buf[count:]
- if action in ['m', 't']:
- self.queue.put((logfile, action, string))
- elif action == 'f':
- logfile = string
- else:
- sleep(2)
-
- def stop(self):
- print 'seeya!'
+ date = datetime.datetime.now().date()
+ while not self.stopping.is_set():
+ time.sleep(0.1)
+ cur_date = datetime.datetime.now().date()
+
+ if date != cur_date:
+ logs = open(self.dirs_file).read().split('\n')
+ self.debug_log('rotate:', logs)
+
+ date_str = str(date)
+ cur_date_str = str(cur_date)
+ del_date_str = str(cur_date - datetime.timedelta(
+ days=self.life_time))
+
+ for log in logs:
+ logfile_src = os.path.join(log, date_str)
+ logfile_dst = os.path.join(log, cur_date_str)
+ logfile_del = os.path.join(log, del_date_str)
+ if os.path.exists(logfile_src):
+ os.move(logfile_src, logfile_dst)
+ if os.path.exists(logfile_del):
+ os.remove(logfile_del)
+
+ self.date = datetime.now().date()
+
+ def stop(self, signum, frame):
+ self.debug_log('stopping')
+ self.stopping.set()
+ del self.server
+ self.debug_log('stopped, seeya!')
+
+ def debug_log(self, *strings):
+ """print self log strings"""
+ if self.debug:
+ string = ' '.join(map(str, strings))
+ self.open(os.path.join(self.path, 'netlog'), 'a')\
+ .write('%s\n' % string)
class Client(object):
@@ -119,7 +214,7 @@ class Client(object):
:param port: port to connect
:param host: host to connect
- "param filename: file to write
+ :param filename: file to write
"""
def __init__(self, port, host, filename):
self.sock = socket.socket()
View
57 tests.py 100644 → 100755
@@ -1,11 +1,54 @@
-import gevent
+#!/usr/bin/env python2
+
+import unittest, multiprocessing, time, os, shutil, datetime, gzip, random
from netlog import Server, Client
+import pdb
+d = pdb.set_trace
+
+class AppTest(unittest.TestCase):
+ def setUp(self):
+ self.path = 'logs'
+ self.port = random.randrange(5000, 6000)
+ self.date = datetime.datetime.now().date()
+ self.date_str = str(self.date)
+ shutil.rmtree(self.path, True)
+
+ # start server
+ def start_server():
+ try:
+ Server(self.path, self.port, max_count=3).start(True)
+ except Exception:
+ pass
+ self.server = multiprocessing.Process(target=start_server)
+ self.server.start()
+
+ def test_main(self):
+ time.sleep(0.1)
+ self.assertEqual(os.listdir(self.path), ['netlog_dirs', 'netlog'])
+
+ # send messages from clients
+ client = Client('127.0.0.1', self.port, 'test0')
+ client.send('qwe')
+ client.send('asd')
+ time.sleep(0.1)
+ self.assertEqual(os.listdir(self.path), ['netlog_dirs', 'netlog'])
+
+ client.send('zxc')
+ time.sleep(0.1)
+ self.assertEqual(os.listdir(self.path),
+ ['netlog_dirs', 'netlog', 'test0'])
+
+ self.assertEqual(os.listdir(os.path.join(self.path, 'test0')),
+ [self.date_str])
+ content = gzip.open(os.path.join(self.path, 'test0',
+ str(self.date))).read()
+ self.assertEqual(content, 'qwe\nasd\nzxc\n')
+ def tearDown(self):
+ self.server.terminate()
+ self.server.join()
+ #del self.server
-gevent.spawn(lambda: Server('./logs', 5010).start())
-gevent.sleep(1)
-client = Client('127.0.0.1', 5010, 'test0')
-client.send('asd xvb')
-client.send('asd\nxvb')
-client.close()
+if __name__ == '__main__':
+ unittest.main()

0 comments on commit 64e30ae

Please sign in to comment.