Skip to content

Commit

Permalink
HIVE-20009 : Fix runtime stats for merge statement (Zoltan Haindrich …
Browse files Browse the repository at this point in the history
…via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <hashutosh@apache.org>
  • Loading branch information
kgyrtkirk authored and ashutoshc committed Jun 30, 2018
1 parent 8f57e25 commit 78cbf14
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 4 deletions.
1 change: 1 addition & 0 deletions itests/src/test/resources/testconfiguration.properties
Expand Up @@ -600,6 +600,7 @@ minillaplocal.query.files=\
partition_pruning.q,\
ptf.q,\
ptf_streaming.q,\
runtime_stats_merge.q,\
quotedid_smb.q,\
resourceplan.q,\
results_cache_1.q,\
Expand Down
4 changes: 4 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/Context.java
Expand Up @@ -1093,6 +1093,10 @@ public String getExecutionId() {
return executionId;
}

public void setPlanMapper(PlanMapper planMapper) {
this.planMapper = planMapper;
}

public PlanMapper getPlanMapper() {
return planMapper;
}
Expand Down
Expand Up @@ -536,6 +536,8 @@ private ReparseResult parseRewrittenQuery(StringBuilder rewrittenQueryStr, Strin
}
rewrittenCtx.setExplainConfig(ctx.getExplainConfig());
rewrittenCtx.setExplainPlan(ctx.isExplainPlan());
rewrittenCtx.setStatsSource(ctx.getStatsSource());
rewrittenCtx.setPlanMapper(ctx.getPlanMapper());
rewrittenCtx.setIsUpdateDeleteMerge(true);
rewrittenCtx.setCmd(rewrittenQueryStr.toString());

Expand Down Expand Up @@ -770,7 +772,7 @@ private static String normalizeColName(String colName) {

/**
* This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it.
* Since HiveLexer.g is written such that it strips away any ` (back ticks) around
* Since HiveLexer.g is written such that it strips away any ` (back ticks) around
* quoted identifiers we need to add those back to generated SQL.
* Additionally, the parser only produces tokens of type Identifier and never
* QuotedIdentifier (HIVE-6013). So here we just quote all identifiers.
Expand Down Expand Up @@ -808,7 +810,7 @@ private void visit(ASTNode n) {
/**
* This allows us to take an arbitrary ASTNode and turn it back into SQL that produced it without
* needing to understand what it is (except for QuotedIdentifiers)
*
*
*/
private String getMatchedText(ASTNode n) {
quotedIdenfierHelper.visit(n);
Expand Down Expand Up @@ -1096,10 +1098,10 @@ private boolean handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTN
.append("\n SELECT cardinality_violation(")
.append(getSimpleTableName(target)).append(".ROW__ID");
addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);

rewrittenQueryStr.append(")\n WHERE ").append(onClauseAsString)
.append(" GROUP BY ").append(getSimpleTableName(target)).append(".ROW__ID");

addPartitionColsToSelect(targetTable.getPartCols(), rewrittenQueryStr, target);

rewrittenQueryStr.append(" HAVING count(*) > 1");
Expand Down
41 changes: 41 additions & 0 deletions ql/src/test/queries/clientpositive/runtime_stats_merge.q
@@ -0,0 +1,41 @@

set hive.mapred.mode=nonstrict;
set hive.security.authorization.manager=org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactoryForTest;
set hive.support.concurrency=true;
set hive.explain.user=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;


set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=88888888;
-- set hive.auto.convert.sortmerge.join=true;
-- set hive.auto.convert.sortmerge.join.to.mapjoin=true;

create table lineitem (L_ORDERKEY integer);

insert into lineitem values (1),(2),(3);

create table lineitem2
stored as orc TBLPROPERTIES ('transactional'='true')
as select * from lineitem;
create table lineitem_stage
stored as orc TBLPROPERTIES ('transactional'='true')
as select * from lineitem limit 1;


analyze table lineitem2 compute statistics for columns;
analyze table lineitem_stage compute statistics for columns;

explain reoptimization
merge into lineitem2 using
(select * from lineitem_stage) sub
on sub.L_ORDERKEY = lineitem2.L_ORDERKEY
when matched then delete;

merge into lineitem2 using
(select * from lineitem_stage) sub
on sub.L_ORDERKEY = lineitem2.L_ORDERKEY
when matched then delete;


194 changes: 194 additions & 0 deletions ql/src/test/results/clientpositive/llap/runtime_stats_merge.q.out
@@ -0,0 +1,194 @@
PREHOOK: query: create table lineitem (L_ORDERKEY integer)
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@lineitem
POSTHOOK: query: create table lineitem (L_ORDERKEY integer)
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@lineitem
PREHOOK: query: insert into lineitem values (1),(2),(3)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@lineitem
POSTHOOK: query: insert into lineitem values (1),(2),(3)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@lineitem
POSTHOOK: Lineage: lineitem.l_orderkey SCRIPT []
PREHOOK: query: create table lineitem2
stored as orc TBLPROPERTIES ('transactional'='true')
as select * from lineitem
PREHOOK: type: CREATETABLE_AS_SELECT
PREHOOK: Input: default@lineitem
PREHOOK: Output: database:default
PREHOOK: Output: default@lineitem2
POSTHOOK: query: create table lineitem2
stored as orc TBLPROPERTIES ('transactional'='true')
as select * from lineitem
POSTHOOK: type: CREATETABLE_AS_SELECT
POSTHOOK: Input: default@lineitem
POSTHOOK: Output: database:default
POSTHOOK: Output: default@lineitem2
POSTHOOK: Lineage: lineitem2.l_orderkey SIMPLE [(lineitem)lineitem.FieldSchema(name:l_orderkey, type:int, comment:null), ]
PREHOOK: query: create table lineitem_stage
stored as orc TBLPROPERTIES ('transactional'='true')
as select * from lineitem limit 1
PREHOOK: type: CREATETABLE_AS_SELECT
PREHOOK: Input: default@lineitem
PREHOOK: Output: database:default
PREHOOK: Output: default@lineitem_stage
POSTHOOK: query: create table lineitem_stage
stored as orc TBLPROPERTIES ('transactional'='true')
as select * from lineitem limit 1
POSTHOOK: type: CREATETABLE_AS_SELECT
POSTHOOK: Input: default@lineitem
POSTHOOK: Output: database:default
POSTHOOK: Output: default@lineitem_stage
POSTHOOK: Lineage: lineitem_stage.l_orderkey SIMPLE [(lineitem)lineitem.FieldSchema(name:l_orderkey, type:int, comment:null), ]
PREHOOK: query: analyze table lineitem2 compute statistics for columns
PREHOOK: type: ANALYZE_TABLE
PREHOOK: Input: default@lineitem2
PREHOOK: Output: default@lineitem2
#### A masked pattern was here ####
POSTHOOK: query: analyze table lineitem2 compute statistics for columns
POSTHOOK: type: ANALYZE_TABLE
POSTHOOK: Input: default@lineitem2
POSTHOOK: Output: default@lineitem2
#### A masked pattern was here ####
PREHOOK: query: analyze table lineitem_stage compute statistics for columns
PREHOOK: type: ANALYZE_TABLE
PREHOOK: Input: default@lineitem_stage
PREHOOK: Output: default@lineitem_stage
#### A masked pattern was here ####
POSTHOOK: query: analyze table lineitem_stage compute statistics for columns
POSTHOOK: type: ANALYZE_TABLE
POSTHOOK: Input: default@lineitem_stage
POSTHOOK: Output: default@lineitem_stage
#### A masked pattern was here ####
PREHOOK: query: explain reoptimization
merge into lineitem2 using
(select * from lineitem_stage) sub
on sub.L_ORDERKEY = lineitem2.L_ORDERKEY
when matched then delete
PREHOOK: type: QUERY
PREHOOK: Input: default@lineitem2
PREHOOK: Input: default@lineitem_stage
PREHOOK: Output: default@lineitem2
PREHOOK: Output: default@merge_tmp_table
POSTHOOK: query: explain reoptimization
merge into lineitem2 using
(select * from lineitem_stage) sub
on sub.L_ORDERKEY = lineitem2.L_ORDERKEY
when matched then delete
POSTHOOK: type: QUERY
POSTHOOK: Input: default@lineitem2
POSTHOOK: Input: default@lineitem_stage
POSTHOOK: Output: default@lineitem2
POSTHOOK: Output: default@merge_tmp_table
POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(lineitem2)lineitem2.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), ]
PREHOOK: query: explain reoptimization
merge into lineitem2 using
(select * from lineitem_stage) sub
on sub.L_ORDERKEY = lineitem2.L_ORDERKEY
when matched then delete
PREHOOK: type: QUERY
POSTHOOK: query: explain reoptimization
merge into lineitem2 using
(select * from lineitem_stage) sub
on sub.L_ORDERKEY = lineitem2.L_ORDERKEY
when matched then delete
POSTHOOK: type: QUERY
POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(lineitem2)lineitem2.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), ]
Vertex dependency in root stage
Map 2 <- Map 1 (BROADCAST_EDGE)
Reducer 3 <- Map 2 (SIMPLE_EDGE)
Reducer 4 <- Map 2 (SIMPLE_EDGE)

Stage-4
Stats Work{}
Stage-0
Move Operator
table:{"name:":"default.lineitem2"}
Stage-3
Dependency Collection{}
Stage-2
Reducer 3 vectorized, llap
File Output Operator [FS_61]
table:{"name:":"default.lineitem2"}
Select Operator [SEL_60] (runtime: rows=1 width=76)
Output:["_col0"]
<-Map 2 [SIMPLE_EDGE] llap
SHUFFLE [RS_10]
PartitionCols:UDFToInteger(_col0)
Select Operator [SEL_9] (runtime: rows=1 width=76)
Output:["_col0"]
Filter Operator [FIL_32] (runtime: rows=1 width=84)
predicate:(_col4 = _col0)
Map Join Operator [MAPJOIN_48] (runtime: rows=1 width=84)
Conds:FIL_36.l_orderkey=RS_52._col0(Inner),Output:["_col0","_col3","_col4"]
<-Map 1 [BROADCAST_EDGE] vectorized, llap
BROADCAST [RS_52]
PartitionCols:_col0
Select Operator [SEL_51] (runtime: rows=1 width=4)
Output:["_col0"]
Filter Operator [FIL_50] (runtime: rows=1 width=4)
predicate:l_orderkey is not null
TableScan [TS_0] (runtime: rows=1 width=4)
default@lineitem_stage,lineitem_stage, ACID table,Tbl:COMPLETE,Col:COMPLETE,Output:["l_orderkey"]
<-Filter Operator [FIL_36] (runtime: rows=3 width=4)
predicate:l_orderkey is not null
TableScan [TS_2] (runtime: rows=3 width=4)
default@lineitem2,lineitem2, ACID table,Tbl:COMPLETE,Col:COMPLETE,Output:["l_orderkey"]
Reducer 4 llap
File Output Operator [FS_22]
table:{"name:":"default.merge_tmp_table"}
Select Operator [SEL_21] (runtime: rows=0 width=-1)
Output:["_col0"]
Filter Operator [FIL_33] (runtime: rows=0 width=-1)
predicate:(_col1 > 1L)
Group By Operator [GBY_19] (runtime: rows=1 width=84)
Output:["_col0","_col1"],aggregations:["count(VALUE._col0)"],keys:KEY._col0
<-Map 2 [SIMPLE_EDGE] llap
SHUFFLE [RS_18]
PartitionCols:_col0
Group By Operator [GBY_17] (runtime: rows=1 width=84)
Output:["_col0","_col1"],aggregations:["count()"],keys:_col3
Select Operator [SEL_16] (runtime: rows=1 width=84)
Output:["_col3"]
Filter Operator [FIL_34] (runtime: rows=1 width=84)
predicate:(_col4 = _col0)
Please refer to the previous Map Join Operator [MAPJOIN_48]
File Output Operator [FS_29]
Select Operator [SEL_28] (runtime: rows=1 width=424)
Output:["_col0"]
Group By Operator [GBY_27] (runtime: rows=1 width=424)
Output:["_col0"],aggregations:["compute_stats(val, 'hll')"]
Select Operator [SEL_24] (runtime: rows=0 width=-1)
Output:["val"]
Please refer to the previous Select Operator [SEL_21]
Stage-5
Stats Work{}
Stage-1
Move Operator
table:{"name:":"default.merge_tmp_table"}
Please refer to the previous Stage-3

PREHOOK: query: merge into lineitem2 using
(select * from lineitem_stage) sub
on sub.L_ORDERKEY = lineitem2.L_ORDERKEY
when matched then delete
PREHOOK: type: QUERY
PREHOOK: Input: default@lineitem2
PREHOOK: Input: default@lineitem_stage
PREHOOK: Output: default@lineitem2
PREHOOK: Output: default@merge_tmp_table
POSTHOOK: query: merge into lineitem2 using
(select * from lineitem_stage) sub
on sub.L_ORDERKEY = lineitem2.L_ORDERKEY
when matched then delete
POSTHOOK: type: QUERY
POSTHOOK: Input: default@lineitem2
POSTHOOK: Input: default@lineitem_stage
POSTHOOK: Output: default@lineitem2
POSTHOOK: Output: default@merge_tmp_table
POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(lineitem2)lineitem2.FieldSchema(name:ROW__ID, type:struct<writeId:bigint,bucketId:int,rowId:bigint>, comment:), ]

0 comments on commit 78cbf14

Please sign in to comment.