Skip to content

Commit

Permalink
treewide: use with_current_subscriber() for spawned tasks
Browse files Browse the repository at this point in the history
Now, when the driver spawns a task to run a new future on it, that
future will use the same subscriber as the code that spawned the task in
the first place.

All tokio::task::spawn invocations inside the driver were adjusted,
examples and tests were not.
  • Loading branch information
piodul committed Mar 22, 2023
1 parent 97239ee commit e557095
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 18 deletions.
5 changes: 3 additions & 2 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tracing::instrument::WithSubscriber;
use tracing::{debug, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -186,7 +187,7 @@ impl Cluster {
};

let (fut, worker_handle) = worker.work().remote_handle();
tokio::spawn(fut);
tokio::spawn(fut.with_current_subscriber());

let result = Cluster {
data: cluster_data,
Expand Down Expand Up @@ -533,7 +534,7 @@ impl ClusterWorker {

let cluster_data = self.cluster_data.load_full();
let use_keyspace_future = Self::handle_use_keyspace_request(cluster_data, request);
tokio::spawn(use_keyspace_future);
tokio::spawn(use_keyspace_future.with_current_subscriber());
},
None => return, // If use_keyspace_channel was closed then cluster was dropped, we can stop working
}
Expand Down
5 changes: 3 additions & 2 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::io::{split, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWrite
use tokio::net::{TcpSocket, TcpStream};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;
use tracing::instrument::WithSubscriber;
use tracing::{debug, error, trace, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -860,7 +861,7 @@ impl Connection {
orphan_notification_receiver,
)
.remote_handle();
tokio::task::spawn(task);
tokio::task::spawn(task.with_current_subscriber());
return Ok(handle);
}

Expand All @@ -872,7 +873,7 @@ impl Connection {
orphan_notification_receiver,
)
.remote_handle();
tokio::task::spawn(task);
tokio::task::spawn(task.with_current_subscriber());
Ok(handle)
}

Expand Down
22 changes: 13 additions & 9 deletions scylla/src/transport/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::pin::Pin;
use std::sync::{Arc, RwLock, Weak};
use std::time::Duration;
use tokio::sync::{mpsc, Notify};
use tracing::instrument::WithSubscriber;
use tracing::{debug, trace, warn};

/// The target size of a per-node connection pool.
Expand Down Expand Up @@ -212,7 +213,7 @@ impl NodeConnectionPool {

let conns = refiller.get_shared_connections();
let (fut, refiller_handle) = refiller.run(use_keyspace_request_receiver).remote_handle();
tokio::spawn(fut);
tokio::spawn(fut.with_current_subscriber());

let keepaliver_handle = if let Some(interval) = keepalive_interval {
let keepaliver = Keepaliver {
Expand All @@ -222,7 +223,7 @@ impl NodeConnectionPool {
};

let (fut, keepaliver_handle) = keepaliver.work().remote_handle();
tokio::spawn(fut);
tokio::spawn(fut.with_current_subscriber());

Some(keepaliver_handle)
} else {
Expand Down Expand Up @@ -1221,14 +1222,17 @@ impl PoolRefiller {
Err(QueryError::IoError(io_error.unwrap()))
};

tokio::task::spawn(async move {
let res = fut.await;
match &res {
Ok(()) => debug!("[{}] Successfully changed current keyspace", address),
Err(err) => warn!("[{}] Failed to change keyspace: {:?}", address, err),
tokio::task::spawn(
async move {
let res = fut.await;
match &res {
Ok(()) => debug!("[{}] Successfully changed current keyspace", address),
Err(err) => warn!("[{}] Failed to change keyspace: {:?}", address, err),
}
let _ = response_sender.send(res);
}
let _ = response_sender.send(res);
});
.with_current_subscriber(),
);
}

// Requires the keyspace to be set
Expand Down
5 changes: 2 additions & 3 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use scylla_cql::frame::types::SerialConsistency;
use std::result::Result;
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::instrument::WithSubscriber;

use super::errors::QueryError;
use super::execution_profile::ExecutionProfileInner;
Expand Down Expand Up @@ -303,9 +304,7 @@ impl RowIterator {
worker_task: impl Future<Output = PageSendAttemptedProof> + Send + 'static,
mut receiver: mpsc::Receiver<Result<ReceivedPage, QueryError>>,
) -> Result<RowIterator, QueryError> {
tokio::task::spawn(async move {
worker_task.await;
});
tokio::task::spawn(worker_task.with_current_subscriber());

// This unwrap is safe because:
// - The future returned by worker.work sends at least one item
Expand Down
4 changes: 2 additions & 2 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ mod latency_awareness {
use futures::{future::RemoteHandle, FutureExt};
use itertools::Either;
use scylla_cql::errors::{DbError, QueryError};
use tracing::trace;
use tracing::{instrument::WithSubscriber, trace};
use uuid::Uuid;

use crate::{load_balancing::NodeRef, transport::node::Node};
Expand Down Expand Up @@ -1392,7 +1392,7 @@ mod latency_awareness {
}
}
.remote_handle();
tokio::task::spawn(updater_fut);
tokio::task::spawn(updater_fut.with_current_subscriber());

Self {
exclusion_threshold,
Expand Down

0 comments on commit e557095

Please sign in to comment.