Skip to content

Commit

Permalink
[Backport] Fix three bugs with segment publishing. (apache#6155) (apa…
Browse files Browse the repository at this point in the history
…che#6187)

* [Backport] Fix three bugs with segment publishing. (apache#6155)

* Fix KafkaIndexTask
  • Loading branch information
jon-wei authored and leventov committed Aug 19, 2018
1 parent 4da7d1c commit ad116ad
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ public void run()

log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction());

return toolbox.getTaskActionClient().submit(action).isSuccess();
return toolbox.getTaskActionClient().submit(action);
};

// Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting
Expand Down Expand Up @@ -2257,7 +2257,7 @@ public TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean u

log.info("Publishing with isTransaction[%s].", useTransaction);

return toolbox.getTaskActionClient().submit(action).isSuccess();
return toolbox.getTaskActionClient().submit(action);
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ dataSchema, new RealtimeIOConfig(null, null, null), null

final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments);
return toolbox.getTaskActionClient().submit(action).isSuccess();
return toolbox.getTaskActionClient().submit(action);
};

try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ SegmentIdentifier allocatePendingSegment(
* {@link DataSourceMetadata#plus(DataSourceMetadata)}. If null, this insert will not
* involve a metadata transaction
*
* @return segment publish result indicating transaction success or failure, and set of segments actually published
* @return segment publish result indicating transaction success or failure, and set of segments actually published.
* This method must only return a failure code if it is sure that the transaction did not happen. If it is not sure,
* it must throw an exception instead.
*
* @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null
* @throws RuntimeException if the state of metadata storage after this call is unknown
*/
SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public SegmentPublishResult announceHistoricalSegments(
}
}

final AtomicBoolean txnFailure = new AtomicBoolean(false);
final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);

try {
return connector.retryTransaction(
Expand All @@ -335,6 +335,9 @@ public SegmentPublishResult inTransaction(
final TransactionStatus transactionStatus
) throws Exception
{
// Set definitelyNotUpdated back to false upon retrying.
definitelyNotUpdated.set(false);

final Set<DataSegment> inserted = Sets.newHashSet();

if (startMetadata != null) {
Expand All @@ -346,8 +349,9 @@ public SegmentPublishResult inTransaction(
);

if (result != DataSourceMetadataUpdateResult.SUCCESS) {
// Metadata was definitely not updated.
transactionStatus.setRollbackOnly();
txnFailure.set(true);
definitelyNotUpdated.set(true);

if (result == DataSourceMetadataUpdateResult.FAILURE) {
throw new RuntimeException("Aborting transaction!");
Expand All @@ -371,9 +375,10 @@ public SegmentPublishResult inTransaction(
);
}
catch (CallbackFailedException e) {
if (txnFailure.get()) {
return new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false);
if (definitelyNotUpdated.get()) {
return SegmentPublishResult.fail();
} else {
// Must throw exception if we are not sure if we updated or not.
throw e;
}
}
Expand Down Expand Up @@ -749,7 +754,12 @@ private byte[] getDataSourceMetadataWithHandleAsBytes(
* @param endMetadata dataSource metadata post-insert will have this endMetadata merged in with
* {@link DataSourceMetadata#plus(DataSourceMetadata)}
*
* @return true if dataSource metadata was updated from matching startMetadata to matching endMetadata
* @return SUCCESS if dataSource metadata was updated from matching startMetadata to matching endMetadata, FAILURE or
* TRY_AGAIN if it definitely was not updated. This guarantee is meant to help
* {@link #announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)}
* achieve its own guarantee.
*
* @throws RuntimeException if state is unknown after this call
*/
protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(
final Handle handle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,8 +625,15 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink
try {
if (descriptorFile.exists()) {
// Already pushed.
log.info("Segment[%s] already pushed.", identifier);
return objectMapper.readValue(descriptorFile, DataSegment.class);

if (useUniquePath) {
// Don't reuse the descriptor, because the caller asked for a unique path. Leave the old one as-is, since
// it might serve some unknown purpose.
log.info("Pushing segment[%s] again with new unique path.", identifier);
} else {
log.info("Segment[%s] already pushed.", identifier);
return objectMapper.readValue(descriptorFile, DataSegment.class);
}
}

log.info("Pushing merged index for segment[%s].", identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,38 +435,33 @@ ListenableFuture<SegmentsAndMetadata> publishInBackground(
final boolean published = publisher.publishSegments(
ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
);
).isSuccess();

if (published) {
log.info("Published segments.");
} else {
log.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
log.info("Transaction failure while publishing segments, removing them from deep storage "
+ "and checking if someone else beat us to publishing.");

segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);

final Set<SegmentIdentifier> segmentsIdentifiers = segmentsAndMetadata
.getSegments()
.stream()
.map(SegmentIdentifier::fromDataSegment)
.collect(Collectors.toSet());

if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers)
.equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
log.info(
"Removing our segments from deep storage because someone else already published them: %s",
segmentsAndMetadata.getSegments()
);
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);

log.info("Our segments really do exist, awaiting handoff.");
} else {
throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments());
throw new ISE("Failed to publish segments.");
}
}
}
catch (Exception e) {
log.warn(
"Removing segments from deep storage after failed publish: %s",
segmentsAndMetadata.getSegments()
);
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);

// Must not remove segments here, we aren't sure if our transaction succeeded or not.
log.warn(e, "Failed publish, not removing segments: %s", segmentsAndMetadata.getSegments());
throw Throwables.propagate(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.segment.realtime.appenderator;

import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.timeline.DataSegment;

import javax.annotation.Nullable;
Expand All @@ -30,11 +31,14 @@ public interface TransactionalSegmentPublisher
/**
* Publish segments, along with some commit metadata, in a single transaction.
*
* @return true if segments were published, false if they were not published due to txn failure with the metadata
* @return publish result that indicates if segments were published or not. If it is unclear
* if the segments were published or not, this method must throw an exception. The behavior is similar to
* IndexerSQLMetadataStorageCoordinator's announceHistoricalSegments.
*
* @throws IOException if there was an I/O error when publishing
* @throws RuntimeException if we cannot tell if the segments were published or not, for some other reason
*/
boolean publishSegments(
SegmentPublishResult publishSegments(
Set<DataSegment> segments,
@Nullable Object commitMetadata
) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
import com.google.common.collect.ImmutableSet;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
import io.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
Expand Down Expand Up @@ -194,6 +195,6 @@ private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState exp

static TransactionalSegmentPublisher makeOkPublisher()
{
return (segments, commitMetadata) -> true;
return (segments, commitMetadata) -> new SegmentPublishResult(ImmutableSet.of(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ public void testFailDuringPublish() throws Exception
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
expectedException.expectMessage(
"Failed to publish segments[[DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z, dataSource='foo', binaryVersion='0'}, DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z, dataSource='foo', binaryVersion='0'}]]");
expectedException.expectMessage("Failed to publish segments.");

testFailDuringPublishInternal(false);
}
Expand Down Expand Up @@ -279,31 +278,34 @@ private void testFailDuringPublishInternal(boolean failWithException) throws Exc
Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk());
}

dataSegmentKiller.killQuietly(new DataSegment(
"foo",
Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
"abc123",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 0),
0,
0
));
EasyMock.expectLastCall().once();

dataSegmentKiller.killQuietly(new DataSegment(
"foo",
Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
"abc123",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 0),
0,
0
));
EasyMock.expectLastCall().once();
if (!failWithException) {
// Should only kill segments if there was _no_ exception.
dataSegmentKiller.killQuietly(new DataSegment(
"foo",
Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
"abc123",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 0),
0,
0
));
EasyMock.expectLastCall().once();

dataSegmentKiller.killQuietly(new DataSegment(
"foo",
Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
"abc123",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(0, 0),
0,
0
));
EasyMock.expectLastCall().once();
}

EasyMock.replay(dataSegmentKiller);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
Expand All @@ -53,6 +54,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -359,14 +361,7 @@ private Set<SegmentIdentifier> asIdentifiers(Iterable<DataSegment> segments)

static TransactionalSegmentPublisher makeOkPublisher()
{
return new TransactionalSegmentPublisher()
{
@Override
public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata) throws IOException
{
return true;
}
};
return (segments, commitMetadata) -> new SegmentPublishResult(Collections.emptySet(), true);
}

static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException)
Expand All @@ -375,7 +370,7 @@ static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithExcept
if (failWithException) {
throw new RuntimeException("test");
}
return false;
return SegmentPublishResult.fail();
};
}

Expand Down

0 comments on commit ad116ad

Please sign in to comment.