Skip to content

Commit 9057cb5

Browse files
committed
NPath Symbol Function and Parser
1 parent c8863f4 commit 9057cb5

File tree

2 files changed

+282
-0
lines changed

2 files changed

+282
-0
lines changed
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package com.sap.hadoop.windowing.functions2.table.npath;
2+
3+
import java.util.ArrayList;
4+
5+
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
6+
import org.apache.hadoop.hive.ql.metadata.HiveException;
7+
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
8+
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
9+
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
10+
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
11+
12+
import com.sap.hadoop.windowing.WindowingException;
13+
import com.sap.hadoop.windowing.runtime2.PartitionIterator;
14+
15+
16+
public abstract class SymbolFunction
17+
{
18+
public static class SymbolFunctionResult
19+
{
20+
/*
21+
* does the row match the pattern represented by this SymbolFunction
22+
*/
23+
public boolean matches;
24+
/*
25+
* what is the index of the row beyond the set of rows that match this pattern.
26+
*/
27+
public int nextRow;
28+
}
29+
30+
SymbolFunctionResult result;
31+
32+
public SymbolFunction()
33+
{
34+
result = new SymbolFunctionResult();
35+
}
36+
37+
public abstract SymbolFunctionResult match(Object row, PartitionIterator<Object> pItr) throws WindowingException;
38+
39+
public abstract boolean isOptional();
40+
41+
public static class Symbol extends SymbolFunction
42+
{
43+
ExprNodeEvaluator symbolExprEval;
44+
Converter converter;
45+
46+
public Symbol(ExprNodeEvaluator symbolExprEval, ObjectInspector symbolOI)
47+
{
48+
this.symbolExprEval = symbolExprEval;
49+
converter = ObjectInspectorConverters.getConverter(
50+
symbolOI,
51+
PrimitiveObjectInspectorFactory.javaBooleanObjectInspector);
52+
}
53+
54+
public SymbolFunctionResult match(Object row, PartitionIterator<Object> pItr) throws WindowingException
55+
{
56+
Object val = null;
57+
try
58+
{
59+
symbolExprEval.evaluate(row);
60+
}
61+
catch(HiveException he)
62+
{
63+
throw new WindowingException(he);
64+
}
65+
val = converter.convert(val);
66+
result.matches = ((Boolean) val).booleanValue();
67+
result.nextRow = pItr.getIndex();
68+
69+
return result;
70+
}
71+
72+
public boolean isOptional()
73+
{
74+
return false;
75+
}
76+
}
77+
78+
public static class Star extends SymbolFunction
79+
{
80+
SymbolFunction symbolFn;
81+
82+
public Star(SymbolFunction symbolFn)
83+
{
84+
this.symbolFn = symbolFn;
85+
}
86+
87+
public SymbolFunctionResult match(Object row, PartitionIterator<Object> pItr) throws WindowingException
88+
{
89+
result.matches = true;
90+
SymbolFunctionResult rowResult = symbolFn.match(row, pItr);
91+
92+
while ( rowResult.matches && pItr.hasNext() )
93+
{
94+
row = pItr.next();
95+
rowResult = symbolFn.match(row, pItr);
96+
}
97+
98+
result.nextRow = pItr.getIndex();
99+
return result;
100+
}
101+
102+
public boolean isOptional()
103+
{
104+
return true;
105+
}
106+
}
107+
108+
public static class Plus extends SymbolFunction
109+
{
110+
SymbolFunction symbolFn;
111+
112+
public Plus(SymbolFunction symbolFn)
113+
{
114+
this.symbolFn = symbolFn;
115+
}
116+
117+
public SymbolFunctionResult match(Object row, PartitionIterator<Object> pItr) throws WindowingException
118+
{
119+
SymbolFunctionResult rowResult = symbolFn.match(row, pItr);
120+
121+
if ( !rowResult.matches )
122+
{
123+
result.matches = false;
124+
result.nextRow = pItr.getIndex();
125+
return result;
126+
}
127+
128+
result.matches = true;
129+
while ( rowResult.matches && pItr.hasNext() )
130+
{
131+
row = pItr.next();
132+
rowResult = symbolFn.match(row, pItr);
133+
}
134+
135+
result.nextRow = pItr.getIndex();
136+
return result;
137+
}
138+
139+
public boolean isOptional()
140+
{
141+
return false;
142+
}
143+
}
144+
145+
public static class Chain extends SymbolFunction
146+
{
147+
ArrayList<SymbolFunction> components;
148+
149+
public Chain(ArrayList<SymbolFunction> components)
150+
{
151+
this.components = components;
152+
}
153+
154+
/*
155+
* Iterate over the Symbol Functions in the Chain:
156+
* - If we are not at the end of the Iterator (i.e. row != null )
157+
* - match the current componentFn
158+
* - if it returns false, then return false
159+
* - otherwise set row to the next row from the Iterator.
160+
* - if we are at the end of the Iterator
161+
* - skip any optional Symbol Fns (star patterns) at the end.
162+
* - but if we come to a non optional Symbol Fn, return false.
163+
* - if we match all Fns in the chain return true.
164+
*/
165+
public SymbolFunctionResult match(Object row, PartitionIterator<Object> pItr) throws WindowingException
166+
{
167+
SymbolFunctionResult componentResult = null;
168+
for(SymbolFunction sFn : components)
169+
{
170+
if ( row != null )
171+
{
172+
componentResult = sFn.match(row, pItr);
173+
if ( !componentResult.matches )
174+
{
175+
result.matches = false;
176+
result.nextRow = componentResult.nextRow;
177+
return result;
178+
}
179+
row = pItr.hasNext() ? pItr.next() : null;
180+
}
181+
else
182+
{
183+
if ( !sFn.isOptional() )
184+
{
185+
result.matches = false;
186+
result.nextRow = componentResult.nextRow;
187+
return result;
188+
}
189+
}
190+
}
191+
192+
result.matches = true;
193+
result.nextRow = componentResult.nextRow;
194+
return result;
195+
}
196+
197+
public boolean isOptional()
198+
{
199+
return false;
200+
}
201+
}
202+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package com.sap.hadoop.windowing.functions2.table.npath;
2+
3+
import java.util.ArrayList;
4+
import java.util.HashMap;
5+
6+
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
7+
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
8+
9+
import com.sap.hadoop.windowing.WindowingException;
10+
import com.sap.hadoop.windowing.functions2.table.npath.SymbolFunction.Symbol;
11+
import com.sap.hadoop.windowing.functions2.table.npath.SymbolFunction.Star;
12+
import com.sap.hadoop.windowing.functions2.table.npath.SymbolFunction.Plus;
13+
import com.sap.hadoop.windowing.functions2.table.npath.SymbolFunction.Chain;
14+
15+
import static com.sap.hadoop.Utils.sprintf;
16+
17+
public class SymbolParser
18+
{
19+
String patternStr;
20+
String[] symbols;
21+
HashMap<String, Object[]> symbolExprEvalMap;
22+
ArrayList<SymbolFunction> symbolFunctions;
23+
Chain symbolFnChain;
24+
25+
26+
public SymbolParser(String patternStr, ArrayList<String> symbolNames,
27+
ArrayList<ExprNodeEvaluator> symbolExprEvals, ArrayList<ObjectInspector> symbolExprOIs)
28+
{
29+
super();
30+
this.patternStr = patternStr;
31+
symbolExprEvalMap = new HashMap<String, Object[]>();
32+
int sz = symbolNames.size();
33+
for(int i=0; i < sz; i++)
34+
{
35+
String symbolName = symbolNames.get(i);
36+
ExprNodeEvaluator symbolExprEval = symbolExprEvals.get(i);
37+
ObjectInspector symbolExprOI = symbolExprOIs.get(i);
38+
symbolExprEvalMap.put(symbolName.toLowerCase(), new Object[] {symbolExprEval, symbolExprOI});
39+
}
40+
}
41+
42+
public SymbolFunction getSymbolFunction()
43+
{
44+
return symbolFnChain;
45+
}
46+
47+
public void parse() throws WindowingException
48+
{
49+
symbols = patternStr.split("\\.");
50+
symbolFunctions = new ArrayList<SymbolFunction>();
51+
52+
for(String symbol : symbols)
53+
{
54+
boolean isStar = symbol.endsWith("*");
55+
boolean isPlus = symbol.endsWith("+");
56+
57+
symbol = (isStar || isPlus) ? symbol.substring(0, symbol.length() - 1) : symbol;
58+
Object[] symbolDetails = symbolExprEvalMap.get(symbol.toLowerCase());
59+
if ( symbolDetails == null )
60+
{
61+
throw new WindowingException(sprintf("Unknown Symbol %s", symbol));
62+
}
63+
64+
ExprNodeEvaluator symbolExprEval = (ExprNodeEvaluator) symbolDetails[0];
65+
ObjectInspector symbolExprOI = (ObjectInspector) symbolDetails[1];
66+
SymbolFunction sFn = new Symbol(symbolExprEval, symbolExprOI);
67+
68+
if ( isStar )
69+
{
70+
sFn = new Star(sFn);
71+
}
72+
else if ( isPlus )
73+
{
74+
sFn = new Plus(sFn);
75+
}
76+
symbolFunctions.add(sFn);
77+
}
78+
symbolFnChain = new Chain(symbolFunctions);
79+
}
80+
}

0 commit comments

Comments
 (0)