Skip to content

Commit

Permalink
add entity comments && fix compile option
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <huazeng@dmetasoul.com>
  • Loading branch information
zenghua committed Jul 14, 2023
1 parent b57a1b9 commit 5b8b6ad
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 34 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/native-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ jobs:

build-maven-package:
runs-on: ubuntu-latest
needs: [build-linux-x86_64, build-windows-x86_64, build-macos-x86_64]
needs: [ build-linux-x86_64, build-windows-x86_64, build-macos-x86_64 ]
steps:
- uses: actions/checkout@v3
- uses: actions/download-artifact@v3
Expand All @@ -126,6 +126,10 @@ jobs:
java-version: '8'
distribution: 'temurin'
cache: maven
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
version: "23.x"
- name: Build with Maven
run: |
MAVEN_OPTS="-Xmx4000m" mvn -q -B package --file pom.xml -Pcross-build -DskipTests -Dmaven.test.skip=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.dmetasoul.lakesoul.meta.entity.DataFileOp;
import com.dmetasoul.lakesoul.meta.entity.FileOp;
import com.zaxxer.hikari.HikariConfig;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -174,7 +175,7 @@ public static String changeDataFileOpListToString(List<DataFileOp> dataFileOpLis
sb.append("{");
for (DataFileOp dataFileOp : dataFileOpList) {
String path = dataFileOp.getPath();
String fileOp = dataFileOp.getFileOp();
String fileOp = dataFileOp.getFileOp().name();
long size = dataFileOp.getSize();
String fileExistCols = dataFileOp.getFileExistCols();
sb.append(String.format("\"(%s,%s,%s,\\\"%s\\\")\",", path, fileOp, size, fileExistCols));
Expand Down Expand Up @@ -202,7 +203,7 @@ public static List<DataFileOp> changeStringToDataFileOpList(String s) {
dataFileOp.setPath(tmpElem.substring(0, tmpElem.indexOf(",")));
tmpElem = tmpElem.substring(tmpElem.indexOf(",") + 1);
String fileOp = tmpElem.substring(0, tmpElem.indexOf(","));
dataFileOp.setFileOp(fileOp);
dataFileOp.setFileOp(FileOp.valueOf(fileOp));
tmpElem = tmpElem.substring(tmpElem.indexOf(",") + 1);
dataFileOp.setSize(Long.parseLong(tmpElem.substring(0, tmpElem.indexOf(","))));
tmpElem = tmpElem.substring(tmpElem.indexOf(",") + 1);
Expand Down
5 changes: 5 additions & 0 deletions lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@
<include>com.fasterxml.jackson.module:jackson-module-scala_2.12</include>
<include>com.fasterxml.jackson.module:jackson-module-paranamer</include>
<include>com.thoughtworks.paranamer:paranamer</include>
<include>com.google.protobuf:protobuf-java</include>
</includes>
<excludes>
<exclude>org.apache.logging.log4j:*</exclude>
Expand Down Expand Up @@ -540,6 +541,10 @@
<pattern>org.apache.arrow.vector</pattern>
<shadedPattern>com.lakesoul.shaded.org.apache.arrow.vector</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>com.lakesoul.shaded.com.google.protobuf</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
package org.apache.flink.lakesoul.sink.committer;

import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.entity.CommitOp;
import com.dmetasoul.lakesoul.meta.entity.DataCommitInfo;
import com.dmetasoul.lakesoul.meta.entity.DataFileOp;
import com.dmetasoul.lakesoul.meta.entity.TableNameId;
import com.dmetasoul.lakesoul.meta.entity.*;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
Expand Down Expand Up @@ -103,7 +100,7 @@ public List<LakeSoulMultiTableSinkCommittable> commit(List<LakeSoulMultiTableSin
.collect(Collectors.joining(LAKESOUL_FILE_EXISTS_COLUMN_SPLITTER));
for (String file : files) {
DataFileOp.Builder dataFileOp = DataFileOp.newBuilder();
dataFileOp.setFileOp(LakeSoulSinkOptions.FILE_OPTION_ADD);
dataFileOp.setFileOp(FileOp.add);
dataFileOp.setPath(file);
Path path = new Path(file);
FileStatus fileStatus = FileSystem.get(path.toUri()).getFileStatus(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object DataOperation {
dataCommitInfoList.foreach(data_commit_info => {
val fileOps = data_commit_info.getFileOpsList.asScala.toArray
fileOps.foreach(file => {
file_arr_buf += DataFileInfo(data_commit_info.getPartitionDesc, file.getPath, file.getFileOp, file.getSize,
file_arr_buf += DataFileInfo(data_commit_info.getPartitionDesc, file.getPath, file.getFileOp.name, file.getSize,
data_commit_info.getTimestamp, file.getFileExistCols)
})
})
Expand All @@ -128,7 +128,7 @@ object DataOperation {
for (metaDataCommitInfo <- dataCommitInfoList) {
val fileOps = metaDataCommitInfo.getFileOpsList.asScala.toArray
for (file <- fileOps) {
file_arr_buf += DataFileInfo(partition_info.range_value, file.getPath, file.getFileOp, file.getSize,
file_arr_buf += DataFileInfo(partition_info.range_value, file.getPath, file.getFileOp.name, file.getSize,
metaDataCommitInfo.getTimestamp, file.getFileExistCols)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.dmetasoul.lakesoul.meta

import com.dmetasoul.lakesoul.meta.entity.DataFileOp
import com.dmetasoul.lakesoul.meta.entity.{DataFileOp, FileOp}
import com.google.common.collect.Lists
import org.apache.spark.internal.Logging
import org.apache.spark.sql.lakesoul.LakeSoulOptions
Expand Down Expand Up @@ -59,11 +59,11 @@ object DataOperation extends Logging {
for (file <- fileOps) {
file_arr_buf += DataFileInfo(
partition_info.range_value,
file.getPath(),
file.getFileOp(),
file.getSize(),
metaDataCommitInfo.getTimestamp(),
file.getFileExistCols()
file.getPath,
file.getFileOp.name,
file.getSize,
metaDataCommitInfo.getTimestamp,
file.getFileExistCols
)
}
}
Expand All @@ -72,7 +72,7 @@ object DataOperation extends Logging {
if (file_arr_buf(i).file_op.equals("del")) {
dupCheck.add(file_arr_buf(i).path)
} else {
if (dupCheck.size == 0 || !dupCheck.contains(file_arr_buf(i).path)) {
if (dupCheck.isEmpty || !dupCheck.contains(file_arr_buf(i).path)) {
file_res_arr_buf += file_arr_buf(i)
}
}
Expand All @@ -93,11 +93,11 @@ object DataOperation extends Logging {
fileOps.foreach(file => {
file_arr_buf += DataFileInfo(
data_commit_info.getPartitionDesc,
file.getPath(),
file.getFileOp(),
file.getSize(),
data_commit_info.getTimestamp(),
file.getFileExistCols()
file.getPath,
file.getFileOp.name,
file.getSize,
data_commit_info.getTimestamp,
file.getFileExistCols
)
})
})
Expand All @@ -107,7 +107,7 @@ object DataOperation extends Logging {
if (file_arr_buf(i).file_op.equals("del")) {
dupCheck.add(file_arr_buf(i).path)
} else {
if (dupCheck.size == 0 || !dupCheck.contains(file_arr_buf(i).path)) {
if (dupCheck.isEmpty || !dupCheck.contains(file_arr_buf(i).path)) {
file_res_arr_buf += file_arr_buf(i)
}
}
Expand Down Expand Up @@ -187,11 +187,11 @@ object DataOperation extends Logging {
fileOps.foreach(file => {
file_arr_buf += DataFileInfo(
data_commit_info.getPartitionDesc,
file.getPath(),
file.getFileOp(),
file.getSize(),
data_commit_info.getTimestamp(),
file.getFileExistCols()
file.getPath,
file.getFileOp.name,
file.getSize,
data_commit_info.getTimestamp,
file.getFileExistCols
)
})
})
Expand All @@ -201,7 +201,7 @@ object DataOperation extends Logging {
if (file_arr_buf(i).file_op.equals("del")) {
dupCheck.add(file_arr_buf(i).path)
} else {
if (dupCheck.size == 0 || !dupCheck.contains(file_arr_buf(i).path)) {
if (dupCheck.isEmpty || !dupCheck.contains(file_arr_buf(i).path)) {
file_res_arr_buf += file_arr_buf(i)
}
}
Expand All @@ -225,7 +225,7 @@ object DataOperation extends Logging {
modification_time: Long): Unit = {
val dataFileInfo = DataFileOp.newBuilder
dataFileInfo.setPath(file_path)
dataFileInfo.setFileOp(file_op)
dataFileInfo.setFileOp(FileOp.valueOf(file_op))
dataFileInfo.setSize(size)
dataFileInfo.setFileExistCols(file_exist_cols)
val file_arr_buf = new ArrayBuffer[DataFileOp]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.dmetasoul.lakesoul.meta

import com.alibaba.fastjson.JSONObject
import com.dmetasoul.lakesoul.meta.entity.FileOp
import org.apache.spark.internal.Logging
import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors
import org.apache.spark.sql.lakesoul.utils._
Expand Down Expand Up @@ -112,7 +113,7 @@ object MetaCommit extends Logging {
for (file_info <- dataCommitInfo.file_ops) {
val metaDataFileInfo = entity.DataFileOp.newBuilder
metaDataFileInfo.setPath(file_info.path)
metaDataFileInfo.setFileOp(file_info.file_op)
metaDataFileInfo.setFileOp(FileOp.valueOf(file_info.file_op))
metaDataFileInfo.setSize(file_info.size)
metaDataFileInfo.setFileExistCols(file_info.file_exist_cols)
fileOps.add(metaDataFileInfo.build)
Expand Down
74 changes: 71 additions & 3 deletions native-metadata/proto/src/entity.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,77 +5,145 @@ package proto.entity;
option java_multiple_files = true;
option java_package = "com.dmetasoul.lakesoul.meta.entity";

// A collection of PartitionInfo of one TableInfo
message MetaInfo {
repeated PartitionInfo listPartition = 1;
TableInfo tableInfo = 2;
repeated PartitionInfo readPartitionInfo = 3;
}


// Meta Information for LakeSoul Table
message TableInfo {
// Global unique identifier of table
string tableId = 1;
// Namespace of table. A string of 'tableNamespace.tablePath' or 'tableNamespace.tableName' maps one unique table globally
string tableNamespace = 2;
// Name of table, optional
string tableName = 3;
// Physical qualified path of table
string tablePath = 4;
// Spark-formatted schema of table
string tableSchema = 5;
// Properties of table, used to tag table with information not tracked by SQL
string properties = 7;
// Partition columns of table. Format of partitions is 'comma_separated_range_column;hash_column'
string partitions = 8;
// Domain this entry belongs to.
// Only when rbac feature enabled will have contents different to 'public'
string domain = 9;
}

// Version information for specific table range partitions
message PartitionInfo {
// TableId of PartitionInfo
string tableId = 1;
// Range partition description, which defines a specific range partition of the table, in the formatted of comma-separated range_colum=range_value
// Especially, a table without range partitions use LAKESOUL_NON_PARTITION_TABLE_PART_DESC as partitionDesc
string partitionDesc = 2;
// A version is defined as a number monotonically increasing by 1
int32 version = 3;
// Specific operation of the version information
CommitOp commitOp = 4;
// Timestamp of the PartitionInfo successfully committed
int64 timestamp = 5;
// Collection of commitId of DataCommitInfo included with PartitionInfo
repeated string snapshot = 7;
// Expression used to calculate or filter data, will be launched in the future. Now it's just a meaningless empty string.
string expression = 8;
// Domain this entry belongs to.
// Only when rbac feature enabled will have contents different to 'public'
string domain = 9;
}

// Namespace of tables
message Namespace {
// Dot-separated-formatted namespace
string namespace = 1;
string properties = 2;
string comment = 3;
// Domain this entry belongs to.
// Only when rbac feature enabled will have contents different to 'public'
string domain = 4;
}


// Set of {AppendCommit, CompactionCommit, UpdateCommit, MergeCommit}, which define the specific operation of this DataCommit
enum CommitOp {
// CompactionCommit: A commit type indicates that this DataCommit is to compact files in a specific table range partition
CompactionCommit = 0;
// AppendCommit: A commit type indicates that this DataCommit is to append files to a specific table range partition without hash partition
AppendCommit = 1;
// MergeCommit: A commit type indicates that this DataCommit is to append files to a specific table range partition with hash partition
MergeCommit = 2;
// UpdateCommit: A commit type indicates that this DataCommit is to update data(add new files and delete old invalid files) in a specific table range partition
UpdateCommit = 3;
// DeleteCommit: A commit type indicates that this DataCommit is to delete files in a specific table range partition
DeleteCommit = 4;
}

// Set of {add, delete}, which define the specific operation of this file
enum FileOp {
// add: indicates that the parquet file is newly added
add = 0;
// del: indicates that the parquet file has been deleted
del = 1;
}

// Singleton Data File information
message DataFileOp {
// Physical qualified path of a parquet file
string path = 1;
string fileOp = 2;
// Specific operation of this file
FileOp fileOp = 2;
// File size of byte-unit
int64 size = 3;
// Columns included with this parquet file, which should be equivalent of the meta of parquet file
string fileExistCols = 4;
}

// Data Files Commit information for specific table range partitions
message DataCommitInfo {
// TableId of DataCommit
string tableId = 1;
// Range partition description, which defines a specific range partition of the table, in the formatted of comma-separated range_colum=range_value
// Especially, a table without range partitions use LAKESOUL_NON_PARTITION_TABLE_PART_DESC as partitionDesc
string partitionDesc = 2;
// Global unique identifier of DataCommit
string commitId = 3;
// Collection of DataFileOps included with DataCommit
repeated DataFileOp fileOps = 4;
// Specific operation of this DataCommit
CommitOp commitOp = 5;
// Creation timestamp of the files of the DataCommit
int64 timestamp = 6;
bool committed = 8;
string domain = 9;
// mark define if this DataCommit has already committed as PartitionInfo of table
bool committed = 7;
string domain = 8;
}

// Relationship between 'TableNamespace.TableName' and TableId
message TableNameId {
// Name of table
string tableName = 1;
// Global unique identifier of table
string tableId = 2;
// Namespace of table
string tableNamespace = 3;
// Domain this entry belongs to.
// Only when rbac feature enabled will have contents different to 'public'
string domain = 4;
}

// Relationship between 'TableNamespace.TablePath' and TableId
message TablePathId {
// Physical qualified path of table
string tablePath = 1;
// Global unique identifier of table
string tableId = 2;
// Namespace of table
string tableNamespace = 3;
// Domain this entry belongs to.
// Only when rbac feature enabled will have contents different to 'public'
string domain = 4;
}

0 comments on commit 5b8b6ad

Please sign in to comment.