Skip to content

Commit

Permalink
add log
Browse files Browse the repository at this point in the history
  • Loading branch information
roseboy-liu committed Aug 17, 2022
1 parent 8ce24ef commit 643c7ba
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
2 changes: 1 addition & 1 deletion query/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl TableProvider for ClusterTable {
.create_physical_plan(projection, filter.clone(), self.schema())
.await;
}
fn supports_filter_pushdown(&self, filter: &Expr) -> Result<TableProviderFilterPushDown> {
fn supports_filter_pushdown(&self, _: &Expr) -> Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Inexact)
}
}
25 changes: 12 additions & 13 deletions tskv/src/kvcore.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
use std::{collections::HashMap, io::Result as IoResultExt, sync, sync::Arc, thread::JoinHandle};

use ::models::{FieldInfo, InMemPoint, SeriesInfo, Tag, ValueType};
use futures::stream::SelectNextSome;
use models::{FieldId, SeriesId, SeriesKey, Timestamp};
use parking_lot::{Mutex, RwLock};
use protos::models::Points;
use protos::{
kv_service::{WritePointsRpcRequest, WritePointsRpcResponse, WriteRowsRpcRequest},
models as fb_models,
};
use snafu::ResultExt;
use tokio::{
runtime::Builder,
Expand All @@ -17,6 +10,14 @@ use tokio::{
oneshot,
},
};

use ::models::{FieldInfo, InMemPoint, SeriesInfo, Tag, ValueType};
use models::{FieldId, SeriesId, SeriesKey, Timestamp};
use protos::models::Points;
use protos::{
kv_service::{WritePointsRpcRequest, WritePointsRpcResponse, WriteRowsRpcRequest},
models as fb_models,
};
use trace::{debug, error, info, trace, warn};

use crate::engine::Engine;
Expand Down Expand Up @@ -337,16 +338,17 @@ impl TsKv {
while let Some(command) = req_rx.recv().await {
match command {
Task::WritePoints { req, tx } => {
warn!("writing points.");
debug!("writing points.");
match tskv.write(req).await {
Ok(resp) => {
let _ret = tx.send(Ok(resp));
}
Err(err) => {
info!("write points error {:?}", err);
let _ret = tx.send(Err(err));
}
}
warn!("write points completed.");
debug!("write points completed.");
}
_ => panic!("unimplemented."),
}
Expand All @@ -356,11 +358,8 @@ impl TsKv {
tokio::spawn(f);
warn!("job 'main' started.");
}

// pub async fn query(&self, _opt: QueryOption) -> Result<Option<Entry>> {
// Ok(None)
// }
}

#[async_trait::async_trait]
impl Engine for TsKv {
async fn write(&self, write_batch: WritePointsRpcRequest) -> Result<WritePointsRpcResponse> {
Expand Down

0 comments on commit 643c7ba

Please sign in to comment.