Skip to content

Commit

Permalink
refactor: Remove unused code (#8974)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Apr 4, 2023
1 parent 9b53293 commit da1822c
Show file tree
Hide file tree
Showing 7 changed files with 2 additions and 107 deletions.
1 change: 0 additions & 1 deletion src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ mod optimizer;
pub use optimizer::{Explain, OptimizerContext, OptimizerContextRef, PlanRef};
mod planner;
pub use planner::Planner;
#[expect(dead_code)]
mod scheduler;
pub mod session;
mod stream_fragmenter;
Expand Down
7 changes: 0 additions & 7 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ enum QueryState {

/// Failed
Failed,

/// Completed
Completed,
}

pub struct QueryExecution {
Expand All @@ -87,8 +84,6 @@ struct QueryRunner {
/// Will be set to `None` after all stage scheduled.
root_stage_sender: Option<oneshot::Sender<SchedulerResult<QueryResultFetcher>>>,

compute_client_pool: ComputeClientPoolRef,

// Used for cleaning up `QueryExecution` after execution.
query_execution_info: QueryExecutionInfoRef,

Expand Down Expand Up @@ -155,7 +150,6 @@ impl QueryExecution {
msg_receiver,
root_stage_sender: Some(root_stage_sender),
scheduled_stages_count: 0,
compute_client_pool,
query_execution_info,
query_metrics,
};
Expand Down Expand Up @@ -377,7 +371,6 @@ impl QueryRunner {
HostAddress {
..Default::default()
},
self.compute_client_pool.clone(),
chunk_rx,
self.query.query_id.clone(),
self.query_execution_info.clone(),
Expand Down
38 changes: 2 additions & 36 deletions src/frontend/src/scheduler/distributed/query_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,20 @@ use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};

use futures::{Stream, StreamExt};
use futures_async_stream::try_stream;
use futures::Stream;
use pgwire::pg_server::{BoxedError, Session, SessionId};
use risingwave_batch::executor::BoxedDataChunkStream;
use risingwave_common::array::DataChunk;
use risingwave_common::error::RwError;
use risingwave_common::session_config::QueryMode;
use risingwave_pb::batch_plan::TaskOutputId;
use risingwave_pb::common::HostAddress;
use risingwave_rpc_client::ComputeClientPoolRef;
use tracing::debug;

use super::stats::DistributedQueryMetrics;
use super::QueryExecution;
use crate::catalog::catalog_service::CatalogReader;
use crate::scheduler::plan_fragmenter::{Query, QueryId};
use crate::scheduler::worker_node_manager::WorkerNodeManagerRef;
use crate::scheduler::{
ExecutionContextRef, HummockSnapshotManagerRef, PinnedHummockSnapshot, SchedulerResult,
};
use crate::scheduler::{ExecutionContextRef, PinnedHummockSnapshot, SchedulerResult};

pub struct DistributedQueryStream {
chunk_rx: tokio::sync::mpsc::Receiver<SchedulerResult<DataChunk>>,
Expand Down Expand Up @@ -80,7 +74,6 @@ impl Drop for DistributedQueryStream {
pub struct QueryResultFetcher {
task_output_id: TaskOutputId,
task_host: HostAddress,
compute_client_pool: ComputeClientPoolRef,

chunk_rx: tokio::sync::mpsc::Receiver<SchedulerResult<DataChunk>>,

Expand Down Expand Up @@ -134,28 +127,23 @@ impl QueryExecutionInfo {
#[derive(Clone)]
pub struct QueryManager {
worker_node_manager: WorkerNodeManagerRef,
hummock_snapshot_manager: HummockSnapshotManagerRef,
compute_client_pool: ComputeClientPoolRef,
catalog_reader: CatalogReader,
query_execution_info: QueryExecutionInfoRef,
pub query_metrics: Arc<DistributedQueryMetrics>,
disrtibuted_query_limit: Option<u64>,
}

type QueryManagerRef = Arc<QueryManager>;

impl QueryManager {
pub fn new(
worker_node_manager: WorkerNodeManagerRef,
hummock_snapshot_manager: HummockSnapshotManagerRef,
compute_client_pool: ComputeClientPoolRef,
catalog_reader: CatalogReader,
query_metrics: Arc<DistributedQueryMetrics>,
disrtibuted_query_limit: Option<u64>,
) -> Self {
Self {
worker_node_manager,
hummock_snapshot_manager,
compute_client_pool,
catalog_reader,
query_execution_info: Arc::new(RwLock::new(QueryExecutionInfo::default())),
Expand Down Expand Up @@ -231,41 +219,19 @@ impl QueryResultFetcher {
pub fn new(
task_output_id: TaskOutputId,
task_host: HostAddress,
compute_client_pool: ComputeClientPoolRef,
chunk_rx: tokio::sync::mpsc::Receiver<SchedulerResult<DataChunk>>,
query_id: QueryId,
query_execution_info: QueryExecutionInfoRef,
) -> Self {
Self {
task_output_id,
task_host,
compute_client_pool,
chunk_rx,
query_id,
query_execution_info,
}
}

#[try_stream(ok = DataChunk, error = RwError)]
async fn run_inner(self) {
debug!(
"Starting to run query result fetcher, task output id: {:?}, task_host: {:?}",
self.task_output_id, self.task_host
);
let compute_client = self
.compute_client_pool
.get_by_addr((&self.task_host).into())
.await?;
let mut stream = compute_client.get_data(self.task_output_id.clone()).await?;
while let Some(response) = stream.next().await {
yield DataChunk::from_protobuf(response?.get_record_batch()?)?;
}
}

fn run(self) -> BoxedDataChunkStream {
Box::pin(self.run_inner())
}

fn stream_from_channel(self) -> DistributedQueryStream {
DistributedQueryStream {
chunk_rx: self.chunk_rx,
Expand Down
10 changes: 0 additions & 10 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ impl StageExecution {
}
}

pub fn get_task_status_unchecked(&self, task_id: TaskId) -> Arc<TaskStatus> {
self.tasks[&task_id].get_status()
}

/// Returns all exchange sources for `output_id`. Each `ExchangeSource` is identified by
/// producer's `TaskId` and `output_id` (consumer's `TaskId`), since each task may produce
/// output to several channels.
Expand Down Expand Up @@ -963,9 +959,3 @@ impl StageRunner {
self.stage.id == 0
}
}

impl TaskStatus {
pub fn task_host_unchecked(&self) -> HostAddress {
self.location.clone().unwrap()
}
}
28 changes: 0 additions & 28 deletions src/frontend/src/scheduler/hummock_snapshot_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,34 +248,6 @@ impl HummockSnapshotManagerCore {
}
}

/// Retrieve max committed epoch from meta with an rpc. This method provides
/// better epoch freshness.
async fn get_epoch_for_query_from_rpc(
&mut self,
batches: &mut Vec<(QueryId, Callback<SchedulerResult<HummockSnapshot>>)>,
) -> HummockSnapshot {
let ret = self.meta_client.get_epoch().await;
match ret {
Ok(snapshot) => {
self.notify_epoch_assigned_for_queries(&snapshot, batches);
snapshot
}
Err(e) => {
for (id, cb) in batches.drain(..) {
let _ = cb.send(Err(SchedulerError::Internal(anyhow!(
"Failed to get epoch for query: {:?} because of RPC Error: {:?}",
id,
e
))));
}
HummockSnapshot {
committed_epoch: INVALID_EPOCH,
current_epoch: INVALID_EPOCH,
}
}
}
}

/// Retrieve max committed epoch and max current epoch from locally cached value, which is
/// maintained by meta's notification service.
fn get_epoch_for_query_from_push(
Expand Down
23 changes: 0 additions & 23 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,14 +960,6 @@ impl BatchPlanFragmenter {
.transpose()
}
}

pub fn worker_node_manager(&self) -> &WorkerNodeManagerRef {
&self.worker_node_manager
}

pub fn catalog_reader(&self) -> &CatalogReader {
&self.catalog_reader
}
}

/// Try to derive the partition to read from the scan range.
Expand Down Expand Up @@ -1047,9 +1039,7 @@ fn derive_partitions(
mod tests {
use std::collections::{HashMap, HashSet};

use risingwave_common::hash::ParallelUnitId;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::ParallelUnit;

use crate::optimizer::plan_node::PlanNodeType;
use crate::scheduler::plan_fragmenter::StageId;
Expand Down Expand Up @@ -1134,17 +1124,4 @@ mod tests {
assert_eq!(1, scan_node2.root.children.len());
assert!(scan_node2.has_table_scan());
}

fn generate_parallel_units(
start_id: ParallelUnitId,
node_id: ParallelUnitId,
) -> Vec<ParallelUnit> {
let parallel_degree = 8;
(start_id..start_id + parallel_degree)
.map(|id| ParallelUnit {
id,
worker_node_id: node_id,
})
.collect()
}
}
2 changes: 0 additions & 2 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ impl FrontendEnv {
let compute_client_pool = Arc::new(ComputeClientPool::default());
let query_manager = QueryManager::new(
worker_node_manager.clone(),
hummock_snapshot_manager.clone(),
compute_client_pool,
catalog_reader.clone(),
Arc::new(DistributedQueryMetrics::for_test()),
Expand Down Expand Up @@ -223,7 +222,6 @@ impl FrontendEnv {
Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let query_manager = QueryManager::new(
worker_node_manager.clone(),
hummock_snapshot_manager.clone(),
compute_client_pool,
catalog_reader.clone(),
Arc::new(DistributedQueryMetrics::new(registry.clone())),
Expand Down

0 comments on commit da1822c

Please sign in to comment.