Skip to content

Commit dfc3ab4

Browse files
committed
introduce PartitionsIterator so LocalExecutor based tests are closer
to real tests.
1 parent e7d567b commit dfc3ab4

File tree

4 files changed

+288
-129
lines changed

4 files changed

+288
-129
lines changed

windowing/src/main/java/com/sap/hadoop/windowing/runtime2/LocalExecutor.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,17 @@ public void execute(QueryDef qDef, WindowingShell wShell) throws WindowingExcept
3737
int partMemSize = tEval.getPartitionMemSize();
3838

3939
WindowingInput wIn = IOUtils.createTableWindowingInput(hvTblSpec.getDbName(), hvTblSpec.getTableName(), tInfo.getHiveCfg());
40-
Partition p = IOUtils.createPartition(partClassName, partMemSize, wIn);
40+
//Partition p = IOUtils.createPartition(partClassName, partMemSize, wIn);
4141

42-
Partition oP = executeChain(qDef, p);
43-
//IOUtils.dumpPartition(oP, System.out);
44-
executeSelectList(qDef, oP, new SysOutRS(out));
42+
PartitionsIterator partsItr = new PartitionsIterator(wIn, qDef);
43+
44+
while(partsItr.hasNext())
45+
{
46+
Partition p = partsItr.next();
47+
Partition oP = executeChain(qDef, p);
48+
//IOUtils.dumpPartition(oP, System.out);
49+
executeSelectList(qDef, oP, new SysOutRS(out));
50+
}
4551
}
4652

4753
public static class SysOutRS implements ForwardSink

windowing/src/main/java/com/sap/hadoop/windowing/runtime2/PartitionIterator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
import java.util.Iterator;
44

5+
/*
6+
* provide an Iterator on the rows in a Partiton.
7+
* Iterator exposes the index of the next location.
8+
* Client can invoke lead/lag relative to the next location.
9+
*/
510
public interface PartitionIterator<T> extends Iterator<T>
611
{
712
int getIndex();
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package com.sap.hadoop.windowing.runtime2;
2+
3+
import java.util.ArrayList;
4+
import java.util.Iterator;
5+
6+
import org.apache.hadoop.hive.serde2.SerDe;
7+
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
8+
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
9+
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
10+
import org.apache.hadoop.io.Writable;
11+
12+
import com.sap.hadoop.windowing.WindowingException;
13+
import com.sap.hadoop.windowing.functions2.TableFunctionEvaluator;
14+
import com.sap.hadoop.windowing.io.WindowingInput;
15+
import com.sap.hadoop.windowing.query2.definition.ColumnDef;
16+
import com.sap.hadoop.windowing.query2.definition.QueryDef;
17+
import com.sap.hadoop.windowing.query2.definition.TableFuncDef;
18+
19+
/*
20+
* given a WindowingInput, slices it into an iteration of Partitions.
21+
* Used by the LocalExecutor.
22+
*/
23+
public class PartitionsIterator implements Iterator<Partition>
24+
{
25+
WindowingInput wIn;
26+
QueryDef qDef;
27+
String partClassName;
28+
int partMemSize;
29+
Object currObject;
30+
SerDe serDe;
31+
StructObjectInspector OI;
32+
StructObjectInspector stdOI;
33+
Writable w;
34+
ArrayList<String> partColumns;
35+
ArrayList<StructField> objFields;
36+
ArrayList<StructField> stdObjFields;
37+
38+
public PartitionsIterator(WindowingInput wIn, QueryDef qDef) throws WindowingException
39+
{
40+
super();
41+
this.wIn = wIn;
42+
this.qDef = qDef;
43+
TableFuncDef tabDef = (TableFuncDef) qDef.getInput();
44+
TableFunctionEvaluator tEval = tabDef.getFunction();
45+
partClassName = tEval.getPartitionClass();
46+
partMemSize = tEval.getPartitionMemSize();
47+
48+
serDe = (SerDe) wIn.getDeserializer();
49+
try
50+
{
51+
OI = (StructObjectInspector) serDe.getObjectInspector();
52+
w = wIn.createRow();
53+
}
54+
catch(Exception se)
55+
{
56+
throw new WindowingException(se);
57+
}
58+
59+
stdOI = (StructObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(OI);
60+
61+
partColumns = new ArrayList<String>();
62+
objFields = new ArrayList<StructField>();
63+
stdObjFields = new ArrayList<StructField>();
64+
ArrayList<ColumnDef> cols = tabDef.getWindow().getPartDef().getColumns();
65+
for(ColumnDef colDef : cols)
66+
{
67+
String colName = colDef.getAlias();
68+
StructField f = OI.getStructFieldRef(colName);
69+
StructField stdF = stdOI.getStructFieldRef(colName);
70+
partColumns.add(colName);
71+
objFields.add(f);
72+
stdObjFields.add(stdF);
73+
}
74+
}
75+
76+
@Override
77+
public boolean hasNext()
78+
{
79+
return currObject != null || wIn.hasNext();
80+
}
81+
82+
@Override
83+
public Partition next()
84+
{
85+
try
86+
{
87+
Partition p = new Partition(partClassName, partMemSize, serDe, OI);
88+
89+
if (currObject == null)
90+
{
91+
w = (Writable) wIn.next();
92+
Object o = wIn.getDeserializer().deserialize(w);
93+
currObject = ObjectInspectorUtils.copyToStandardObject(o, OI);
94+
}
95+
p.append(w);
96+
97+
boolean reachedNextPart = false;
98+
while (wIn.next(w) != -1)
99+
{
100+
Object o = wIn.getDeserializer().deserialize(w);
101+
if (isInPartition(o))
102+
{
103+
p.append(w);
104+
}
105+
else
106+
{
107+
currObject = ObjectInspectorUtils.copyToStandardObject(o, OI);
108+
reachedNextPart = true;
109+
break;
110+
}
111+
}
112+
if (!reachedNextPart)
113+
{
114+
currObject = null;
115+
}
116+
return p;
117+
}
118+
catch (Exception we)
119+
{
120+
throw new RuntimeException(we);
121+
}
122+
}
123+
124+
@Override
125+
public void remove()
126+
{
127+
throw new UnsupportedOperationException();
128+
}
129+
130+
private boolean isInPartition(Object o)
131+
{
132+
for(int i=0; i < partColumns.size(); i++)
133+
{
134+
if (ObjectInspectorUtils.compare(OI.getStructFieldData(o, objFields.get(i)), objFields.get(i).getFieldObjectInspector(),
135+
stdOI.getStructFieldData(currObject, stdObjFields.get(i)), stdObjFields.get(i).getFieldObjectInspector()) != 0)
136+
{
137+
return false;
138+
}
139+
}
140+
return true;
141+
}
142+
143+
}

0 commit comments

Comments
 (0)