Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARUHA-1756: initial API for Log Compaction; #905

Merged
merged 4 commits into from Jul 12, 2018

Conversation

7 participants
@v-stepanov
Copy link
Member

v-stepanov commented Jul 3, 2018

Initial API for Log Compaction

Zalando ticket : ARUHA-1756

Description

Initial API for Log Compaction feature in Nakadi.

Review

  • Tests
  • Documentation
  • CHANGELOG

Deployment Notes

--

The compaction can be not applied to events that were published recently and located at the head of the queue,
which means that the actual amount of events recevied by consumers can be different depending on time when the
consumption happened.
When using 'compact' cleanup policy user should consider that different Nakadi endpoints showing the amount

This comment has been minimized.

@rcillo

rcillo Jul 3, 2018

Member

Besides adding this information here, what do you think about reviewing the text for those endpoints and make a simple reference to this attribute. So that someone reading the API starting from those endpoints also has the chance to know that the semantics change when used in combination with this feature.

This comment has been minimized.

@v-stepanov

v-stepanov Jul 3, 2018

Author Member

Fixed ff56c17

@codecov-io

This comment has been minimized.

Copy link

codecov-io commented Jul 3, 2018

Codecov Report

Merging #905 into master will not change coverage.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff            @@
##             master     #905   +/-   ##
=========================================
  Coverage     53.95%   53.95%           
  Complexity     1761     1761           
=========================================
  Files           316      316           
  Lines          9712     9712           
  Branches        892      892           
=========================================
  Hits           5240     5240           
  Misses         4144     4144           
  Partials        328      328

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update bbfb446...29a4aab. Read the comment docs.

partition, there is no compaction across partitions. The key that will be used as a compaction key should be
specified in 'partition_compaction_keys' field.
The compaction can be not applied to events that were published recently and located at the head of the queue,
which means that the actual amount of events recevied by consumers can be different depending on time when the

This comment has been minimized.

@rcillo

rcillo Jul 3, 2018

Member

s/recevied/received/

This comment has been minimized.

@v-stepanov

v-stepanov Jul 3, 2018

Author Member

Fixed ff56c17

event will be available for at least the retention time period. However Nakadi doesn't guarantee that event
will be deleted right after retention time expires.
- 'compact' cleanup policy will keep only the latest event for each event key. The compaction is performed per

This comment has been minimized.

@rcillo

rcillo Jul 3, 2018

Member

I know we strive to decouple Nakadi from Kafka, but I think it would be informative to link to the section in Kafka docs that talk about compaction. A user reading this thing about "head" and "recent" might not have the same context as we do and for that reason have many questions. I think we could point there and say that for now this feature is only supported when using Kafka as a backend.

This comment has been minimized.

@v-stepanov

v-stepanov Jul 3, 2018

Author Member

Yes, makes sense. Added the link ff56c17

items:
type: string
description: |
Indicates the fields used as a key of event for per-partition compaction of this event type.

This comment has been minimized.

@rcillo

rcillo Jul 3, 2018

Member

What do you think about writing something in the lines: "When using hash partition strategy, one common approach is to use the exact same value as partition_compaction_keys, this way users make sure that events published to the same partition with the same compaction key will be compacted".

This comment has been minimized.

@v-stepanov

v-stepanov Jul 3, 2018

Author Member

Added ff56c17

Indicates the fields used as a key of event for per-partition compaction of this event type.
Required when 'cleanup_policy' is set to 'compact'. Must be absent otherwise.
If this is set it MUST be a valid required field as defined in the schema.
It's not possible to change the value of this field for existing event type.

This comment has been minimized.

@SergKam

SergKam Jul 3, 2018

Member

I would mention here that "If you relay on order of compacted events this value should be the same as (or include) partition_keys. see more ...manual link..."

This comment has been minimized.

@v-stepanov

v-stepanov Jul 3, 2018

Author Member

Added this to description ff56c17

@rcillo

This comment has been minimized.

Copy link
Member

rcillo commented Jul 3, 2018

👍

1 similar comment
@v-stepanov

This comment has been minimized.

Copy link
Member Author

v-stepanov commented Jul 3, 2018

👍

@v-stepanov

This comment has been minimized.

Copy link
Member Author

v-stepanov commented Jul 3, 2018

I will merge it together with implementation

in the middle of queue if there is a newer event for the same key published.
For more details about compaction implementation please read the documentation of Log Compaction in Kafka
https://kafka.apache.org/documentation/#compaction, Nakadi currently relies on this implementation.

This comment has been minimized.

@lmontrieux

lmontrieux Jul 4, 2018

Member

Should we refer to Kafka-specific details in Nakadi? One of the advantages of Nakadi is that it can (only potentially so far, but still) support multiple storages. This looks a bit like an abstraction leak

This comment has been minimized.

@SergKam

SergKam Jul 4, 2018

Member

Good point. I think it should be rephrased, less direct. It is fine only as an example. "For example, you can read the documentation of Log Compaction in Kafka"

This comment has been minimized.

@rcillo

rcillo Jul 4, 2018

Member

@lmontrieux if we are not referring to Kafka I would argue that we should rewrite the whole section about log compaction, with diagrams and all. Otherwise it will not be possible to understand why the feature works as it works. We would also not be able to refer to terminology specific from Kafka, like "head of the queue".

This comment has been minimized.

@rcillo

rcillo Jul 4, 2018

Member

@lmontrieux @SergKam I don't agree that it's a good point. It's like talking about a chicken without saying the word chicken. Like, it has feathers, eat corn and put eggs. In this API we need to use the concept of "head queue" and "head tail" which is defined only in Kafka docs. We would have to redefine them here if we want to mention these terms. They are not self explanatory. Putting myself in the shoes of someone who do not not know how log compaction works, if I do not read Kafka docs, I would have a thousand questions. We either follow the pragmatic approach of explicitly pointing to Kafka docs or we exhaustively explain how it works in our docs.

This comment has been minimized.

@rcillo

rcillo Jul 4, 2018

Member

One option would be to literally copy and paste Kafka docs into Nakadi and adjust the wording, if that doesn't break with any laws.

This comment has been minimized.

@SergKam

SergKam Jul 4, 2018

Member

@rcillo My proposal was to keep the link to Kafka docs and everything related, but just to mention, that it can be another backend system supporting compaction. like https://github.com/apache/incubator-pulsar/wiki/PIP-14:-Topic-compaction

@rcillo

This comment has been minimized.

Copy link
Member

rcillo commented Jul 9, 2018

👍

@@ -1993,6 +1999,16 @@ definitions:
'user_defined'.
type: string
example: '0'
partition_compaction_key:
description: |
Represents a key of event for per-partition compaction of this event type.

This comment has been minimized.

@rcillo

rcillo Jul 9, 2018

Member

What do you think about "Value used for per-partition compaction of the event type. Given two events with the same partition_compaction_key value are published to the same partition, the later overwrites the former.". It's a kind of redundant but I suggest it because it would be self-contained information about what it means.

This comment has been minimized.

@v-stepanov

v-stepanov Jul 9, 2018

Author Member

I like it. Will change.

This comment has been minimized.

@v-stepanov

v-stepanov Jul 9, 2018

Author Member

Fixed 29a4aab

@rcillo

This comment has been minimized.

Copy link
Member

rcillo commented Jul 9, 2018

👍

event will be available for at least the retention time period. However Nakadi doesn't guarantee that event
will be deleted right after retention time expires.
- 'compact' cleanup policy will keep only the latest event for each event key. The compaction is performed per

This comment has been minimized.

@conorclifford

conorclifford Jul 10, 2018

I feel it would be better, and less burden on producing clients, if the partition_compaction_key was configured as part of the EventType (similar to how partition_key_fields is done today) - this then would mean existing producing applications would need have zero change to take advantage of this capability (as it would only need to be a change in the specification of the Event Type itself), as well as being a more symmetric API (having both partitioning and compaction key data defined/handled in the same way would be nice).

Another big benefit of having the compaction_key defined in the event-type is that existing Nakadi clients will continue to work for event production - adding to the event metadata for every event may require client library upgrades to allow client code upgrades.

This comment has been minimized.

@rcillo

rcillo Jul 11, 2018

Member

@conorclifford our first version was exactly how you describe and exactly for the same reasons. We later on changed to this new approach. I'll try to explain why and then we can maybe agree on the change or we can revert it back.

There are some problems with the approach used for partition_key_fields:

  1. non-injective functions: users might specify complete sections an event as values for hashing, or in this case compaction: Arrays, entire objects and numbers. In the case of partitioning, having a function that is non-injective is not an issue for partitioning, since it would only mean that 2 events would be stored in the same partition. As for compaction, the function needs to be strictly injective. The consequences for having different set of values mapping to the same compaction key would be undesirable deletion of events with conflicting keys. It's very hard to implement a function that is injective given the domain is so complex. I can tell for example that numbers are problematic due to precision. In Java, if a user decides to compact two events {"foo": 0.99999999999999999} and {"foo":1} in some cases, both number could be mapped to the same value, duo to internal limitations of how Java represents values internally. I know for sure that our implementation transforms 0.0 into 0 and that would definitely cause a conflict, which might be accidental. Only strings are considered to be 100% safe for implementing an injective function. That would lead to an asymmetry, since partition key fields and compaction fields would have different constraints in the types of values chosen.

  2. Difficulty to debug: since the computed compaction key would only exist in Kafka, it would be hard to explain to users why two events are compacted, since Nakadi API would not expose the generated value. One could suggest to enrich the event metadata with the generated value, so that it becomes easier to debug. That's also a possibility. But then we would have to make it clear if this fields is always controlled by Nakadi or if users can also set it themselves.

I would say point 2 (debugging) is less of a problem, since we already know about one solution. Problem number 1 (non-injective function) is the most complex in my opinion, since it could lead to data loss. There is no solution to the number precision problem since this is a Json limitation regarding precise definition of what a number can be. If we document it, someone will not be aware of it and will hit the problem. Limiting it to strings could be a solution. But we considered that this feature should be taken more seriously then partitioning and that it would be reasonable to expect producer to change their code in order to generate this key in a well though way using only strings.

Please, tell us what do you think about these two problems and how we approached it.

This comment has been minimized.

@rcillo

rcillo Jul 11, 2018

Member

@conorclifford I forgot to mention that this key should have clearly defined maximum size. When the key is a result of an internal Nakadi computation, it could be that users would have problems on runtime in random ways, depending on the combination of values used to generate the key.

This comment has been minimized.

@conorclifford

conorclifford Jul 12, 2018

I was unaware of this complex semantics with partition_key_fields (out of interest, do you know what proportion of event-types are taking advantage of this capability with complex objects, vs. those simply providing a string field?)

With that said, I still think having the "compaction" key defined within the event-type would give more consistency with partitioning - enforcing that this key be a "string" as part of the event-type JSON Schema validation would meet the requirements for the value.

Even taking it one step further, and mandating that for log compacted topics that the partition_key_field be a string, and that that be the value that is used for compaction, could simply remove any possible confusing situations where hashed partition assignment and compaction keys do not align as expected by clients (it would, however, lose the ability to have different compaction vs. partitioning, which can be beneficial in certain (somewhat edge) cases).

Regarding the defined maximum size - I'm not sure what you mean here - defining the compaction key as a string (whether it is in the event-type, or as in this proposal, in the metadata) there wouldn't be any "internal nakadi computation", would there? (just the string itself being used as the key, no?)

This comment has been minimized.

@rcillo

rcillo Jul 12, 2018

Member

@conorclifford I agree that narrowing the possible values to strings would solve the problem 👍 I'll catch up with the team and see if we can implement this way.

Regarding maximum size there is a small computation on our side. We would collect all the strings mentioned in the partition_compaction_keys and generate a json array and then serialize it as a string. That would be the key used at Kafka level. But I just checked yesterday and Kafka has no limit regarding the size of this key. So this problem does not exist =)

This comment has been minimized.

@rcillo

rcillo Jul 12, 2018

Member

@conorclifford we had a small chat internally and even though we all agree that it would be feasible to implement an injective function by limiting the value to string, we see other problems with this approach.

We would have to check that the fields are present in the schema (which we can do already, but not without problems. Some paths are not accessible, like "anyOf", for example). We would have to extend this check to verify that these fields are marked as "required" which is not in place today. We would also have to check that all the values are not empty strings.

If the publisher then sends some empty string, we would have to detect that and properly inform them that it was not possible to publish the event because one of values they have specified in the path was empty. Which is a problem harder to clearly communicate than just saying that partition_compaction_key is empty.

In the future, we would be required to implement deletion of events from log compacted event types. If we have a single string as a key for compaction, the api for deletion would be much simpler than if we would require from the user a set of fields matching a given structure.

We would like to know if changing the client library is the major pain point for your team or if the main problem you see with the proposed approach is the asymmetry with partitioning feature.

This comment has been minimized.

@conorclifford

conorclifford Jul 12, 2018

@rcillo interesting points, especially the deletion one.

In our group alone there's a few different applications that will need enhancing to support this, some are directly interacting with the HTTP API (so direct code changes for this at least), whereas others depend on one or more of the existing client libs (its quite likely that "nakadi-java" is the most used lib) - its by no means a blocker, just a bit of additional work to be done.

The extra work, if adding symmetry was readily feasible, would have been avoided - but, as you point out, there's some gotchas indeed. One thing, its going to be interesting to see essentially duplicated data in the "metadata" block and in the event itself, as for our event-types at least, the partition key is the same as the compaction key (its already a single simple string 😄 )

This comment has been minimized.

@rcillo

rcillo Jul 12, 2018

Member

@conorclifford yes, we actually even started to consider that the partitioning keys feature might have problems so we would like not to replicate that approach again. It's an interesting situation where designing a feature we learned something about existing feature that is not ideal.

So, we truly believe that this new approach will be rock solid and work. We would like to proceed and deploy a version for you to test =)

Regarding the additional work, since our goal is to help your team achieve your goals, we can even consider a scenario where we contribute to nakadi-java. If that could speed things up for your team, of course. We believe that the additional work will pay off in terms of stability and less bug fixes or incidents.

This comment has been minimized.

@conorclifford

conorclifford Jul 12, 2018

@rcillo for now, there are some topics we can start with that only require changes to a single in-team repo, so the nakadi-java client change should not block any early testing.

@adyach

This comment has been minimized.

Copy link
Member

adyach commented Jul 12, 2018

👍

@rcillo rcillo merged commit f82c48c into master Jul 12, 2018

7 checks passed

Codacy/PR Quality Review Up to standards. A positive pull request.
Details
codecov/patch Coverage not affected when comparing bbfb446...29a4aab
Details
codecov/project 53.95% remains the same compared to bbfb446
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details
zappr Approvals: @rcillo, @adyach.
zappr/pr/specification PR has passed specification checks

@rcillo rcillo deleted the ARUHA-1756-log-compaction-api branch Jul 12, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.