diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java index 6aa11eb8312d69..d427918786f607 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java @@ -43,11 +43,18 @@ public Date createInstance() { @Override public Date copy(Date from) { + if(from == null) { + return null; + } return new Date(from.getTime()); } + @Override public Date copy(Date from, Date reuse) { + if(from == null) { + return null; + } reuse.setTime(from.getTime()); return reuse; } @@ -59,17 +66,30 @@ public int getLength() { @Override public void serialize(Date record, DataOutputView target) throws IOException { - target.writeLong(record.getTime()); + if(record == null) { + target.writeLong(-1L); + } else { + target.writeLong(record.getTime()); + } } @Override public Date deserialize(DataInputView source) throws IOException { - return new Date(source.readLong()); + long v = source.readLong(); + if(v == -1L) { + return null; + } else { + return new Date(v); + } } @Override public Date deserialize(Date reuse, DataInputView source) throws IOException { - reuse.setTime(source.readLong()); + long v = source.readLong(); + if(v == -1L) { + return null; + } + reuse.setTime(v); return reuse; } diff --git a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java index c192ec25312b6b..5c48bf4792020b 100644 --- a/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java +++ b/flink-core/src/main/java/org/apache/flink/types/NullFieldException.java @@ -59,6 +59,18 @@ public NullFieldException(int fieldIdx) { super("Field " + fieldIdx + " is null, but expected to hold a value."); this.fieldPos = fieldIdx; } + + /** + * Constructs an {@code NullFieldException} with a default message, referring to + * given field number as the null field and a cause (Throwable) + * + * @param fieldIdx The index of the field that was null, but expected to hold a value. + * @param cause Pass the root cause of the error + */ + public NullFieldException(int fieldIdx, Throwable cause) { + super("Field " + fieldIdx + " is null, but expected to hold a value.", cause); + this.fieldPos = fieldIdx; + } /** * Gets the field number that was attempted to access. If the number is not set, this method returns diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index 59bea0ce21a907..998ae1218d789b 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -182,7 +182,8 @@ public void testSerializeIndividually() { fail("Exception in test: " + e.getMessage()); } } - + + @Test public void testSerializeIndividuallyReusingValues() { try { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java index 231486d13130d6..2b330c25b479e1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java @@ -115,7 +115,7 @@ public void serialize(T value, DataOutputView target) throws IOException { try { fieldSerializers[i].serialize(o, target); } catch (NullPointerException npex) { - throw new NullFieldException(i); + throw new NullFieldException(i, npex); } } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java index 4061195fe3894d..d52055d0b650ae 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java @@ -52,6 +52,7 @@ import scala.math.BigInt; import java.util.Collection; +import java.util.Date; import java.util.Iterator; @SuppressWarnings("serial") @@ -1101,6 +1102,37 @@ public void testJodatimeDateTimeWithKryo() throws Exception { expected = "(1)\n"; } + /** + * Fix for FLINK-2158. + * + * @throws Exception + */ + @Test + public void testDateNullException() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> in = env.fromElements(new Tuple2(0, new Date(1230000000)), + new Tuple2(1, null), + new Tuple2(2, new Date(1230000000)) + ); + + DataSet r = in.groupBy(0).reduceGroup(new GroupReduceFunction, String>() { + @Override + public void reduce(Iterable> values, Collector out) throws Exception { + for (Tuple2 e : values) { + out.collect(Integer.toString(e.f0)); + } + } + }); + + r.writeAsText(resultPath); + env.execute(); + + expected = "0\n1\n2\n"; + } + + + public static class GroupReducer8 implements GroupReduceFunction { @Override public void reduce(