Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - fix: recover from invalid segments #2909

Closed
wants to merge 15 commits into from
Closed
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ jobs:
UNINSTALL: noclean
SERVER_LOG: fluvio=debug
strategy:
fail-fast: false
# fail-fast: true
matrix:
os: [ubuntu-latest]
rust-target: [x86_64-unknown-linux-musl]
Expand Down
9 changes: 3 additions & 6 deletions crates/fluvio-spu/src/replication/follower/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use fluvio_storage::config::ReplicaConfig;
use fluvio_controlplane_metadata::partition::{Replica, ReplicaKey};
use fluvio_protocol::record::RecordSet;
use fluvio_protocol::record::Offset;
use fluvio_storage::{FileReplica, StorageError, ReplicaStorage, ReplicaStorageConfig};
use fluvio_storage::{FileReplica, ReplicaStorage, ReplicaStorageConfig};
use fluvio_types::SpuId;

use crate::replication::leader::ReplicaOffsetRequest;
Expand Down Expand Up @@ -209,7 +209,7 @@ where
&self,
records: &mut RecordSet<R>,
leader_hw: Offset,
) -> Result<bool, StorageError> {
) -> Result<bool> {
let mut changes = false;

if records.total_records() > 0 {
Expand Down Expand Up @@ -245,10 +245,7 @@ where

/// try to write records
/// ensure records has correct baseoffset
async fn write_recordsets<R: BatchRecords>(
&self,
records: &mut RecordSet<R>,
) -> Result<bool, StorageError> {
async fn write_recordsets<R: BatchRecords>(&self, records: &mut RecordSet<R>) -> Result<bool> {
let storage_leo = self.leo();
if records.base_offset() != storage_leo {
// this could happened if records were sent from leader before hw was sync
Expand Down
6 changes: 3 additions & 3 deletions crates/fluvio-spu/src/replication/leader/replica_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use anyhow::Result;
use fluvio_protocol::record::{RecordSet, Offset, ReplicaKey, BatchRecords};
use fluvio_controlplane_metadata::partition::{Replica, ReplicaStatus, PartitionStatus};
use fluvio_controlplane::LrsRequest;
use fluvio_storage::{FileReplica, StorageError, ReplicaStorage, OffsetInfo, ReplicaStorageConfig};
use fluvio_storage::{FileReplica, ReplicaStorage, OffsetInfo, ReplicaStorageConfig};
use fluvio_types::{SpuId};
use fluvio_spu_schema::Isolation;

Expand Down Expand Up @@ -315,7 +315,7 @@ where
&self,
records: &mut RecordSet<R>,
notifiers: &FollowerNotifier,
) -> Result<(Offset, Offset, usize), StorageError> {
) -> Result<(Offset, Offset, usize)> {
let offsets = self
.storage
.write_record_set(records, self.in_sync_replica == 1)
Expand Down Expand Up @@ -742,7 +742,7 @@ mod test_leader {
&mut self,
records: &mut fluvio_protocol::record::RecordSet<R>,
update_highwatermark: bool,
) -> Result<usize, fluvio_storage::StorageError> {
) -> Result<usize> {
self.pos.leo = records.last_offset().unwrap();
if update_highwatermark {
self.pos.hw = self.pos.leo;
Expand Down
18 changes: 10 additions & 8 deletions crates/fluvio-spu/src/services/public/produce_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,16 @@ async fn handle_produce_partition<R: BatchRecords>(

PartitionWriteResult::ok(replica_id, base_offset, leo)
}
Err(err @ StorageError::BatchTooBig(_)) => {
error!(%replica_id, "Batch is too big: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::MessageTooLarge)
}
Err(err) => {
error!(%replica_id, "Error writing to replica: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::StorageError)
}
Err(err) => match err.downcast_ref::<StorageError>() {
Some(StorageError::BatchTooBig(_)) => {
error!(%replica_id, "Batch is too big: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::MessageTooLarge)
}
_ => {
error!(%replica_id, "Error writing to replica: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::StorageError)
}
},
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-spu/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ where
&self,
records: &mut RecordSet<R>,
hw_update: bool,
) -> Result<(Offset, Offset, usize), StorageError> {
) -> Result<(Offset, Offset, usize)> {
debug!(
replica = %self.id,
leo = self.leo(),
Expand Down
43 changes: 27 additions & 16 deletions crates/fluvio-storage/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::path::Path;

use fluvio_protocol::record::BatchHeader;
use fluvio_protocol::record::Offset;
use tracing::error;
use tracing::instrument;
use tracing::trace;
use tracing::debug;
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -19,20 +21,21 @@ use fluvio_protocol::record::Size;

use crate::file::FileBytesIterator;

#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, Clone)]
/// Outer batch representation
/// It's either sucessfully decoded into actual batch or not enough bytes to decode
pub enum BatchHeaderError<R> {
#[error(transparent)]
Io(#[from] IoError),
#[error("Not Enough Header{actual_len} {expected_len}")]
pub enum BatchHeaderError {
#[error("Not Enough Header {pos}, {actual_len} {expected_len}")]
NotEnoughHeader {
pos: u32,
actual_len: usize,
expected_len: usize,
},
#[error("Not Enough Content {actual_len} {expected_len}")]
#[error("Not Enough Content {pos} {base_offset} {actual_len} {expected_len}")]
NotEnoughContent {
header: Batch<R>, // decoded header
header: BatchHeader,
base_offset: Offset,
pos: u32,
actual_len: usize,
expected_len: usize,
},
Expand All @@ -57,7 +60,7 @@ where
#[instrument(skip(file))]
pub(crate) async fn read_from<S: StorageBytesIterator>(
file: &mut S,
) -> Result<Option<FileBatchPos<R>>, BatchHeaderError<R>> {
) -> Result<Option<FileBatchPos<R>>> {
let pos = file.get_pos();
trace!(pos, "reading from pos");
let bytes = match file.read_bytes(BATCH_FILE_HEADER_SIZE as u32).await? {
Expand All @@ -81,7 +84,9 @@ where
return Err(BatchHeaderError::NotEnoughHeader {
actual_len: read_len,
expected_len: BATCH_FILE_HEADER_SIZE,
});
pos,
}
.into());
}

let mut cursor = Cursor::new(bytes);
Expand Down Expand Up @@ -110,10 +115,13 @@ where
Some(bytes) => bytes,
None => {
return Err(BatchHeaderError::NotEnoughContent {
header: batch,
base_offset: batch.get_base_offset(),
header: batch.header,
actual_len: 0,
expected_len: content_len,
})
pos,
}
.into())
}
};

Expand All @@ -128,10 +136,13 @@ where

if read_len < content_len {
return Err(BatchHeaderError::NotEnoughContent {
header: batch,
base_offset: batch.get_base_offset(),
header: batch.header,
actual_len: read_len,
expected_len: content_len,
});
pos,
}
.into());
}

let mut cursor = Cursor::new(bytes);
Expand Down Expand Up @@ -226,9 +237,9 @@ where
match FileBatchPos::read_from(&mut self.byte_iterator).await {
Ok(batch_res) => Ok(batch_res),
Err(err) => {
debug!("error getting batch: {}", err);
error!("error getting batch: {}, invalidating", err);
self.invalid = true;
Err(anyhow!("error decoding batch: {}", err))
Err(err)
}
}
}
Expand Down
107 changes: 75 additions & 32 deletions crates/fluvio-storage/src/bin/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use std::{path::PathBuf};
use clap::Parser;
use anyhow::{Result, anyhow};

use fluvio_controlplane_metadata::partition::ReplicaKey;
use fluvio_protocol::record::Offset;
use fluvio_future::task::run_block_on;
use fluvio_storage::{
LogIndex, OffsetPosition,
batch_header::BatchHeaderStream,
segment::{MutableSegment},
config::{ReplicaConfig},
FileReplica, ReplicaStorage,
};
use fluvio_storage::records::FileRecords;

Expand All @@ -29,18 +31,23 @@ enum Main {

#[clap(name = "validate")]
ValidateSegment(SegmentValidateOpt),

/// show information about replica
#[clap(name = "replica")]
Replica(ReplicaOpt),
}

fn main() {
fluvio_future::subscriber::init_logger();

let opt = Main::parse();
let main_opt = Main::parse();

let result = run_block_on(async {
match opt {
match main_opt {
Main::Log(opt) => dump_log(opt).await,
Main::Index(opt) => dump_index(opt).await,
Main::ValidateSegment(opt) => validate_segment(opt).await,
Main::Replica(opt) => replica_info(opt).await,
}
});
if let Err(err) = result {
Expand Down Expand Up @@ -86,31 +93,45 @@ async fn dump_log(opt: LogOpt) -> Result<()> {

let mut count: usize = 0;
let time = std::time::Instant::now();
while let Some(batch_pos) = header_stream.try_next().await? {
let pos = batch_pos.get_pos();
let batch = batch_pos.inner();

let base_offset = batch.get_base_offset();

if let Some(min) = opt.min {
if (base_offset as usize) < min {
continue;
let mut last_batch_offset = 0;
loop {
match header_stream.try_next().await {
Ok(Some(batch_pos)) => {
let pos = batch_pos.get_pos();
let batch = batch_pos.inner();

let base_offset = batch.get_base_offset();

if let Some(min) = opt.min {
if (base_offset as usize) < min {
continue;
}
}
if let Some(max) = opt.max {
if (base_offset as usize) > max {
break;
}
}

if opt.print {
println!(
"batch offset: {}, pos: {}, len: {}, ",
base_offset, pos, batch.batch_len,
);
}

count += 1;
last_batch_offset = base_offset;
}
}
if let Some(max) = opt.max {
if (base_offset as usize) > max {
Ok(None) => {
break;
}
Err(err) => {
println!("encountered error: {:#?}", err);
println!("last batch offset: {}", last_batch_offset);
break;
}
}

if opt.print {
println!(
"batch offset: {}, pos: {}, len: {}, ",
base_offset, pos, batch.batch_len,
);
}

count += 1;
}

println!(
Expand Down Expand Up @@ -171,12 +192,6 @@ pub(crate) struct SegmentValidateOpt {

#[clap(long, default_value = "0")]
base_offset: Offset,

#[clap(long)]
skip_errors: bool,

#[clap(long)]
verbose: bool,
}

pub(crate) async fn validate_segment(opt: SegmentValidateOpt) -> Result<()> {
Expand All @@ -196,13 +211,41 @@ pub(crate) async fn validate_segment(opt: SegmentValidateOpt) -> Result<()> {
);

let start = std::time::Instant::now();
let last_offset = active_segment
.validate(opt.skip_errors, opt.verbose)
.await?;
let last_offset = active_segment.validate_and_repair().await?;

let duration = start.elapsed().as_secs_f32();

println!("completed, last offset = {last_offset}, took: {duration} seconds");

Ok(())
}

#[derive(Debug, Parser)]
pub(crate) struct ReplicaOpt {
/// base data directory
#[clap(value_parser)]
replica_dir: PathBuf,

#[clap(long)]
topic: String,

#[clap(long, default_value = "0")]
partition: u32,
}

pub(crate) async fn replica_info(opt: ReplicaOpt) -> Result<()> {
let replica_dir = opt.replica_dir;

println!("opening replica dir: {:#?}", replica_dir);
let option = ReplicaConfig::builder()
.base_dir(replica_dir.clone())
.build();

let replica = ReplicaKey::new(opt.topic, opt.partition);
let replica = FileReplica::create_or_load(&replica, option).await?;

println!("hw: {:#?}", replica.get_hw());
println!("leo: {:#?}", replica.get_leo());

Ok(())
}
Loading