/
tcp_connection.rb
177 lines (152 loc) · 4.78 KB
/
tcp_connection.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
require 'socket'
require 'timeout'
require 'protos/voldemort-client.pb'
class TCPConnection < Connection
include Voldemort
attr_accessor :socket
SOCKET_TIMEOUT = 3
def connect_to(host, port)
begin
timeout(SOCKET_TIMEOUT) do
self.socket = TCPSocket.open(host, port)
self.send_protocol_version
if(protocol_handshake_ok?)
return self.socket
else
raise "There was an error connecting to the node"
end
end
rescue Timeout::Error
raise "Timeout when connecting to node"
rescue
false
end
end
def get_from(db_name, key, route = false)
request = VoldemortRequest.new
request.should_route = route
request.store = db_name
request.type = RequestType::GET
request.get = GetRequest.new
request.get.key = key
self.send(request) # send the request
raw_response = self.receive # read the response
response = GetResponse.new.parse_from_string(raw_response) # compose the get object based on the raw response
reconnect_when_errors_in(response)
response
end
def get_all_from(db_name, keys, route = false)
request = VoldemortRequest.new
request.should_route = route
request.store = db_name
request.type = RequestType::GET_ALL
request.getAll = GetAllRequest.new
request.getAll.keys = keys
self.send(request) # send the request
raw_response = self.receive # read the response
response = GetAllResponse.new.parse_from_string(raw_response) # compose the get object based on the raw response
reconnect_when_errors_in(response)
response
end
def put_from(db_name, key, value, version = nil, route = false)
version = get_version(key) unless version
request = VoldemortRequest.new
request.should_route = route
request.store = db_name
request.type = RequestType::PUT
request.put = PutRequest.new
request.put.key = key
request.put.versioned = Versioned.new
request.put.versioned.value = value
request.put.versioned.version = VectorClock.new
request.put.versioned.version.merge_from(version)
self.send(request) # send the request
raw_response = self.receive # read the response
response = PutResponse.new.parse_from_string(raw_response)
reconnect_when_errors_in(response)
add_to_versions(version) # add version or increment when needed
version
end
def delete_from(db_name, key, version = nil, route = false)
version = get_version(key) unless version
request = VoldemortRequest.new
request.should_route = route
request.store = db_name
request.type = RequestType::DELETE
request.delete = DeleteRequest.new
request.delete.key = key
request.delete.version = VectorClock.new
request.delete.version.merge_from(version)
self.send(request) # send the request
raw_response = self.receive # read the response
response = DeleteResponse.new.parse_from_string(raw_response)
reconnect_when_errors_in(response)
response.success
end
def add_to_versions(version)
entry = version.entries.detect { |e| e.node_id == self.connected_node.id.to_i }
if(entry)
entry.version += 1
else
entry = ClockEntry.new
entry.node_id = self.connected_node.id.to_i
entry.version = 1
version.entries << entry
version.timestamp = Time.new.to_i * 1000
end
version
end
def get_version(key)
other_version = get(key)[1][0]
if(other_version)
return other_version.version
else
version = VectorClock.new
version.timestamp = Time.new.to_i * 1000
return version
end
end
# unpack argument is N | Long, network (big-endian) byte order.
# from http://ruby-doc.org/doxygen/1.8.4/pack_8c-source.html
def receive
raw_size = self.socket.recv(4)
size = raw_size.unpack('N')
# Read until we get to size
read = 0
buffer = ""
while read < size[0] and size[0] > 0
data = self.socket.recv(size[0] - read)
buffer << data
read += data.length
end
return buffer
rescue
self.reconnect!
end
# pack argument is N | Long, network (big-endian) byte order.
# from http://ruby-doc.org/doxygen/1.8.4/pack_8c-source.html
def send(request)
self.reconnect unless self.socket
bytes = request.serialize_to_string # helper method thanks to ruby-protobuf
self.socket.write([bytes.size].pack("N") + bytes)
rescue
self.disconnect!
end
def send_protocol_version
self.socket.write(self.protocol_version)
end
def protocol_handshake_ok?
self.socket.recv(2) == STATUS_OK
end
def connect!
self.connect_to_random_node
end
def reconnect!
self.disconnect! if self.socket
self.connect!
end
def disconnect!
self.socket.close if self.socket
self.socket = nil
end
end