Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend, meta): support RECOVER command to trigger recovery #16259

Merged
merged 10 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ message ApplyThrottleResponse {
common.Status status = 1;
}

message RecoverRequest {}

message RecoverResponse {}

service StreamManagerService {
rpc Flush(FlushRequest) returns (FlushResponse);
rpc Pause(PauseRequest) returns (PauseResponse);
Expand All @@ -277,6 +281,7 @@ service StreamManagerService {
rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse);
rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse);
rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse);
rpc Recover(RecoverRequest) returns (RecoverResponse);
}

// Below for cluster service.
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub mod handle_privilege;
mod kill_process;
pub mod privilege;
pub mod query;
mod recover;
pub mod show;
mod transaction;
pub mod util;
Expand Down Expand Up @@ -509,6 +510,7 @@ pub async fn handle(
}
Statement::Flush => flush::handle_flush(handler_args).await,
Statement::Wait => wait::handle_wait(handler_args).await,
Statement::Recover => recover::handle_recover(handler_args).await,
Statement::SetVariable {
local: _,
variable,
Expand Down
38 changes: 38 additions & 0 deletions src/frontend/src/handler/recover.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};

use super::RwPgResponse;
use crate::error::{ErrorCode, Result};
use crate::handler::HandlerArgs;
use crate::session::SessionImpl;

pub(super) async fn handle_recover(handler_args: HandlerArgs) -> Result<RwPgResponse> {
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
// Only permit recovery for super users.
if !handler_args.session.is_super_user() {
return Err(ErrorCode::PermissionDenied(
"only superusers can trigger adhoc recovery".to_string(),
)
.into());
}
do_recover(&handler_args.session).await?;
Ok(PgResponse::empty_result(StatementType::RECOVER))
}

pub(crate) async fn do_recover(session: &SessionImpl) -> Result<()> {
let client = session.env().meta_client();
client.recover().await?;
Ok(())
}
6 changes: 6 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub trait FrontendMetaClient: Send + Sync {

async fn wait(&self) -> Result<()>;

async fn recover(&self) -> Result<()>;

async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;

async fn list_table_fragments(
Expand Down Expand Up @@ -137,6 +139,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.wait().await
}

async fn recover(&self) -> Result<()> {
self.0.recover().await
}

async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
self.0.cancel_creating_jobs(infos).await
}
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,10 @@ impl FrontendMetaClient for MockFrontendMetaClient {
async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
unimplemented!()
}

async fn recover(&self) -> RpcResult<()> {
unimplemented!()
}
}

#[cfg(test)]
Expand Down
14 changes: 13 additions & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::manager::{LocalNotification, MetadataManager};
use risingwave_meta::model;
use risingwave_meta::model::ActorId;
use risingwave_meta::stream::ThrottleConfig;
Expand Down Expand Up @@ -411,4 +411,16 @@ impl StreamManagerService for StreamServiceImpl {
dependencies,
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn recover(
&self,
_request: Request<RecoverRequest>,
) -> Result<Response<RecoverResponse>, Status> {
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::AdhocRecovery)
.await;
Ok(Response::new(RecoverResponse {}))
}
}
51 changes: 44 additions & 7 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use crate::barrier::notifier::BarrierInfo;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::state::BarrierManagerState;
use crate::error::MetaErrorInner;
use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo};
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{
Expand Down Expand Up @@ -118,6 +119,8 @@ enum RecoveryReason {
Bootstrap,
/// After failure.
Failover(MetaError),
/// Manually triggered
Adhoc,
}

/// Status of barrier manager.
Expand Down Expand Up @@ -651,14 +654,20 @@ impl GlobalBarrierManager {
}
}

// Checkpoint frequency changes.
notification = local_notification_rx.recv() => {
let notification = notification.unwrap();
// Handle barrier interval and checkpoint frequency changes
if let LocalNotification::SystemParamsChange(p) = &notification {
self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
self.scheduled_barriers
.set_checkpoint_frequency(p.checkpoint_frequency() as usize)
match notification {
// Handle barrier interval and checkpoint frequency changes.
LocalNotification::SystemParamsChange(p) => {
self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
self.scheduled_barriers
.set_checkpoint_frequency(p.checkpoint_frequency() as usize)
},
// Handle adhoc recovery triggered by user.
LocalNotification::AdhocRecovery => {
self.adhoc_recovery().await;
}
_ => {}
}
}
resp_result = self.control_stream_manager.next_response() => {
Expand Down Expand Up @@ -788,7 +797,7 @@ impl GlobalBarrierManager {
err.clone(),
)));
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recover from the committed epoch
let span = tracing::info_span!(
"failure_recovery",
error = %err.as_report(),
Expand All @@ -803,6 +812,31 @@ impl GlobalBarrierManager {
panic!("failed to execute barrier: {}", err.as_report());
}
}

async fn adhoc_recovery(&mut self) {
let err = MetaErrorInner::AdhocRecovery.into();
self.context.tracker.lock().await.abort_all(&err);
self.checkpoint_control.clear_on_err(&err).await;

if self.enable_recovery {
self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Adhoc));
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recover from the committed epoch
let span = tracing::info_span!(
"adhoc_recovery",
error = %err.as_report(),
prev_epoch = prev_epoch.value().0
);

// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
self.recovery(None).instrument(span).await;
self.context.set_status(BarrierManagerStatus::Running);
} else {
panic!("failed to execute barrier: {}", err.as_report());
}
}
}

impl GlobalBarrierManagerContext {
Expand Down Expand Up @@ -1031,6 +1065,9 @@ impl GlobalBarrierManagerContext {
BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
}
BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
bail!("The cluster is recovering-adhoc")
}
BarrierManagerStatus::Running => Ok(()),
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ pub enum MetaErrorInner {
#[backtrace]
anyhow::Error,
),

// Indicates that recovery was triggered manually.
#[error("adhoc recovery triggered")]
AdhocRecovery,
}

impl MetaError {
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub enum LocalNotification {
SystemParamsChange(SystemParamsReader),
FragmentMappingsUpsert(Vec<FragmentId>),
FragmentMappingsDelete(Vec<FragmentId>),
AdhocRecovery,
}

#[derive(Debug)]
Expand Down
7 changes: 7 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,12 @@ impl MetaClient {
Ok(())
}

pub async fn recover(&self) -> Result<()> {
let request = RecoverRequest {};
self.inner.recover(request).await?;
Ok(())
}

pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
let resp = self.inner.cancel_creating_jobs(request).await?;
Expand Down Expand Up @@ -1903,6 +1909,7 @@ macro_rules! for_all_meta_rpc {
,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
,{ stream_client, recover, RecoverRequest, RecoverResponse }
,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
Expand Down
6 changes: 6 additions & 0 deletions src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,8 @@ pub enum Statement {
/// WAIT for ALL running stream jobs to finish.
/// It will block the current session the condition is met.
Wait,
/// Trigger stream job recover
Recover,
}

impl fmt::Display for Statement {
Expand Down Expand Up @@ -2108,6 +2110,10 @@ impl fmt::Display for Statement {
write!(f, "KILL {}", process_id)?;
Ok(())
}
Statement::Recover => {
write!(f, "RECOVER")?;
Ok(())
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ define_keywords!(
READ,
READS,
REAL,
RECOVER,
RECURSIVE,
REF,
REFERENCES,
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ impl Parser {
Keyword::CLOSE => Ok(self.parse_close_cursor()?),
Keyword::FLUSH => Ok(Statement::Flush),
Keyword::WAIT => Ok(Statement::Wait),
Keyword::RECOVER => Ok(Statement::Recover),
_ => self.expected(
"an SQL statement",
Token::Word(w).with_location(token.location),
Expand Down
25 changes: 25 additions & 0 deletions src/tests/simulation/tests/integration_tests/backfill_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,28 @@ async fn test_enable_arrangement_backfill() -> Result<()> {
assert!(!result.contains("ArrangementBackfill"));
Ok(())
}

#[tokio::test]
async fn test_recovery_cancels_foreground_ddl() -> Result<()> {
let mut cluster = Cluster::start(Configuration::enable_arrangement_backfill()).await?;
let mut session = cluster.start_session();
session.run("SET STREAMING_RATE_LIMIT=1").await?;
session.run("CREATE TABLE t(v1 int);").await?;
session
.run("INSERT INTO t select * from generate_series(1, 100000);")
.await?;
let handle = tokio::spawn(async move {
session
.run("CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;")
.await
});
sleep(Duration::from_secs(2)).await;
cluster.run("RECOVER").await?;
match handle.await? {
Ok(_) => panic!("create m1 should fail"),
Err(e) => {
assert!(e.to_string().contains("adhoc recovery triggered"));
}
}
Ok(())
}
1 change: 1 addition & 0 deletions src/utils/pgwire/src/pg_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub enum StatementType {
CLOSE_CURSOR,
WAIT,
KILL,
RECOVER,
}

impl std::fmt::Display for StatementType {
Expand Down
Loading