Skip to content

Commit

Permalink
[FLINK-2158] Add support for null to the DateSerializer
Browse files Browse the repository at this point in the history
This closes apache#780
  • Loading branch information
rmetzger authored and Nam-Luc Tran committed Jan 8, 2016
1 parent c68dd0e commit 4916560
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,18 @@ public Date createInstance() {

@Override
public Date copy(Date from) {
if(from == null) {
return null;
}
return new Date(from.getTime());
}


@Override
public Date copy(Date from, Date reuse) {
if(from == null) {
return null;
}
reuse.setTime(from.getTime());
return reuse;
}
Expand All @@ -59,17 +66,30 @@ public int getLength() {

@Override
public void serialize(Date record, DataOutputView target) throws IOException {
target.writeLong(record.getTime());
if(record == null) {
target.writeLong(-1L);
} else {
target.writeLong(record.getTime());
}
}

@Override
public Date deserialize(DataInputView source) throws IOException {
return new Date(source.readLong());
long v = source.readLong();
if(v == -1L) {
return null;
} else {
return new Date(v);
}
}

@Override
public Date deserialize(Date reuse, DataInputView source) throws IOException {
reuse.setTime(source.readLong());
long v = source.readLong();
if(v == -1L) {
return null;
}
reuse.setTime(v);
return reuse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ public NullFieldException(int fieldIdx) {
super("Field " + fieldIdx + " is null, but expected to hold a value.");
this.fieldPos = fieldIdx;
}

/**
* Constructs an {@code NullFieldException} with a default message, referring to
* given field number as the null field and a cause (Throwable)
*
* @param fieldIdx The index of the field that was null, but expected to hold a value.
* @param cause Pass the root cause of the error
*/
public NullFieldException(int fieldIdx, Throwable cause) {
super("Field " + fieldIdx + " is null, but expected to hold a value.", cause);
this.fieldPos = fieldIdx;
}

/**
* Gets the field number that was attempted to access. If the number is not set, this method returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ public void testSerializeIndividually() {
fail("Exception in test: " + e.getMessage());
}
}



@Test
public void testSerializeIndividuallyReusingValues() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void serialize(T value, DataOutputView target) throws IOException {
try {
fieldSerializers[i].serialize(o, target);
} catch (NullPointerException npex) {
throw new NullFieldException(i);
throw new NullFieldException(i, npex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import scala.math.BigInt;

import java.util.Collection;
import java.util.Date;
import java.util.Iterator;

@SuppressWarnings("serial")
Expand Down Expand Up @@ -1101,6 +1102,37 @@ public void testJodatimeDateTimeWithKryo() throws Exception {
expected = "(1)\n";
}

/**
* Fix for FLINK-2158.
*
* @throws Exception
*/
@Test
public void testDateNullException() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<Integer, Date>> in = env.fromElements(new Tuple2<Integer, Date>(0, new Date(1230000000)),
new Tuple2<Integer, Date>(1, null),
new Tuple2<Integer, Date>(2, new Date(1230000000))
);

DataSet<String> r = in.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer, Date>, String>() {
@Override
public void reduce(Iterable<Tuple2<Integer, Date>> values, Collector<String> out) throws Exception {
for (Tuple2<Integer, Date> e : values) {
out.collect(Integer.toString(e.f0));
}
}
});

r.writeAsText(resultPath);
env.execute();

expected = "0\n1\n2\n";
}



public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
@Override
public void reduce(
Expand Down

0 comments on commit 4916560

Please sign in to comment.