/
AutocommitSupport.java
94 lines (83 loc) · 3.85 KB
/
AutocommitSupport.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package org.zalando.nakadi.service.subscription.autocommit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zalando.nakadi.domain.EventTypePartition;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.CursorOperationsService;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class AutocommitSupport {
private final CursorOperationsService cursorOperationsService;
private final ZkSubscriptionClient zkSubscriptionClient;
private final Map<EventTypePartition, PartitionSkippedCursorsOperator> partitionsState = new HashMap<>();
private final CursorConverter cursorConverter;
private static final Logger LOG = LoggerFactory.getLogger(AutocommitSupport.class);
public AutocommitSupport(
final CursorOperationsService cursorOperationsService,
final ZkSubscriptionClient zkSubscriptionClient,
final CursorConverter cursorConverter) {
this.cursorOperationsService = cursorOperationsService;
this.zkSubscriptionClient = zkSubscriptionClient;
this.cursorConverter = cursorConverter;
}
public void addPartition(final NakadiCursor committed) {
if (partitionsState.containsKey(committed.getEventTypePartition())) {
return;
}
partitionsState.put(
committed.getEventTypePartition(),
new PartitionSkippedCursorsOperator(cursorOperationsService, committed));
}
public void removePartition(final EventTypePartition eventTypePartition) {
try {
autocommit();
} catch (RuntimeException ex) {
LOG.warn("Failed to execute autocommit while removing partition {}", eventTypePartition, ex);
}
partitionsState.remove(eventTypePartition);
}
public void addSkippedEvent(final NakadiCursor cursor) {
final PartitionSkippedCursorsOperator partitionSkippedCursorsOperator =
partitionsState.get(cursor.getEventTypePartition());
if (null != partitionSkippedCursorsOperator) {
partitionSkippedCursorsOperator.addSkippedEvent(cursor);
}
}
public void onCommit(final NakadiCursor cursor) {
final PartitionSkippedCursorsOperator partitionSkippedCursorsOperator =
partitionsState.get(cursor.getEventTypePartition());
if (null != partitionSkippedCursorsOperator) {
partitionSkippedCursorsOperator.onCommit(cursor);
}
}
// As skipped events are not participating in memory consumption or any other stuff, we can safely call
// this method as often as we want (once a second, once a minute or whatever), consumption of main (non-skipped)
// data is not slowing down because of autocommit not being called.
// The only limitation is about monitoring - the less times it is called -> the less accurate monitoring is.
public void autocommit() {
List<NakadiCursor> toAutocommit = null;
for (final PartitionSkippedCursorsOperator state : partitionsState.values()) {
final NakadiCursor c = state.getAutoCommitSuggestion();
if (null == c) {
continue;
}
if (null == toAutocommit) {
toAutocommit = new ArrayList<>();
}
toAutocommit.add(c);
}
if (null == toAutocommit) {
return;
}
final List<SubscriptionCursorWithoutToken> converted = toAutocommit.stream()
.map(cursorConverter::convertToNoToken)
.collect(Collectors.toList());
zkSubscriptionClient.commitOffsets(converted);
}
}