Skip to content

Commit

Permalink
revert arrow version to 9
Browse files Browse the repository at this point in the history
  • Loading branch information
dmetasoul-opensource committed Mar 21, 2023
1 parent ef92479 commit 0431dbf
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 34 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/flink-cdc-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ jobs:
docker pull -q dockcross/manylinux2014-x64:latest
- name: Build with Maven
run: |
MAVEN_OPTS="-Xmx4000m" mvn -q -B clean package -f pom.xml -DskipTests
cargo install cross --git https://github.com/cross-rs/cross
MAVEN_OPTS="-Xmx4000m" mvn -q -B clean package -f pom.xml -Prelease-linux-x86-64 -DskipTests
- name: Get jar names
run: |
echo "FLINK_JAR_NAME=$(python script/get_jar_name.py lakesoul-flink)" >> $GITHUB_ENV
Expand Down Expand Up @@ -85,7 +86,7 @@ jobs:
- name: Accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --conf spark.hadoop.fs.s3.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.fast.upload.buffer=disk --conf spark.hadoop.fs.s3a.fast.upload=true --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --conf spark.hadoop.fs.s3.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.fast.upload.buffer=disk --conf spark.hadoop.fs.s3a.fast.upload=true --conf spark.dmetasoul.lakesoul.native.io.enable=true --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Adding columns for tables and deleting some data from tables
run: |
cd ./script/benchmark
Expand All @@ -96,7 +97,7 @@ jobs:
- name: Accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.dmetasoul.lakesoul.native.io.enable=true --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Updating data in tables
run: |
cd ./script/benchmark
Expand All @@ -105,7 +106,7 @@ jobs:
- name: Accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.dmetasoul.lakesoul.native.io.enable=true --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Dropping columns and deleting some data in tables
run: |
cd ./script/benchmark
Expand All @@ -116,4 +117,4 @@ jobs:
- name: Accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.dmetasoul.lakesoul.native.io.enable=true --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ object LakeSoulSQLConf {
.createWithDefault(10)

val NATIVE_IO_ENABLE: ConfigEntry[Boolean] =
buildConf("native.io.scan.enable")
buildConf("native.io.enable")
.doc(
"""
|If ture, org.apache.spark.sql.execution.datasources.parquet.NativeVectorizedReader will be used instead of org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
|If ture, NativeIO would be enabled for both read and write
""".stripMargin)
.booleanConf
.createWithDefault(false)
Expand Down Expand Up @@ -142,7 +142,7 @@ object LakeSoulSQLConf {
|If NATIVE_IO_ENABLE=true, tokio::runtime::Runtime will be build with NATIVE_IO_THREAD_NUM thread_num
""".stripMargin)
.intConf
.createWithDefault(1)
.createWithDefault(2)

val NATIVE_IO_READER_AWAIT_TIMEOUT: ConfigEntry[Int] =
buildConf("native.io.await.timeout")
Expand All @@ -151,5 +151,5 @@ object LakeSoulSQLConf {
|If NATIVE_IO_ENABLE=true, timeout for each iterate will be set to NATIVE_IO_READER_AWAIT_TIMEOUT mills
""".stripMargin)
.intConf
.createWithDefault(3000)
.createWithDefault(10000)
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ object Benchmark {
.config("spark.hadoop.fs.s3a.multipart.size", 67108864)
.config("spark.hadoop.fs.s3a.connection.maximum", 100)
.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
.config("spark.hadoop.fs.s3a.endpoint.region", "us-east-1")
.config("spark.hadoop.fs.s3a.access.key", "minioadmin1")
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin1")
.config("spark.sql.shuffle.partitions", 10)
.config("spark.sql.files.maxPartitionBytes", "1g")
.config("spark.default.parallelism", 8)
Expand All @@ -67,6 +69,7 @@ object Benchmark {
.config("spark.sql.catalog.lakesoul", classOf[LakeSoulCatalog].getName)
.config(SQLConf.DEFAULT_CATALOG.key, LakeSoulCatalog.CATALOG_NAME)
.config("spark.default.parallelism", "16")
.config("spark.dmetasoul.lakesoul.native.io.enable", "true")

val spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object ParquetScanBenchmark {

var bucketName = "lakesoul-test-bucket"
if (args.length >= 1 && args(0) == "--localtest") {
builder.config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
builder.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
.config("spark.hadoop.fs.s3a.endpoint.region", "us-east-1")
.config("spark.hadoop.fs.s3a.access.key", "minioadmin1")
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin1")
Expand Down
2 changes: 1 addition & 1 deletion native-io/lakesoul-io-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<arrow.version>11.0.0</arrow.version>
<arrow.version>9.0.0</arrow.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<scalatest.version>3.1.0</scalatest.version>
Expand Down
64 changes: 43 additions & 21 deletions native-io/lakesoul-io/src/lakesoul_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use atomic_refcell::AtomicRefCell;
use std::mem::MaybeUninit;
use std::sync::Arc;

use arrow_schema::SchemaRef;
use arrow::datatypes::Schema;
use arrow_schema::SchemaRef;

pub use datafusion::arrow::error::ArrowError;
pub use datafusion::arrow::error::Result as ArrowResult;
pub use datafusion::arrow::record_batch::RecordBatch;
pub use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::col as logical_col;
use datafusion::physical_plan::expressions::{PhysicalSortExpr, col};
use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};

use datafusion::prelude::SessionContext;

Expand All @@ -38,12 +38,11 @@ use tokio::runtime::Runtime;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

use crate::lakesoul_io_config::{create_session_context, LakeSoulIOConfig};
use crate::sorted_merge::sorted_stream_merger::{SortedStream,SortedStreamMerger};
use crate::sorted_merge::merge_operator::MergeOperator;
use crate::default_column_stream::default_column_stream::DefaultColumnStream;
use crate::filter::Parser as FilterParser;

use crate::lakesoul_io_config::{create_session_context, LakeSoulIOConfig};
use crate::sorted_merge::merge_operator::MergeOperator;
use crate::sorted_merge::sorted_stream_merger::{SortedStream, SortedStreamMerger};

pub struct LakeSoulReader {
sess_ctx: SessionContext,
Expand Down Expand Up @@ -75,15 +74,21 @@ impl LakeSoulReader {

let file_schema = Arc::new(Schema::from(df.schema()));


let cols = file_schema.fields().iter().filter(|field| schema.index_of(field.name()).is_ok()).map(|field| logical_col(field.name().as_str())).collect::<Vec<_>>();
let cols = file_schema
.fields()
.iter()
.filter(|field| schema.index_of(field.name()).is_ok())
.map(|field| logical_col(field.name().as_str()))
.collect::<Vec<_>>();

df = df.select(cols)?;

df = self.config.filter_strs.iter().try_fold(df, |df, f| df.filter(FilterParser::parse(f.clone(), file_schema.clone())))?;
df = self.config.filter_strs.iter().try_fold(df, |df, f| {
df.filter(FilterParser::parse(f.clone(), file_schema.clone()))
})?;
let stream = df.execute_stream().await?;
let stream = DefaultColumnStream::new_from_stream(stream, schema.clone());

self.schema = Some(stream.schema().clone().into());
self.stream = Box::new(MaybeUninit::new(Box::pin(stream)));
Ok(())
Expand All @@ -102,15 +107,18 @@ impl LakeSoulReader {
.await?;

let file_schema = Arc::new(Schema::from(df.schema()));
let cols = file_schema.fields().iter().filter(|field| schema.index_of(field.name()).is_ok()).map(|field| logical_col(field.name().as_str())).collect::<Vec<_>>();
df = df.select(cols)?;
df = self.config.filter_strs.iter().try_fold(df, |df, f| df.filter(FilterParser::parse(f.clone(), file_schema.clone())))?;
let stream = df
.execute_stream()
.await?;
streams.push(SortedStream::new(
stream,
));
let cols = file_schema
.fields()
.iter()
.filter(|field| schema.index_of(field.name()).is_ok())
.map(|field| logical_col(field.name().as_str()))
.collect::<Vec<_>>();
df = df.select(cols)?;
df = self.config.filter_strs.iter().try_fold(df, |df, f| {
df.filter(FilterParser::parse(f.clone(), file_schema.clone()))
})?;
let stream = df.execute_stream().await?;
streams.push(SortedStream::new(stream));
}

let mut sort_exprs = Vec::with_capacity(self.config.primary_keys.len());
Expand All @@ -121,7 +129,21 @@ impl LakeSoulReader {
});
}

let merge_ops = self.config.schema.0.fields().iter().map(|field| MergeOperator::from_name(self.config.merge_operators.get(field.name()).unwrap_or(&String::from("UseLast")))).collect::<Vec<_>>();
let merge_ops = self
.config
.schema
.0
.fields()
.iter()
.map(|field| {
MergeOperator::from_name(
self.config
.merge_operators
.get(field.name())
.unwrap_or(&String::from("UseLast")),
)
})
.collect::<Vec<_>>();

let merge_stream = SortedStreamMerger::new_from_streams(
streams,
Expand Down Expand Up @@ -202,11 +224,11 @@ impl SyncSendableMutableLakeSoulReader {
#[cfg(test)]
mod tests {
use super::*;
use rand::prelude::*;
use std::mem::ManuallyDrop;
use std::sync::mpsc::sync_channel;
use std::time::Instant;
use tokio::runtime::Builder;
use rand::prelude::*;

use arrow::datatypes::{DataType, Field, Schema};
use arrow::util::pretty::print_batches;
Expand Down
1 change: 0 additions & 1 deletion native-io/lakesoul-io/src/lakesoul_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ impl MultiPartAsyncWriter {
impl AsyncBatchWriter for MultiPartAsyncWriter {
async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<()> {
let batch = uniform_record_batch(batch);
println!{"{:?}", batch.schema()};
MultiPartAsyncWriter::write_batch(batch, &mut self.arrow_writer, &mut self.in_mem_buf, &mut self.writer).await
}

Expand Down

0 comments on commit 0431dbf

Please sign in to comment.