Skip to content

Commit 10174bb

Browse files
committed
add method to create WindowingInput from a File
1 parent 70b9e58 commit 10174bb

File tree

1 file changed

+44
-0
lines changed
  • windowing/src/main/java/com/sap/hadoop/windowing/io

1 file changed

+44
-0
lines changed

windowing/src/main/java/com/sap/hadoop/windowing/io/IOUtils.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import java.io.IOException;
1010
import java.io.PrintStream;
11+
import java.util.Properties;
1112

1213
import org.apache.commons.logging.Log;
1314
import org.apache.commons.logging.LogFactory;
@@ -102,6 +103,49 @@ public static WindowingInput createTableWindowingInput(String dbName, String tab
102103
}
103104
}
104105

106+
@SuppressWarnings("unchecked")
107+
public static WindowingInput createFileWindowingInput(String path, String inputFormatClassName,
108+
String serDeClassName, Properties serDeProperties, Configuration conf) throws WindowingException
109+
{
110+
try
111+
{
112+
HiveConf hConf = new HiveConf(conf, IOUtils.class);
113+
JobConf job = new JobConf(hConf);
114+
Path p = new Path(path);
115+
p = makeQualified(p, conf);
116+
117+
Class<? extends InputFormat<? extends Writable, ? extends Writable>> inputFormatClass =
118+
(Class<? extends InputFormat<? extends Writable, ? extends Writable>>) Class.forName(inputFormatClassName);
119+
hConf.setClass("mapred.input.format.class", inputFormatClass, InputFormat.class);
120+
hConf.set(INPUT_INPUTFORMAT_CLASS, inputFormatClass.getName());
121+
InputFormat<? extends Writable, ? extends Writable> iFmt = inputFormatClass.newInstance();
122+
if (iFmt instanceof TextInputFormat)
123+
{
124+
((TextInputFormat)iFmt).configure(job);
125+
}
126+
FileInputFormat.addInputPath(job, p);
127+
InputSplit[] iSplits = iFmt.getSplits(job, 1);
128+
org.apache.hadoop.mapred.RecordReader<Writable, Writable> rdr =
129+
(org.apache.hadoop.mapred.RecordReader<Writable, Writable>) iFmt.getRecordReader(iSplits[0], job, Reporter.NULL);
130+
131+
hConf.set(INPUT_PATH, path);
132+
hConf.set(INPUT_KEY_CLASS, rdr.createKey().getClass().getName());
133+
hConf.set(INPUT_VALUE_CLASS, rdr.createValue().getClass().getName());
134+
135+
hConf.set(INPUT_SERDE_CLASS, serDeClassName);
136+
137+
TableWindowingInput tIn = new TableWindowingInput();
138+
139+
tIn.initialize(null, hConf, serDeProperties);
140+
141+
return tIn;
142+
}
143+
catch(Exception e)
144+
{
145+
throw new WindowingException(e);
146+
}
147+
}
148+
105149
public static Partition createPartition(String partitionClass,
106150
int partitionMemSize, WindowingInput wIn) throws WindowingException
107151
{

0 commit comments

Comments
 (0)