-
Notifications
You must be signed in to change notification settings - Fork 27
/
tap.py
129 lines (104 loc) · 4.14 KB
/
tap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python
"""
TAP protocol client library.
Copyright (c) 2010 Dustin Sallings <dustin@spy.net>
"""
import socket
import string
import random
import struct
import asyncore
import mc_bin_server
import mc_bin_client
from memcacheConstants import REQ_MAGIC_BYTE, RES_MAGIC_BYTE
from memcacheConstants import REQ_PKT_FMT, RES_PKT_FMT, MIN_RECV_PACKET
from memcacheConstants import SET_PKT_FMT, DEL_PKT_FMT, INCRDECR_RES_FMT
import memcacheConstants
class TapConnection(mc_bin_server.MemcachedBinaryChannel):
def __init__(self, server, port, callback, clientId=None, opts={}, user=None, pswd=None):
mc_bin_server.MemcachedBinaryChannel.__init__(self, None, None,
self._createTapCall(clientId,
opts))
self.server = server
self.port = port
self.callback = callback
self.identifier = (server, port)
self.user = user
self.pswd = pswd
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((server, port))
def create_socket(self, family, type):
if not self.user:
mc_bin_server.MemcachedBinaryChannel.create_socket(self, family, type)
return
self.family_and_type = family, type
self.mc = mc_bin_client.MemcachedClient(self.server, self.port)
self.mc.sasl_auth_plain(self.user, self.pswd or "")
sock = self.mc.s
sock.setblocking(0)
self.set_socket(sock)
def _createTapCall(self, key=None, opts={}):
# Client identifier
if not key:
key = "".join(random.sample(string.letters, 16))
dtype=0
opaque=0
cas=0
extraHeader, val = self._encodeOpts(opts)
msg=struct.pack(REQ_PKT_FMT, REQ_MAGIC_BYTE,
memcacheConstants.CMD_TAP_CONNECT,
len(key), len(extraHeader), dtype, 0,
len(key) + len(extraHeader) + len(val),
opaque, cas)
return msg + extraHeader + key + val
def _encodeOpts(self, opts):
header = 0
val = []
for op in sorted(opts.keys()):
header |= op
if op in memcacheConstants.TAP_FLAG_TYPES:
val.append(struct.pack(memcacheConstants.TAP_FLAG_TYPES[op],
opts[op]))
elif op == memcacheConstants.TAP_FLAG_LIST_VBUCKETS:
val.append(self._encodeVBucketList(opts[op]))
else:
val.append(opts[op])
return struct.pack(">I", header), ''.join(val)
def _encodeVBucketList(self, vbl):
l = list(vbl) # in case it's a generator
vals = [struct.pack("!H", len(l))]
for v in vbl:
vals.append(struct.pack("!H", v))
return ''.join(vals)
def processCommand(self, cmd, klen, vb, extralen, cas, data):
extra = data[0:extralen]
if len(extra) == 8:
es_length, _flags, _ttl = \
struct.unpack(memcacheConstants.TAP_GENERAL_PKT_FMT, extra)
else:
es_length, _flags, _ttl, _flg, _exp = \
struct.unpack(memcacheConstants.TAP_MUTATION_PKT_FMT, extra)
key_start = extralen + es_length
key = data[key_start:(key_start+klen)]
val = data[(key_start+klen):]
return self.callback(self.identifier, cmd, extra, key, vb, val, cas)
def handle_connect(self):
pass
def handle_close(self):
self.close()
class TapClient(object):
def __init__(self, servers, callback, opts={}, user=None, pswd=None):
for t in servers:
tc = TapConnection(t.host, t.port, callback, t.id, opts, user, pswd)
class TapDescriptor(object):
port = 11211
id = None
def __init__(self, s):
self.host = s
if ':' in s:
self.host, self.port = s.split(':', 1)
self.port = int(self.port)
if '@' in self.host:
self.id, self.host = self.host.split('@', 1)
def __repr__(self):
return "<TapDescriptor %s@%s:%d>" % (self.id or "(anon)", self.host, self.port)