Skip to content

Commit

Permalink
KAFKA-3559: Recycle old tasks when possible
Browse files Browse the repository at this point in the history
Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes apache#2032 from enothereska/KAFKA-3559-onPartitionAssigned
  • Loading branch information
enothereska authored and guozhangwang committed Oct 30, 2016
1 parent a4ab9d0 commit 29ea4b0
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 50 deletions.
Expand Up @@ -110,6 +110,9 @@ public final ThreadCache cache() {

public abstract void close();

public abstract void initTopology();
public abstract void closeTopology();

public abstract void commitOffsets();

/**
Expand Down
Expand Up @@ -170,4 +170,11 @@ public void close() {
queuesByTime.clear();
partitionQueues.clear();
}

public void clear() {
queuesByTime.clear();
for (RecordQueue queue : partitionQueues.values()) {
queue.clear();
}
}
}
Expand Up @@ -176,4 +176,11 @@ public boolean isEmpty() {
public long timestamp() {
return partitionTime;
}

/**
* Clear the fifo queue of its elements
*/
public void clear() {
fifoQueue.clear();
}
}
Expand Up @@ -102,6 +102,16 @@ public void close() {
//no-op
}

@Override
public void initTopology() {
//no-op
}

@Override
public void closeTopology() {
//no-op
}

@Override
public void commitOffsets() {
// no-op
Expand Down
Expand Up @@ -52,6 +52,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
private final PartitionGroup partitionGroup;
private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
private final PunctuationQueue punctuationQueue;
private final Map<TopicPartition, RecordQueue> partitionQueues;

private final Map<TopicPartition, Long> consumedOffsets;
private final RecordCollector recordCollector;
Expand Down Expand Up @@ -93,7 +94,7 @@ public StreamTask(TaskId id,

// create queues for each assigned partition and associate them
// to corresponding source nodes in the processor topology
Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
partitionQueues = new HashMap<>();

for (TopicPartition partition : partitions) {
SourceNode source = topology.source(partition.topic());
Expand All @@ -119,16 +120,7 @@ public StreamTask(TaskId id,
log.info("{} Initializing state stores", logPrefix);
initializeStateStores();

// initialize the task by initializing all its processor nodes in the topology
log.info("{} Initializing processor nodes of the topology", logPrefix);
for (ProcessorNode node : this.topology.processors()) {
this.currNode = node;
try {
node.init(this.processorContext);
} finally {
this.currNode = null;
}
}
initTopology();

((ProcessorContextImpl) this.processorContext).initialized();
}
Expand Down Expand Up @@ -328,15 +320,24 @@ public void schedule(long interval) {
punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
}

/**
* @throws RuntimeException if an error happens during closing of processor nodes
*/
@Override
public void close() {
log.debug("{} Closing processor topology", logPrefix);
public void initTopology() {
// initialize the task by initializing all its processor nodes in the topology
log.info("{} Initializing processor nodes of the topology", logPrefix);
for (ProcessorNode node : this.topology.processors()) {
this.currNode = node;
try {
node.init(this.processorContext);
} finally {
this.currNode = null;
}
}
}

this.partitionGroup.close();
this.consumedOffsets.clear();
@Override
public void closeTopology() {

this.partitionGroup.clear();

// close the processors
// make sure close() is called for each node even when there is a RuntimeException
Expand All @@ -357,6 +358,18 @@ public void close() {
}
}

/**
* @throws RuntimeException if an error happens during closing of processor nodes
*/
@Override
public void close() {
log.debug("{} Closing processor topology", logPrefix);

this.partitionGroup.close();
this.consumedOffsets.clear();
closeTopology();
}

@Override
protected Map<TopicPartition, Long> recordCollectorOffsets() {
return recordCollector.offsets();
Expand Down

0 comments on commit 29ea4b0

Please sign in to comment.