Skip to content

Commit

Permalink
Merge branch 'master' into error_type
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyxuqqi committed May 24, 2023
2 parents 272ea32 + 4f8d259 commit fe96997
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 154 deletions.
18 changes: 13 additions & 5 deletions components/encryption/src/file_dict_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use file_system::{rename, File, OpenOptions};
use kvproto::encryptionpb::{EncryptedContent, FileDictionary, FileInfo};
use protobuf::Message;
use rand::{thread_rng, RngCore};
use tikv_util::{box_err, set_panic_mark, warn};
use tikv_util::{box_err, info, set_panic_mark, warn};

use crate::{
encrypted_file::{EncryptedFile, Header, Version, TMP_FILE_SUFFIX},
Expand Down Expand Up @@ -134,11 +134,19 @@ impl FileDictionaryFile {
.open(&tmp_path)
.unwrap();

let header = Header::new(&file_dict_bytes, Version::V2);
tmp_file.write_all(&header.to_bytes())?;
let header = Header::new(&file_dict_bytes, Version::V2).to_bytes();
tmp_file.write_all(&header)?;
tmp_file.write_all(&file_dict_bytes)?;
tmp_file.sync_all()?;

let new_size = header.len() + file_dict_bytes.len();
info!(
"installing new dictionary file";
"name" => tmp_path.display(),
"old_size" => self.file_size,
"new_size" => new_size,
);
self.file_size = new_size;
// Replace old file with the tmp file aomticlly.
rename(&tmp_path, &origin_path)?;
let base_dir = File::open(&self.base)?;
Expand All @@ -148,9 +156,8 @@ impl FileDictionaryFile {
} else {
let file = EncryptedFile::new(&self.base, &self.name);
file.write(&file_dict_bytes, &PlaintextBackend::default())?;
self.file_size = file_dict_bytes.len();
}
// rough size, excluding EncryptedFile meta.
self.file_size = file_dict_bytes.len();
Ok(())
}

Expand Down Expand Up @@ -197,6 +204,7 @@ impl FileDictionaryFile {
}
}
Err(e @ Error::TailRecordParseIncomplete) => {
// We will call `rewrite` later to trim the corruption.
warn!(
"{:?} occurred and the last complete filename is {}",
e, last_record_name
Expand Down
3 changes: 2 additions & 1 deletion components/raft_log_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,11 @@ impl FileSystem for ManagedFileSystem {
}

fn delete<P: AsRef<Path>>(&self, path: P) -> IoResult<()> {
self.base_file_system.delete(path.as_ref())?;
if let Some(ref manager) = self.key_manager {
manager.delete_file(path.as_ref().to_str().unwrap())?;
}
self.base_file_system.delete(path)
Ok(())
}

fn rename<P: AsRef<Path>>(&self, src_path: P, dst_path: P) -> IoResult<()> {
Expand Down
7 changes: 7 additions & 0 deletions components/raftstore-v2/src/operation/ready/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use engine_traits::{
EncryptionKeyManager, KvEngine, RaftEngine, RaftLogBatch, TabletContext, TabletRegistry,
ALL_CFS,
};
use fail::fail_point;
use kvproto::raft_serverpb::{PeerState, RaftSnapshotData};
use protobuf::Message;
use raft::{eraftpb::Snapshot, StateRole};
Expand Down Expand Up @@ -287,11 +288,17 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
!s.scheduled || snapshot_index != RAFT_INIT_LOG_INDEX
}) {
info!(self.logger, "apply tablet snapshot completely");
// Tablet sent from region leader should have already be trimmed.
self.storage_mut().set_has_dirty_data(false);
SNAP_COUNTER.apply.success.inc();

fail_point!("apply_snapshot_complete");
}
if let Some(init) = split {
info!(self.logger, "init split with snapshot finished");
self.post_split_init(ctx, init);

fail_point!("post_split_init_complete");
}
self.schedule_apply_fsm(ctx);
if self.remove_tombstone_tablets(snapshot_index) {
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore-v2/src/raft/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ impl<EK: KvEngine, R> Apply<EK, R> {
self.region().get_id()
}

#[allow(unused)]
#[inline]
pub fn peer_id(&self) -> u64 {
self.peer.get_id()
}
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore-v2/src/worker/tablet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{

use collections::HashMap;
use engine_traits::{DeleteStrategy, KvEngine, Range, TabletContext, TabletRegistry, DATA_CFS};
use fail::fail_point;
use kvproto::{import_sstpb::SstMeta, metapb::Region};
use slog::{debug, error, info, warn, Logger};
use sst_importer::SstImporter;
Expand Down Expand Up @@ -255,6 +256,7 @@ impl<EK: KvEngine> Runner<EK> {
}
// drop before callback.
drop(tablet);
fail_point!("tablet_trimmed_finished");
cb();
})
.unwrap();
Expand Down
21 changes: 16 additions & 5 deletions components/resolved_ts/src/advance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use concurrency_manager::ConcurrencyManager;
use engine_traits::KvEngine;
use fail::fail_point;
use futures::{compat::Future01CompatExt, future::select_all, FutureExt, TryFutureExt};
use grpcio::{ChannelBuilder, Environment, Error as GrpcError, RpcStatusCode};
use grpcio::{
ChannelBuilder, CompressionAlgorithms, Environment, Error as GrpcError, RpcStatusCode,
};
use kvproto::{
kvrpcpb::{CheckLeaderRequest, CheckLeaderResponse},
metapb::{Peer, PeerRole},
Expand Down Expand Up @@ -44,6 +46,8 @@ use txn_types::TimeStamp;
use crate::{endpoint::Task, metrics::*};

const DEFAULT_CHECK_LEADER_TIMEOUT_DURATION: Duration = Duration::from_secs(5); // 5s
const DEFAULT_GRPC_GZIP_COMPRESSION_LEVEL: usize = 2;
const DEFAULT_GRPC_MIN_MESSAGE_SIZE_TO_COMPRESS: usize = 4096;

pub struct AdvanceTsWorker {
pd_client: Arc<dyn PdClient>,
Expand Down Expand Up @@ -520,10 +524,17 @@ async fn get_tikv_client(
let mut clients = tikv_clients.lock().await;
let start = Instant::now_coarse();
// hack: so it's different args, grpc will always create a new connection.
let cb = ChannelBuilder::new(env.clone()).raw_cfg_int(
CString::new("random id").unwrap(),
CONN_ID.fetch_add(1, Ordering::SeqCst),
);
// the check leader requests may be large but not frequent, compress it to
// reduce the traffic.
let cb = ChannelBuilder::new(env.clone())
.raw_cfg_int(
CString::new("random id").unwrap(),
CONN_ID.fetch_add(1, Ordering::SeqCst),
)
.default_compression_algorithm(CompressionAlgorithms::GRPC_COMPRESS_GZIP)
.default_gzip_compression_level(DEFAULT_GRPC_GZIP_COMPRESSION_LEVEL)
.default_grpc_min_message_size_to_compress(DEFAULT_GRPC_MIN_MESSAGE_SIZE_TO_COMPRESS);

let channel = security_mgr.connect(cb, &store.peer_address);
let cli = TikvClient::new(channel);
clients.insert(store_id, cli.clone());
Expand Down
17 changes: 5 additions & 12 deletions components/security/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub struct ClientSuite {

impl SecurityConfig {
/// Validates ca, cert and private key.
pub fn validate(&self, raftstore_v2: bool) -> Result<(), Box<dyn Error>> {
pub fn validate(&self) -> Result<(), Box<dyn Error>> {
check_key_file("ca key", &self.ca_path)?;
check_key_file("cert key", &self.cert_path)?;
check_key_file("private key", &self.key_path)?;
Expand All @@ -97,13 +97,6 @@ impl SecurityConfig {
{
return Err("ca, cert and private key should be all configured.".into());
}
if raftstore_v2
&& self.encryption.data_encryption_method
!= kvproto::encryptionpb::EncryptionMethod::Plaintext
{
return Err("encryption is not supported for partitioned-raft-kv".into());
}

Ok(())
}

Expand Down Expand Up @@ -304,7 +297,7 @@ mod tests {
fn test_security() {
let cfg = SecurityConfig::default();
// default is disable secure connection.
cfg.validate(false).unwrap();
cfg.validate().unwrap();
let mgr = SecurityManager::new(&cfg).unwrap();
assert!(mgr.cfg.ca_path.is_empty());
assert!(mgr.cfg.cert_path.is_empty());
Expand All @@ -313,7 +306,7 @@ mod tests {
let assert_cfg = |c: fn(&mut SecurityConfig), valid: bool| {
let mut invalid_cfg = cfg.clone();
c(&mut invalid_cfg);
assert_eq!(invalid_cfg.validate(false).is_ok(), valid);
assert_eq!(invalid_cfg.validate().is_ok(), valid);
};

// invalid path should be rejected.
Expand Down Expand Up @@ -341,11 +334,11 @@ mod tests {
c.cert_path = format!("{}", example_cert.display());
c.key_path = format!("{}", example_key.display());
// incomplete configuration.
c.validate(false).unwrap_err();
c.validate().unwrap_err();

// data should be loaded from file after validating.
c.ca_path = format!("{}", example_ca.display());
c.validate(false).unwrap();
c.validate().unwrap();

let (ca, cert, key) = c.load_certs().unwrap_or_default();
assert_eq!(ca, vec![0]);
Expand Down
42 changes: 24 additions & 18 deletions components/tikv_util/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,28 +131,35 @@ impl Mul<u64> for ReadableSize {
}
}

impl Serialize for ReadableSize {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
impl fmt::Display for ReadableSize {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let size = self.0;
let mut buffer = String::new();
if size == 0 {
write!(buffer, "{}KiB", size).unwrap();
write!(f, "{}KiB", size)
} else if size % PIB == 0 {
write!(buffer, "{}PiB", size / PIB).unwrap();
write!(f, "{}PiB", size / PIB)
} else if size % TIB == 0 {
write!(buffer, "{}TiB", size / TIB).unwrap();
write!(f, "{}TiB", size / TIB)
} else if size % GIB == 0 {
write!(buffer, "{}GiB", size / GIB).unwrap();
write!(f, "{}GiB", size / GIB)
} else if size % MIB == 0 {
write!(buffer, "{}MiB", size / MIB).unwrap();
write!(f, "{}MiB", size / MIB)
} else if size % KIB == 0 {
write!(buffer, "{}KiB", size / KIB).unwrap();
write!(f, "{}KiB", size / KIB)
} else {
write!(buffer, "{}B", size).unwrap();
write!(f, "{}B", size)
}
}
}

impl Serialize for ReadableSize {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut buffer = String::new();
write!(buffer, "{}", self).unwrap();
serializer.serialize_str(&buffer)
}
}
Expand All @@ -164,11 +171,11 @@ impl FromStr for ReadableSize {
fn from_str(s: &str) -> Result<ReadableSize, String> {
let size_str = s.trim();
if size_str.is_empty() {
return Err(format!("{:?} is not a valid size.", s));
return Err(format!("{s:?} is not a valid size."));
}

if !size_str.is_ascii() {
return Err(format!("ASCII string is expected, but got {:?}", s));
return Err(format!("ASCII string is expected, but got {s:?}"));
}

// size: digits and '.' as decimal separator
Expand Down Expand Up @@ -198,15 +205,14 @@ impl FromStr for ReadableSize {
}
_ => {
return Err(format!(
"only B, KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, and PiB are supported: {:?}",
s
"only B, KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, and PiB are supported: {s:?}"
));
}
};

match size.parse::<f64>() {
Ok(n) => Ok(ReadableSize((n * unit as f64) as u64)),
Err(_) => Err(format!("invalid size string: {:?}", s)),
Err(_) => Err(format!("invalid size string: {s:?}")),
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions etc/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@
## The path to RocksDB directory.
# data-dir = "./"

## Specifies the engine type. This configuration can only be specified when creating a new cluster
## and cannot be modifies once being specified.
##
## Available types are:
## "raft-kv": The default engine type in versions earlier than TiDB v6.6.0.
## "partitioned-raft-kv": The new storage engine type introduced in TiDB v6.6.0.
# engine = "raft-kv"

## The number of slots in Scheduler latches, which controls write concurrency.
## In most cases you can use the default value. When importing data, you can set it to a larger
## value.
Expand Down Expand Up @@ -630,7 +638,12 @@

## Memory usage limit for Raft Engine. Undersized write buffers will be flushed to satisfy the
## requirement.
##
## No limit when not specified.
##
## When storage.engine is "raft-kv", default is no limit.
## When storage.engine is "partitioned-raft-kv", default value is 25% of available system memory or
## 15GiB, whichever is smaller.
# write-buffer-limit = "1GB"

## Options for `Titan`.
Expand Down

0 comments on commit fe96997

Please sign in to comment.