Skip to content

Commit

Permalink
fix ClassCastException in FiniteAppenderatorDriver (apache#2896)
Browse files Browse the repository at this point in the history
  • Loading branch information
dclim authored and fjy committed Apr 29, 2016
1 parent 3f71a4a commit 5f0a9cc
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public SegmentIdentifier add(
try {
final int numRows = appenderator.add(identifier, row, wrapCommitterSupplier(committerSupplier));
if (numRows >= maxRowsPerSegment) {
moveSegmentOut(ImmutableList.of(identifier));
moveSegmentOut(sequenceName, ImmutableList.of(identifier));
}
}
catch (SegmentNotWritableException e) {
Expand Down Expand Up @@ -376,13 +376,18 @@ private SegmentIdentifier getSegment(final DateTime timestamp, final String sequ
/**
* Move a set of identifiers out from "active", making way for newer segments.
*/
private void moveSegmentOut(final List<SegmentIdentifier> identifiers)
private void moveSegmentOut(final String sequenceName, final List<SegmentIdentifier> identifiers)
{
synchronized (activeSegments) {
final NavigableMap<Long, SegmentIdentifier> activeSegmentsForSequence = activeSegments.get(sequenceName);
if (activeSegmentsForSequence == null) {
throw new ISE("WTF?! Asked to remove segments for sequenceName[%s] which doesn't exist...", sequenceName);
}

for (final SegmentIdentifier identifier : identifiers) {
log.info("Moving segment[%s] out of active list.", identifier);
final long key = identifier.getInterval().getStartMillis();
if (activeSegments.remove(key) != identifier) {
if (activeSegmentsForSequence.remove(key) != identifier) {
throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,33 @@ public void testSimple() throws Exception
Assert.assertEquals(3, segmentsAndMetadata.getCommitMetadata());
}

@Test
public void testMaxRowsPerSegment() throws Exception
{
final int numSegments = 3;
final TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<>();
Assert.assertNull(driver.startJob());

for (int i = 0; i < numSegments * MAX_ROWS_PER_SEGMENT; i++) {
committerSupplier.setMetadata(i + 1);
InputRow row = new MapBasedInputRow(
new DateTime("2000T01"),
ImmutableList.of("dim2"),
ImmutableMap.<String, Object>of(
"dim2",
String.format("bar-%d", i),
"met1",
2.0
)
);
Assert.assertNotNull(driver.add(row, "dummy", committerSupplier));
}

final SegmentsAndMetadata segmentsAndMetadata = driver.finish(makeOkPublisher(), committerSupplier.get());
Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size());
Assert.assertEquals(numSegments * MAX_ROWS_PER_SEGMENT, segmentsAndMetadata.getCommitMetadata());
}

private Set<SegmentIdentifier> asIdentifiers(Iterable<DataSegment> segments)
{
return ImmutableSet.copyOf(
Expand Down

0 comments on commit 5f0a9cc

Please sign in to comment.