Skip to content

Commit 21a6df0

Browse files
committed
flush out NPath Evaluator; initial translation and execution test for
NPath
1 parent 1b7a818 commit 21a6df0

File tree

5 files changed

+262
-3
lines changed

5 files changed

+262
-3
lines changed

windowing/src/main/java/com/sap/hadoop/windowing/functions2/FunctionRegistry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.sap.hadoop.windowing.functions2.table.Noop.NoopResolver;
2626
import com.sap.hadoop.windowing.functions2.table.NoopWithMap.NoopWithMapResolver;
2727
import com.sap.hadoop.windowing.functions2.table.WindowingTableFunction.WindowingTableFunctionResolver;
28+
import com.sap.hadoop.windowing.functions2.table.npath.NPath.NPathResolver;
2829

2930
@SuppressWarnings({ "deprecation", "static-access" })
3031
public class FunctionRegistry
@@ -130,6 +131,7 @@ public FunctionInfo getfInfo()
130131
registerTableFunction(NOOP_TABLE_FUNCTION, NoopResolver.class);
131132
registerTableFunction(NOOP_MAP_TABLE_FUNCTION, NoopWithMapResolver.class);
132133
registerTableFunction(WINDOWING_TABLE_FUNCTION, WindowingTableFunctionResolver.class);
134+
registerTableFunction("npath", NPathResolver.class);
133135
}
134136

135137
public static boolean isTableFunction(String name)

windowing/src/main/java/com/sap/hadoop/windowing/functions2/table/npath/NPath.java

Lines changed: 185 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,27 @@
11
package com.sap.hadoop.windowing.functions2.table.npath;
22

3+
import java.util.ArrayList;
4+
5+
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
6+
import org.apache.hadoop.hive.ql.metadata.HiveException;
7+
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
8+
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
9+
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
10+
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
11+
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
12+
313
import com.sap.hadoop.windowing.WindowingException;
414
import com.sap.hadoop.windowing.functions2.TableFunctionEvaluator;
15+
import com.sap.hadoop.windowing.functions2.TableFunctionResolver;
16+
import com.sap.hadoop.windowing.functions2.table.npath.SymbolFunction.SymbolFunctionResult;
17+
import com.sap.hadoop.windowing.query2.definition.ArgDef;
18+
import com.sap.hadoop.windowing.query2.definition.QueryDef;
19+
import com.sap.hadoop.windowing.query2.definition.TableFuncDef;
520
import com.sap.hadoop.windowing.runtime2.Partition;
21+
import com.sap.hadoop.windowing.runtime2.PartitionIterator;
22+
import com.sap.hadoop.windowing.runtime2.RuntimeUtils;
23+
24+
import static com.sap.hadoop.Utils.sprintf;
625

726
/**
827
* return rows that meet a specified pattern. Use symbols to specify a list of expressions to match.
@@ -23,6 +42,45 @@
2342
*/
2443
public class NPath extends TableFunctionEvaluator
2544
{
45+
private transient String patternStr;
46+
private transient SymbolsInfo symInfo;
47+
private transient String resultExprStr;
48+
private transient SymbolFunction syFn;
49+
private transient ArrayList<ExprNodeEvaluator> resultExprEvals;
50+
51+
52+
53+
@Override
54+
public void execute(Partition iPart, Partition outP) throws WindowingException
55+
{
56+
PartitionIterator<Object> pItr = iPart.iterator();
57+
RuntimeUtils.connectLeadLagFunctionsToPartition(getQueryDef(), pItr);
58+
while (pItr.hasNext())
59+
{
60+
Object iRow = pItr.next();
61+
62+
SymbolFunctionResult syFnRes = SymbolFunction.match(syFn, iRow, pItr);
63+
if (syFnRes.matches )
64+
{
65+
int sz = syFnRes.nextRow - (pItr.getIndex() - 1);
66+
Object selectListInput = NPathUtils.getSelectListInput(iRow, tDef.getInput().getOI(), pItr, sz);
67+
ArrayList<Object> oRow = new ArrayList<Object>();
68+
for(ExprNodeEvaluator resExprEval : resultExprEvals)
69+
{
70+
try
71+
{
72+
oRow.add(resExprEval.evaluate(selectListInput));
73+
}
74+
catch(HiveException he)
75+
{
76+
throw new WindowingException(he);
77+
}
78+
}
79+
outP.append(oRow);
80+
}
81+
}
82+
}
83+
2684
/**
2785
* <ul>
2886
* <li> check structure of Arguments:
@@ -40,12 +98,137 @@ public class NPath extends TableFunctionEvaluator
4098
@Override
4199
public void setupOI() throws WindowingException
42100
{
101+
ArrayList<ArgDef> args = tDef.getArgs();
102+
int argsNum = args == null ? 0 : args.size();
103+
104+
if ( argsNum < 4 )
105+
{
106+
throwErrorWithSignature("at least 4 arguments required");
107+
}
108+
109+
/*
110+
* validate and setup patternStr
111+
*/
112+
ArgDef symboPatternArg = args.get(0);
113+
ObjectInspector symbolPatternArgOI = symboPatternArg.getOI();
114+
115+
if ( !ObjectInspectorUtils.isConstantObjectInspector(symbolPatternArgOI) ||
116+
(symbolPatternArgOI.getCategory() != ObjectInspector.Category.PRIMITIVE) ||
117+
((PrimitiveObjectInspector)symbolPatternArgOI).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING )
118+
{
119+
throwErrorWithSignature("Currently the symbol Pattern must be a Constant String.");
120+
}
121+
122+
patternStr = ((ConstantObjectInspector)symbolPatternArgOI).getWritableConstantValue().toString();
123+
124+
/*
125+
* validate and setup SymbolInfo
126+
*/
127+
int symbolArgsSz = argsNum - 2;
128+
if ( symbolArgsSz % 2 != 0)
129+
{
130+
throwErrorWithSignature("Symbol Name, Expression need to be specified in pairs: there are odd number of symbol args");
131+
}
132+
133+
symInfo = new SymbolsInfo(symbolArgsSz/2);
134+
for(int i=1; i <= symbolArgsSz; i += 2)
135+
{
136+
ArgDef symbolNameArg = args.get(i);
137+
ObjectInspector symbolNameArgOI = symbolNameArg.getOI();
138+
139+
if ( !ObjectInspectorUtils.isConstantObjectInspector(symbolNameArgOI) ||
140+
(symbolNameArgOI.getCategory() != ObjectInspector.Category.PRIMITIVE) ||
141+
((PrimitiveObjectInspector)symbolNameArgOI).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING )
142+
{
143+
throwErrorWithSignature(sprintf("Currently a Symbol Name(%s) must be a Constant String", symbolNameArg.getExpression().toStringTree()));
144+
}
145+
String symbolName = ((ConstantObjectInspector)symbolNameArgOI).getWritableConstantValue().toString();
146+
147+
ArgDef symolExprArg = args.get(i+1);
148+
ObjectInspector symolExprArgOI = symolExprArg.getOI();
149+
if ( (symolExprArgOI.getCategory() != ObjectInspector.Category.PRIMITIVE) ||
150+
((PrimitiveObjectInspector)symolExprArgOI).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN )
151+
{
152+
throwErrorWithSignature(sprintf("Currently a Symbol Expression(%s) must be a boolean expression", symolExprArg.getExpression().toStringTree()));
153+
}
154+
symInfo.add(symbolName, symolExprArg);
155+
}
156+
157+
/*
158+
* validate and setup resultExprStr
159+
*/
160+
ArgDef resultExprArg = args.get(argsNum - 1);
161+
ObjectInspector resultExprArgOI = resultExprArg.getOI();
162+
163+
if ( !ObjectInspectorUtils.isConstantObjectInspector(resultExprArgOI) ||
164+
(resultExprArgOI.getCategory() != ObjectInspector.Category.PRIMITIVE) ||
165+
((PrimitiveObjectInspector)resultExprArgOI).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING )
166+
{
167+
throwErrorWithSignature("Currently the result Expr parameter must be a Constant String.");
168+
}
169+
170+
resultExprStr = ((ConstantObjectInspector)resultExprArgOI).getWritableConstantValue().toString();
171+
172+
/*
173+
* setup SymbolFunction chain.
174+
*/
175+
SymbolParser syP = new SymbolParser(patternStr, symInfo.symbolExprsNames, symInfo.symbolExprsEvaluators, symInfo.symbolExprsOIs);
176+
syP.parse();
177+
syFn = syP.getSymbolFunction();
178+
179+
/*
180+
* setup OI for input to resultExpr select list
181+
*/
182+
StructObjectInspector selectListInputOI = (StructObjectInspector) NPathUtils.createSelectListInputOI(tDef.getInput().getOI());
183+
184+
/*
185+
* parse ResultExpr Str and setup OI.
186+
*/
187+
ResultExpressionParser resultExprParser = new ResultExpressionParser(resultExprStr, selectListInputOI);
188+
resultExprParser.translate();
189+
resultExprEvals = resultExprParser.getSelectListExprEvaluators();
190+
OI = resultExprParser.getSelectListOutputOI();
191+
}
192+
193+
private void throwErrorWithSignature(String message) throws WindowingException
194+
{
195+
throw new WindowingException(sprintf(
196+
"NPath signature is: SymbolPattern, one or more SymbolName, expression pairs, the result expression as a select list. Error %s",
197+
message));
198+
}
199+
200+
public static class NPathResolver extends TableFunctionResolver
201+
{
202+
203+
@Override
204+
protected TableFunctionEvaluator createEvaluator(QueryDef qDef, TableFuncDef tDef)
205+
{
206+
207+
return new NPath();
208+
}
43209

44210
}
45211

46-
@Override
47-
public void execute(Partition iPart, Partition outP) throws WindowingException
212+
static class SymbolsInfo
48213
{
214+
int sz;
215+
ArrayList<ExprNodeEvaluator> symbolExprsEvaluators;
216+
ArrayList<ObjectInspector> symbolExprsOIs;
217+
ArrayList<String> symbolExprsNames;
218+
219+
SymbolsInfo(int sz)
220+
{
221+
this.sz = sz;
222+
symbolExprsEvaluators = new ArrayList<ExprNodeEvaluator>(sz);
223+
symbolExprsOIs = new ArrayList<ObjectInspector>(sz);
224+
symbolExprsNames = new ArrayList<String>(sz);
225+
}
49226

227+
void add(String name, ArgDef arg)
228+
{
229+
symbolExprsNames.add(name);
230+
symbolExprsEvaluators.add(arg.getExprEvaluator());
231+
symbolExprsOIs.add(arg.getOI());
232+
}
50233
}
51234
}

windowing/src/main/java/com/sap/hadoop/windowing/functions2/table/npath/ResultExpressionParser.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.hadoop.hive.ql.parse.ASTNode;
1010
import org.apache.hadoop.hive.ql.parse.RowResolver;
1111
import org.apache.hadoop.hive.ql.parse.TypeCheckCtx;
12+
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
1213
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
1314
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
1415
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -91,7 +92,7 @@ private void buildSelectListEvaluators() throws WindowingException
9192
throw new WindowingException(he);
9293
}
9394

94-
selectColName = selectColName == null ? "npath_col_" + i : selectColName;
95+
selectColName = getColumnName(selectColName, selectColumnExprNode, i);
9596

9697
selectListExprEvaluators.add(selectColumnExprEval);
9798
selectListExprOIs.add(selectColumnOI);
@@ -137,4 +138,18 @@ private void validateSelectExpr() throws WindowingException
137138
TranslateUtils.validateNoLeadLagInValueBoundarySpec(node, "Lead/Lag not allowed in NPath Result Expression");
138139
}
139140
}
141+
142+
private String getColumnName(String alias, ExprNodeDesc exprNode, int colIdx)
143+
{
144+
if ( alias != null )
145+
{
146+
return alias;
147+
}
148+
else if ( exprNode instanceof ExprNodeColumnDesc )
149+
{
150+
ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc) exprNode;
151+
return colDesc.getColumn();
152+
}
153+
return "npath_col_" + colIdx;
154+
}
140155
}

windowing/src/test/java/com/sap/hadoop/windowing/runtime2/MRExecutorTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,32 @@ public void testSumDelta() throws WindowingException
7575
String e = "";
7676
Assert.assertEquals(r, e);
7777
}
78+
79+
@Test
80+
public void testNPath() throws WindowingException
81+
{
82+
wshell.execute(
83+
" select origin_city_name, fl_num, year, month, day_of_month, sz, tpath " +
84+
" from npath( " +
85+
" flights_tiny " +
86+
" partition by fl_num " +
87+
" order by year, month, day_of_month, " +
88+
" 'LATE.LATE+', " +
89+
" 'LATE', arr_delay > 15, " +
90+
" 'origin_city_name, fl_num, year, month, day_of_month, size(tpath) as sz, tpath as tpath' " +
91+
" ) " +
92+
" into path='/tmp/testNPath' \n" +
93+
" serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' \n" +
94+
" with serdeproperties('field.delim'=',') \n" +
95+
" format 'org.apache.hadoop.mapred.TextOutputFormat'",
96+
outPrinter);
97+
String r = outStream.toString();
98+
r = r.replace("\r\n", "\n");
99+
System.out.println(r);
100+
String e = "";
101+
//Assert.assertEquals(r, e);
102+
103+
}
78104

79105

80106
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.sap.hadoop.windowing.translation2;
2+
3+
import junit.framework.Assert;
4+
5+
import org.junit.Test;
6+
7+
import com.sap.hadoop.windowing.WindowingException;
8+
import com.sap.hadoop.windowing.testutils.MRBaseTest;
9+
10+
public class TranslationTest extends MRBaseTest
11+
{
12+
13+
@Test
14+
public void testNPath() throws WindowingException
15+
{
16+
17+
wshell.translate(
18+
" select origin_city_name, fl_num, year, month, day_of_month, sz, tpath " +
19+
" from npath( " +
20+
" flights_tiny " +
21+
" partition by fl_num " +
22+
" order by year, month, day_of_month, " +
23+
" 'LATE.LATE.LATE.LATE.LATE+', " +
24+
" 'LATE', arr_delay > 15, " +
25+
" 'origin_city_name, fl_num, year, month, day_of_month, size(tpath) as sz, tpath as tpath' " +
26+
" ) " +
27+
" into path='/tmp/testNPath' \n" +
28+
" serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' \n" +
29+
" with serdeproperties('field.delim'=',') \n" +
30+
" format 'org.apache.hadoop.mapred.TextOutputFormat'");
31+
32+
}
33+
}

0 commit comments

Comments
 (0)