Skip to content

Commit

Permalink
Merge pull request #1169 from zalando/ARUHA-2687
Browse files Browse the repository at this point in the history
ARUHA-2687 Fix cursor comparison for autocommit
  • Loading branch information
antban committed Feb 21, 2020
2 parents c247f6e + 039b6ea commit 2218cbd
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
Expand Up @@ -106,7 +106,7 @@ private StreamingContext(final Builder builder) {
this.eventTypeChangeListener = builder.eventTypeChangeListener;
this.cursorComparator = builder.cursorComparator;
this.kpiPublisher = builder.kpiPublisher;
this.autocommitSupport = new AutocommitSupport(builder.cursorOperationsService, zkClient);
this.autocommitSupport = new AutocommitSupport(builder.cursorOperationsService, zkClient, cursorConverter);
this.kpiDataStreamedEventType = builder.kpiDataStremedEventType;
this.kpiCollectionFrequencyMs = builder.kpiCollectionFrequencyMs;
this.streamMemoryLimitBytes = builder.streamMemoryLimitBytes;
Expand Down
Expand Up @@ -4,6 +4,7 @@
import org.slf4j.LoggerFactory;
import org.zalando.nakadi.domain.EventTypePartition;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.CursorOperationsService;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;
Expand All @@ -18,13 +19,17 @@ public class AutocommitSupport {
private final CursorOperationsService cursorOperationsService;
private final ZkSubscriptionClient zkSubscriptionClient;
private final Map<EventTypePartition, PartitionSkippedCursorsOperator> partitionsState = new HashMap<>();
private final CursorConverter cursorConverter;

private static final Logger LOG = LoggerFactory.getLogger(AutocommitSupport.class);

public AutocommitSupport(
final CursorOperationsService cursorOperationsService,
final ZkSubscriptionClient zkSubscriptionClient) {
final ZkSubscriptionClient zkSubscriptionClient,
final CursorConverter cursorConverter) {
this.cursorOperationsService = cursorOperationsService;
this.zkSubscriptionClient = zkSubscriptionClient;
this.cursorConverter = cursorConverter;
}

public void addPartition(final NakadiCursor committed) {
Expand Down Expand Up @@ -82,7 +87,7 @@ public void autocommit() {
}

final List<SubscriptionCursorWithoutToken> converted = toAutocommit.stream()
.map(v -> new SubscriptionCursorWithoutToken(v.getEventType(), v.getPartition(), v.getOffset()))
.map(cursorConverter::convertToNoToken)
.collect(Collectors.toList());
zkSubscriptionClient.commitOffsets(converted);
}
Expand Down
Expand Up @@ -6,25 +6,32 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.zalando.nakadi.domain.EventTypePartition;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.service.CursorConverter;
import org.zalando.nakadi.service.CursorOperationsService;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.view.SubscriptionCursorWithoutToken;

import java.util.Arrays;
import java.util.List;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class AutocommitSupportTest {

@Mock
private ZkSubscriptionClient zkClientMock;
@Mock
private CursorOperationsService cursorOperationsService;
@Mock
private CursorConverter cursorConverter;
private AutocommitSupport autocommitSupport;
private EventTypePartition etp1;
private NakadiCursor[] etp1Cursors;
Expand All @@ -37,7 +44,7 @@ public class AutocommitSupportTest {
@Before
public void before() {
MockitoAnnotations.initMocks(this);
autocommitSupport = new AutocommitSupport(cursorOperationsService, zkClientMock);
autocommitSupport = new AutocommitSupport(cursorOperationsService, zkClientMock, cursorConverter);

etp1 = new EventTypePartition("t", "p1");
etp1Cursors = mockCursors(etp1, LongStream.range(0, 10).toArray());
Expand All @@ -46,6 +53,14 @@ public void before() {
etp2 = new EventTypePartition("t", "p2");
etp2Cursors = mockCursors(etp2, LongStream.range(0, 10).toArray());
autocommitSupport.addPartition(etp2Cursors[0]);

Stream.concat(Arrays.stream(etp1Cursors), Arrays.stream(etp2Cursors))
.forEach(c -> {
final SubscriptionCursorWithoutToken result = new SubscriptionCursorWithoutToken(
c.getEventType(), c.getPartition(), c.getOffset());
when(cursorConverter.convertToNoToken(Mockito.eq(c)))
.thenReturn(result);
});
}

@Test
Expand Down

0 comments on commit 2218cbd

Please sign in to comment.