Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aruha 456 block consumers #489

Merged
merged 39 commits into from Dec 13, 2016
Merged

Aruha 456 block consumers #489

merged 39 commits into from Dec 13, 2016

Conversation

v-stepanov
Copy link
Contributor

@v-stepanov v-stepanov commented Nov 22, 2016

@antban @rcillo @adyach @v1ctor @SergKam please review
(It was not yet tested properly, i will create a separate stack and will test it there, so if you notice some bug in the code - let me know :)

I have no idea how this small feature could grow to 700 lines on code :(

Concept document moved to wiki: https://github.com/zalando/nakadi/wiki/Too-much-consumers-from-a-single-application

@codecov-io
Copy link

codecov-io commented Nov 22, 2016

Current coverage is 59.07% (diff: 14.41%)

Merging #489 into master will decrease coverage by 2.29%

@@             master       #489   diff @@
==========================================
  Files           159        164     +5   
  Lines          4279       4489   +210   
  Methods           0          0          
  Messages          0          0          
  Branches        481        500    +19   
==========================================
+ Hits           2626       2652    +26   
- Misses         1488       1670   +182   
- Partials        165        167     +2   

Powered by Codecov. Last update 17ebb2a...cc44af0

@adyach
Copy link
Member

adyach commented Nov 22, 2016

@v-stepanov are you going to move concept document to wiki and post the link here?

import java.util.stream.Collectors;

import static java.text.MessageFormat.format;
import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we agree on not to use static imports ? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hell no! :)
We had a balance of -1 in our radar which doesn't allow neither adopt nor forbid that.

throws ConnectionSlotOccupiedException {

final String parent = zkPathForConsumer(client, eventType, partition);
List<String> children = ImmutableList.of();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can init it only in case of KeeperException.NoNodeException ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Fixed 37f2d31

.filter(slot -> !occupiedSlots.contains(slot))
.collect(Collectors.toList());
final int slotIndex = new Random().nextInt(availableSlots.size());
final String slot = availableSlots.get(slotIndex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with this approach that the change of nakadi.stream.maxConnections to value grater than 5 will cause an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good catch. I fixed it dedca7c

for (final String partition : partitions) {
Optional<ConnectionSlot> connectionSlot = Optional.empty();
try {
connectionSlot = executeWithRetry(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for me it is not clear why should we retry here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you see the implementation doesn't use locks - because that puts additional load on ZK and also lock nodes are not cleaned after. So to avoid of creation of more than 5 connections the implementation uses named slots ("1", "2", "3", "4", "5"). And the possible situation is that two instances will try to capture the same free slot at the same time. In that case we should retry to try capture another free slot.
(Maybe not the best explanation but feel free to approach me so that I can describe how it works on board)

@v-stepanov
Copy link
Contributor Author

@adyach I moved now the concept document to project wiki: https://github.com/zalando/nakadi/wiki/Too-much-consumers-from-a-single-application

@@ -327,6 +334,52 @@ private Response readEvents() {
}

@Test(timeout = 10000)
public void whenExceedMaxConsumersNumThen429() throws IOException, InterruptedException, ExecutionException,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@v-stepanov it would be good to have a test case for when a connection is closed. It should release one slot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extended this test. Now it also checks if one slot is released when connection is closed. c8f0987


public static final String CONNECTIONS_ZK_PATH = "/nakadi/consumers/connections";

private static final Logger LOG = LoggerFactory.getLogger(CursorsService.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/CursorsService/ConsumerLimitingService/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I fixed that now d82978a

try {
connectionSlot = executeWithRetry(
() -> acquireConnectionSlot(client, eventType, partition),
new RetryForSpecifiedCountStrategy<Optional<ConnectionSlot>>(5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of retries should be at least maxConnections and not a fixed 5.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood correctly, slots number starts at 0 and goes up to maxConnections . If there is concurrency when getting 0, the next attempt will be to get 1, and so on. So if in the future we increase maxConnections to 10, for example, this algorithm would stop trying on the fifith slot and reply with 429 even though there were another 5 slots.

Makes sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not exactly like you described. The client tries to capture one of FREE slots (so he doesn't try blindly), and also he chooses a random free slot (not the lowest one). So the retry will happen only if two clients were trying to capture the same free slot at the same time.
But anyway I see your point. Will change the number of retries to maxConnection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed fde52a9

@rcillo
Copy link
Contributor

rcillo commented Nov 23, 2016

👍

}
}

public void deletePartitionNodeIfPossible(final String nodePath) {
Copy link
Contributor

@antban antban Nov 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this method can be probably renamed to deleteAnyZkNode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

@rcillo
Copy link
Contributor

rcillo commented Nov 30, 2016

I see no more pending comments.

@rcillo
Copy link
Contributor

rcillo commented Nov 30, 2016

:shipit:


private List<String> getChildrenCached(final String zkPath) {
try {
PathChildrenCache cache = SLOTS_CACHES.getOrDefault(zkPath, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you want to use curator TreeCache for that? In that case you won't store the list of caches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really need to cache all the nodes' children in "/nakadi/consumers/connections"
I'm only interested in nodes's children for connections that exist in this Nakadi instance.
So the TreeCache is too much for me in this case, it will create a huge amount of watchers which I really don't need and that will put a very high load on ZK.

if (cache == null) {
cache = new PathChildrenCache(zkHolder.get(), zkPath, false);
cache.start(BUILD_INITIAL_CACHE);
SLOTS_CACHES.put(zkPath, cache);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of two threads are in this if block It is possible that you put one cache into the map then other thread get it in line 205 but after another thread in if block will replace it with new object.
I suggest to use putIfAbsent or even replace SLOTS_CACHES.getOrDefault(zkPath, null) with computeIfAbsent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point! Thanks. I fixed it now the way you suggested f41ca10

&& s.getClient().equals(slot.getClient())
&& s.getEventType().equals(slot.getEventType()));
if (!hasMoreConnectionsToPartition) {
final String consumerPath = zkPathForConsumer(slot.getClient(), slot.getEventType(), slot.getPartition());
Copy link
Member

@adyach adyach Dec 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example the thread came to this if block and the same moment another thread will acquire connection for that same slot. The node exists in the ZK but not in the ACQUIRED_SLOTS . Do you think it is gonna be a problem?

Copy link
Contributor Author

@v-stepanov v-stepanov Dec 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I don't completely get it.

The node exists in the ZK but not in the ACQUIRED_SLOTS

Why you think it will not be in ACQUIRED_SLOTS ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I did not get why you need ACQUIRED_SLOTS. You can get acquired slots from cache because ACQUIRED_SLOTS do not give you any synchronisation between cache and ZK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason why i need ACQUIRED_SLOTS is to be able to delete the cache which I don't need any more at current Nakadi instance. Without knowing what are the current acquired slots at this instance I will not know if I can delete the cache already or if I still need it.

&& s.getEventType().equals(slot.getEventType()));
if (!hasMoreConnectionsToPartition) {
final String consumerPath = zkPathForConsumer(slot.getClient(), slot.getEventType(), slot.getPartition());
final PathChildrenCache cache = SLOTS_CACHES.getOrDefault(consumerPath, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do only remove since it returns removed value or null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Fixed it here: 85ead49

@adyach
Copy link
Member

adyach commented Dec 1, 2016

:shipit:

@v-stepanov
Copy link
Contributor Author

@antban @rcillo @adyach @lmontrieux
This ticket is ready for a review again.
With latest changed I added a lock when creating a connection for client_id|event-type pair


@Test(expected = NoConnectionSlotsException.class)
public void whenNoFreeSlotsThenException() throws Exception {
final String parition = "0";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/parition/partition/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed 9b5b4db


private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
private static final Logger LOG = LoggerFactory.getLogger(ConsumerLimitingCleaningService.class);
private static final int HANGING_NODES_CLEAN_PERIOD_MS = 6 * 60 * 60 * 1000; // 6 hours
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of moving this number to the application config file?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That's not possible because of the usage on annotation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. And even without that, I don't see much sense to make this property configurable.

deleteConsumersData();

// "hanging" node
CURATOR.create().creatingParentsIfNeeded().forPath("/nakadi/consumers/connections/hanging");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it wasn't clear for me that hanging is the name of a client. It took me some time to realise that it's the name of a client which has no consumers anymore. That's why you clean it. Changing it to consumer_with_no_open_streams could make if more explicit. This is just a suggestion to improve clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, "hanging" here represents the whole node name that is usually combined from "client_id|event_type|partition". But in this test I test only ConsumerLimitingCleaningService class. And it is agnostic of node name, for this service the node is just a node. So I don't see much sense in this test to have a name close to real because ConsumerLimitingCleaningService doesn't care.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said, it's a suggestion to improve clarity. I took some time to understand it. I understand that for the machine it doesn't matter. But that's just me.

@rcillo
Copy link
Contributor

rcillo commented Dec 12, 2016

:shipit:

@rcillo
Copy link
Contributor

rcillo commented Dec 12, 2016

:shipit:

@rcillo
Copy link
Contributor

rcillo commented Dec 12, 2016

:shipit:

@v-stepanov v-stepanov merged commit 2195cef into master Dec 13, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants