forked from apache/kafka
/
StandbyTask.java
128 lines (108 loc) · 4.68 KB
/
StandbyTask.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* A StandbyTask
*/
public class StandbyTask extends AbstractTask {
private static final Logger log = LoggerFactory.getLogger(StandbyTask.class);
private final Map<TopicPartition, Long> checkpointedOffsets;
/**
* Create {@link StandbyTask} with its assigned partitions
* @param id the ID of this task
* @param applicationId the ID of the stream processing application
* @param partitions the collection of assigned {@link TopicPartition}
* @param topology the instance of {@link ProcessorTopology}
* @param consumer the instance of {@link Consumer}
* @param restoreConsumer the instance of {@link Consumer} used when restoring state
* @param config the {@link StreamsConfig} specified by the user
* @param metrics the {@link StreamsMetrics} created by the thread
* @param stateDirectory the {@link StateDirectory} created by the thread
*/
public StandbyTask(TaskId id,
String applicationId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
StreamsConfig config,
StreamsMetrics metrics, final StateDirectory stateDirectory) {
super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null);
// initialize the topology with its own context
this.processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
log.info("standby-task [{}] Initializing state stores", id());
initializeStateStores();
((StandbyContextImpl) this.processorContext).initialized();
this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
}
public Map<TopicPartition, Long> checkpointedOffsets() {
return checkpointedOffsets;
}
public Collection<TopicPartition> changeLogPartitions() {
return checkpointedOffsets.keySet();
}
/**
* Updates a state store using records from one change log partition
* @return a list of records not consumed
*/
public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
log.debug("standby-task [{}] Updating standby replicas of its state store for partition [{}]", id(), partition);
return stateMgr.updateStandbyStates(partition, records);
}
public void commit() {
log.debug("standby-task [{}] Committing its state", id());
stateMgr.flush(processorContext);
// reinitialize offset limits
initializeOffsetLimits();
}
@Override
public void close() {
//no-op
}
@Override
public void initTopology() {
//no-op
}
@Override
public void closeTopology() {
//no-op
}
@Override
public void commitOffsets() {
// no-op
}
/**
* Produces a string representation contain useful information about a StreamTask.
* This is useful in debugging scenarios.
* @return A string representation of the StreamTask instance.
*/
public String toString() {
return super.toString();
}
}