Skip to content

Commit

Permalink
optimize transactionInsert
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <huazeng@dmetasoul.com>
  • Loading branch information
zenghua committed May 30, 2024
1 parent 5ffbe04 commit c952936
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ public boolean commitData(MetaInfo metaInfo, boolean changeSchema, CommitOp comm
} else {
if (middleCommitOps.contains(CommitOp.UpdateCommit) || middleCommitOps.contains(CommitOp.CompactionCommit)) {
partitionDescList.remove(partitionDesc);
snapshotList.removeAll(partitionInfo.getSnapshotList());
snapshotList.removeAll(partitionInfo.getSnapshotList().stream().map(uuid -> DBUtil.toJavaUUID(uuid).toString()).collect(Collectors.toList()));
continue;
}
curPartitionInfo = updateSubmitPartitionSnapshot(partitionInfo, curPartitionInfo, readPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,38 @@ public static String protoUuidToJniString(Uuid uuid) {
sb.append(low);
return sb.toString();
}

public static class Timer {

private final String name;
private long totalCost;
private long times;

private long startTimeOnce;

public Timer(String name) {
this.name = name;
initTimer();
}

public void initTimer() {
totalCost = 0L;
times = 0L;
startTimeOnce = 0L;
}

public void start() {
startTimeOnce = System.currentTimeMillis();
}

public void end() {
times += 1;
totalCost += System.currentTimeMillis() - startTimeOnce;
}

public void report() {
System.out.printf("Timer %s: totalCost=%d, times=%d, avgCost=%.3f\n", name, totalCost, times, (double) totalCost / times);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.stream.Collectors;

public class PartitionInfoDao {
final DBUtil.Timer transactionInsertTimer = new DBUtil.Timer("transactionInsert");

public void insert(PartitionInfo partitionInfo) {
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
Expand All @@ -38,50 +39,62 @@ public void insert(PartitionInfo partitionInfo) {
}

public boolean transactionInsert(List<PartitionInfo> partitionInfoList, List<String> snapshotList) {
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
if (partitionInfoList.isEmpty()) return true;
Integer count = NativeMetadataJavaClient.insert(
NativeUtils.CodedDaoType.TransactionInsertPartitionInfo,
JniWrapper.newBuilder().addAllPartitionInfo(partitionInfoList).build());
return count > 0;
}
boolean flag = true;
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = DBConnector.getConn();
pstmt = conn.prepareStatement("insert into partition_info (table_id, partition_desc, version, " +
"commit_op, snapshot, expression, domain) values (?, ?, ?, ? ,?, ?, ?)");
conn.setAutoCommit(false);
for (PartitionInfo partitionInfo : partitionInfoList) {
insertSinglePartitionInfo(conn, pstmt, partitionInfo);
}
pstmt = conn.prepareStatement("update data_commit_info set committed = 'true' where commit_id = ?");
for (String uuid : snapshotList) {
pstmt.setString(1, uuid);
pstmt.execute();
transactionInsertTimer.start();
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
if (partitionInfoList.isEmpty()) return true;
PartitionInfo snapshotContainer = PartitionInfo.newBuilder().addAllSnapshot(snapshotList.stream().map(s -> DBUtil.toProtoUuid(UUID.fromString(s))).collect(Collectors.toList())).build();

Integer count = NativeMetadataJavaClient.insert(
NativeUtils.CodedDaoType.TransactionInsertPartitionInfo,
JniWrapper.newBuilder()
.addAllPartitionInfo(partitionInfoList)
.addPartitionInfo(snapshotContainer)
.build());
return count > 0;
}
conn.commit();
} catch (SQLException e) {
flag = false;
boolean flag = true;
Connection conn = null;
PreparedStatement pstmt = null;
try {
if (conn != null) {
conn.rollback();
conn = DBConnector.getConn();
pstmt = conn.prepareStatement("insert into partition_info (table_id, partition_desc, version, " +
"commit_op, snapshot, expression, domain) values (?, ?, ?, ? ,?, ?, ?)");
conn.setAutoCommit(false);
for (PartitionInfo partitionInfo : partitionInfoList) {
insertSinglePartitionInfo(conn, pstmt, partitionInfo);
}
} catch (SQLException ex) {
ex.printStackTrace();
}
if (e.getMessage().contains("duplicate key value violates unique constraint")) {
// only when primary key conflicts could we ignore the exception
e.printStackTrace();
} else {
// throw exception in all other cases
throw new RuntimeException(e);
pstmt = conn.prepareStatement("update data_commit_info set committed = 'true' where commit_id = ?");
for (String uuid : snapshotList) {
pstmt.setString(1, uuid);
pstmt.execute();
}
conn.commit();
} catch (SQLException e) {
flag = false;
try {
if (conn != null) {
conn.rollback();
}
} catch (SQLException ex) {
ex.printStackTrace();
}
if (e.getMessage().contains("duplicate key value violates unique constraint")) {
// only when primary key conflicts could we ignore the exception
e.printStackTrace();
} else {
// throw exception in all other cases
throw new RuntimeException(e);
}
} finally {
DBConnector.closeConn(pstmt, conn);
}
return flag;

} finally {
DBConnector.closeConn(pstmt, conn);
transactionInsertTimer.end();
// transactionInsertTimer.report();
}
return flag;
}

private void insertSinglePartitionInfo(Connection conn, PreparedStatement pstmt, PartitionInfo partitionInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ public class LakeSoulArrowConnectorCase extends AbstractTestBase {
@Test
public void test() throws Exception {
int parallelism = 2;
StreamExecutionEnvironment execEnv = LakeSoulTestUtils.createStreamExecutionEnvironment(parallelism, 5000L, 5000L);
StreamExecutionEnvironment execEnv = LakeSoulTestUtils.createStreamExecutionEnvironment(parallelism, 2000L, 2000L);
StreamTableEnvironment tableEnv = LakeSoulTestUtils.createTableEnvInStreamingMode(
execEnv, parallelism);
DataStreamSource<LakeSoulArrowWrapper> source = execEnv.addSource(new MockLakeSoulArrowSource.MockSourceFunction(1000, 1000L));
DataStreamSource<LakeSoulArrowWrapper> source = execEnv.addSource(new MockLakeSoulArrowSource.MockSourceFunction(5000, 1000L));
String name = "Print Sink";
PrintSinkFunction<LakeSoulArrowWrapper> printFunction = new PrintSinkFunction<>(name, false);

Configuration conf = new Configuration();
conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, 1);
conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, parallelism);

LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context();
context.env = execEnv;
Expand Down
45 changes: 24 additions & 21 deletions rust/lakesoul-metadata/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,10 +950,11 @@ pub async fn execute_insert(
.await
}
DaoType::TransactionInsertPartitionInfo => {
let partition_info_list = wrapper.partition_info;
let mut partition_info_list = wrapper.partition_info.clone();
let snapshot_container = partition_info_list.pop().unwrap();
let result = {
let transaction = client.transaction().await?;
let prepared = transaction
let transaction_insert_statement = match transaction
.prepare(
"insert into partition_info(
table_id,
Expand All @@ -966,8 +967,16 @@ pub async fn execute_insert(
)
values($1::TEXT, $2::TEXT, $3::INT, $4::TEXT, $5::_UUID, $6::TEXT, $7::TEXT)",
)
.await;
let statement = match prepared {
.await
{
Ok(statement) => statement,
Err(e) => return Err(LakeSoulMetaDataError::from(e)),
};

let update_statement = match transaction
.prepare("update data_commit_info set committed = 'true' where commit_id = $1::UUID")
.await
{
Ok(statement) => statement,
Err(e) => return Err(LakeSoulMetaDataError::from(e)),
};
Expand All @@ -981,7 +990,7 @@ pub async fn execute_insert(

let result = transaction
.execute(
&statement,
&transaction_insert_statement,
&[
&partition_info.table_id,
&partition_info.partition_desc,
Expand All @@ -1001,22 +1010,16 @@ pub async fn execute_insert(
Err(e) => Err(LakeSoulMetaDataError::from(e)),
};
};

for uuid in &snapshot {
let result = transaction
.execute(
"update data_commit_info set committed = 'true' where commit_id = $1::UUID",
&[&uuid],
)
.await;

if let Some(e) = result.err() {
eprintln!("update committed error, err = {:?}", e);
return match transaction.rollback().await {
Ok(()) => Ok(0i32),
Err(e) => Err(LakeSoulMetaDataError::from(e)),
};
}
}
for _uuid in &snapshot_container.snapshot {
let uid = uuid::Uuid::from_u64_pair(_uuid.high, _uuid.low);
let result = transaction.execute(&update_statement, &[&uid]).await;
if let Some(e) = result.err() {
eprintln!("update committed error, err = {:?}", e);
return match transaction.rollback().await {
Ok(()) => Ok(0i32),
Err(e) => Err(LakeSoulMetaDataError::from(e)),
};
}
}
match transaction.commit().await {
Expand Down
3 changes: 2 additions & 1 deletion rust/lakesoul-metadata/src/metadata_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl MetaDataClient {

match commit_op {
CommitOp::AppendCommit | CommitOp::MergeCommit => {
let new_partition_list = meta_info
let mut new_partition_list = meta_info
.list_partition
.iter()
.map(|partition_info| {
Expand Down Expand Up @@ -356,6 +356,7 @@ impl MetaDataClient {
}
})
.collect::<Result<Vec<PartitionInfo>>>()?;
new_partition_list.push(PartitionInfo { ..Default::default() });
let val = self.transaction_insert_partition_info(new_partition_list).await?;
let vec = self.get_all_partition_info(table_info.table_id.as_str()).await?;
debug!("val = {val} ,get partition list after finished: {:?}", vec);
Expand Down

0 comments on commit c952936

Please sign in to comment.