Skip to content
This repository
branch: master
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

executable file 149 lines (131 sloc) 4.676 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
#Copyright (c) 2011 Fabula Solutions. All rights reserved.
#Use of this source code is governed by a BSD-style license that can be
#found in the license.txt file.

# leveldb server
import threading
import zmq
import leveldb
import json
import optparse
from time import sleep

class workerThread(threading.Thread):
    """workerThread"""
    def __init__(self, context, db):
        threading.Thread.__init__ (self)
        self.context = context
        self.db = db
        self.running = True
        self.processing = False
        self.socket = self.context.socket(zmq.XREQ)

    def run(self):
        self.socket.connect('inproc://backend')
        while self.running:
            try:
                msg = self.socket.recv_multipart()
            except zmq.ZMQError:
                self.running = False
                continue

            self.processing = True
            if len(msg) != 3:
                value = 'None'
                reply = [msg[0], value]
                self.socket.send_multipart(reply)
                continue
            id = msg[0]
            op = msg[1]
            data = json.loads(msg[2])
            reply = [id]
            if op == 'get':
                try:
                    value = self.db.Get(data)
                except:
                    value = ""
                reply.append(value)
                
            elif op == 'put':
                try:
                    self.db.Put(data[0], data[1])
                    value = "True"
                except:
                    value = ""
                reply.append(value)
                
            elif op == 'delete':
                self.db.Delete(data)
                value = ""
                reply.append(value)
                
            elif op == 'range':
                start = data[0]
                end = data[1]
                if start and end:
                    try:
                        arr = []
                        for value in self.db.RangeIter(start, end):
                            arr.append({value[0]: value[1]})
                        reply.append(json.dumps(arr))
                    except:
                        value = ""
                        reply.append(value)
                else:
                    try:
                        arr = []
                        for value in self.db.RangeIter():
                            arr.append({value[0]: value[1]})
                        reply.append(json.dumps(arr))
                    except:
                        value = ""
                        reply.append(value)
            else:
                value = ""
                reply.append(value)
            self.socket.send_multipart(reply)
            self.processing = False

    def close(self):
        self.running = False
        while self.processing:
            sleep(1)
        self.socket.close()

if __name__ == "__main__":
    optparser = optparse.OptionParser(
        prog='leveldb-server.py',
        version='0.1.1',
        description='leveldb-server',
        usage='%prog \n\t-p [port and host settings] Default: tcp://127.0.0.1:5147\n' + \
                    '\t-f [database file name] Default: level.db')
    optparser.add_option('--host', '-p', dest='host',
        default='tcp://127.0.0.1:5147')
    optparser.add_option('--dbfile', '-d', dest='dbfile',
        default='level.db')
    options, arguments = optparser.parse_args()
    
    if not (options.host and options.dbfile):
        optparser.print_help()

    print "Starting leveldb-server %s" % options.host
    context = zmq.Context()
    frontend = context.socket(zmq.XREP)
    frontend.bind(options.host)
    backend = context.socket(zmq.XREQ)
    backend.bind('inproc://backend')

    poll = zmq.Poller()
    poll.register(frontend, zmq.POLLIN)
    poll.register(backend, zmq.POLLIN)

    db = leveldb.LevelDB(options.dbfile)

    workers = []
    for i in xrange(3):
        worker = workerThread(context, db)
        worker.start()
        workers.append(worker)
            
    try:
        while True:
            sockets = dict(poll.poll())
            if frontend in sockets:
                if sockets[frontend] == zmq.POLLIN:
                    msg = frontend.recv_multipart()
                    backend.send_multipart(msg)
                    
            if backend in sockets:
                if sockets[backend] == zmq.POLLIN:
                    msg = backend.recv_multipart()
                    frontend.send_multipart(msg)
    except KeyboardInterrupt:
        for worker in workers:
            worker.close()
        frontend.close()
        backend.close()
        context.term()

Something went wrong with that request. Please try again.