Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 718 lines (572 sloc) 22.3 KB
#!/usr/bin/python -u
#
# CS3700, Spring 2016
# Team modestmousegood
#
import sys
import time
import socket
import select
import random
import json
import math
# --------------------------------------------------------------------------- #
# STATE CLASSES
# --------------------------------------------------------------------------- #
class Candidate_State:
def __init__(self, quorum):
self.quorum = quorum
self.timeout_broadcast = (time.time() * 1000) + 1000
self.vote_count = 1
self.votes_from = [MY_ID]
self.timeout_election = (time.time() * 1000) + random.randrange(300, 400)
def increment_vote(self, rep_id):
if rep_id not in self.votes_from:
self.vote_count += 1
self.votes_from.append(rep_id)
# returns true if you need to rebroadcast
def evaluate_rebroadcast(self):
if time.time() * 1000 >= self.timeout_broadcast:
return True
return False
# called when we rebroadcasted RequestVoteRPC
def rebroadcasted(self):
self.timeout_broadcast = (time.time() * 1000) + 1000
# return 0 to call_again, keep me alive
# -1 to timeout, kill me and start new election
# 1 for success, we became leader also kill me
def evaluate_election(self):
if time.time() * 1000 >= self.timeout_election:
return -1
if self.vote_count >= self.quorum:
return 1
return 0
class Follower_State:
def __init__(self, script_initializing=False):
self.leader_timeout = (time.time() * 1000) + random.randrange(500, 700)
self.new = True
self.script_initializing = script_initializing
if script_initializing:
self.start_queueing_redirects_time = time.time() * 1000 + 7000
else:
self.start_queueing_redirects_time = time.time() * 1000 + 500
def heartbeat_from_leader(self):
self.leader_timeout = (time.time() * 1000) + random.randrange(500, 700)
# returns 0 if we're within the timeout
# returns -1 if we're past the heartbeat timeout
def evaluate(self):
if time.time() * 1000 >= self.leader_timeout:
return -1
return 0
def leader_changed(self):
if not self.script_initializing:
self.start_queueing_redirects_time = time.time() * 1000 + 500
def should_we_queue(self):
result = (time.time() * 1000) >= self.start_queueing_redirects_time
return result
def mark_not_new(self):
self.new = False
class Leader_State:
def __init__(self, quorum_size):
self.heartbeat_timeout = 0
self.replica_responses = []
# {
# index: log index
# success_count: number of successful appends
# timeout: timestamp of when we need to ask for quorum again
# term: term
# heard_from: rep id's that responded with success
# reached_quorum: true/false
# oked: true/false
# MID: MID of original put message from client
# src: src id of client that sent put
# }
self.replicas = {}
# key: replica id
# value: {
# next: index of the next log entry to send
# match: index of highest log entry know to be replicated
# commit: known commit index
# }
self.quorum_size = quorum_size
self.marker = -1
self.commitable_index = 0
self.okable_index = -1 # the commit index of which we've heard a reasonable number of confirmations from replicas
self.hot_db = {} # old portion of db from range (oked_commit_index, commit_index]
self.new = True
# called when we send a heartbeat to followers
def sent_heartbeat(self):
self.heartbeat_timeout = (time.time() * 1000) + 100
# returns 0 if we don't need to send a heartbeat yet
# returns -1 if we now need to send a heartbeat
def evaluate(self):
if (time.time() * 1000) >= self.heartbeat_timeout:
return -1
return 0
def initialize_entry_append(self, entry_index, mid, src):
if self.marker == -1:
self.marker = entry_index
# Initialize count at 1 because we (leader) are technically part of quorum.
self.replica_responses.append({
'index': entry_index,
'success_count': 1,
'timeout': time.time() * 1000 + 500,
'term': term_num,
'heard_from': [MY_ID],
'reached_quorum': False,
'oked': False,
'MID': mid,
'src': src
})
def increment_response(self, entry_index, source_id):
if entry_index - self.marker < 0:
return False
self.replica_responses[entry_index - self.marker]['success_count'] += 1
self.replica_responses[entry_index - self.marker]['heard_from'].append(source_id)
if self.replica_responses[entry_index - self.marker]['success_count'] == self.quorum_size:
self.replica_responses[entry_index - self.marker]['reached_quorum'] = True
return True
else: return False
# Return the last index of the lowest sequence of entries that have reached quorum.
# i.e: 12(success), 13(success), 14(success), 15(waiting), 16(success) --> 14
# NOTE: This could be more efficient given the current commit index.
def calculate_commitable_index(self):
# If the first index hasn't reached quorum, can't commit anything for this term
if not self.replica_responses[0]['reached_quorum']:
return -1
else:
lowest = self.commitable_index
for i in range(self.commitable_index, len(self.replica_responses)):
if self.replica_responses[i]['reached_quorum']:
lowest = i
else:
break
self.commitable_index = lowest
return lowest + self.marker
def needs_ok_sent(self):
result = []
for r in reversed(self.replica_responses):
if not r['oked'] and r['index'] <= self.okable_index:
result.append(r['index'])
return result
def received_append_response_with_commit_index(self, index, source_id):
if self.okable_index < index:
self.okable_index = index
def replica_with_high_commit(self):
rep_id = "FFFF"
highest = -1
for r, v in self.replicas.iteritems():
if v['commit'] > highest:
highest = v['commit']
rep_id = r
return rep_id
def oked(self, index):
self.replica_responses[index - self.marker]['oked'] = True
def get_mid(self, entry_index):
return self.replica_responses[entry_index - self.marker]['MID']
def get_src(self, entry_index):
return self.replica_responses[entry_index - self.marker]['src']
def is_entry_index_initialized(self, index):
if self.marker == -1:
return False
return index - self.marker <= len(self.replica_responses)
def add_hot(self, index, key, value):
self.hot_db[index] = {'key': key, 'value': value}
def del_hot(self, index):
if index in self.hot_db:
del self.hot_db[index]
def get_hot(self, key):
for i in self.hot_db:
if self.hot_db[i]['key'] == key:
return self.hot_db[i]['value']
return False
def replica_state_update(self, rep_id, commit_index, entry_index):
beat = 0 if 'beat_timeout' not in self.replicas[rep_id] else self.replicas[rep_id]['beat_timeout']
if rep_id not in self.replicas or 'match' not in self.replicas[rep_id] or 'commit' not in self.replicas[rep_id]:
self.replicas[rep_id] = {'commit': -1, 'match': entry_index, 'beat_timeout': beat}
for i in range(self.replicas[rep_id]['match'], entry_index):
self.increment_response(i, rep_id)
self.replicas[rep_id]['match'] = entry_index
self.received_append_response_with_commit_index(commit_index, rep_id)
self.replicas[rep_id] = {'commit': commit_index, 'match': entry_index, 'beat_timeout': beat}
def get_replica_match_index(self, rep_id):
if rep_id not in self.replicas or not 'match' in self.replicas[rep_id]:
return -1
return self.replicas[rep_id]['match']
def sent_beat(self, rep_id):
if rep_id not in self.replicas:
self.replicas[rep_id] = {'commit': -1}
self.replicas[rep_id]['beat_timeout'] = time.time() * 1000 + 400
def needs_beat(self):
result = []
for ri, r in self.replicas.iteritems():
if 'beat_timeout' not in r:
result.append(ri)
elif r['beat_timeout'] < time.time() * 1000:
result.append(ri)
for rr in result:
self.sent_beat(rr)
return result
def set_not_new(self):
self.new = False
# --------------------------------------------------------------------------- #
# GLOBAL VARIABLES
# --------------------------------------------------------------------------- #
db = {}
log = []
leader = 'FFFF'
voted_for = None
term_num = 0
commit_index = -1 # 0 means first thing in log is actually commited
last_applied = -1
# Log items
# key: String put key
# value: String put value
# term: int term when added
# mid: message id of put
# src: client id of put
MY_ID = sys.argv[1]
REPLICA_IDS = sys.argv[2:]
quorum = math.floor(len(REPLICA_IDS)/2 + 1)
state = Candidate_State(quorum)
queued_requests = [] # requests queued when we were missing leader
queue_flush_timeout = 0
# Connect to the network. All messages to/from other replicas and clients will
# occur over this socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
sock.connect(MY_ID)
# --------------------------------------------------------------------------- #
# SENDING PACKETS
# --------------------------------------------------------------------------- #
# Builds packet objects and returns them in json format
def build_packet(dest, type, message={}):
packet = {
'src': MY_ID,
'dst': dest,
'type': type,
'leader': leader
}
for key, value in message.iteritems():
packet[key] = value
return json.dumps(packet)
# Sends the given (ready as is) packet on the given socket
def send_packet(packet):
sock.send(packet)
# RequestVote RPC from a candidate
def send_request_vote_rpc(dest):
if len(log) == 0:
last_log_term = 0
else:
last_log_term = log[-1]['term']
message = {
'term': term_num,
'candidate_id': MY_ID,
'last_log_index': len(log),
'last_log_term': last_log_term,
'commit_index': commit_index
}
send_packet(build_packet(dest, 'RequestVoteRPC', message))
# Response from a follower from a RequestVote RPC
def send_request_vote_rpc_response(dest, vote_granted):
message = {
'term': term_num,
'vote_granted': vote_granted
}
send_packet(build_packet(dest, 'RequestVoteRPCResponse', message))
def send_append_entries_rpc(dest, entry_index=None, entry_array=[]):
message = {
'term': term_num,
'leader_id': MY_ID,
'leader_commit': commit_index
}
if entry_index == None:
message['prev_log_entry'] = -1
message['prev_log_term'] = -1
message['entries'] = []
elif entry_index <= 0:
message['prev_log_entry'] = entry_index - 1
message['prev_log_term'] = 0
message['entries'] = entry_array
else:
message['prev_log_entry'] = entry_index - 1
message['prev_log_term'] = log[entry_index - 1]["term"]
message['entries'] = entry_array
send_packet(build_packet(dest, 'AppendEntriesRPC', message))
def send_append_entries_rpc_response(dest, success):
# Sending back follower's prevLogIndex to tell leader what it needs.
message = {
'term': term_num,
'success': success,
'top_log_entry': last_index_of_log(),
'commit_index': commit_index
}
send_packet(build_packet(dest, 'AppendEntriesRPCResponse', message))
# --------------------------------------------------------------------------- #
# RECEIVING PACKETS
# --------------------------------------------------------------------------- #
# Handles all incoming packets from the socket.
def read_socket():
ready = select.select([sock], [], [], 0.1)[0]
if sock in ready:
msg_raw = sock.recv(32768)
msg = json.loads(msg_raw)
return msg
# --------------------------------------------------------------------------- #
# MAIN
# --------------------------------------------------------------------------- #
def last_index_of_log():
return len(log) - 1
def start_election():
global term_num, voted_for
term_num += 1
voted_for = MY_ID
# NOTE: We can optimize by keeping track of responses and not FFFFing every time
for rep_id in REPLICA_IDS:
send_request_vote_rpc(rep_id)
def update_term(new_term, new_leader=None):
global leader, term_num, voted_for
if new_leader:
leader = new_leader
term_num = new_term
voted_for = None
def update_commit_index(new_commit_index, leader_state=None):
global commit_index, log, db
# log entries should be duplicated in db up to index commit_index
if new_commit_index > commit_index:
for i in range(commit_index + 1, new_commit_index + 1):
# If we're the leader and this is a db rewrite, save this in temp
if leader_state and log[i]['key'] in db:
leader_state.add_hot(i, log[i]['key'], db[log[i]['key']])
db[log[i]['key']] = log[i]['value']
commit_index = new_commit_index
# if leader, we also need to send a heartbeat because our index went up
# we will NOT send OK's until we get some confirmations for these
if leader_state:
a = leader_state.replica_with_high_commit()
send_entries(leader_state, a)
def send_oks(leader_state):
needs = leader_state.needs_ok_sent()
if len(needs) > 0:
for entry_index in needs:
send_packet(build_packet(log[entry_index]['src'], "ok", {"MID": log[entry_index]['mid']}))
leader_state.oked(entry_index)
leader_state.del_hot(entry_index)
def send_entries(leader_state, rep_id):
start = leader_state.get_replica_match_index(rep_id) + 1
end = last_index_of_log()
entry_array = []
if start > end:
start = end
leader_state.sent_beat(rep_id)
send_append_entries_rpc(rep_id)
return
if end - start > 50:
end = start + 50
for i in range(start, end + 1):
if i >= 0: entry_array.append(log[i])
leader_state.sent_beat(rep_id)
send_append_entries_rpc(rep_id, start, entry_array)
def flush_queue():
global queued_requests, queue_flush_timeout
for packet in queued_requests:
send_packet(build_packet(packet['src'], 'redirect', {'MID' : packet['MID']}))
queued_requests = []
queue_flush_timeout = 0
def attempt_log_append(packet):
global log
if packet['prev_log_entry'] == last_index_of_log():
log = log + packet['entries']
return True
elif packet['prev_log_entry'] < last_index_of_log():
for i in range(packet['prev_log_entry'], last_index_of_log()):
del log[-1]
log = log + packet['entries']
return True
return False
def follower_state(last_state, current_packet):
global voted_for, leader
# If leader timed out, start new election and initialize candidate state
if last_state.evaluate() == -1:
start_election()
return Candidate_State(len(REPLICA_IDS))
if last_state.new or (len(queued_requests) > 0 and not last_state.should_we_queue()):
flush_queue()
last_state.mark_not_new()
# Leader hasn't timed out...
if current_packet:
if current_packet['type'] in ['RequestVoteRPC', 'AppendEntriesRPC', 'RequestVoteRPCResponse', 'AppendEntriesRPCResponse']:
leader_id = current_packet['leader_id'] if 'leader_id' in current_packet else leader
leader_changed = not leader == leader_id
if current_packet['term'] > term_num:
update_term(current_packet['term'], leader_id)
elif current_packet['term'] == term_num and leader_changed:
leader = leader_id
if current_packet['term'] >= term_num and leader_changed:
last_state.leader_changed()
flush_queue()
if current_packet['type'] == "RequestVoteRPC":
if current_packet['term'] < term_num:
send_request_vote_rpc_response(current_packet['src'], False)
elif (current_packet['commit_index'] >= commit_index and
current_packet['last_log_index'] >= last_index_of_log() and
(voted_for == None or voted_for == current_packet['candidate_id'])):
send_request_vote_rpc_response(current_packet['src'], True)
voted_for = current_packet['candidate_id']
last_state.heartbeat_from_leader() # Christo Approved (TM)
elif current_packet['type'] == "AppendEntriesRPC":
last_state.heartbeat_from_leader()
index_of_new_entry = current_packet['prev_log_entry'] + 1
entry_terms_match = False if len(log) == 0 else current_packet['prev_log_term'] != log[-1]['term']
# Make sure this isn't simply a heartbeat with no entries
if len(current_packet['entries']) == 0:
if current_packet['term'] < term_num:
send_append_entries_rpc_response(current_packet['src'], False)
else:
update_commit_index(min(current_packet['leader_commit'], last_index_of_log()))
send_append_entries_rpc_response(current_packet['src'], -1)
# reply false for out of term, or if incoming intry is ahead of the top of our log
elif current_packet['term'] < term_num or current_packet['prev_log_entry'] > last_index_of_log() or entry_terms_match:
send_append_entries_rpc_response(current_packet['src'], False)
# actually append case
else:
send_append_entries_rpc_response(current_packet['src'], attempt_log_append(current_packet))
update_commit_index(min(current_packet['leader_commit'], last_index_of_log()))
elif current_packet['type'] in ['get', 'put']:
if last_state.should_we_queue():
queued_requests.append(current_packet)
else:
packet = build_packet(current_packet['src'], 'redirect', {'MID' : current_packet['MID']})
send_packet(packet)
return last_state
def candidate_state(last_state, current_packet):
global leader
if current_packet:
if current_packet['type'] in ['RequestVoteRPC', 'AppendEntriesRPC', 'RequestVoteRPCResponse', 'AppendEntriesRPCResponse']:
leader_id = current_packet['leader_id'] if 'leader_id' in current_packet else leader
if current_packet['term'] > term_num:
update_term(current_packet['term'], leader_id)
flush_queue()
return Follower_State(term_num < 2)
elif current_packet['term'] == term_num:
leader = leader_id
if current_packet['type'] == 'RequestVoteRPCResponse':
if current_packet['vote_granted']:
last_state.increment_vote(current_packet['src'])
elif current_packet['type'] == 'RequestVoteRPC':
if current_packet['term'] <= term_num:
send_request_vote_rpc_response(current_packet['src'], False)
elif current_packet['type'] == 'AppendEntriesRPC':
# If we got a heartbeat from someone for the same term as us, yeild to them and become follower
if current_packet['term'] == term_num:
return Follower_State(term_num < 2)
elif current_packet['type'] in ['get', 'put']:
queued_requests.append(current_packet)
# If the election timed out, start new election and initialize candidate state
if last_state.evaluate_election() == -1:
start_election()
return Candidate_State(quorum)
# If we won the election, initialize leader state
if last_state.evaluate_election() == 1:
return Leader_State(quorum)
# Else, the election is ongoing...
# If we need to rebroadcast RequestVoteRPC
if last_state.evaluate_rebroadcast():
for rep_id in REPLICA_IDS:
send_request_vote_rpc(rep_id)
last_state.rebroadcasted()
return last_state
def leader_state(last_state, current_packet):
global leader
leader = MY_ID
if last_state.new:
# initial heartbeat with start
for r in REPLICA_IDS:
send_entries(last_state, r)
for i in range(commit_index, last_index_of_log()):
send_packet(build_packet(log[-1]['src'], "redirect", {'MID': log[-1]['mid']}))
del log[-1]
# Probably stuff in queue, flush it
flush_queue()
last_state.set_not_new()
if current_packet:
if (current_packet['type'] in ['RequestVoteRPC', 'AppendEntriesRPC', 'RequestVoteRPCResponse', 'AppendEntriesRPCResponse']
and current_packet['term'] > term_num):
leader_id = current_packet['leader_id'] if 'leader_id' in current_packet else leader
update_term(current_packet['term'], leader_id)
return Follower_State()
if current_packet['type'] == 'AppendEntriesRPC' and current_packet['term'] == term_num:
print " We're getting a appendRPC from a someone claiming to also be the leader in this term"
if current_packet['last_log_entry'] > last_index_of_log():
return Follower_State()
elif current_packet['type'] == 'AppendEntriesRPCResponse':
last_state.replica_state_update(current_packet['src'], current_packet['commit_index'], current_packet['top_log_entry'])
# This may just be a response to an empty heartbeat
if current_packet['success'] == -1:
# Only thing to consider is the follower's reported commit index
send_oks(last_state)
elif current_packet['success']:
# If we were the leader that started this entry append
if last_state.is_entry_index_initialized(current_packet['top_log_entry']):
# If this returns true, it's made quorum, so we should check our commit index
if last_state.increment_response(current_packet['top_log_entry'], current_packet['src']):
update_commit_index(last_state.calculate_commitable_index(), last_state)
# Irrelevent to the increment_response function for quorums, we need to keep track
# of reported commit indexed of followers
# And we should make sure we don't need to send new OK's
send_oks(last_state)
# else, we aren't looking for quorum for this entry, so we're just in the process of updating a follower
else:
if current_packet['top_log_entry'] < last_index_of_log():
send_entries(last_state, current_packet['src'])
else: # append failed
if current_packet['term'] > term_num:
print "yeilding to term", current_packet['term'], term_num
return Follower_State()
send_entries(last_state, current_packet['src'])
elif current_packet['type'] == 'put':
new_key = current_packet['key']
new_value = current_packet['value']
log.append({
'key': current_packet['key'],
'value': current_packet['value'],
'term': term_num,
'mid': current_packet['MID'],
'src': current_packet['src']
})
last_state.initialize_entry_append(last_index_of_log(), current_packet['MID'] , current_packet['src'])
for rep_id in REPLICA_IDS:
send_entries(last_state, rep_id)
elif current_packet['type'] == 'get':
request_key = current_packet['key']
# Check to be sure the requested key isn't in the midst of modification
hot = last_state.get_hot(request_key)
if hot:
send_packet(build_packet(current_packet['src'], 'ok', {'value': hot, 'MID': current_packet['MID']}))
elif request_key in db:
request_response = db[request_key]
send_packet(build_packet(current_packet['src'], 'ok', {'value' : request_response, 'MID': current_packet['MID']}))
else:
send_packet(build_packet(current_packet['src'], 'ok', {'value': "", 'MID' : current_packet['MID']}))
reps_needing_beats = last_state.needs_beat()
for r in reps_needing_beats:
send_entries(last_state, r)
return last_state
def main():
global state, queue_flush_timeout
while True:
# Read and respond to what is on the socket, if anything
current_packet = read_socket()
# Last major failsafe to keep queue from accumulating
if len(queued_requests) > 0:
if queue_flush_timeout == 0:
queue_flush_timeout = time.time() * 1000 + 250
elif time.time() * 1000 >= queue_flush_timeout:
flush_queue()
if isinstance(state, Follower_State):
state = follower_state(state, current_packet)
elif isinstance(state, Candidate_State):
state = candidate_state(state, current_packet)
else:
state = leader_state(state, current_packet)
return 0
main()