Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow secondaries to store oplog operations avoiding full sync #45

Merged
merged 3 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/lib/disk_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,10 @@ pub fn start_snap_shot_timer(timer: timer::Timer, dbs: Arc<Databases>) {
let dbs = dbs.clone();
let dbs_map = dbs.map.read().unwrap();
let db_opt = dbs_map.get(&database_name);
match db_opt {
Some(db) => {
storage_data_disk(db, database_name.clone());
}
_ => log::warn!("Database not found {}", database_name),
if let Some(db) = db_opt {
storage_data_disk(db, database_name.clone());
} else {
log::warn!("Database not found {}", database_name)
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/lib/network/ws_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ impl Handler for Server {
let read_promise = async {
loop {
let message = receiver.next().await;
match message {
Some(message) => match message.as_ref() {
if let Some(message) = message {
match message.as_ref() {
TO_CLOSE => {
log::debug!("Closing server connection");
break;
}
message => {
log::debug!("ws_ops::_read_thread::message {}", message);
match ws_sender.send(message) {
Ok(_) => {}
Err(e) => {
log::warn!("ws_ops::_read_thread::send::Error {}", e)
}
Ok(_) => (),
};
}
},
None => {
log::warn!("ws_ops::_read_thread::error::None");
}
} else {
log::warn!("ws_ops::_read_thread::error::None Will close the connection");
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was causing loops ont he ws

ne
nun-db-0 nun-db 2021-07-13T07:59:33.240530486-03:00 [2021-07-13T10:59:33.033308685Z nun_db::lib::network::ws_ops] ws_ops::_read_thread::error::None
nun-db-0 nun-db 2021-07-13T07:59:33.240534036-03:00 [2021-07-13T10:59:33.033310065Z nun_db::lib::network::ws_ops] ws_ops::_read_thread::error::None
nun-db-0 nun-db 2021-07-13T07:59:33.240537586-03:00 [2021-07-13T10:59:33.033311425Z nun_db::lib::network::ws_ops] ws_ops::_read_thread::error::None
nun-db-0 nun-db 2021-07-13T07:59:33.240541166-03:00 [2021-07-13T10:59:33.033312795Z nun_db::lib::network::ws_ops] ws_ops::_read_thread::error::None
nun-db-0 nun-db 2021-07-13T07:59:33.240545126-03:00 [2021-07-13T10:59:33.033314166Z nun_db::lib::network::ws_ops] ws_ops::_read_thread::error::None

break;
}
}
};
Expand Down
2 changes: 0 additions & 2 deletions src/lib/process_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub fn process_request(input: &str, dbs: &Arc<Databases>, client: &mut Client) -
input_to_log
);
let db_name_state = client.selected_db_name();
let is_primary = dbs.is_primary();
let start = Instant::now();
let request = match Request::parse(String::from(input).trim_matches('\n')) {
Ok(req) => req,
Expand Down Expand Up @@ -370,7 +369,6 @@ pub fn process_request(input: &str, dbs: &Arc<Databases>, client: &mut Client) -
&db_name_state,
result,
&dbs.replication_sender.clone(),
is_primary,
);
replication_result
}
Expand Down
178 changes: 93 additions & 85 deletions src/lib/replication_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,92 +37,98 @@ pub fn replicate_request(
db_name: &String,
response: Response,
replication_sender: &Sender<String>,
is_primary: bool,
) -> Response {
if is_primary {
match response {
Response::Error { msg: _ } => response,
_ => match input {
Request::CreateDb { name, token } => {
log::debug!("Will replicate command a created database name {}", name);
replicate_web(replication_sender, format!("create-db {} {}", name, token));
Response::Ok {}
}
Request::Snapshot {} => {
log::debug!("Will replicate a snapshot to the database {}", db_name);
replicate_web(
replication_sender,
format!("replicate-snapshot {}", db_name),
);
Response::Ok {}
}
match response {
Response::Error { msg: _ } => response,
_ => match input {
Request::CreateDb { name, token } => {
log::debug!("Will replicate command a created database name {}", name);
replicate_web(replication_sender, format!("create-db {} {}", name, token));
Response::Ok {}
}
Request::Snapshot {} => {
log::debug!("Will replicate a snapshot to the database {}", db_name);
replicate_web(
replication_sender,
format!("replicate-snapshot {}", db_name),
);
Response::Ok {}
}

Request::Set { value, key } => {
log::debug!("Will replicate the set of the key {} to {} ", key, value);
replicate_web(
replication_sender,
get_replicate_message(db_name.to_string(), key, value),
);
Response::Ok {}
}
Request::ReplicateSnapshot { db } => {
log::debug!("Will replicate a snapshot to the database {}", db);
replicate_web(replication_sender, format!("replicate-snapshot {}", db));
Response::Ok {}
}

Request::ReplicateSet { db, value, key } => {
if is_primary {
log::debug!("Will replicate the set of the key {} to {} ", key, value);
replicate_web(
replication_sender,
get_replicate_message(db.to_string(), key, value),
);
}
Response::Ok {}
}
Request::Set { value, key } => {
log::debug!("Will replicate the set of the key {} to {} ", key, value);
replicate_web(
replication_sender,
get_replicate_message(db_name.to_string(), key, value),
);
Response::Ok {}
}

Request::Remove { key } => {
log::debug!("Will replicate the remove of the key {} ", key);
replicate_web(
replication_sender,
get_replicate_remove_message(db_name.to_string(), key),
);
Response::Ok {}
}
Request::ReplicateSet { db, value, key } => {
log::debug!("Will replicate the set of the key {} to {} ", key, value);
replicate_web(
replication_sender,
get_replicate_message(db.to_string(), key, value),
);
Response::Ok {}
}

Request::Election { id } => {
replicate_web(replication_sender, format!("election cadidate {}", id));
Response::Ok {}
}
Request::Remove { key } => {
log::debug!("Will replicate the remove of the key {} ", key);
replicate_web(
replication_sender,
get_replicate_remove_message(db_name.to_string(), key),
);
Response::Ok {}
}

Request::ElectionActive {} => {
replicate_web(replication_sender, format!("election active"));
Response::Ok {}
}
Request::ReplicateRemove { db, key } => {
log::debug!("Will replicate the remove of the key {} ", key);
replicate_web(
replication_sender,
get_replicate_remove_message(db.to_string(), key),
);
Response::Ok {}
}

Request::Leave { name } => {
replicate_web(replication_sender, format!("replicate-leave {}", name));
Response::Ok {}
}
Request::Election { id } => {
replicate_web(replication_sender, format!("election cadidate {}", id));
Response::Ok {}
}

Request::ReplicateIncrement { db, inc, key } => {
if is_primary {
log::debug!("Will replicate the inc of the key {} to {} ", key, inc);
replicate_web(
replication_sender,
get_replicate_increment_message(db.to_string(), key, inc.to_string()),
);
}
Response::Ok {}
}
Request::Increment { key, inc } => {
replicate_web(
replication_sender,
get_replicate_increment_message(db_name.to_string(), key, inc.to_string()),
);
Response::Ok {}
}
_ => response,
},
}
} else {
response
Request::ElectionActive {} => {
replicate_web(replication_sender, format!("election active"));
Response::Ok {}
}

Request::Leave { name } => {
replicate_web(replication_sender, format!("replicate-leave {}", name));
Response::Ok {}
}

Request::ReplicateIncrement { db, inc, key } => {
log::debug!("Will replicate the inc of the key {} to {} ", key, inc);
replicate_web(
replication_sender,
get_replicate_increment_message(db.to_string(), key, inc.to_string()),
);
Response::Ok {}
}
Request::Increment { key, inc } => {
replicate_web(
replication_sender,
get_replicate_increment_message(db_name.to_string(), key, inc.to_string()),
);
Response::Ok {}
}
_ => response,
},
}
}

Expand Down Expand Up @@ -206,7 +212,11 @@ pub async fn start_replication_thread(
log::info!("replication_ops::start_replication_thread will exist!");
break;
}
replicate_message_to_secoundary(message.to_string(), &dbs);
if dbs.is_primary() || dbs .is_eligible() {// starting nodes neeeds to replicate election messages
replicate_message_to_secoundary(message.to_string(), &dbs);
} else {
log::debug!("Won't replicate message from secoundary");
}
let request = Request::parse(&message.to_string()).unwrap();
match request {
Request::CreateDb { name, token: _ } => {
Expand Down Expand Up @@ -1057,7 +1067,6 @@ mod tests {
&db_name,
resp_error,
&sender,
false,
) {
Response::Error { msg: _ } => true,
_ => false,
Expand All @@ -1078,7 +1087,7 @@ mod tests {
value: "any_value".to_string(),
};
let db_name = "some".to_string();
let result = match replicate_request(req_set, &db_name, resp_set, &sender, true) {
let result = match replicate_request(req_set, &db_name, resp_set, &sender) {
Response::Ok {} => true,
_ => false,
};
Expand All @@ -1100,7 +1109,7 @@ mod tests {
value: "any_value".to_string(),
};
let db_name = "some".to_string();
let _ = match replicate_request(req_set, &db_name, resp_set, &sender, false) {
let _ = match replicate_request(req_set, &db_name, resp_set, &sender) {
Response::Set {
key: _key,
value: _value,
Expand All @@ -1125,7 +1134,6 @@ mod tests {
&db_name,
resp_get,
&sender,
false,
) {
Response::Value { key: _, value: _ } => true,
_ => false,
Expand All @@ -1143,7 +1151,7 @@ mod tests {
let resp_get = Response::Ok {};

let db_name = "some_db_name".to_string();
let result = match replicate_request(request, &db_name, resp_get, &sender, true) {
let result = match replicate_request(request, &db_name, resp_get, &sender) {
Response::Ok {} => true,
_ => false,
};
Expand All @@ -1162,7 +1170,7 @@ mod tests {
let resp_get = Response::Ok {};

let db_name = "some".to_string();
let result = match replicate_request(request, &db_name, resp_get, &sender, true) {
let result = match replicate_request(request, &db_name, resp_get, &sender) {
Response::Ok {} => true,
_ => false,
};
Expand Down
15 changes: 7 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ fn init_logger() {

fn main() -> Result<(), String> {
init_logger();
log::info!("nundb starting!");
let matches: ArgMatches<'_> = lib::commad_line::commands::prepare_args();
if let Some(start_match) = matches.subcommand_matches("start") {
return start_db(
Expand Down Expand Up @@ -58,7 +59,6 @@ fn start_db(
Receiver<String>,
) = channel(100);
let keys_map = disk_ops::load_keys_map_from_disk();
log::debug!("Keys {}", keys_map.len());

let dbs = lib::db_ops::create_init_dbs(
user.to_string(),
Expand Down Expand Up @@ -92,17 +92,16 @@ fn start_db(

let dbs_self_election = dbs.clone();
let tcp_address_to_election = Arc::new(tcp_address.to_string());
let joi_thread = thread::spawn(move || {
let join_thread = thread::spawn(move || {
lib::replication_ops::ask_to_join_all_replicas(
&replicate_address_to_thread,
&tcp_address_to_election.to_string(),
&dbs_self_election.user.to_string(),
&dbs_self_election.pwd.to_string(),
)
);
lib::election_ops::start_inital_election(dbs_self_election)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this process to here saved one extra thread in the startup and made the election process faster

});

let db_election = dbs.clone();
let election_thread = thread::spawn(|| lib::election_ops::start_inital_election(db_election));

let timer = timer::Timer::new();
let db_snap = dbs.clone();
Expand All @@ -123,16 +122,16 @@ fn start_db(
thread::spawn(|| lib::network::http_ops::start_http_client(db_http, http_address));

let tcp_address = String::from(tcp_address.clone());
let dbs_tcp = dbs.clone();
let tcp_thread =
thread::spawn(move || lib::network::tcp_ops::start_tcp_client(dbs.clone(), &tcp_address));
thread::spawn(move || lib::network::tcp_ops::start_tcp_client(dbs_tcp, &tcp_address));
let join_all_promises = async {
join!(replication_thread_creator, replication_thread);
};
block_on(join_all_promises);
tcp_thread.join().expect("Tcp thread died");
ws_thread.join().expect("WS thread died");

election_thread.join().expect("election_thread thread died");
joi_thread.join().expect("joi_thread thread died");
join_thread.join().expect("join_thread thread died");
Ok(())
}