Skip to content

Commit

Permalink
add a loop in bin/src/ctl::upgrade_main()
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 8512878 commit af29ec6
Showing 1 changed file with 80 additions and 74 deletions.
154 changes: 80 additions & 74 deletions bin/src/ctl/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,94 +301,100 @@ pub fn upgrade_main(
None,
));

match channel.read_message() {
None => {
bail!("Error: the proxy didn't list workers");
}
Some(message) => {
if id != message.id {
bail!("Error: received unexpected message: {:?}", message);
loop {
match channel.read_message() {
None => {
bail!("Error: the proxy didn't list workers");
}
match message.status {
CommandStatus::Processing => {
bail!("Error: the proxy didn't return list of workers immediately");
}
CommandStatus::Error => {
bail!(
"Error: failed to get the list of worker: {}",
message.message
);
Some(message) => {
if id != message.id {
bail!("Error: received unexpected message: {:?}", message);
}
CommandStatus::Ok => {
if let Some(CommandResponseData::Workers(ref workers)) = message.data {
let mut table = Table::new();
table.add_row(row!["Worker", "pid", "run state"]);
for ref worker in workers.iter() {
let run_state = format!("{:?}", worker.run_state);
table.add_row(row![worker.id, worker.pid, run_state]);
}
println!("");
table.printstd();
println!("");

let id = generate_tagged_id("UPGRADE-MAIN");
channel.write_message(&CommandRequest::new(
id.clone(),
CommandRequestData::UpgradeMain,
None,
));
println!("Upgrading main process");

loop {
match channel.read_message() {
None => {
bail!("Error: the proxy didn't start main upgrade");
}
Some(message) => {
if &id != &message.id {
bail!("Error: received unexpected message: {:?}", message);
match message.status {
CommandStatus::Processing => {
error!("Error: the proxy didn't return list of workers immediately");
}
CommandStatus::Error => {
bail!(
"Error: failed to get the list of worker: {}",
message.message
);
}
CommandStatus::Ok => {
if let Some(CommandResponseData::Workers(ref workers)) = message.data {
let mut table = Table::new();
table.add_row(row!["Worker", "pid", "run state"]);
for ref worker in workers.iter() {
let run_state = format!("{:?}", worker.run_state);
table.add_row(row![worker.id, worker.pid, run_state]);
}
println!("");
table.printstd();
println!("");

let id = generate_tagged_id("UPGRADE-MAIN");
channel.write_message(&CommandRequest::new(
id.clone(),
CommandRequestData::UpgradeMain,
None,
));
println!("Upgrading main process");

loop {
match channel.read_message() {
None => {
bail!("Error: the proxy didn't start main upgrade");
}
match message.status {
CommandStatus::Processing => {}
CommandStatus::Error => {
Some(message) => {
if &id != &message.id {
bail!(
"Error: failed to upgrade the main: {}",
message.message
"Error: received unexpected message: {:?}",
message
);
}
CommandStatus::Ok => {
println!(
"Main process upgrade succeeded: {}",
message.message
);
break;
match message.status {
CommandStatus::Processing => {}
CommandStatus::Error => {
bail!(
"Error: failed to upgrade the main: {}",
message.message
);
}
CommandStatus::Ok => {
println!(
"Main process upgrade succeeded: {}",
message.message
);
break;
}
}
}
}
}
}

// Reconnect to the new main
println!("Reconnecting to new main process...");
let mut channel = create_channel(&config)
.with_context(|| "could not reconnect to the command unix socket")?;
// Reconnect to the new main
println!("Reconnecting to new main process...");
let mut channel = create_channel(&config).with_context(|| {
"could not reconnect to the command unix socket"
})?;

// Do a rolling restart of the workers
let running_workers = workers
.iter()
.filter(|worker| worker.run_state == RunState::Running)
.collect::<Vec<_>>();
let running_count = running_workers.len();
for (i, ref worker) in running_workers.iter().enumerate() {
println!("Upgrading worker {} (of {})", i + 1, running_count);
// Do a rolling restart of the workers
let running_workers = workers
.iter()
.filter(|worker| worker.run_state == RunState::Running)
.collect::<Vec<_>>();
let running_count = running_workers.len();
for (i, ref worker) in running_workers.iter().enumerate() {
println!("Upgrading worker {} (of {})", i + 1, running_count);

channel = upgrade_worker(channel, 0, worker.id)?;
//thread::sleep(Duration::from_millis(1000));
}

channel = upgrade_worker(channel, 0, worker.id)?;
//thread::sleep(Duration::from_millis(1000));
println!("Proxy successfully upgraded!");
}

println!("Proxy successfully upgraded!");
break Ok(())
}
Ok(())
}
}
}
Expand Down Expand Up @@ -419,7 +425,7 @@ pub fn upgrade_worker(
None => bail!("the proxy didn't answer"),
Some(message) => match message.status {
CommandStatus::Processing => {
bail!("Worker {} is processing: {}", worker_id, message.message)
info!("Worker {} is processing: {}", worker_id, message.message)
}
CommandStatus::Error => bail!(
"could not stop the worker {}: {}",
Expand Down

0 comments on commit af29ec6

Please sign in to comment.