forked from mateusfreira/nun-db
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.rs
executable file
·159 lines (136 loc) · 5.23 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
mod lib;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::executor::block_on;
use futures::join;
use lib::*;
use lib::configuration::Configuration;
use log;
use signal_hook::{consts::SIGINT, iterator::Signals};
use std::thread;
use std::sync::Arc;
use clap::ArgMatches;
use env_logger::{Builder, Env, Target};
fn init_logger(config: &Configuration) {
let env = Env::default().filter_or(config.nun_log_level.as_str(), "info");
Builder::from_env(env)
.format_level(false)
.target(Target::Stdout)
.format_timestamp_nanos()
.init();
}
fn main() -> Result<(), String> {
let config = configuration::get_configuration();
init_logger(&config);
log::info!("nundb starting!");
let matches: ArgMatches<'_> = lib::commad_line::commands::prepare_args();
if let Some(start_match) = matches.subcommand_matches("start") {
return start_db(
matches.value_of("user").unwrap_or(config.nun_user.as_str()),
matches.value_of("pwd").unwrap_or(config.nun_pwd.as_str()),
start_match.value_of("ws-address").unwrap_or(config.nun_ws_addr.as_str()),
start_match
.value_of("http-address")
.unwrap_or(config.nun_http_addr.as_str()),
start_match
.value_of("tcp-address")
.unwrap_or(config.nun_tcp_addr.as_str()),
start_match.value_of("replicate-address").unwrap_or(config.nun_replicate_addr.as_str()),
);
} else {
return lib::commad_line::commands::exec_command(&matches);
}
}
fn start_db(
user: &str,
pwd: &str,
ws_address: &str,
http_address: &str,
tcp_address: &str,
replicate_address: &str,
) -> Result<(), String> {
let (replication_sender, replication_receiver): (Sender<String>, Receiver<String>) =
channel(100);
let (replication_supervisor_sender, replication_supervisor_receiver): (
Sender<String>,
Receiver<String>,
) = channel(100);
let keys_map = disk_ops::load_keys_map_from_disk();
let is_oplog_valid = disk_ops::is_oplog_valid();
if is_oplog_valid {
log::debug!("All fine with op-log metadafiles");
} else {
log::warn!("Nun-db has restarted with op-log in a invalid state, oplog and keys metadafile will be deleted!");
disk_ops::clean_op_log_metadata_files();
}
let dbs = lib::db_ops::create_init_dbs(
user.to_string(),
pwd.to_string(),
tcp_address.to_string(),
replication_supervisor_sender,
replication_sender.clone(),
keys_map,
is_oplog_valid,
);
disk_ops::load_all_dbs_from_disk(&dbs);
let mut signals = Signals::new(&[SIGINT]).unwrap();
let dbs_to_signal = dbs.clone();
thread::spawn(move || {
for sig in signals.forever() {
println!("Received signal {:?}", sig);
db_ops::safe_shutdown(&dbs_to_signal);
std::process::exit(0);
}
});
let db_replication_start = dbs.clone();
let tcp_address_to_relication = Arc::new(tcp_address.to_string());
let replication_thread_creator = async {
log::debug!("lib::replication_ops::start_replication_supervisor");
lib::replication_ops::start_replication_supervisor(
replication_supervisor_receiver,
db_replication_start,
tcp_address_to_relication,
)
.await
};
let db_replication = dbs.clone();
let replication_thread = async {
lib::replication_ops::start_replication_thread(replication_receiver, db_replication).await
};
let replicate_address_to_thread = Arc::new(replicate_address.to_string());
let dbs_self_election = dbs.clone();
let tcp_address_to_election = Arc::new(tcp_address.to_string());
let join_thread = thread::spawn(move || {
lib::replication_ops::ask_to_join_all_replicas(
&replicate_address_to_thread,
&tcp_address_to_election.to_string(),
&dbs_self_election.user.to_string(),
&dbs_self_election.pwd.to_string(),
);
lib::election_ops::start_inital_election(dbs_self_election)
});
let timer = timer::Timer::new();
let db_snap = dbs.clone();
// Disck thread
let _snapshot_thread = thread::spawn(|| lib::disk_ops::start_snap_shot_timer(timer, db_snap));
let db_socket = dbs.clone();
let db_http = dbs.clone();
let http_address = Arc::new(http_address.to_string());
let ws_address = Arc::new(ws_address.to_string());
// Netwotk threds
let ws_thread =
thread::spawn(move || lib::network::ws_ops::start_web_socket_client(db_socket, ws_address));
let _http_thread =
thread::spawn(|| lib::network::http_ops::start_http_client(db_http, http_address));
let tcp_address = String::from(tcp_address.clone());
let dbs_tcp = dbs.clone();
let tcp_thread =
thread::spawn(move || lib::network::tcp_ops::start_tcp_client(dbs_tcp, &tcp_address));
let join_all_promises = async {
join!(replication_thread_creator, replication_thread);
};
block_on(join_all_promises);
tcp_thread.join().expect("Tcp thread died");
ws_thread.join().expect("WS thread died");
join_thread.join().expect("join_thread thread died");
Ok(())
}