Skip to content

Commit

Permalink
1. support arbitrary db for rewrite 2. switch to rewrite2 3. support …
Browse files Browse the repository at this point in the history
…partition from rewrite
  • Loading branch information
wangxiaoying committed Jun 19, 2023
1 parent bed71fb commit 0fca401
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 27 deletions.
49 changes: 33 additions & 16 deletions benchmarks/tpch-fed.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,53 @@
"""
Usage:
tpch-fed.py --file=<file>
tpch-fed.py [--file=<file>] [--dir=<dir>] [--runs=<runs>]
Options:
--file=<file> Query file.
--file=<file> Query file.
--dir=<dir> Query path.
--runs=<runs> # runs [default: 1].
-h --help Show this screen.
--version Show version.
"""
import os

import os
import time
import connectorx as cx
from contexttimer import Timer
from docopt import docopt
import pandas as pd
from pathlib import Path

def run_query_from_file(query_file):
with open(query_file, "r") as f:
sql = f.read()
print(f"file: {query_file}")

with Timer() as timer:
df = cx.read_sql(db_map, sql, return_type="arrow")
print(f"time in total: {timer.elapsed}, {len(df)} rows, {len(df.columns)} cols")
print(df)



if __name__ == "__main__":
args = docopt(__doc__, version="Naval Fate 2.0")
query_file = args["--file"]

db_map = {
"db1": os.environ["DB1"],
"db2": os.environ["DB2"],
}
print(f"dbs: {db_map}")
db_map = {}
db_conns = os.environ["FED_CONN"]
for conn in db_conns.split(','):
db_map[conn.split('=', 1)[0]] = conn.split('=', 1)[1]

with open(query_file, "r") as f:
sql = f.read()
print(f"file: {query_file}")
print(f"dbs: {db_map}")

with Timer() as timer:
df = cx.read_sql(db_map, sql, return_type="pandas")
print("time in total:", timer.elapsed)
for i in range(int(args["--runs"])):
print(f"=============== run {i} ================")
print()
if args["--file"]:
filename = args["--file"]
run_query_from_file(filename)
elif args["--dir"]:
for filename in sorted(Path(args["--dir"]).glob("q*.sql")):
run_query_from_file(filename)
time.sleep(2)

print(df)
12 changes: 12 additions & 0 deletions connectorx-cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub struct CXConnectionInfo {
conn: *const c_char,
schema: CXSlice<CXTable>,
is_local: bool,
jdbc_url: *const c_char,
jdbc_driver: *const c_char,
}

#[repr(C)]
Expand Down Expand Up @@ -94,10 +96,20 @@ pub unsafe extern "C" fn connectorx_rewrite(
db_map.insert(name.to_string(), source_info);
} else {
let conn = unsafe { CStr::from_ptr(p.conn) }.to_str().unwrap();
let jdbc_url = match p.jdbc_url.is_null() {
true => "",
false => unsafe { CStr::from_ptr(p.jdbc_url) }.to_str().unwrap(),
};
let jdbc_driver = match p.jdbc_driver.is_null() {
true => "",
false => unsafe { CStr::from_ptr(p.jdbc_driver) }.to_str().unwrap(),
};
// println!("name: {:?}, conn: {:?}", name, conn);
let source_info = FederatedDataSourceInfo::new_from_conn_str(
SourceConn::try_from(conn).unwrap(),
p.is_local,
jdbc_url,
jdbc_driver,
);
db_map.insert(name.to_string(), source_info);
}
Expand Down
11 changes: 7 additions & 4 deletions connectorx/src/fed_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn run(
for (k, v) in db_map.into_iter() {
db_conn_map.insert(
k,
FederatedDataSourceInfo::new_from_conn_str(SourceConn::try_from(v.as_str())?, false),
FederatedDataSourceInfo::new_from_conn_str(SourceConn::try_from(v.as_str())?, false, "", ""),
);
}
let fed_plan = rewrite_sql(sql.as_str(), &db_conn_map, j4rs_base)?;
Expand All @@ -36,13 +36,16 @@ pub fn run(
}
_ => {
debug!("start query {}: {}", i, p.sql);
let queries = [CXQuery::naked(p.sql)];
let mut queries = vec![];
p.sql.split(';').for_each(|ss| {
queries.push(CXQuery::naked(ss));
});
let source_conn = &db_conn_map[p.db_name.as_str()]
.conn_str_info
.as_ref()
.unwrap();

let destination = get_arrow(source_conn, None, &queries)?;
let destination = get_arrow(source_conn, None, queries.as_slice())?;
let rbs = destination.arrow()?;

let provider = MemTable::try_new(rbs[0].schema(), vec![rbs])?;
Expand Down Expand Up @@ -72,7 +75,7 @@ pub fn run(
Ok(())
})?;

debug!("\nexecute query final...");
debug!("\nexecute query final...\n{}\n", local_sql);
let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
// until datafusion fix the bug: https://github.com/apache/arrow-datafusion/issues/2147
for alias in alias_names {
Expand Down
38 changes: 32 additions & 6 deletions connectorx/src/fed_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ pub struct Plan {
pub cardinality: usize,
}

pub struct FederatedDataSourceInfo {
pub struct FederatedDataSourceInfo<'a> {
pub conn_str_info: Option<SourceConn>,
pub manual_info: Option<HashMap<String, Vec<String>>>,
pub is_local: bool,
pub jdbc_url: &'a str,
pub jdbc_driver: &'a str,
}

impl FederatedDataSourceInfo {
pub fn new_from_conn_str(source_conn: SourceConn, is_local: bool) -> Self {
impl<'a> FederatedDataSourceInfo<'a> {
pub fn new_from_conn_str(source_conn: SourceConn, is_local: bool, jdbc_url: &'a str, jdbc_driver: &'a str) -> Self {
Self {
conn_str_info: Some(source_conn),
manual_info: None,
is_local,
jdbc_url,
jdbc_driver
}
}
pub fn new_from_manual_schema(
Expand All @@ -41,6 +45,8 @@ impl FederatedDataSourceInfo {
conn_str_info: None,
manual_info: Some(manual_schema),
is_local,
jdbc_url: "",
jdbc_driver: "",
}
}
}
Expand Down Expand Up @@ -69,6 +75,7 @@ fn init_jvm(j4rs_base: Option<&str>) -> Jvm {
.build()?
}

#[allow(dead_code)]
#[throws(ConnectorXOutError)]
fn create_sources(jvm: &Jvm, db_map: &HashMap<String, FederatedDataSourceInfo>) -> Instance {
let data_sources = jvm.create_instance("java.util.HashMap", &[])?;
Expand Down Expand Up @@ -121,7 +128,16 @@ fn create_sources(jvm: &Jvm, db_map: &HashMap<String, FederatedDataSourceInfo>)
InvocationArg::try_from(Null::String).unwrap(),
],
)?,
_ => unimplemented!("Connection: {:?} not supported!", url),
_ => jvm.create_instance(
"ai.dataprep.federated.FederatedDataSource",
&[
InvocationArg::try_from(db_info.is_local).unwrap(),
InvocationArg::try_from(db_info.jdbc_url).unwrap(),
InvocationArg::try_from(db_info.jdbc_driver).unwrap(),
InvocationArg::try_from(url.username()).unwrap(),
InvocationArg::try_from(url.password().unwrap_or("")).unwrap(),
],
)?,
};
jvm.invoke(
&data_sources,
Expand Down Expand Up @@ -166,6 +182,16 @@ fn create_sources(jvm: &Jvm, db_map: &HashMap<String, FederatedDataSourceInfo>)
data_sources
}

#[allow(dead_code)]
#[throws(ConnectorXOutError)]
fn create_sources2(jvm: &Jvm, db_map: &HashMap<String, FederatedDataSourceInfo>) -> Instance {
let mut dbs = vec![];
for db in db_map.keys() {
dbs.push(String::from(db));
}
jvm.java_list("java.lang.String", dbs)?
}

#[throws(ConnectorXOutError)]
pub fn rewrite_sql(
sql: &str,
Expand All @@ -176,10 +202,10 @@ pub fn rewrite_sql(
debug!("init jvm successfully!");

let sql = InvocationArg::try_from(sql).unwrap();
let data_sources = create_sources(&jvm, db_map)?;
let data_sources = create_sources2(&jvm, db_map)?;
let rewriter = jvm.create_instance("ai.dataprep.federated.FederatedQueryRewriter", &[])?;
let data_sources = InvocationArg::try_from(data_sources).unwrap();
let plan = jvm.invoke(&rewriter, "rewrite", &[sql, data_sources])?;
let plan = jvm.invoke(&rewriter, "rewrite2", &[sql, data_sources])?;

let count = jvm.invoke(&plan, "getCount", &[])?;
let count: i32 = jvm.to_rust(count)?;
Expand Down
3 changes: 2 additions & 1 deletion connectorx/src/source_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum SourceType {
Oracle,
BigQuery,
DuckDB,
Unknown,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -57,7 +58,7 @@ impl TryFrom<&str> for SourceConn {
"oracle" => Ok(SourceConn::new(SourceType::Oracle, url, proto)),
"bigquery" => Ok(SourceConn::new(SourceType::BigQuery, url, proto)),
"duckdb" => Ok(SourceConn::new(SourceType::DuckDB, url, proto)),
_ => unimplemented!("Connection: {} not supported!", conn),
_ => Ok(SourceConn::new(SourceType::Unknown, url, proto)),
}
}
}
Expand Down

0 comments on commit 0fca401

Please sign in to comment.