New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARUHA-1664: Subscription time lag stats #873

Merged
merged 25 commits into from May 25, 2018

Conversation

4 participants
@v-stepanov
Member

v-stepanov commented May 8, 2018

Extended subscription statistics endpoint with optional time-lag information.

Zalando ticket: ARUHA-1664

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 8, 2018

deploy validation please

@codecov-io

This comment has been minimized.

codecov-io commented May 8, 2018

Codecov Report

Merging #873 into master will increase coverage by 0.21%.
The diff coverage is 71.3%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master     #873      +/-   ##
============================================
+ Coverage     53.39%   53.61%   +0.21%     
- Complexity     1695     1710      +15     
============================================
  Files           310      314       +4     
  Lines          9403     9507     +104     
  Branches        845      857      +12     
============================================
+ Hits           5021     5097      +76     
- Misses         4082     4103      +21     
- Partials        300      307       +7
Impacted Files Coverage Δ Complexity Δ
...zalando/nakadi/service/NakadiCursorComparator.java 65.71% <ø> (ø) 9 <0> (ø) ⬇️
...exceptions/runtime/InconsistentStateException.java 100% <ø> (ø) 2 <0> (ø) ⬇️
...g/zalando/nakadi/controller/ExceptionHandling.java 50% <0%> (-2.5%) 12 <0> (ø)
...ceptions/runtime/TimeLagStatsTimeoutException.java 0% <0%> (ø) 0 <0> (?)
...kadi/exceptions/runtime/LimitReachedException.java 0% <0%> (ø) 0 <0> (?)
...ando/nakadi/domain/SubscriptionEventTypeStats.java 96.29% <100%> (+0.29%) 3 <0> (ø) ⬇️
...o/nakadi/repository/kafka/NakadiKafkaConsumer.java 75.86% <100%> (ø) 8 <0> (ø) ⬇️
.../java/org/zalando/nakadi/domain/ConsumedEvent.java 62.5% <100%> (+12.5%) 6 <2> (+2) ⬆️
...ando/nakadi/controller/SubscriptionController.java 51.61% <37.5%> (-9.93%) 10 <1> (ø)
...kadi/service/subscription/SubscriptionService.java 75% <70.58%> (+0.43%) 37 <0> (ø) ⬇️
... and 8 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 81bd9f9...dbd04d9. Read the comment docs.

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 8, 2018

deploy validation please

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 9, 2018

deploy validation please

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 9, 2018

deploy validation please

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 9, 2018

deploy validation please

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 9, 2018

deploy validation please

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 9, 2018

deploy validation please

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 9, 2018

deploy validation please

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 9, 2018

deploy validation please

@v-stepanov v-stepanov changed the title from ARUHA-1664: added debug loggin of events timestamp; to ARUHA-1664: Subscription time lag stats May 11, 2018

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 11, 2018

deploy validation please

1 similar comment
@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 11, 2018

deploy validation please

InconsistentStateException {
try {
final EventConsumer consumer = timelineService.createEventConsumer(
"time-lag-checker-" + UUID.randomUUID().toString(), ImmutableList.of(cursor));

This comment has been minimized.

@rcillo

rcillo May 14, 2018

Member

I did not even remember this "identification" feature still existed. I thought it was "commented out" as an emergency reaction to an incident which we never followed up.

This comment has been minimized.

@v-stepanov

v-stepanov May 17, 2018

Member

It is commented out in a place where this parameter is passed to kafka consumer.

final EventConsumer consumer = timelineService.createEventConsumer(
"time-lag-checker-" + UUID.randomUUID().toString(), ImmutableList.of(cursor));
final ConsumedEvent nextEvent = executeWithRetry(

This comment has been minimized.

@rcillo

rcillo May 14, 2018

Member

I thought this execute with retries was exclusively used by tests. Would you consider adding a parameter to readEvents to pass the amount of time you are willing to wait? Then you give control to that method and let it use Kafka to manage it. I think that KafkaConsumer accepts some parameters that define precisely "fetch_wait_time_ms". You could use it, when building a new KafkaConsumer. Then you just need to pool and read whatever comes with it.

This comment has been minimized.

@v-stepanov

v-stepanov May 14, 2018

Member

I think I was just lazy to do all that refactoring to be able to pass that parameter :)
I will do it

This comment has been minimized.

@rcillo

rcillo May 14, 2018

Member

@v-stepanov I think it might be there already as the last parameter in the constructor. It's called "pollTimeout"

This comment has been minimized.

@v-stepanov

v-stepanov May 17, 2018

Member

I checked that again, my fear about increasing this timeout to 1 second is that I will get a huge chunk of events. I think with smaller timeout I have a better chance of getting small amount of events.

private Duration getNextEventTimeLag(final NakadiCursor cursor) throws ErrorGettingCursorTimeLagException,
InconsistentStateException {
try {
final EventConsumer consumer = timelineService.createEventConsumer(

This comment has been minimized.

@rcillo

rcillo May 14, 2018

Member

Creating a consumer is an expensive thing. Would you consider to try to "pack" many partitions in a single consumer? I'm not sure we have control over independent partitions reading, but I would like to ask you anyway, why did you decide not to do it.

This comment has been minimized.

@v-stepanov

v-stepanov May 14, 2018

Member

The problem here is exactly no control over reading from specific partitions. So if I create a consumer for many partitions I don't know when I will get data for some specific partition. It may happen that I will have to read a lot of data from partition A before I get at least something for partition B.
That's why I decided to make a consumer per partition. And as that happens in multi-threaded way - I can't share the same consumer of course.

This comment has been minimized.

@rcillo

rcillo May 14, 2018

Member

Ok, got it.

This comment has been minimized.

@rcillo

rcillo May 14, 2018

Member

@v-stepanov I was checking Kafka Consumer parameters and maybe we can create a single consumer that will read a single event from each partition by using max.partition.fetch.bytes = 1 http://kafka.apache.org/documentation.html#consumerconfigs

This comment has been minimized.

@v-stepanov

v-stepanov May 17, 2018

Member

Well, I tried that, but it doesn't really work: I get a lot of events per partition.

This comment has been minimized.

@v-stepanov

v-stepanov May 17, 2018

Member

Another thing is that it's not so nice to pass kafka-specific parameter from a place which is abstracted from kafka.

new RetryForSpecifiedTimeStrategy<ConsumedEvent>(EVENT_FETCH_WAIT_TIME_MS)
.withResultsThatForceRetry((ConsumedEvent) null));
if (nextEvent == null) {

This comment has been minimized.

@rcillo

rcillo May 14, 2018

Member

Shouldn't this be an error condition? If I understood correctly, before invoking this method you check if the cursor is not the END. So there should always be a "next". Then this should throw an error because we were not able to fetch the event on time.

This comment has been minimized.

@v-stepanov

v-stepanov May 17, 2018

Member

I agree. Fixed 36ff06a

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SubscriptionTimLagServiceTest {

This comment has been minimized.

@rcillo

rcillo May 14, 2018

Member

There's a typo in TimLag.

This comment has been minimized.

@v-stepanov
}
@Test
public void testTimeLagsForTailAndNotTailPositions() throws NakadiException, InvalidCursorException {

This comment has been minimized.

@rcillo

rcillo May 14, 2018

Member

I like the tests that exercise the exception conditions but I don't like this one. I think we could achieve a similar result by introducing some more testing code to a user journey or another acceptance test. We could have a situation where one partition has everything committed and another one doesn't. Then we can check that we get Zero for the former and something bigger than zero for the later.

This comment has been minimized.

@v-stepanov

v-stepanov May 17, 2018

Member

Why don't you like this test? I mean it tests exactly one thing - that for cursors at tail it returns zero duration and for cursors not at tail it reads next event from consumer.

The problem with acceptance tests is that they execute for more than 8 minutes already. If we keep adding acceptance tests for each case they will soon become unusable (I think they almost reached this point). The test you suggest to make will take up to ~5 seconds, which is not so cheap.

final ConsumedEvent nextEvent = executeWithRetry(
() -> {
final List<ConsumedEvent> events = consumer.readEvents();

This comment has been minimized.

@rcillo

rcillo May 14, 2018

Member

@v-stepanov maybe we should always close the consumer one we do not plan to use it anymore. This consumer is quite crazy and it keeps doing some stuff until being garbage collected.

This comment has been minimized.

@v-stepanov

v-stepanov May 17, 2018

Member

Oh yes! Thanks for noticing. Fixed 36ff06a

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 17, 2018

deploy validation please

throws InconsistentStateException, ServiceTemporarilyUnavailableException {
final List<EventType> eventTypes = getEventTypesForSubscription(subscription);
final ZkSubscriptionClient subscriptionClient = createZkSubscriptionClient(subscription);
final Optional<ZkSubscriptionNode> zkSubscriptionNode = subscriptionClient.getZkSubscriptionNode();
if (includeDistance) {
return loadStats(eventTypes, zkSubscriptionNode, subscriptionClient);
return loadStats(eventTypes, zkSubscriptionNode, subscriptionClient, showTimeLag);

This comment has been minimized.

@antban

antban May 18, 2018

Member

I took a look on a code, and it seems that if showTimeLag is true, than includeDistance is always true.
But exactely this if looks like a bug.

This comment has been minimized.

@antban

antban May 18, 2018

Member

I mean includeDistance and showTimeLag are connected and are generating 4 possible combinations, but only 3 of them supported. After some time we will forget about it and it will be perfect example of spagetti code

This comment has been minimized.

@v-stepanov

v-stepanov May 18, 2018

Member

Yes, I agree. I will try to replace these two booleans with enum.

This comment has been minimized.

@v-stepanov
final List<PartitionEndStatistics> endPositions)
throws ErrorGettingCursorTimeLagException, InconsistentStateException {
final ExecutorService executor = Executors.newFixedThreadPool(LAG_CALCULATION_PARALLELISM);

This comment has been minimized.

@antban

antban May 18, 2018

Member

Does it make sense to constantly create threads in packs of 10?
Why not to use single CachedThreadPool for this service?

This comment has been minimized.

@v-stepanov

v-stepanov May 18, 2018

Member

Fixed thread pool will create 10 Threads only if they are really needed (subscription has >= 10 partitions). It will anyway add threads 1 by 1 until reaches 10. The only problem it has is that it will never decrease thread num after that, but for us it's not a problem because the lifetime of this threadpool is very short.

And I don't see how to limit CachedThreadPool size.

This comment has been minimized.

@antban

antban May 18, 2018

Member

@v-stepanov
Sometimes I feel myself as alien. I mean why not to use single cached thread pool for instance, not for request

new ThreadPoolExecutor(
    0, // Minimum number of threads
    1000, // Maximum number of threads
    60L, // Idle timeout to kill threads after
    TimeUnit.SECONDS, // Idle timeout unit
    new LinkedBlockingQueue<>()) // Queue, to which tasks are scheduled

Let's suppose that I am doing 2000 requests targeting 8000 partitions. You will create definitely more than 2000 threads, at most - 8000 (will be pretty close to max), and after starting - will destroy it. With single cached thread pool you will create up to max parallel requests* avg partition count = 20 *4 = 80 (it is really expected value, in total).
We can calculate difference and think about time spent for threads creation/termination.

I don't really see the reason of single request limit to 10 threads. You are just saying that overall I have possibility of infinite thread creation (across different requests), but I will not try to accomplish fast this huge request, because I do not want to.
This 10 limit will not save instance from 1000 concurrent requests, but will definitely make some requests slower.
Also, having limit of threads dedicated for this operation will guarantee that nakadi will not be overloaded with this amount of requests.

This comment has been minimized.

@v-stepanov

v-stepanov May 23, 2018

Member

@antban I reworked it.
Now there is a common thread pool. But I anyway implemented the throttling per request (one request will not occupy more than 20 threads).
I hope this makes you satisfied.

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 23, 2018

deploy validation please

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 23, 2018

deploy validation please

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 24, 2018

@antban I updated PR according to your offline comments.

@antban

This comment has been minimized.

Member

antban commented May 24, 2018

👍

Merge remote-tracking branch 'origin/master' into ARUHA-1664-time-lag…
…-stats

# Conflicts:
#	CHANGELOG.md
#	src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java
@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 24, 2018

deploy validation please

@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 24, 2018

👍

@antban

This comment has been minimized.

Member

antban commented May 25, 2018

👍

1 similar comment
@v-stepanov

This comment has been minimized.

Member

v-stepanov commented May 25, 2018

👍

@v-stepanov v-stepanov merged commit 322819c into master May 25, 2018

6 of 7 checks passed

Codacy/PR Quality Review Not up to standards. This pull request quality could be better.
Details
codecov/patch 71.3% of diff hit (target 53.39%)
Details
codecov/project 53.61% (+0.21%) compared to 81bd9f9
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details
zappr Approvals: @antban, @v-stepanov.
zappr/pr/specification PR has passed specification checks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment