Skip to content

Commit

Permalink
calculate start offset for new topology consistently
Browse files Browse the repository at this point in the history
  • Loading branch information
wurstmeister committed Jan 18, 2014
1 parent 80005ba commit 5b764cd
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
9 changes: 9 additions & 0 deletions src/jvm/storm/kafka/KafkaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
}
}


public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
if ( config.forceFromStart ) {
startOffsetTime = config.startOffsetTime;
}
return getOffset(consumer, topic, partition, startOffsetTime);
}

public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
Expand Down
10 changes: 5 additions & 5 deletions src/jvm/storm/kafka/PartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ public PartitionManager(DynamicPartitionConnections connections, String topology
LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
}

if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
LOG.info("No partition information found, using configuration to determine offset");
} else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
LOG.info("Using startOffsetTime to choose last commit offset.");
} else if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, kafka.api.OffsetRequest.LatestTime());
LOG.info("Setting last commit offset to HEAD.");
LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
} else {
_committedTo = jsonOffset;
LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
Expand Down
6 changes: 1 addition & 5 deletions src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,7 @@ private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition
offset = (Long) lastMeta.get("nextOffset");
}
} else {
long startTime = kafka.api.OffsetRequest.LatestTime();
if (_config.forceFromStart) {
startTime = _config.startOffsetTime;
}
offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, startTime);
offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
}
ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
long endoffset = offset;
Expand Down

0 comments on commit 5b764cd

Please sign in to comment.