Skip to content

Commit

Permalink
Add queryId to Action in SemiTransactionalHiveMetastore
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Mar 24, 2021
1 parent 01cdc47 commit 9b6fb64
Showing 1 changed file with 24 additions and 12 deletions.
Expand Up @@ -442,7 +442,7 @@ public synchronized void createTable(
TableAndMore tableAndMore = new TableAndMore(table, identity, Optional.of(principalPrivileges), currentPath, Optional.empty(), ignoreExisting, statistics, statistics);
if (oldTableAction == null) {
HdfsContext hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName());
tableActions.put(table.getSchemaTableName(), new Action<>(ActionType.ADD, tableAndMore, hdfsContext, identity));
tableActions.put(table.getSchemaTableName(), new Action<>(ActionType.ADD, tableAndMore, hdfsContext, identity, session.getQueryId()));
return;
}
switch (oldTableAction.getType()) {
Expand All @@ -451,7 +451,7 @@ public synchronized void createTable(
throw new TrinoException(TRANSACTION_CONFLICT, "Operation on the same table with different user in the same transaction is not supported");
}
HdfsContext hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName());
tableActions.put(table.getSchemaTableName(), new Action<>(ActionType.ALTER, tableAndMore, hdfsContext, identity));
tableActions.put(table.getSchemaTableName(), new Action<>(ActionType.ALTER, tableAndMore, hdfsContext, identity, session.getQueryId()));
return;

case ADD:
Expand All @@ -477,7 +477,7 @@ public synchronized void dropTable(ConnectorSession session, String databaseName
if (oldTableAction == null || oldTableAction.getType() == ActionType.ALTER) {
HdfsContext hdfsContext = new HdfsContext(session, databaseName, tableName);
HiveIdentity identity = new HiveIdentity(session);
tableActions.put(schemaTableName, new Action<>(ActionType.DROP, null, hdfsContext, identity));
tableActions.put(schemaTableName, new Action<>(ActionType.DROP, null, hdfsContext, identity, session.getQueryId()));
return;
}
switch (oldTableAction.getType()) {
Expand Down Expand Up @@ -571,7 +571,8 @@ public synchronized void finishInsertIntoExistingTable(
merge(currentStatistics, statisticsUpdate),
statisticsUpdate),
hdfsContext,
identity));
identity,
session.getQueryId()));
return;
}

Expand Down Expand Up @@ -653,7 +654,8 @@ public synchronized void finishRowLevelDelete(
Optional.of(currentLocation),
partitionAndStatementIds),
hdfsContext,
identity));
identity,
session.getQueryId()));
return;
}

Expand Down Expand Up @@ -702,7 +704,8 @@ public synchronized void finishUpdate(
Optional.of(currentLocation),
partitionAndStatementIds),
hdfsContext,
identity));
identity,
session.getQueryId()));
return;
}

Expand Down Expand Up @@ -895,7 +898,7 @@ public synchronized void addPartition(
if (oldPartitionAction == null) {
partitionActionsOfTable.put(
partition.getValues(),
new Action<>(ActionType.ADD, new PartitionAndMore(identity, partition, currentLocation, Optional.empty(), statistics, statistics), hdfsContext, identity));
new Action<>(ActionType.ADD, new PartitionAndMore(identity, partition, currentLocation, Optional.empty(), statistics, statistics), hdfsContext, identity, session.getQueryId()));
return;
}
switch (oldPartitionAction.getType()) {
Expand All @@ -906,7 +909,7 @@ public synchronized void addPartition(
}
partitionActionsOfTable.put(
partition.getValues(),
new Action<>(ActionType.ALTER, new PartitionAndMore(identity, partition, currentLocation, Optional.empty(), statistics, statistics), hdfsContext, identity));
new Action<>(ActionType.ALTER, new PartitionAndMore(identity, partition, currentLocation, Optional.empty(), statistics, statistics), hdfsContext, identity, session.getQueryId()));
return;
case ADD:
case ALTER:
Expand All @@ -927,10 +930,10 @@ public synchronized void dropPartition(ConnectorSession session, String database
HdfsContext hdfsContext = new HdfsContext(session, databaseName, tableName);
HiveIdentity identity = new HiveIdentity(session);
if (deleteData) {
partitionActionsOfTable.put(partitionValues, new Action<>(ActionType.DROP, null, hdfsContext, identity));
partitionActionsOfTable.put(partitionValues, new Action<>(ActionType.DROP, null, hdfsContext, identity, session.getQueryId()));
}
else {
partitionActionsOfTable.put(partitionValues, new Action<>(ActionType.DROP_PRESERVE_DATA, null, hdfsContext, identity));
partitionActionsOfTable.put(partitionValues, new Action<>(ActionType.DROP_PRESERVE_DATA, null, hdfsContext, identity, session.getQueryId()));
}
return;
}
Expand Down Expand Up @@ -985,7 +988,8 @@ public synchronized void finishInsertIntoExistingPartition(
merge(currentStatistics, statisticsUpdate),
statisticsUpdate),
context,
identity));
identity,
session.getQueryId()));
return;
}

Expand Down Expand Up @@ -2598,8 +2602,9 @@ public static class Action<T>
private final T data;
private final HdfsContext hdfsContext;
private final HiveIdentity identity;
private final String queryId;

public Action(ActionType type, T data, HdfsContext hdfsContext, HiveIdentity identity)
public Action(ActionType type, T data, HdfsContext hdfsContext, HiveIdentity identity, String queryId)
{
this.type = requireNonNull(type, "type is null");
if (type == ActionType.DROP || type == ActionType.DROP_PRESERVE_DATA) {
Expand All @@ -2611,6 +2616,7 @@ public Action(ActionType type, T data, HdfsContext hdfsContext, HiveIdentity ide
this.data = data;
this.hdfsContext = requireNonNull(hdfsContext, "hdfsContext is null");
this.identity = requireNonNull(identity, "identity is null");
this.queryId = requireNonNull(queryId, "queryId is null");
}

public ActionType getType()
Expand All @@ -2629,6 +2635,11 @@ public HdfsContext getHdfsContext()
return hdfsContext;
}

public String getQueryId()
{
return queryId;
}

public HiveIdentity getIdentity()
{
return identity;
Expand All @@ -2639,6 +2650,7 @@ public String toString()
{
return toStringHelper(this)
.add("type", type)
.add("queryId", queryId)
.add("data", data)
.toString();
}
Expand Down

0 comments on commit 9b6fb64

Please sign in to comment.