Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flink ci #177

Merged
merged 2 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions .github/workflows/flink-cdc-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ jobs:
docker pull -q dockcross/manylinux2014-x64:latest
- name: Build with Maven
run: |
MAVEN_OPTS="-Xmx4000m" mvn -q -B clean package -f pom.xml -DskipTests
cargo install cross --git https://github.com/cross-rs/cross
MAVEN_OPTS="-Xmx4000m" mvn -q -B clean package -f pom.xml -Prelease-linux-x86-64 -DskipTests
- name: Get jar names
run: |
echo "FLINK_JAR_NAME=$(python script/get_jar_name.py lakesoul-flink)" >> $GITHUB_ENV
Expand Down Expand Up @@ -85,7 +86,7 @@ jobs:
- name: Accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --conf spark.hadoop.fs.s3.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.fast.upload.buffer=disk --conf spark.hadoop.fs.s3a.fast.upload=true --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --conf spark.hadoop.fs.s3.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.fast.upload.buffer=disk --conf spark.hadoop.fs.s3a.fast.upload=true --conf spark.dmetasoul.lakesoul.native.io.enable=true --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Adding columns for tables and deleting some data from tables
run: |
cd ./script/benchmark
Expand All @@ -96,7 +97,7 @@ jobs:
- name: Accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.dmetasoul.lakesoul.native.io.enable=true --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Updating data in tables
run: |
cd ./script/benchmark
Expand All @@ -105,7 +106,7 @@ jobs:
- name: Accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.dmetasoul.lakesoul.native.io.enable=true --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Dropping columns and deleting some data in tables
run: |
cd ./script/benchmark
Expand All @@ -116,4 +117,4 @@ jobs:
- name: Accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.dmetasoul.lakesoul.native.io.enable=true --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
4 changes: 0 additions & 4 deletions lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,6 @@
<pattern>io.netty</pattern>
<shadedPattern>com.lakesoul.shaded.io.netty</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.arrow</pattern>
<shadedPattern>com.lakesoul.shaded.org.apache.arrow</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
Expand Down
4 changes: 0 additions & 4 deletions lakesoul-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,6 @@
<pattern>com.google.flatbuffers</pattern>
<shadedPattern>com.lakesoul.shaded.com.google.flatbuffers</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.arrow</pattern>
<shadedPattern>com.lakesoul.shaded.org.apache.arrow</shadedPattern>
</relocation>
</relocations>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ object LakeSoulSQLConf {
.createWithDefault(10)

val NATIVE_IO_ENABLE: ConfigEntry[Boolean] =
buildConf("native.io.scan.enable")
buildConf("native.io.enable")
.doc(
"""
|If ture, org.apache.spark.sql.execution.datasources.parquet.NativeVectorizedReader will be used instead of org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
|If ture, NativeIO would be enabled for both read and write
""".stripMargin)
.booleanConf
.createWithDefault(false)
Expand Down Expand Up @@ -142,7 +142,7 @@ object LakeSoulSQLConf {
|If NATIVE_IO_ENABLE=true, tokio::runtime::Runtime will be build with NATIVE_IO_THREAD_NUM thread_num
""".stripMargin)
.intConf
.createWithDefault(1)
.createWithDefault(2)

val NATIVE_IO_READER_AWAIT_TIMEOUT: ConfigEntry[Int] =
buildConf("native.io.await.timeout")
Expand All @@ -151,5 +151,5 @@ object LakeSoulSQLConf {
|If NATIVE_IO_ENABLE=true, timeout for each iterate will be set to NATIVE_IO_READER_AWAIT_TIMEOUT mills
""".stripMargin)
.intConf
.createWithDefault(3000)
.createWithDefault(10000)
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ object Benchmark {
.config("spark.hadoop.fs.s3a.multipart.size", 67108864)
.config("spark.hadoop.fs.s3a.connection.maximum", 100)
.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
.config("spark.hadoop.fs.s3a.endpoint.region", "us-east-1")
.config("spark.hadoop.fs.s3a.access.key", "minioadmin1")
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin1")
.config("spark.sql.shuffle.partitions", 10)
.config("spark.sql.files.maxPartitionBytes", "1g")
.config("spark.default.parallelism", 8)
Expand All @@ -67,6 +69,7 @@ object Benchmark {
.config("spark.sql.catalog.lakesoul", classOf[LakeSoulCatalog].getName)
.config(SQLConf.DEFAULT_CATALOG.key, LakeSoulCatalog.CATALOG_NAME)
.config("spark.default.parallelism", "16")
.config("spark.dmetasoul.lakesoul.native.io.enable", "true")

val spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object ParquetScanBenchmark {

var bucketName = "lakesoul-test-bucket"
if (args.length >= 1 && args(0) == "--localtest") {
builder.config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
builder.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
.config("spark.hadoop.fs.s3a.endpoint.region", "us-east-1")
.config("spark.hadoop.fs.s3a.access.key", "minioadmin1")
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin1")
Expand Down
2 changes: 1 addition & 1 deletion native-io/lakesoul-io-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<arrow.version>11.0.0</arrow.version>
<arrow.version>9.0.0</arrow.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<scalatest.version>3.1.0</scalatest.version>
Expand Down
64 changes: 43 additions & 21 deletions native-io/lakesoul-io/src/lakesoul_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use atomic_refcell::AtomicRefCell;
use std::mem::MaybeUninit;
use std::sync::Arc;

use arrow_schema::SchemaRef;
use arrow::datatypes::Schema;
use arrow_schema::SchemaRef;

pub use datafusion::arrow::error::ArrowError;
pub use datafusion::arrow::error::Result as ArrowResult;
pub use datafusion::arrow::record_batch::RecordBatch;
pub use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::col as logical_col;
use datafusion::physical_plan::expressions::{PhysicalSortExpr, col};
use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};

use datafusion::prelude::SessionContext;

Expand All @@ -38,12 +38,11 @@ use tokio::runtime::Runtime;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

use crate::lakesoul_io_config::{create_session_context, LakeSoulIOConfig};
use crate::sorted_merge::sorted_stream_merger::{SortedStream,SortedStreamMerger};
use crate::sorted_merge::merge_operator::MergeOperator;
use crate::default_column_stream::default_column_stream::DefaultColumnStream;
use crate::filter::Parser as FilterParser;

use crate::lakesoul_io_config::{create_session_context, LakeSoulIOConfig};
use crate::sorted_merge::merge_operator::MergeOperator;
use crate::sorted_merge::sorted_stream_merger::{SortedStream, SortedStreamMerger};

pub struct LakeSoulReader {
sess_ctx: SessionContext,
Expand Down Expand Up @@ -75,15 +74,21 @@ impl LakeSoulReader {

let file_schema = Arc::new(Schema::from(df.schema()));


let cols = file_schema.fields().iter().filter(|field| schema.index_of(field.name()).is_ok()).map(|field| logical_col(field.name().as_str())).collect::<Vec<_>>();
let cols = file_schema
.fields()
.iter()
.filter(|field| schema.index_of(field.name()).is_ok())
.map(|field| logical_col(field.name().as_str()))
.collect::<Vec<_>>();

df = df.select(cols)?;

df = self.config.filter_strs.iter().try_fold(df, |df, f| df.filter(FilterParser::parse(f.clone(), file_schema.clone())))?;
df = self.config.filter_strs.iter().try_fold(df, |df, f| {
df.filter(FilterParser::parse(f.clone(), file_schema.clone()))
})?;
let stream = df.execute_stream().await?;
let stream = DefaultColumnStream::new_from_stream(stream, schema.clone());

self.schema = Some(stream.schema().clone().into());
self.stream = Box::new(MaybeUninit::new(Box::pin(stream)));
Ok(())
Expand All @@ -102,15 +107,18 @@ impl LakeSoulReader {
.await?;

let file_schema = Arc::new(Schema::from(df.schema()));
let cols = file_schema.fields().iter().filter(|field| schema.index_of(field.name()).is_ok()).map(|field| logical_col(field.name().as_str())).collect::<Vec<_>>();
df = df.select(cols)?;
df = self.config.filter_strs.iter().try_fold(df, |df, f| df.filter(FilterParser::parse(f.clone(), file_schema.clone())))?;
let stream = df
.execute_stream()
.await?;
streams.push(SortedStream::new(
stream,
));
let cols = file_schema
.fields()
.iter()
.filter(|field| schema.index_of(field.name()).is_ok())
.map(|field| logical_col(field.name().as_str()))
.collect::<Vec<_>>();
df = df.select(cols)?;
df = self.config.filter_strs.iter().try_fold(df, |df, f| {
df.filter(FilterParser::parse(f.clone(), file_schema.clone()))
})?;
let stream = df.execute_stream().await?;
streams.push(SortedStream::new(stream));
}

let mut sort_exprs = Vec::with_capacity(self.config.primary_keys.len());
Expand All @@ -121,7 +129,21 @@ impl LakeSoulReader {
});
}

let merge_ops = self.config.schema.0.fields().iter().map(|field| MergeOperator::from_name(self.config.merge_operators.get(field.name()).unwrap_or(&String::from("UseLast")))).collect::<Vec<_>>();
let merge_ops = self
.config
.schema
.0
.fields()
.iter()
.map(|field| {
MergeOperator::from_name(
self.config
.merge_operators
.get(field.name())
.unwrap_or(&String::from("UseLast")),
)
})
.collect::<Vec<_>>();

let merge_stream = SortedStreamMerger::new_from_streams(
streams,
Expand Down Expand Up @@ -202,11 +224,11 @@ impl SyncSendableMutableLakeSoulReader {
#[cfg(test)]
mod tests {
use super::*;
use rand::prelude::*;
use std::mem::ManuallyDrop;
use std::sync::mpsc::sync_channel;
use std::time::Instant;
use tokio::runtime::Builder;
use rand::prelude::*;

use arrow::datatypes::{DataType, Field, Schema};
use arrow::util::pretty::print_batches;
Expand Down
1 change: 0 additions & 1 deletion native-io/lakesoul-io/src/lakesoul_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ impl MultiPartAsyncWriter {
impl AsyncBatchWriter for MultiPartAsyncWriter {
async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<()> {
let batch = uniform_record_batch(batch);
println!{"{:?}", batch.schema()};
MultiPartAsyncWriter::write_batch(batch, &mut self.arrow_writer, &mut self.in_mem_buf, &mut self.writer).await
}

Expand Down