Skip to content

Commit

Permalink
importer: Update log and key format for src/import (#4145)
Browse files Browse the repository at this point in the history
* unified log format

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* minor fix

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/import.rs

Signed-off-by: Huachao Huang <huachao.huang@gmail.com>

Co-Authored-By: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/prepare.rs

Signed-off-by: Huachao Huang <huachao.huang@gmail.com>

Co-Authored-By: DorianZheng <xingzhengde72@gmail.com>

* hex format keys

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* minor fix

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/import.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/sst_service.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/sst_importer.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/sst_importer.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/prepare.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/sst_importer.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/prepare.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/kv_service.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/kv_service.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/import.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/import.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/import.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/import.rs

Signed-off-by: Huachao Huang <huachao.huang@gmail.com>

Co-Authored-By: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/prepare.rs

Signed-off-by: Huachao Huang <huachao.huang@gmail.com>

Co-Authored-By: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/import.rs

Signed-off-by: Huachao Huang <huachao.huang@gmail.com>

Co-Authored-By: DorianZheng <xingzhengde72@gmail.com>

* Update src/import/prepare.rs

Make huachao happy

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>

* minor fix

Signed-off-by: DorianZheng <xingzhengde72@gmail.com>
  • Loading branch information
DorianZheng committed Mar 1, 2019
1 parent 7d16e0b commit 371b664
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 84 deletions.
8 changes: 4 additions & 4 deletions src/import/client.rs
Expand Up @@ -112,15 +112,15 @@ impl Client {
let ch = match self.resolve(store.get_id()) {
Ok(v) => v,
Err(e) => {
error!("switch store {:?}: {:?}", store, e);
error!("get store channel failed"; "store" => ?store, "err" => %e);
continue;
}
};
let client = ImportSstClient::new(ch);
let future = match client.switch_mode_async(req) {
Ok(v) => v,
Err(e) => {
error!("switch store {:?}: {:?}", store, e);
error!("switch mode failed"; "store" => ?store, "err" => %e);
continue;
}
};
Expand All @@ -140,15 +140,15 @@ impl Client {
let ch = match self.resolve(store.get_id()) {
Ok(v) => v,
Err(e) => {
error!("compact store {:?}: {:?}", store, e);
error!("get store channel failed"; "store" => ?store, "err" => %e);
continue;
}
};
let client = ImportSstClient::new(ch);
let future = match client.compact_async(req) {
Ok(v) => v,
Err(e) => {
error!("compact store {:?}: {:?}", store, e);
error!("compact failed"; "store" => ?store, "err" => %e);
continue;
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/import/common.rs
Expand Up @@ -98,7 +98,7 @@ impl<Client: ImportClient> RangeContext<Client> {
self.region = match self.client.get_region(key) {
Ok(region) => Some(region),
Err(e) => {
error!("get region: {:?}", e);
error!("get region failed"; "err" => %e);
None
}
}
Expand Down
39 changes: 18 additions & 21 deletions src/import/import.rs
Expand Up @@ -54,7 +54,7 @@ impl<Client: ImportClient> ImportJob<Client> {

pub fn run(&self) -> Result<()> {
let start = Instant::now();
info!("{} start", self.tag);
info!("start"; "tag" => %self.tag);

// Before importing data, we need to help to balance data in the cluster.
let job = PrepareJob::new(
Expand All @@ -75,11 +75,11 @@ impl<Client: ImportClient> ImportJob<Client> {

match res {
Ok(_) => {
info!("{} takes {:?}", self.tag, start.elapsed());
info!("import engine"; "tag" => %self.tag, "takes" => ?start.elapsed());
Ok(())
}
Err(e) => {
error!("{}: {:?}", self.tag, e);
error!("import engine failed"; "tag" => %self.tag, "err" => %e);
Err(e)
}
}
Expand Down Expand Up @@ -148,7 +148,7 @@ impl<Client: ImportClient> SubImportJob<Client> {

fn run(&self) -> Result<()> {
let start = Instant::now();
info!("{} start {:?}", self.tag, self.range);
info!("start"; "tag" => %self.tag, "range" => ?self.range);

for i in 0..MAX_RETRY_TIMES {
if i != 0 {
Expand All @@ -158,7 +158,7 @@ impl<Client: ImportClient> SubImportJob<Client> {
let (tx, rx) = mpsc::sync_channel(self.cfg.num_import_sst_jobs);
let handles = self.run_import_threads(rx);
if let Err(e) = self.run_import_stream(tx) {
error!("{} import stream: {:?}", self.tag, e);
error!("import stream"; "tag" => %self.tag, "err" => %e);
continue;
}
for h in handles {
Expand All @@ -175,16 +175,13 @@ impl<Client: ImportClient> SubImportJob<Client> {

let range_count = self.finished_ranges.lock().unwrap().len();
info!(
"{} import {} ranges takes {:?}",
self.tag,
range_count,
start.elapsed(),
"import"; "tag" => %self.tag, "range_count" => %range_count, "takes" => ?start.elapsed(),
);

return Ok(());
}

error!("{} run out of time", self.tag);
error!("run out of time"; "tag" => %self.tag);
Err(Error::ImportJobFailed(self.tag.clone()))
}

Expand Down Expand Up @@ -258,7 +255,7 @@ impl<Client: ImportClient> ImportSSTJob<Client> {

fn run(&mut self) -> Result<()> {
let start = Instant::now();
info!("{} start {:?}", self.tag, self.sst);
info!("start"; "tag" => %self.tag, "sst" => ?self.sst);

for i in 0..MAX_RETRY_TIMES {
if i != 0 {
Expand All @@ -271,20 +268,20 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
if self.sst.inside_region(&region) {
region
} else {
warn!("{} outside of {:?}", self.tag, region);
warn!("sst out of region range"; "tag" => %self.tag, "region" => ?region);
return Err(Error::ImportSSTJobFailed(self.tag.clone()));
}
}
Err(e) => {
warn!("{}: {:?}", self.tag, e);
warn!("get region failed"; "tag" => %self.tag, "err" => %e);
continue;
}
};

for _ in 0..MAX_RETRY_TIMES {
match self.import(region) {
Ok(_) => {
info!("{} takes {:?}", self.tag, start.elapsed());
info!("import sst"; "tag" => %self.tag, "takes" => ?start.elapsed());
return Ok(());
}
Err(Error::UpdateRegion(new_region)) => {
Expand All @@ -296,12 +293,12 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
}
}

error!("{} run out of time", self.tag);
error!("run out of time"; "tag" => %self.tag);
Err(Error::ImportSSTJobFailed(self.tag.clone()))
}

fn import(&mut self, mut region: RegionInfo) -> Result<()> {
info!("{} import to {:?}", self.tag, region);
info!("start import sst"; "tag" => %self.tag, "region" => ?region);

// Update SST meta for this region.
{
Expand Down Expand Up @@ -336,7 +333,7 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
)))
}
None => {
warn!("{} epoch not match {:?}", self.tag, current_region);
warn!("epoch not match"; "tag" => %self.tag, "new_regions" => ?current_regions);
Err(Error::EpochNotMatch(current_regions))
}
}
Expand All @@ -351,10 +348,10 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
let store_id = peer.get_store_id();
match self.client.upload_sst(store_id, upload) {
Ok(_) => {
info!("{} upload to store {}", self.tag, store_id);
info!("upload"; "tag" => %self.tag, "store" => %store_id);
}
Err(e) => {
warn!("{} upload to store {}: {:?}", self.tag, store_id, e);
warn!("upload failed"; "tag" => %self.tag, "store" => %store_id, "err" => %e);
return Err(e);
}
}
Expand Down Expand Up @@ -386,11 +383,11 @@ impl<Client: ImportClient> ImportSSTJob<Client> {

match res {
Ok(_) => {
info!("{} ingest to store {}", self.tag, store_id);
info!("ingest"; "tag" => %self.tag, "store" => %store_id);
Ok(())
}
Err(e) => {
warn!("{} ingest to store {}: {:?}", self.tag, store_id, e);
warn!("ingest failed"; "tag" => %self.tag, "store" => %store_id, "err" => %e);
Err(e)
}
}
Expand Down
25 changes: 13 additions & 12 deletions src/import/kv_importer.rs
Expand Up @@ -63,19 +63,20 @@ impl KVImporter {

// Restrict max open engines
if inner.engines.len() >= self.cfg.max_open_engines {
let errmsg = format!("Too many open engines {}: {}", uuid, inner.engines.len());
error!("{}", errmsg);
return Err(Error::ResourceTemporarilyUnavailable(errmsg));
error!("Too many open engines "; "uuid" => %uuid, "opened_engine_count" => %inner.engines.len());
return Err(Error::ResourceTemporarilyUnavailable(
"Too many open engines".to_string(),
));
}

match self.dir.open(uuid) {
Ok(engine) => {
info!("open {:?}", engine);
info!("open engine"; "engine" => ?engine);
inner.engines.insert(uuid, Arc::new(engine));
Ok(())
}
Err(e) => {
error!("open {}: {:?}", uuid, e);
error!("open engine failed"; "uuid" => %uuid, "err" => %e);
Err(e)
}
}
Expand Down Expand Up @@ -110,11 +111,11 @@ impl KVImporter {

match engine.close() {
Ok(_) => {
info!("close {:?}", engine);
info!("close engine"; "engine" => ?engine);
Ok(())
}
Err(e) => {
error!("close {:?}: {:?}", engine, e);
error!("close engine failed"; "engine" => ?engine, "err" => %e);
Err(e)
}
}
Expand All @@ -141,11 +142,11 @@ impl KVImporter {

match res {
Ok(_) => {
info!("import {}", uuid);
info!("import"; "uuid" => %uuid);
Ok(())
}
Err(e) => {
error!("import {}: {:?}", uuid, e);
error!("import failed"; "uuid" => %uuid, "err" => %e);
Err(e)
}
}
Expand Down Expand Up @@ -175,11 +176,11 @@ impl KVImporter {

match self.dir.cleanup(uuid) {
Ok(_) => {
info!("cleanup {}", uuid);
info!("cleanup"; "uuid" => %uuid);
Ok(())
}
Err(e) => {
error!("cleanup {}: {:?}", uuid, e);
error!("cleanup failed"; "uuid" => %uuid, "err" => %e);
Err(e)
}
}
Expand Down Expand Up @@ -331,7 +332,7 @@ impl EngineFile {
impl Drop for EngineFile {
fn drop(&mut self) {
if let Err(e) = self.cleanup() {
warn!("cleanup {:?}: {:?}", self, e);
warn!("cleanup"; "engine file" => ?self, "err" => %e);
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/import/kv_service.rs
Expand Up @@ -89,11 +89,11 @@ impl ImportKv for ImportKVService {
let client = Client::new(req.get_pd_addr(), 1)?;
match client.switch_cluster(req.get_request()) {
Ok(_) => {
info!("switch cluster {:?}", req.get_request());
info!("switch cluster"; "req" => ?req.get_request());
Ok(())
}
Err(e) => {
error!("switch cluster {:?}: {:?}", req.get_request(), e);
error!("switch cluster failed"; "req" => ?req.get_request(), "err" => %e);
Err(e)
}
}
Expand Down Expand Up @@ -284,11 +284,11 @@ impl ImportKv for ImportKVService {
let client = Client::new(req.get_pd_addr(), 1)?;
match client.compact_cluster(&compact) {
Ok(_) => {
info!("compact cluster {:?}", compact);
info!("compact cluster"; "req" => ?compact);
Ok(())
}
Err(e) => {
error!("compact cluster {:?}: {:?}", compact, e);
error!("compact cluster failed"; "req" => ?compact, "err" => %e);
Err(e)
}
}
Expand Down

0 comments on commit 371b664

Please sign in to comment.