Skip to content

Commit 4f70bfe

Browse files
author
Harish Butani
committed
refactor PTF ifc;
- clearer separation of duties between Resolver and Evaluator. - document ifc. - provide a PItr to execute method. - linking to LeadLag done in Executor. WindowTblFunc: - hold onto WindowFnDefs - QDeserializer, no special handling for WindowTblFunc
1 parent 17ac585 commit 4f70bfe

18 files changed

+539
-287
lines changed

windowing/src/main/java/com/sap/hadoop/windowing/functions2/TableFunctionEvaluator.java

Lines changed: 73 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -10,127 +10,136 @@
1010
import com.sap.hadoop.windowing.query2.definition.QueryDef;
1111
import com.sap.hadoop.windowing.query2.definition.TableFuncDef;
1212
import com.sap.hadoop.windowing.runtime2.Partition;
13+
import com.sap.hadoop.windowing.runtime2.PartitionIterator;
14+
import com.sap.hadoop.windowing.runtime2.RuntimeUtils;
1315

1416
/**
15-
* Based on Hive {@link GenericUDAFEvaluator}. Break up the responsibility of the old AsbtractTableFunction class into a Resolver and Evaluator.
16-
*
17+
* Based on Hive {@link GenericUDAFEvaluator}. Break up the responsibility of the old AsbtractTableFunction
18+
* class into a Resolver and Evaluator.
19+
* <p>
20+
* The Evaluator also holds onto the {@link TableFunctionDef}. This provides information
21+
* about the arguments to the function, the shape of the Input partition and the Partitioning details.
22+
* The Evaluator is responsible for providing the 2 execute methods:
23+
* <ol>
24+
* <li><b>execute:</b> which is invoked after the input is partitioned; the contract
25+
* is, it is given an input Partition and must return an output Partition. The shape of the output
26+
* Partition is obtained from the getOutputOI call.
27+
* <li><b>transformRawInput:</b> In the case where this function indicates that it will transform the raw input
28+
* before it is fed through the partitioning mechanics, this function is called. Again the contract is
29+
* t is given an input Partition and must return an Partition. The shape of the output Partition is
30+
* obtained from getRawInputOI() call.
31+
* </ol>
32+
33+
* </ol>
1734
*
1835
*/
1936
public abstract class TableFunctionEvaluator
2037
{
21-
protected TableFunctionResolver resolver;
2238
transient protected StructObjectInspector OI;
23-
transient protected StructObjectInspector mapOI;
39+
transient protected StructObjectInspector rawInputOI;
2440
protected TableFuncDef tDef;
2541
protected QueryDef qDef;
2642
String partitionClass;
2743
int partitionMemSize;
28-
44+
boolean transformsRawInput;
45+
2946
static{
3047
SerializationUtils.makeTransient(TableFunctionEvaluator.class, "OI");
31-
SerializationUtils.makeTransient(TableFunctionEvaluator.class, "mapOI");
48+
SerializationUtils.makeTransient(TableFunctionEvaluator.class, "rawInputOI");
3249
}
3350

34-
35-
public TableFunctionResolver getResolver()
36-
{
37-
return resolver;
38-
}
3951

40-
public void setResolver(TableFunctionResolver resolver)
41-
{
42-
this.resolver = resolver;
43-
}
44-
4552
public StructObjectInspector getOutputOI()
4653
{
4754
return OI;
4855
}
4956

50-
public void setTableDef(TableFuncDef tDef)
57+
protected void setOutputOI(StructObjectInspector outputOI)
5158
{
52-
this.tDef = tDef;
53-
}
54-
55-
public void setQueryDef(QueryDef qDef)
56-
{
57-
this.qDef = qDef;
59+
OI = outputOI;
5860
}
5961

60-
public void setPartitionClass(String partitionClass)
62+
public TableFuncDef getTableDef()
6163
{
62-
this.partitionClass = partitionClass;
64+
return tDef;
6365
}
6466

65-
public void setPartitionMemSize(int partitionMemSize)
67+
public void setTableDef(TableFuncDef tDef)
6668
{
67-
this.partitionMemSize = partitionMemSize;
69+
this.tDef = tDef;
6870
}
6971

70-
public StructObjectInspector getMapOutputOI()
71-
{
72-
return mapOI;
73-
}
74-
75-
public TableFuncDef getTableDef()
76-
{
77-
return tDef;
78-
}
79-
80-
public QueryDef getQueryDef()
72+
protected QueryDef getQueryDef()
8173
{
8274
return qDef;
8375
}
84-
85-
public abstract void setupOI() throws WindowingException;
86-
87-
public void setupMapOI() throws WindowingException
76+
77+
protected void setQueryDef(QueryDef qDef)
8878
{
89-
if (!resolver.hasMapPhase())
90-
{
91-
return;
92-
}
93-
mapOI = OI;
79+
this.qDef = qDef;
9480
}
95-
81+
9682
public String getPartitionClass()
9783
{
9884
return partitionClass;
9985
}
10086

87+
public void setPartitionClass(String partitionClass)
88+
{
89+
this.partitionClass = partitionClass;
90+
}
91+
10192
public int getPartitionMemSize()
10293
{
10394
return partitionMemSize;
10495
}
105-
106-
public boolean hasMapPhase() throws WindowingException
96+
97+
public void setPartitionMemSize(int partitionMemSize)
10798
{
108-
resolver.setHasMapPhase();
109-
return resolver.hasMapPhase();
99+
this.partitionMemSize = partitionMemSize;
110100
}
111-
112-
public Partition execute(Partition iPart) throws WindowingException
101+
102+
public StructObjectInspector getRawInputOI()
113103
{
114-
Partition outP = new Partition(getPartitionClass(), getPartitionMemSize(), tDef.getSerde(), OI);
115-
execute(iPart, outP);
116-
return outP;
104+
return rawInputOI;
117105
}
118106

119-
protected void execute(Partition iPart, Partition oPart) throws WindowingException
107+
protected void setRawInputOI(StructObjectInspector rawInputOI)
120108
{
121-
109+
this.rawInputOI = rawInputOI;
110+
}
111+
112+
public boolean isTransformsRawInput() {
113+
return transformsRawInput;
114+
}
115+
116+
public void setTransformsRawInput(boolean transformsRawInput) {
117+
this.transformsRawInput = transformsRawInput;
118+
}
119+
120+
public Partition execute(Partition iPart)
121+
throws WindowingException
122+
{
123+
PartitionIterator<Object> pItr = iPart.iterator();
124+
RuntimeUtils.connectLeadLagFunctionsToPartition(qDef, pItr);
125+
Partition outP = new Partition(getPartitionClass(),
126+
getPartitionMemSize(), tDef.getSerde(), OI);
127+
execute(pItr, outP);
128+
return outP;
122129
}
123130

124-
public Partition mapExecute(Partition iPart) throws WindowingException
131+
protected abstract void execute(PartitionIterator<Object> pItr, Partition oPart) throws WindowingException;
132+
133+
public Partition transformRawInput(Partition iPart) throws WindowingException
125134
{
126-
if ( !resolver.hasMapPhase())
135+
if ( !isTransformsRawInput())
127136
{
128137
throw new WindowingException(sprintf("Internal Error: mapExecute called on function (%s)that has no Map Phase", tDef.getName()));
129138
}
130-
return _mapExecute(iPart);
139+
return _transformRawInput(iPart);
131140
}
132-
133-
protected Partition _mapExecute(Partition iPart) throws WindowingException
141+
142+
protected Partition _transformRawInput(Partition iPart) throws WindowingException
134143
{
135144
return null;
136145
}

windowing/src/main/java/com/sap/hadoop/windowing/functions2/TableFunctionResolver.java

Lines changed: 108 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.apache.hadoop.hive.conf.HiveConf;
44
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
5+
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
56

67
import com.sap.hadoop.ds.list.ByteBasedList;
78
import com.sap.hadoop.windowing.Constants;
@@ -11,41 +12,129 @@
1112

1213
/**
1314
* Based on Hive {@link GenericUDAFResolver}. Break up the responsibility of the
14-
* old AsbtractTableFunction class into a Resolver and Evaluator.
15-
*
16-
*
15+
* old AbstractTableFunction class into a Resolver and Evaluator.
16+
* The Resolver is responsible for:
17+
* <ol>
18+
* <li> setting up the {@link tableFunctionEvaluator}
19+
* <li> Setting up the The raw and output ObjectInspectors of the Evaluator.
20+
* <li> The Evaluator also holds onto the {@link TableFunctionDef}. This provides information
21+
* about the arguments to the function, the shape of the Input partition and the Partitioning details.
22+
* </ol>
23+
* The Resolver for a function is obtained from the {@link FunctionRegistry}. The Resolver is initialized
24+
* by the following 4 step process:
25+
* <ol>
26+
* <li> The initialize method is called; which is passed the {@link QueryDef} and the {@link TableFunctionDef}.
27+
* <li> The resolver is then asked to setup the Raw ObjectInspector. This is only required if the Function reshapes
28+
* the raw input.
29+
* <li> Once the Resolver has had a chance to compute the shape of the Raw Input that is fed to the partitioning
30+
* machinery; the translator sets up the partitioning details on the tableFuncDef.
31+
* <li> finally the resolver is asked to setup the output ObjectInspector.
32+
* </ol>
1733
*/
1834
@SuppressWarnings("deprecation")
19-
public abstract class TableFunctionResolver {
20-
protected boolean hasMapPhase;
35+
public abstract class TableFunctionResolver
36+
{
37+
TableFunctionEvaluator evaluator;
38+
QueryDef qDef;
2139

22-
public TableFunctionEvaluator initialize(QueryDef qDef, TableFuncDef tDef)
23-
throws WindowingException {
40+
/*
41+
* - called during translation.
42+
* - invokes createEvaluator which must be implemented by a subclass
43+
* - sets up the evaluator with references to the TableDef, PartitionClass, PartitonMemsize and
44+
* the transformsRawInput boolean.
45+
*/
46+
public void initialize(QueryDef qDef, TableFuncDef tDef)
47+
throws WindowingException
48+
{
49+
this.qDef = qDef;
2450
HiveConf cfg = qDef.getTranslationInfo().getHiveCfg();
2551
String partitionClass = cfg.get(Constants.WINDOW_PARTITION_CLASS,
2652
Constants.DEFAULT_WINDOW_PARTITION_CLASS);
2753
int partitionMemSize = cfg.getInt(Constants.WINDOW_PARTITION_MEM_SIZE,
2854
ByteBasedList.MEDIUM_SIZE);
2955

30-
TableFunctionEvaluator tfEval = createEvaluator(qDef, tDef);
31-
tfEval.setResolver(this);
32-
tfEval.setQueryDef(qDef);
33-
tfEval.setTableDef(tDef);
34-
tfEval.setPartitionClass(partitionClass);
35-
tfEval.setPartitionMemSize(partitionMemSize);
56+
evaluator = createEvaluator(qDef, tDef);
57+
evaluator.setTransformsRawInput(transformsRawInput());
58+
evaluator.setTableDef(tDef);
59+
evaluator.setQueryDef(qDef);
60+
evaluator.setPartitionClass(partitionClass);
61+
evaluator.setPartitionMemSize(partitionMemSize);
3662

37-
return tfEval;
63+
}
64+
65+
/*
66+
* called during deserialization of a QueryDef during runtime.
67+
*/
68+
public void initialize(QueryDef qDef, TableFuncDef tDef, TableFunctionEvaluator evaluator)
69+
throws WindowingException
70+
{
71+
this.evaluator = evaluator;
72+
this.qDef = qDef;
73+
evaluator.setTableDef(tDef);
74+
evaluator.setQueryDef(qDef);
75+
}
76+
77+
public TableFunctionEvaluator getEvaluator()
78+
{
79+
return evaluator;
3880
}
3981

40-
public boolean hasMapPhase() {
41-
return hasMapPhase;
82+
/*
83+
* - a subclass must provide this method.
84+
* - this method is invoked during translation and also when the Operator is initialized during runtime.
85+
* - a subclass must use this call to setup the shape of its output.
86+
* - subsequent to this call, a call to getOutputOI call on the {@link TableFunctionEvaluator} must return the OI
87+
* of the output of this function.
88+
*/
89+
public abstract void setupOutputOI() throws WindowingException;
90+
91+
/*
92+
* - Called on functions that transform the raw input.
93+
* - this method is invoked during translation and also when the Operator is initialized during runtime.
94+
* - a subclass must use this call to setup the shape of the raw input, that is fed to the partitioning mechanics.
95+
* - subsequent to this call, a call to getRawInputOI call on the {@link TableFunctionEvaluator} must return the OI
96+
* of the output of this function.
97+
*/
98+
public void setupRawInputOI() throws WindowingException
99+
{
100+
if (!transformsRawInput())
101+
{
102+
return;
103+
}
104+
throw new WindowingException(
105+
"Function has map phase, must extend setupMapOI");
106+
}
107+
108+
/*
109+
* callback method used by subclasses to set the RawInputOI on the Evaluator.
110+
*/
111+
protected void setRawInputOI(StructObjectInspector rawInputOI)
112+
{
113+
evaluator.setRawInputOI(rawInputOI);
114+
}
115+
116+
/*
117+
* callback method used by subclasses to set the OutputOI on the Evaluator.
118+
*/
119+
protected void setOutputOI(StructObjectInspector outputOI)
120+
{
121+
evaluator.setOutputOI(outputOI);
42122
}
43123

44-
protected void setHasMapPhase() throws WindowingException {
45-
hasMapPhase = false;
124+
public QueryDef getQueryDef()
125+
{
126+
return qDef;
46127
}
47128

129+
/*
130+
* a subclass must indicate whether it will transform the raw input before it is fed through the
131+
* partitioning mechanics.
132+
*/
133+
public abstract boolean transformsRawInput();
134+
135+
/*
136+
* a subclass must provide the {@link TableFunctionEvaluator} instance.
137+
*/
48138
protected abstract TableFunctionEvaluator createEvaluator(QueryDef qDef,
49139
TableFuncDef tDef);
50-
51140
}

0 commit comments

Comments
 (0)