-
Notifications
You must be signed in to change notification settings - Fork 1
/
UnitTests.py
242 lines (218 loc) · 9.82 KB
/
UnitTests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import time
import subprocess
import sys
import unittest
import os
import signal
import warnings
import requests
from Address import Address
from Raft import RaftNode
from Server import start_serving
from app import KVStore
from structs.ColorLog import ColorLog
from structs.NodeType import NodeType
from utils.RPCHandler import RPCHandler
# Suppress ResourceWarning
warnings.simplefilter("ignore", ResourceWarning)
class TestKVStore(unittest.TestCase):
def test_ping(self):
kv_store = KVStore()
log = {'term': 1, 'command': 'ping', 'value': ''}
kv_store.executing_log(log)
self.assertEqual(log['value'], "PONG")
print("✅ Unit test ping passed")
def test_set_and_get(self):
kv_store = KVStore()
log_set = {'term': 1, 'command': 'set kunci value', 'value': ''}
kv_store.executing_log(log_set)
self.assertEqual(log_set['value'], "OK")
log_get = {'term': 2, 'command': 'get kunci', 'value': ''}
kv_store.executing_log(log_get)
self.assertEqual(log_get['value'], "value")
print("✅ Unit test set and get passed")
def test_append(self):
kv_store = KVStore()
log_set = {'term': 1, 'command': 'set kunci value', 'value': ''}
kv_store.executing_log(log_set)
self.assertEqual(log_set['value'], "OK")
log_append = {'term': 2, 'command': 'append kunci value', 'value': ''}
kv_store.executing_log(log_append)
self.assertEqual(log_append['value'], "OK")
log_get = {'term': 3, 'command': 'get kunci', 'value': ''}
kv_store.executing_log(log_get)
self.assertEqual(log_get['value'], "valuevalue")
print("✅ Unit test append passed")
def test_delete(self):
kv_store = KVStore()
log_set = {'term': 1, 'command': 'set kunci value', 'value': ''}
kv_store.executing_log(log_set)
self.assertEqual(log_set['value'], "OK")
log_get = {'term': 2, 'command': 'del kunci', 'value': ''}
kv_store.executing_log(log_get)
self.assertEqual(log_get['value'], "value")
log_get = {'term': 3, 'command': 'get kunci', 'value': ''}
kv_store.executing_log(log_get)
self.assertEqual(log_get['value'], "")
print("✅ Unit test del passed")
def test_strlen(self):
kv_store = KVStore()
log_set = {'term': 1, 'command': 'set kunci value', 'value': ''}
kv_store.executing_log(log_set)
self.assertEqual(log_set['value'], "OK")
log_get = {'term': 2, 'command': 'strln kunci', 'value': ''}
kv_store.executing_log(log_get)
self.assertEqual(log_get['value'], 5)
print("✅ Unit test strln passed")
def test_transaction(self):
kv_store = KVStore()
log_transaction = {'term': 1, 'command': 'set kunci value; append kunci 123; get kunci', 'value': ''}
kv_store.executing_log(log_transaction)
self.assertEqual(log_transaction['value'], "value123")
print("✅ Unit test transaction passed")
class TestMembership(unittest.TestCase):
def test_fail_to_apply_membership(self):
print(ColorLog.colorize("Running test fail to apply membership for 45 seconds", ColorLog._HEADER))
with subprocess.Popen(["python", "Server.py", "localhost", "4001", "localhost", "4000"], stdout=subprocess.PIPE) as follower:
time.sleep(45)
self.assertFalse(follower.poll() is not None)
follower.kill()
follower.stdout.close() # Ensure resources are released
os.remove("storage/localhost_4001.json")
print("✅ Unit test fail to apply membership passed")
def test_success_to_apply_membership(self):
print(ColorLog.colorize("Running test success to apply membership for 15 seconds", ColorLog._HEADER))
leader = None
follower = None
try:
leader = subprocess.Popen(["python", "Server.py", "localhost", "6000"], stdout=subprocess.PIPE)
follower = subprocess.Popen(["python", "Server.py", "localhost", "6001", "localhost", "6000"], stdout=subprocess.PIPE)
time.sleep(15)
self.assertTrue(follower.poll() is None)
finally:
if follower:
follower.terminate()
follower.kill()
follower.stdout.close() # Ensure resources are released
if leader:
leader.terminate()
leader.kill()
leader.stdout.close() # Ensure resources are released
os.remove("storage/localhost_6000.json")
os.remove("storage/localhost_6001.json")
print("✅ Unit test success to apply membership passed")
class TestLogReplication(unittest.TestCase):
def test_success_commit_log(self):
print(ColorLog.colorize("Running test success to commit log for 15 seconds", ColorLog._HEADER))
leader = None
follower = None
client = None
url = "http://localhost:8000/execute_command"
try:
leader = subprocess.Popen(["python", "Server.py", "localhost", "8001"], stdout=subprocess.PIPE)
follower = subprocess.Popen(["python", "Server.py", "localhost", "8002", "localhost", "8001"], stdout=subprocess.PIPE)
client = subprocess.Popen(["python", "Client.py", "localhost", "8000"], stdout=subprocess.PIPE)
time.sleep(15)
response = requests.post(url, json={
"address": {
"ip": "localhost",
"port": 8001
},
"command": "ping",
})
response_log_1 = requests.post(url, json={
"address": {
"ip": "localhost",
"port": 8001
},
"command": "request_log",
})
response_log_2 = requests.post(url, json={
"address": {
"ip": "localhost",
"port": 8002
},
"command": "request_log",
})
self.assertEqual(response_log_1.json()["data"], response_log_2.json()["data"])
finally:
if follower:
follower.terminate()
follower.kill()
follower.stdout.close() # Ensure resources are released
if leader:
leader.terminate()
leader.kill()
leader.stdout.close() # Ensure resources are released
if client:
client.terminate()
client.kill()
client.stdout.close() # Ensure resources are released
# delete storage
os.remove("storage/localhost_8001.json")
os.remove("storage/localhost_8002.json")
print("✅ Unit test success to replicate log passed")
class TestHeartbeat(unittest.TestCase):
def test_heartbeat_request(self):
print(ColorLog.colorize("Running test to send heartbeat request", ColorLog._HEADER))
try:
leader_address = Address("localhost", 3100)
follower1_address = Address("localhost", 3101)
leader = subprocess.Popen(["python", "Server.py", "localhost", "3100"], stdout=subprocess.PIPE)
follower = subprocess.Popen(["python", "Server.py", "localhost", "3101", "localhost", "3100"], stdout=subprocess.PIPE)
request_dummy = {
"leader_addr": leader_address,
"election_term": 0,
"prev_last_term": 0,
"prev_last_index": 0,
"entries": [],
"leader_commit": 0,
}
rpc = RPCHandler()
response = rpc.request(leader_address, "heartbeat", request_dummy)
self.assertIsNotNone(response)
finally:
if follower:
follower.terminate()
follower.kill()
follower.stdout.close() # Ensure resources are released
if leader:
leader.terminate()
leader.kill()
leader.stdout.close() # Ensure resources are released
# delete storage
os.remove("storage/localhost_3100.json")
os.remove("storage/localhost_3101.json")
print("✅ Unit test success to send heartbeat message")
class TestVoting(unittest.TestCase):
def test_vote_request(self):
print(ColorLog.colorize("Running test to send vote request", ColorLog._HEADER))
try:
leader_address = Address("localhost", 4100)
follower1_address = Address("localhost", 4101)
leader = subprocess.Popen(["python", "Server.py", "localhost", "4100"], stdout=subprocess.PIPE)
follower = subprocess.Popen(["python", "Server.py", "localhost", "4101", "localhost", "4100"], stdout=subprocess.PIPE)
request_dummy = {
"candidate_addr": follower1_address,
"election_term": 0,
"last_term": 0,
"last_index": 0,
}
rpc = RPCHandler()
response = rpc.request(leader_address, "vote", request_dummy)
self.assertIsNotNone(response)
finally:
if follower:
follower.terminate()
follower.kill()
follower.stdout.close() # Ensure resources are released
if leader:
leader.terminate()
leader.kill()
leader.stdout.close() # Ensure resources are released
# delete storage
os.remove("storage/localhost_4100.json")
os.remove("storage/localhost_4101.json")
print("✅ Unit test success to send vote message")
if __name__ == '__main__':
unittest.main(verbosity=0)