Skip to content

Commit

Permalink
fix(clusters): resolve ip by create ts
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Aug 9, 2024
1 parent f7d021c commit d6faf4c
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 39 deletions.
6 changes: 5 additions & 1 deletion infra/default-builds/dockerfiles/test-ds-echo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ async fn echo_tcp_server(port: u16) -> Result<()> {

let listener = TcpListener::bind(&addr).await.context("bind failed")?;
loop {
let (socket, _) = listener.accept().await.context("accept failed")?;
let (socket, addr) = listener.accept().await.context("accept failed")?;
println!("connection: {addr}");

tokio::spawn(async move {
let mut framed = Framed::new(socket, BytesCodec::new());

Expand All @@ -106,6 +108,8 @@ async fn echo_tcp_server(port: u16) -> Result<()> {
.await
.expect("write failed");
}

println!("connection closed: {addr}");
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion infra/default-builds/outputs/test-ds-echo-tag.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
test-ds-echo:1723071093
test-ds-echo:1723150754
2 changes: 1 addition & 1 deletion infra/default-builds/outputs/test-ds-echo.tar
Git LFS file not shown
72 changes: 37 additions & 35 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,41 +1004,43 @@ impl WorkflowCtx {
Ok(signal)
}

/// Checks if the given signal exists in the database.
pub async fn query_signal<T: Listen>(&mut self) -> GlobalResult<Option<T>> {
let event = self.relevant_history().nth(self.location_idx);

// Signal received before
let signal = if let Some(event) = event {
tracing::debug!(name=%self.name, id=%self.workflow_id, "replaying signal");

// Validate history is consistent
let Event::Signal(signal) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event} at {}, found signal",
self.loc(),
)))
.map_err(GlobalError::raw);
};

Some(T::parse(&signal.name, signal.body.clone()).map_err(GlobalError::raw)?)
}
// Listen for new message
else {
let ctx = ListenCtx::new(self);

match T::listen(&ctx).await {
Ok(res) => Some(res),
Err(err) if matches!(err, WorkflowError::NoSignalFound(_)) => None,
Err(err) => return Err(err).map_err(GlobalError::raw),
}
};

// Move to next event
self.location_idx += 1;

Ok(signal)
}
// TODO: Currently implemented wrong, if no signal is received it should still write a signal row to the
// database so that upon replay it again receives no signal
// /// Checks if the given signal exists in the database.
// pub async fn query_signal<T: Listen>(&mut self) -> GlobalResult<Option<T>> {
// let event = self.relevant_history().nth(self.location_idx);

// // Signal received before
// let signal = if let Some(event) = event {
// tracing::debug!(name=%self.name, id=%self.workflow_id, "replaying signal");

// // Validate history is consistent
// let Event::Signal(signal) = event else {
// return Err(WorkflowError::HistoryDiverged(format!(
// "expected {event} at {}, found signal",
// self.loc(),
// )))
// .map_err(GlobalError::raw);
// };

// Some(T::parse(&signal.name, signal.body.clone()).map_err(GlobalError::raw)?)
// }
// // Listen for new message
// else {
// let ctx = ListenCtx::new(self);

// match T::listen(&ctx).await {
// Ok(res) => Some(res),
// Err(err) if matches!(err, WorkflowError::NoSignalFound(_)) => None,
// Err(err) => return Err(err).map_err(GlobalError::raw),
// }
// };

// // Move to next event
// self.location_idx += 1;

// Ok(signal)
// }

pub async fn msg<M>(&mut self, tags: serde_json::Value, body: M) -> GlobalResult<()>
where
Expand Down
3 changes: 3 additions & 0 deletions svc/pkg/cluster/src/ops/server/resolve_for_ip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub async fn cluster_server_resolve_for_ip(
WHERE
($1 OR cloud_destroy_ts IS NULL) AND
public_ip = ANY($2)
-- When more than one record is returned per IP, sort by create_ts so that the first record is the
-- latest server created
ORDER BY create_ts DESC
",
input.include_destroyed,
input.ips
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ User=node_exporter
Group=node_exporter
Type=simple
# Reduce cardinality
ExecStart=/usr/bin/node_exporter --collector.disable-defaults --collector.cpu --collector.netdev --collector.conntrack --collector.meminfo --collector.filesystem --collector.filesystem.mount-points-exclude=^/opt/nomad/
ExecStart=/usr/bin/node_exporter --collector.disable-defaults --collector.cpu --collector.netdev --collector.conntrack --collector.meminfo --collector.filesystem --collector.filesystem.mount-points-exclude=^/opt/nomad/ --collector.netstat --collector.sockstat --collector.tcpstat --collector.network_route --collector.arp --collector.filefd --collector.interrupts --collector.softirqs --collector.processes
Restart=always
RestartSec=2
Expand Down

0 comments on commit d6faf4c

Please sign in to comment.