Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #960 from zalando/aruha-1804-bugfix
Browse files Browse the repository at this point in the history
Bug Fix 1804
  • Loading branch information
Kunal-Jha committed Oct 15, 2018
2 parents 885a8c8 + 756fd9e commit 3f65fd8
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 81 deletions.
7 changes: 6 additions & 1 deletion docs/_data/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,12 @@ paths:
- $ref: '#/parameters/SubscriptionId'
- name: show_time_lag
in: query
description: show consumer time lag
description: |
Shows consumer time lag as an optional field.
This option is a time consuming operation and Nakadi attempts to compute it in the best possible strategy.
In cases of failures, resulting in Nakadi being unable to compute it within a configurable timeout,
the field might either be partially present or not present (depending on the number of successful requests)
in the output.
type: boolean
default: false
responses:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
import org.zalando.nakadi.exceptions.runtime.InternalNakadiException;
import org.zalando.nakadi.exceptions.runtime.InvalidLimitException;
import org.zalando.nakadi.exceptions.runtime.InvalidVersionNumberException;
import org.zalando.nakadi.exceptions.runtime.LimitReachedException;
import org.zalando.nakadi.exceptions.runtime.NakadiBaseException;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.runtime.NoSuchSubscriptionException;
import org.zalando.nakadi.exceptions.runtime.RepositoryProblemException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.exceptions.runtime.UnprocessableEntityException;
import org.zalando.problem.MoreStatus;
import org.zalando.problem.Problem;
import org.zalando.problem.spring.web.advice.ProblemHandling;
import org.zalando.problem.spring.web.advice.Responses;
Expand Down Expand Up @@ -117,13 +115,6 @@ public ResponseEntity<Problem> handleInternalError(final NakadiBaseException exc
return Responses.create(Response.Status.INTERNAL_SERVER_ERROR, exception.getMessage(), request);
}

@ExceptionHandler(LimitReachedException.class)
public ResponseEntity<Problem> handleLimitReachedException(
final ServiceTemporarilyUnavailableException exception, final NativeWebRequest request) {
LOG.warn(exception.getMessage());
return Responses.create(MoreStatus.TOO_MANY_REQUESTS, exception.getMessage(), request);
}

@ExceptionHandler(DbWriteOperationsBlockedException.class)
public ResponseEntity<Problem> handleDbWriteOperationsBlockedException(
final DbWriteOperationsBlockedException exception, final NativeWebRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.runtime.NoSuchSubscriptionException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.exceptions.runtime.TimeLagStatsTimeoutException;
import org.zalando.nakadi.service.FeatureToggleService;
import org.zalando.nakadi.service.subscription.SubscriptionService;
import org.zalando.nakadi.service.subscription.SubscriptionService.StatsMode;
import org.zalando.problem.Problem;
import org.zalando.problem.spring.web.advice.Responses;

import javax.annotation.Nullable;
import javax.ws.rs.core.Response;
import java.util.Set;

import static javax.ws.rs.core.Response.Status.NOT_IMPLEMENTED;
Expand Down Expand Up @@ -122,12 +120,4 @@ public ResponseEntity<Problem> handleServiceUnavailableResponses(final NakadiBas
exception.getMessage()),
request);
}

@ExceptionHandler(TimeLagStatsTimeoutException.class)
public ResponseEntity<Problem> handleTimeLagStatsTimeoutException(final TimeLagStatsTimeoutException e,
final NativeWebRequest request) {
LOG.warn(e.getMessage());
return Responses.create(Response.Status.REQUEST_TIMEOUT, e.getMessage(), request);
}

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.zalando.nakadi.exceptions.runtime.SubscriptionUpdateConflictException;
import org.zalando.nakadi.exceptions.runtime.TooManyPartitionsException;
import org.zalando.nakadi.exceptions.runtime.WrongInitialCursorsException;

import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.repository.TopicRepository;
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.google.common.collect.ImmutableList;
import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.zalando.nakadi.domain.ConsumedEvent;
Expand All @@ -11,9 +13,6 @@
import org.zalando.nakadi.exceptions.runtime.ErrorGettingCursorTimeLagException;
import org.zalando.nakadi.exceptions.runtime.InconsistentStateException;
import org.zalando.nakadi.exceptions.runtime.InvalidCursorException;
import org.zalando.nakadi.exceptions.runtime.LimitReachedException;
import org.zalando.nakadi.exceptions.runtime.NakadiBaseException;
import org.zalando.nakadi.exceptions.runtime.TimeLagStatsTimeoutException;
import org.zalando.nakadi.repository.EventConsumer;
import org.zalando.nakadi.service.NakadiCursorComparator;
import org.zalando.nakadi.service.timeline.TimelineService;
Expand All @@ -39,7 +38,7 @@

@Component
public class SubscriptionTimeLagService {

private static final Logger LOG = LoggerFactory.getLogger(SubscriptionTimeLagService.class);
private static final int EVENT_FETCH_WAIT_TIME_MS = 1000;
private static final int REQUEST_TIMEOUT_MS = 30000;
private static final int MAX_THREADS_PER_REQUEST = 20;
Expand All @@ -59,9 +58,7 @@ public SubscriptionTimeLagService(final TimelineService timelineService,
}

public Map<EventTypePartition, Duration> getTimeLags(final Collection<NakadiCursor> committedPositions,
final List<PartitionEndStatistics> endPositions)
throws ErrorGettingCursorTimeLagException, InconsistentStateException, LimitReachedException,
TimeLagStatsTimeoutException {
final List<PartitionEndStatistics> endPositions) {

final TimeLagRequestHandler timeLagHandler = new TimeLagRequestHandler(timelineService, threadPool);
final Map<EventTypePartition, Duration> timeLags = new HashMap<>();
Expand All @@ -82,21 +79,12 @@ public Map<EventTypePartition, Duration> getTimeLags(final Collection<NakadiCurs
for (final EventTypePartition partition : futureTimeLags.keySet()) {
timeLags.put(partition, futureTimeLags.get(partition).get());
}
return timeLags;
} catch (final RejectedExecutionException e) {
throw new LimitReachedException("Time lag statistics thread pool exhausted", e);
} catch (final TimeoutException e) {
throw new TimeLagStatsTimeoutException("Timeout exceeded for time lag statistics", e);
} catch (final ExecutionException e) {
if (e.getCause() instanceof NakadiBaseException) {
throw (NakadiBaseException) e.getCause();
} else {
throw new InconsistentStateException("Unexpected error occurred when getting subscription time lag",
e.getCause());
}
} catch (final Throwable e) {
throw new InconsistentStateException("Unexpected error occurred when getting subscription time lag", e);
} catch (RejectedExecutionException | TimeoutException | ExecutionException e) {
LOG.warn("caught exception the timelag stats are not complete - " + e);
} catch (Throwable e) {
LOG.warn("caught throwable the timelag stats are not complete - " + e);
}
return timeLags;
}

private boolean isCursorAtTail(final NakadiCursor cursor, final List<PartitionEndStatistics> endPositions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
import org.zalando.nakadi.domain.PartitionEndStatistics;
import org.zalando.nakadi.domain.Storage;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.runtime.ErrorGettingCursorTimeLagException;
import org.zalando.nakadi.exceptions.runtime.InconsistentStateException;
import org.zalando.nakadi.exceptions.runtime.InvalidCursorException;
import org.zalando.nakadi.repository.EventConsumer;
import org.zalando.nakadi.service.subscription.SubscriptionTimeLagService;
Expand All @@ -24,6 +21,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -79,28 +77,15 @@ public void testTimeLagsForTailAndNotTailPositions() throws InvalidCursorExcepti
}


@Test(expected = InconsistentStateException.class)
@SuppressWarnings("unchecked")
public void whenNakadiRuntimeExceptionThenInconsistentStateExceptionIsThrown()
throws InvalidCursorException {
when(timelineService.createEventConsumer(any(), any())).thenThrow(NakadiRuntimeException.class);

final Timeline et1Timeline = new Timeline("et1", 0, new Storage("", Storage.Type.KAFKA), "t1", null);
final NakadiCursor committedCursor1 = NakadiCursor.of(et1Timeline, "p1", "o1");

timeLagService.getTimeLags(ImmutableList.of(committedCursor1), ImmutableList.of());
}

@Test(expected = ErrorGettingCursorTimeLagException.class)
@SuppressWarnings("unchecked")
public void whenInvalidCursorThenErrorGettingCursorTimeLagExceptionIsThrown()
throws InvalidCursorException {
when(timelineService.createEventConsumer(any(), any())).thenThrow(InvalidCursorException.class);

@Test
public void whenNoSubscriptionThenReturnSizeZeroMap() {
when(timelineService.createEventConsumer(any(), any())).thenReturn(null);
final Timeline et1Timeline = new Timeline("et1", 0, new Storage("", Storage.Type.KAFKA), "t1", null);
final NakadiCursor committedCursor1 = NakadiCursor.of(et1Timeline, "p1", "o1");

timeLagService.getTimeLags(ImmutableList.of(committedCursor1), ImmutableList.of());
final Map<EventTypePartition, Duration> result = timeLagService.getTimeLags
(ImmutableList.of(committedCursor1), ImmutableList.of());
assertThat(result.size(), is(0));
}

private PartitionEndStatistics mockEndStats(final NakadiCursor nakadiCursor) {
Expand Down

0 comments on commit 3f65fd8

Please sign in to comment.