Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aruha 1757 log compaction #907

Merged
merged 27 commits into from Jul 13, 2018

Conversation

4 participants
@v-stepanov
Copy link
Member

v-stepanov commented Jul 9, 2018

Log Compaction

Zalando ticket : ARUHA-1757

Description

Added log compaction functionality to Nakadi.

Review

  • Tests
  • Documentation
  • CHANGELOG

Deployment Notes

We need to setup separate kafka cluster and a storage in Nakadi for it. And we also need to define variable NAKADI_TIMELINES_STORAGE_COMPACTED for live environment to point to this new storage.

@codecov-io

This comment has been minimized.

Copy link

codecov-io commented Jul 9, 2018

Codecov Report

Merging #907 into master will decrease coverage by 0.34%.
The diff coverage is 37.03%.

Impacted file tree graph

@@             Coverage Diff             @@
##             master    #907      +/-   ##
===========================================
- Coverage     53.95%   53.6%   -0.35%     
- Complexity     1761    1775      +14     
===========================================
  Files           316     323       +7     
  Lines          9712    9850     +138     
  Branches        892     900       +8     
===========================================
+ Hits           5240    5280      +40     
- Misses         4144    4236      +92     
- Partials        328     334       +6
Impacted Files Coverage Δ Complexity Δ
...org/zalando/nakadi/repository/TopicRepository.java 0% <ø> (ø) 0 <0> (ø) ⬇️
...zalando/nakadi/controller/TimelinesController.java 60% <ø> (ø) 4 <0> (ø) ⬇️
...ando/nakadi/repository/kafka/KafkaTopicConfig.java 0% <0%> (ø) 0 <0> (?)
...akadi/exceptions/runtime/TopicConfigException.java 0% <0%> (ø) 0 <0> (ø) ⬇️
...kadi/repository/kafka/KafkaTopicConfigBuilder.java 0% <0%> (ø) 0 <0> (?)
...zalando/nakadi/exceptions/CompactionException.java 0% <0%> (ø) 0 <0> (?)
...ptions/runtime/TimelinesNotSupportedException.java 100% <100%> (ø) 1 <1> (?)
...a/org/zalando/nakadi/service/EventTypeService.java 75.92% <100%> (+0.83%) 59 <5> (+5) ⬆️
.../java/org/zalando/nakadi/domain/CleanupPolicy.java 100% <100%> (ø) 1 <1> (?)
...ando/nakadi/repository/KafkaRepositoryCreator.java 26.92% <100%> (ø) 1 <0> (ø) ⬇️
... and 16 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 167c0c1...bd7ae26. Read the comment docs.

private void validatePartitionCompactionKeys(final Schema schema, final EventTypeBase eventType)
throws InvalidEventTypeException, JSONException, SchemaException {
final List<String> absentFields = eventType.getPartitionCompactionKeys().stream()
.filter(field -> !schema.definesProperty(convertToJSONPointer(field)))

This comment has been minimized.

@v-stepanov

v-stepanov Jul 9, 2018

Author Member

@rcillo I didn't find a fast way to validate that there is a required field exists in schema. So here I only validate that property exists in schema (we actually do the same for other ..._key_fields properties). If you have an idea regarding that - please let me know and I will change it.

v-stepanov added some commits Jul 9, 2018

public ResponseEntity<Problem> notFound(final NotFoundException ex, final NativeWebRequest request) {
@ExceptionHandler({
NotFoundException.class,
TimelinesNotSupportedException.class})

This comment has been minimized.

@rcillo

rcillo Jul 9, 2018

Member

Wouldn't it make more sense to reply with 422? It actually found the event type but could not go on with the request.

This comment has been minimized.

@v-stepanov

v-stepanov Jul 9, 2018

Author Member

Oh yes, sorry. I just added it to wrong handler accidentally. Will fix.

This comment has been minimized.

@v-stepanov

v-stepanov Jul 9, 2018

Author Member

fixed 9397337

@@ -184,7 +187,7 @@ public boolean topicExists(final String topic) throws TopicRepositoryException {
final ProducerRecord<String, String> kafkaRecord = new ProducerRecord<>(
topicId,
KafkaCursor.toKafkaPartition(item.getPartition()),
item.getPartition(),
item.getEventKey(),

This comment has been minimized.

@rcillo

rcillo Jul 9, 2018

Member

Do you know why we used to publish the partition as a key? Is it somehow going to change anything?

This comment has been minimized.

@v-stepanov

v-stepanov Jul 9, 2018

Author Member

I think we were using partition as a key accidentally. In general if you directly specify partition to publish to - it will just ignore the key, it is stated like that in documentation. So I don't expect anything to be changed.
Or do you suggest to keep using partition as a key for cleanup_policy=delete? (not to break anything)

This comment has been minimized.

@rcillo

rcillo Jul 9, 2018

Member

I think we can test it, no problem. Good that you already checked the docs and they say it should not change anything. We just need to confirm 👍

@@ -99,6 +99,7 @@ EventPublishResult publishInternal(final String events,

validate(batch, eventType);
partition(batch, eventType);
compact(batch, eventType);

This comment has been minimized.

@rcillo

rcillo Jul 9, 2018

Member

There is an alternative solution. When building the effective Schema, you can make this field mandatory and leverage the feature of validation, so that it doesn't come this point. I know it somehow couples validation with compaction, but I guess that's already the case for almost all aspects of event publishing.

More specifically, extending

by adding another if and then modifying the schema by marking partition_compaction_key to be required and to be longer than 0 character. The nice thing about doing that is that it provides an error with the complete path to the missing field.

Then the error would also be reported as a problem during validation, which I think makes more sense than "partitioning".

This comment has been minimized.

@v-stepanov

v-stepanov Jul 9, 2018

Author Member

Yes, makes sense. Changed 7ab73d2

This comment has been minimized.

@adyach

adyach Jul 12, 2018

Member

we introduced another loop here, do you think it is time to combine it and do every step in one loop?

This comment has been minimized.

@rcillo

rcillo Jul 13, 2018

Member

@adyach in the API we always point out which step failed. So if something fails during erichment, for example, it means that everything is fine regarding validation. So if we did as you suggest, it could happen that for the first event it validates fine but then enrichment breaks. The user fix the payload, send again and this time if fails validation for the next event. So I think that even though it would be possible to do what you suggest, I think it would be more work then it looks like to not change the current behaviour.

@@ -38,7 +39,8 @@ public String getPartition() {
}
}

String createTopic(int partitionCount, Long retentionTimeMs) throws TopicCreationException;
String createTopic(int partitionCount, Long retentionTimeMs, CleanupPolicy cleanupPolicy)

This comment has been minimized.

@rcillo

rcillo Jul 9, 2018

Member

I'ld say this interface has a problem. Every time we need to tweak some configuration in the topic creation you need to change this interface. Maybe we could pass an object with all the configs? Then if we want to tweak clean up minimum latency in the future it becomes easier, just add a property to the object and then map it properly in the implementation, but the interface goes unchanged.

But I honestly cannot think about many other configuration options.

This comment has been minimized.

@rcillo

rcillo Jul 9, 2018

Member

Just thought about one: max message size, for example.

This comment has been minimized.

@rcillo

rcillo Jul 9, 2018

Member

But I guess we could also do this refactoring when editing it the next time.

This comment has been minimized.

@v-stepanov

v-stepanov Jul 9, 2018

Author Member

I think it's not that bad yet. We have some places with 15 parameters :)
I think we should start this "Introduce Parameter Object" refactoring from the most problematic places we have.

@v-stepanov

This comment has been minimized.

Copy link
Member Author

v-stepanov commented Jul 9, 2018

@rcillo I addressed your comments

@rcillo

This comment has been minimized.

@v-stepanov maybe also add minimum length of 1.

This comment has been minimized.

Copy link
Member Author

v-stepanov replied Jul 9, 2018

done 393c9dd

@@ -164,15 +171,25 @@ private static JSONObject addMetadata(final JSONObject schema, final EventType e
metadataProperties.put("parent_eids", arrayOfUUIDs);
metadataProperties.put("flow_id", string);
metadataProperties.put("partition", string);
metadataProperties.put("partition_compaction_key", string);

This comment has been minimized.

@rcillo

rcillo Jul 9, 2018

Member

I guess you are putting it twice. Here and in line 181.

This comment has been minimized.

@v-stepanov

v-stepanov Jul 9, 2018

Author Member

🤦‍♂️
Fixed 5363d3f

@rcillo

This comment has been minimized.

Copy link
Member

rcillo commented Jul 9, 2018

👍

@v-stepanov

This comment has been minimized.

Copy link
Member Author

v-stepanov commented Jul 9, 2018

👍

@v-stepanov

This comment has been minimized.

Copy link
Member Author

v-stepanov commented Jul 9, 2018

deploy validation please

v-stepanov added some commits Jul 10, 2018

Merge pull request #909 from zalando/ARUHA-1757-additional-topic-prop…
…erties

Aruha 1757 additional topic properties
@v-stepanov

This comment has been minimized.

Copy link
Member Author

v-stepanov commented Jul 12, 2018

deploy validation please

@rcillo

This comment has been minimized.

Copy link
Member

rcillo commented Jul 13, 2018

👍

1 similar comment
@adyach

This comment has been minimized.

Copy link
Member

adyach commented Jul 13, 2018

👍

@rcillo rcillo merged commit 65fa723 into master Jul 13, 2018

5 of 8 checks passed

Codacy/PR Quality Review Not up to standards. This pull request quality could be better.
Details
codecov/patch 37.03% of diff hit (target 53.95%)
Details
codecov/project 53.6% (-0.35%) compared to 167c0c1
Details
aruha/jenkins Build finished.
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details
zappr Approvals: @rcillo, @adyach.
zappr/pr/specification PR has passed specification checks

@rcillo rcillo deleted the ARUHA-1757-log-compaction branch Jul 13, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.