Skip to content

Commit

Permalink
cnosdb#458 fix: recover from wal will miss data
Browse files Browse the repository at this point in the history
  • Loading branch information
bartliu827 authored and roseboy-liu committed Aug 22, 2022
1 parent b28a269 commit a007aed
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 87 deletions.
93 changes: 35 additions & 58 deletions tskv/src/kvcore.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::HashMap, io::Result as IoResultExt, sync, sync::Arc, thread::JoinHandle};

use futures::stream::SelectNextSome;
use libc::printf;
use parking_lot::{Mutex, RwLock};
use snafu::ResultExt;
use tokio::{
Expand Down Expand Up @@ -91,7 +92,7 @@ impl TsKv {
summary_task_sender: summary_task_sender.clone(),
};

core.recover_wal();
core.recover_wal().await;
core.run_wal_job(wal_receiver);
core.run_flush_job(
flush_task_receiver,
Expand Down Expand Up @@ -142,7 +143,6 @@ impl TsKv {
wal_manager
.recover(
self,
self.version_set.clone(),
self.global_ctx.clone(),
self.flush_task_sender.clone(),
)
Expand Down Expand Up @@ -226,25 +226,8 @@ impl TsKv {
),
};

for p in points.iter() {
let sid = p.series_id();

for f in p.fields().iter() {
tsf.put_mutcache(
&mut MemRaw {
seq,
ts: p.timestamp as i64,
field_id: f.field_id(),
field_type: f.value_type,
val: &f.value,
},
self.flush_task_sender.clone(),
)
.await;

println!("==== insert_cache sid {:02X} fid {:02X}", sid, f.field_id());
}
}
tsf.put_points(seq, points, self.flush_task_sender.clone())
.await;
}

fn run_wal_job(&self, mut receiver: UnboundedReceiver<WalTask>) {
Expand Down Expand Up @@ -376,24 +359,18 @@ impl TsKv {
warn!("job 'main' started.");
}

async fn write_points(
&self,
write_batch: WritePointsRpcRequest,
mut seq: u64,
) -> Result<WritePointsRpcResponse> {
let shared_write_batch = Arc::new(write_batch.points);
let fb_points = flatbuffers::root::<fb_models::Points>(&shared_write_batch)
async fn build_mem_points(&self, points: Arc<Vec<u8>>) -> Result<(String, Vec<InMemPoint>)> {
let fb_points = flatbuffers::root::<fb_models::Points>(&points)
.context(error::InvalidFlatbufferSnafu)?;

let mut db_name = write_batch.database;
let db_name = String::from_utf8(fb_points.database().unwrap().to_vec())
.map_err(|err| Error::ErrCharacterSet)?;

let mut mem_points = Vec::<_>::with_capacity(fb_points.points().unwrap().len());
// get or create forward index
for point in fb_points.points().unwrap() {
let mut info =
SeriesInfo::from_flatbuffers(&point).context(error::InvalidModelSnafu)?;
if db_name.is_empty() {
db_name = info.db().clone();
}
let sid = self
.index_set
.write()
Expand All @@ -408,35 +385,12 @@ impl TsKv {

for i in 0..fields.len() {
point.fields[i].field_id = fields[i].field_id();
println!(
"==== write sid {:02X} fid {:02X}",
sid,
fields[i].field_id()
);
}

println!("==================");
mem_points.push(point);
}

// write wal
if seq == 0 {
let (cb, rx) = oneshot::channel();
self.wal_sender
.send(WalTask::Write {
points: shared_write_batch.clone(),
cb,
})
.map_err(|err| Error::Send)?;
let (tmp, _) = rx.await.context(error::ReceiveSnafu)??;
seq = tmp;
}

self.insert_cache(&db_name, seq, &mem_points).await;
Ok(WritePointsRpcResponse {
version: 1,
points: vec![],
})
return Ok((db_name, mem_points));
}

// pub async fn query(&self, _opt: QueryOption) -> Result<Option<Entry>> {
Expand All @@ -446,14 +400,37 @@ impl TsKv {
#[async_trait::async_trait]
impl Engine for TsKv {
async fn write(&self, write_batch: WritePointsRpcRequest) -> Result<WritePointsRpcResponse> {
self.write_points(write_batch, 0).await
let points = Arc::new(write_batch.points);
let (db_name, mem_points) = self.build_mem_points(points.clone()).await?;

let (cb, rx) = oneshot::channel();
self.wal_sender
.send(WalTask::Write { cb, points })
.map_err(|err| Error::Send)?;
let (seq, _) = rx.await.context(error::ReceiveSnafu)??;

self.insert_cache(&db_name, seq, &mem_points).await;

Ok(WritePointsRpcResponse {
version: 1,
points: vec![],
})
}

async fn write_from_wal(
&self,
write_batch: WritePointsRpcRequest,
seq: u64,
) -> Result<WritePointsRpcResponse> {
self.write_points(write_batch, seq).await
let points = Arc::new(write_batch.points);
let (db_name, mem_points) = self.build_mem_points(points.clone()).await?;

self.insert_cache(&db_name, seq, &mem_points).await;

Ok(WritePointsRpcResponse {
version: 1,
points: vec![],
})
}

fn read(
Expand Down
4 changes: 2 additions & 2 deletions tskv/src/memcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Default for MemEntry {
ts_min: i64::MAX,
ts_max: i64::MIN,
field_type: ValueType::Unknown,
cells: Vec::new(),
cells: Vec::with_capacity(256),
}
}
}
Expand Down Expand Up @@ -174,7 +174,7 @@ impl MemCache {
item.ts_max = ts;
}
if item.ts_min > ts {
item.ts_min = ts
item.ts_min = ts;
}
item.field_type = raw.field_type;
self.cache_size += size_of_val(&data) as u64;
Expand Down
68 changes: 41 additions & 27 deletions tskv/src/tseries_family.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
use config::get_config;
use crossbeam::channel::internal::SelectHandle;
use lazy_static::lazy_static;
use models::{FieldId, Timestamp, ValueType};
use models::{FieldId, InMemPoint, Timestamp, ValueType};
use parking_lot::{Mutex, RwLock};
use tokio::sync::mpsc::UnboundedSender;
use trace::{debug, error, info, warn};
Expand Down Expand Up @@ -684,25 +684,24 @@ impl TseriesFamily {
.expect("error send flush req to kvcore");
}

// todo(Subsegment) : (&mut self) will case performance regression.we must get writeLock to get
// version_set when we insert each point
pub async fn put_mutcache(
pub async fn put_points(
&mut self,
raw: &mut MemRaw<'_>,
seq: u64,
points: &Vec<InMemPoint>,
sender: UnboundedSender<Arc<Mutex<Vec<FlushReq>>>>,
) {
if self.immut_ts_min == i64::MIN {
self.immut_ts_min = raw.ts;
}
if raw.ts >= self.immut_ts_min {
if raw.ts > self.mut_ts_max {
self.mut_ts_max = raw.ts;
for p in points.iter() {
let sid = p.series_id();
for f in p.fields().iter() {
self.put_mutcache(&mut MemRaw {
seq,
ts: p.timestamp,
field_id: f.field_id(),
field_type: f.value_type,
val: &f.value,
})
.await;
}
let mut mem = self.super_version.caches.mut_cache.write();
let _ = mem.insert_raw(raw);
} else {
let mut delta_mem = self.super_version.caches.delta_mut_cache.write();
let _ = delta_mem.insert_raw(raw);
}

if self.super_version.caches.mut_cache.read().is_full() {
Expand All @@ -718,6 +717,24 @@ impl TseriesFamily {
}
}

// todo(Subsegment) : (&mut self) will case performance regression.we must get writeLock to get
// version_set when we insert each point
pub async fn put_mutcache(&mut self, raw: &mut MemRaw<'_>) {
if self.immut_ts_min == i64::MIN {
self.immut_ts_min = raw.ts;
}
if raw.ts >= self.immut_ts_min {
if raw.ts > self.mut_ts_max {
self.mut_ts_max = raw.ts;
}
let mut mem = self.super_version.caches.mut_cache.write();
let _ = mem.insert_raw(raw);
} else {
let mut delta_mem = self.super_version.caches.delta_mut_cache.write();
let _ = delta_mem.insert_raw(raw);
}
}

pub async fn delete_cache(&self, time_range: &TimeRange) {
for i in self.mut_cache.write().data_cache.iter_mut() {
if i.1.overlap(time_range) {
Expand Down Expand Up @@ -999,17 +1016,14 @@ mod test {
)),
tcfg.clone(),
);
let (flush_task_sender, flush_task_receiver) = mpsc::unbounded_channel();
tsf.put_mutcache(
&mut MemRaw {
seq: 0,
ts: 0,
field_id: 0,
field_type: ValueType::Integer,
val: 10_i32.to_be_bytes().as_slice(),
},
flush_task_sender,
)

tsf.put_mutcache(&mut MemRaw {
seq: 0,
ts: 0,
field_id: 0,
field_type: ValueType::Integer,
val: 10_i32.to_be_bytes().as_slice(),
})
.await;
assert_eq!(
tsf.mut_cache.read().data_cache.get(&0).unwrap().cells.len(),
Expand Down

0 comments on commit a007aed

Please sign in to comment.