Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Switch to using ConsumerPartition for sideline persistence (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
stanlemon authored Nov 23, 2017
1 parent 6ef1c30 commit 028ed47
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,6 @@ synchronized void loadSidelines() {
fireHoseSpout = delegateSpoutFactory.create(fireHoseIdentifier);
}

final String topic = (String) getSpoutConfig().get(KafkaConsumerConfig.KAFKA_TOPIC);

final List<SidelineRequestIdentifier> existingRequestIds = getPersistenceAdapter().listSidelineRequests();
logger.info("Found {} existing sideline requests that need to be resumed", existingRequestIds.size());

Expand All @@ -258,21 +256,21 @@ synchronized void loadSidelines() {

SidelinePayload payload = null;

final Set<Integer> partitions = getPersistenceAdapter().listSidelineRequestPartitions(id);
final Set<ConsumerPartition> partitions = getPersistenceAdapter().listSidelineRequestPartitions(id);

for (final Integer partition : partitions) {
for (final ConsumerPartition partition : partitions) {
payload = retrieveSidelinePayload(id, partition);

if (payload == null) {
logger.warn("Sideline {} on partition {} payload was null, this is probably a serialization problem.");
continue;
}

startingStateBuilder.withPartition(topic, partition, payload.startingOffset);
startingStateBuilder.withPartition(partition, payload.startingOffset);

// We only have an ending offset on STOP requests
if (payload.endingOffset != null) {
endingStateStateBuilder.withPartition(topic, partition, payload.endingOffset);
endingStateStateBuilder.withPartition(partition, payload.endingOffset);
}
}

Expand Down Expand Up @@ -361,7 +359,7 @@ public void startSidelining(SidelineRequest sidelineRequest) {
SidelineType.START,
sidelineRequest.id, // TODO: Now that this is in the request, we should change the persistence adapter
sidelineRequest,
consumerPartition.partition(),
consumerPartition,
startingState.getOffsetForNamespaceAndPartition(consumerPartition),
null
);
Expand Down Expand Up @@ -421,7 +419,7 @@ public void stopSidelining(SidelineRequest sidelineRequest) {
// This is the state that the VirtualSpout should start with
final SidelinePayload sidelinePayload = retrieveSidelinePayload(
id,
consumerPartition.partition()
consumerPartition
);

if (sidelinePayload == null) {
Expand All @@ -439,7 +437,7 @@ public void stopSidelining(SidelineRequest sidelineRequest) {
SidelineType.STOP,
id,
new SidelineRequest(id, negatedStep), // Persist the negated steps, so they load properly on resume
consumerPartition.partition(),
consumerPartition,
sidelinePayload.startingOffset,
endingState.getOffsetForNamespaceAndPartition(consumerPartition)
);
Expand Down Expand Up @@ -557,9 +555,9 @@ VirtualSpoutIdentifier generateSidelineVirtualSpoutId(final SidelineRequestIdent
return new SidelineVirtualSpoutIdentifier(getVirtualSpoutIdPrefix(), sidelineRequestIdentifier);
}

private SidelinePayload retrieveSidelinePayload(final SidelineRequestIdentifier id, final int partition) {
private SidelinePayload retrieveSidelinePayload(final SidelineRequestIdentifier id, final ConsumerPartition consumerPartition) {
try {
return getPersistenceAdapter().retrieveSidelineRequest(id, partition);
return getPersistenceAdapter().retrieveSidelineRequest(id, consumerPartition);
} catch (InvalidFilterChainStepException ex) {
logger.error("Unable to load sideline payload {}", ex);
// Basically if we can't deserialize the step we're not sending back any part of the payload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void onVirtualSpoutCompletion(DelegateSpout virtualSpout) {
for (final ConsumerPartition consumerPartition : virtualSpout.getStartingState().getConsumerPartitions()) {
persistenceAdapter.clearSidelineRequest(
sidelineRequestIdentifier,
consumerPartition.partition()
consumerPartition
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.salesforce.storm.spout.dynamic.ConsumerPartition;
import com.salesforce.storm.spout.sideline.trigger.SidelineRequest;
import com.salesforce.storm.spout.sideline.trigger.SidelineRequestIdentifier;
import com.salesforce.storm.spout.sideline.trigger.SidelineType;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
Expand All @@ -59,42 +61,29 @@ public void close() {
storedSidelineRequests.clear();
}

/**
* @param type SidelineType (Start or Stop)
* @param id unique identifier for the sideline request.
* @param partitionId which partition we want to persist.
* @param startingOffset The starting offset to persist.
* @param endingOffset The ending offset to persist.
*/
@Override
public void persistSidelineRequestState(
final SidelineType type,
final SidelineRequestIdentifier id,
final SidelineRequest request,
final int partitionId,
final ConsumerPartition consumerPartition,
final Long startingOffset,
final Long endingOffset
) {
storedSidelineRequests.put(
getSidelineRequestStateKey(id, partitionId),
getSidelineRequestStateKey(id, consumerPartition),
new SidelinePayload(type, id, request, startingOffset, endingOffset)
);
}

/**
* Retrieves a sideline request state for the given SidelineRequestIdentifier.
* @param id SidelineRequestIdentifier you want to retrieve the state for.
* @param partitionId which partition
* @return The ConsumerState that was persisted via persistSidelineRequestState().
*/
@Override
public SidelinePayload retrieveSidelineRequest(SidelineRequestIdentifier id, int partitionId) {
return storedSidelineRequests.get(getSidelineRequestStateKey(id, partitionId));
public SidelinePayload retrieveSidelineRequest(SidelineRequestIdentifier id, ConsumerPartition consumerPartition) {
return storedSidelineRequests.get(getSidelineRequestStateKey(id, consumerPartition));
}

@Override
public void clearSidelineRequest(SidelineRequestIdentifier id, int partitionId) {
storedSidelineRequests.remove(getSidelineRequestStateKey(id, partitionId));
public void clearSidelineRequest(SidelineRequestIdentifier id, ConsumerPartition consumerPartition) {
storedSidelineRequests.remove(getSidelineRequestStateKey(id, consumerPartition));
}

@Override
Expand All @@ -109,54 +98,51 @@ public List<SidelineRequestIdentifier> listSidelineRequests() {
}

@Override
public Set<Integer> listSidelineRequestPartitions(final SidelineRequestIdentifier id) {
final Set<Integer> partitions = Sets.newHashSet();
public Set<ConsumerPartition> listSidelineRequestPartitions(final SidelineRequestIdentifier id) {
final Set<ConsumerPartition> partitions = Sets.newHashSet();

for (SidelineRequestStateKey key : storedSidelineRequests.keySet()) {
if (key.id.equals(id)) {
partitions.add(key.partitionId);
partitions.add(key.consumerPartition);
}
}

return Collections.unmodifiableSet(partitions);
}

private SidelineRequestStateKey getSidelineRequestStateKey(final SidelineRequestIdentifier id, final int partitionId) {
return new SidelineRequestStateKey(id, partitionId);
private SidelineRequestStateKey getSidelineRequestStateKey(
final SidelineRequestIdentifier id,
final ConsumerPartition consumerPartition
) {
return new SidelineRequestStateKey(id, consumerPartition);
}

private static class SidelineRequestStateKey {

public final SidelineRequestIdentifier id;
public final int partitionId;
public final ConsumerPartition consumerPartition;

SidelineRequestStateKey(final SidelineRequestIdentifier id, final int partitionId) {
SidelineRequestStateKey(final SidelineRequestIdentifier id, final ConsumerPartition consumerPartition) {
this.id = id;
this.partitionId = partitionId;
this.consumerPartition = consumerPartition;
}

@Override
public boolean equals(Object other) {
if (this == other) {
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}

SidelineRequestStateKey that = (SidelineRequestStateKey) other;

if (partitionId != that.partitionId) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
return id != null ? id.equals(that.id) : that.id == null;
SidelineRequestStateKey that = (SidelineRequestStateKey) obj;
return Objects.equals(id, that.id)
&& Objects.equals(consumerPartition, that.consumerPartition);
}

@Override
public int hashCode() {
int result = id != null ? id.hashCode() : 0;
result = 31 * result + partitionId;
return result;
return Objects.hash(id, consumerPartition);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

package com.salesforce.storm.spout.sideline.persistence;

import com.salesforce.storm.spout.dynamic.ConsumerPartition;
import com.salesforce.storm.spout.sideline.trigger.SidelineRequest;
import com.salesforce.storm.spout.sideline.trigger.SidelineRequestIdentifier;
import com.salesforce.storm.spout.sideline.trigger.SidelineType;
Expand Down Expand Up @@ -54,40 +55,40 @@ public interface PersistenceAdapter {
* @param type type of the sideline (Start/Stop).
* @param id unique identifier for the sideline request.
* @param request sideline request.
* @param partitionId Partition id
* @param consumerPartition consumer partition.
* @param startingOffset Ending offset
* @param endingOffset Starting offset
*/
void persistSidelineRequestState(
final SidelineType type,
final SidelineRequestIdentifier id,
final SidelineRequest request,
final int partitionId,
final ConsumerPartition consumerPartition,
final Long startingOffset,
final Long endingOffset
);

/**
* Retrieves a sideline request state for the given SidelineRequestIdentifier.
* @param id SidelineRequestIdentifier you want to retrieve the state for.
* @param partitionId PartitionId to persist.
* @return The ConsumerState that was persisted via persistSidelineRequestState().
* @param consumerPartition consumer partition.
* @return Payload of the sideline data that was persisted.
*/
SidelinePayload retrieveSidelineRequest(final SidelineRequestIdentifier id, int partitionId);
SidelinePayload retrieveSidelineRequest(final SidelineRequestIdentifier id, final ConsumerPartition consumerPartition);

/**
* List the partitions for the given sideline request.
* @param id Identifier for the sideline request that you want the partitions for
* @return A list of the partitions for the sideline request
* @return A list of the consumer partitions for the sideline request
*/
Set<Integer> listSidelineRequestPartitions(final SidelineRequestIdentifier id);
Set<ConsumerPartition> listSidelineRequestPartitions(final SidelineRequestIdentifier id);

/**
* Removes a sideline request from the persistence layer.
* @param id - SidelineRequestIdentifier you want to clear.
* @param partitionId - Partition of the sideline request you want to clear
* @param id Identifier to clear.
* @param consumerPartition consumer partition.
*/
void clearSidelineRequest(SidelineRequestIdentifier id, int partitionId);
void clearSidelineRequest(final SidelineRequestIdentifier id, final ConsumerPartition consumerPartition);

/**
* Lists existing sideline requests.
Expand Down
Loading

0 comments on commit 028ed47

Please sign in to comment.