Skip to content

Commit

Permalink
solve ExecMapperContext#inputFileChanged NPE problem
Browse files Browse the repository at this point in the history
  • Loading branch information
kellyzly committed Jan 18, 2018
1 parent 80958ae commit e81b7df
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -538,9 +538,15 @@ public void process(Writable value) throws HiveException {
// A mapper can span multiple files/partitions.
// The serializers need to be reset if the input file changed
ExecMapperContext context = getExecContext();
if (context != null && context.inputFileChanged()) {
// The child operators cleanup if input file has changed
cleanUpInputFileChanged();
if( context.getIoCxt().getInputPath() == null){
// If MapInput is rdd cached, context.getIoCxt().getInputPath will be null
//we need initialize currentCtxs first, more detail see HIVE-18301
initializeContextsForSparkCacheMode();
}else {
if (context != null && context.inputFileChanged()) {
// The child operators cleanup if input file has changed
cleanUpInputFileChanged();
}
}
int childrenDone = 0;
for (MapOpCtx current : currentCtxs) {
Expand Down Expand Up @@ -701,4 +707,10 @@ public Deserializer getCurrentDeserializer() {

return currentCtxs[0].deserializer;
}

private void initializeContextsForSparkCacheMode(){
// TODO investigate whether we can use first element of opCtxMap.values() as the value of contexts?
Map<Operator<?>, MapOpCtx> contexts = opCtxMap.values().iterator().next();
currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]);
}
}

0 comments on commit e81b7df

Please sign in to comment.