Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aruha 1757 log compaction #907

Merged
merged 27 commits into from Jul 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4bc216c
ARUHA-1757: Added log compaction functionality;
Jul 4, 2018
ebb0d36
ARUHA-1757: fixed imports;
Jul 4, 2018
51de225
ARUHA-1757: fixed codestyle and tests;
Jul 5, 2018
fe36b2a
ARUHA-1757: created tests for log compaction;
Jul 6, 2018
d52cae0
ARUHA-1757: added restrictions to log compaction functionality;
Jul 6, 2018
02907a2
ARUHA-1757: added tests;
Jul 7, 2018
ae10771
ARUHA-1757: modified test;
Jul 7, 2018
c881931
ARUHA-1757: added update restrictions; extended tests;
Jul 8, 2018
50c8927
ARUHA-1757: added handling missing compaction key;
Jul 9, 2018
57f7e44
ARUHA-1757: updated changelog;
Jul 9, 2018
f1ffed0
ARUHA-1757: moved log compaction key to metadata;
Jul 9, 2018
c11538b
ARUHA-1757: fixed test;
Jul 9, 2018
f1c5e4e
ARUHA-1757: reverted change;
Jul 9, 2018
cb3b17e
ARUHA-1757: added final;
Jul 9, 2018
7ab73d2
ARUHA-1757: added compaction key presence validation to schema valida…
Jul 9, 2018
bdc1649
ARUHA-1757: added validation that compacted event type can't be undef…
Jul 9, 2018
9397337
ARUHA-1757: fixed exception handler;
Jul 9, 2018
21005ae
ARUHA-1757: revered back NotFoundException handling;
Jul 9, 2018
393c9dd
ARUHA-1757: added min length to compaction key field;
Jul 9, 2018
5363d3f
ARUHA-1757: fixed compaction key validation code;
Jul 9, 2018
13337d8
ARUHA-1757: refactored code; added additional configuration to kafka …
Jul 10, 2018
7f908ae
ARUHA-1757: added test testing new properties;
Jul 10, 2018
2d849d0
ARUHA-1757: changed default values;
Jul 12, 2018
ab385a5
ARUHA-1757: changed default values;
Jul 12, 2018
79ae03d
ARUHA-1757: changed default values;
Jul 12, 2018
fe5b2aa
ARUHA-1757: changed default values;
Jul 12, 2018
bd7ae26
Merge pull request #909 from zalando/ARUHA-1757-additional-topic-prop…
v-stepanov Jul 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Added
- Log Compaction functionality.

## [2.7.7] - 2018-06-26

### Added
Expand Down
Expand Up @@ -6,14 +6,15 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.zalando.nakadi.config.NakadiSettings;
import org.zalando.nakadi.domain.BatchFactory;
import org.zalando.nakadi.domain.BatchItem;
import org.zalando.nakadi.domain.CleanupPolicy;
import org.zalando.nakadi.domain.EventPublishingStatus;
import org.zalando.nakadi.repository.NakadiTopicConfig;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings;
import org.zalando.nakadi.util.UUIDGenerator;
Expand All @@ -23,6 +24,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry;
Expand All @@ -42,13 +44,17 @@ public class KafkaRepositoryAT extends BaseAT {
private static final int DEFAULT_REPLICA_FACTOR = 1;
private static final int MAX_TOPIC_PARTITION_COUNT = 10;
private static final int DEFAULT_TOPIC_ROTATION = 50000000;
private static final int COMPACTED_TOPIC_ROTATION = 60000;
private static final int COMPACTED_TOPIC_SEGMENT_BYTES = 1000000;
private static final int COMPACTED_TOPIC_COMPACTION_LAG = 1000;
private static final int DEFAULT_COMMIT_TIMEOUT = 60;
private static final int ZK_SESSION_TIMEOUT = 30000;
private static final int ZK_CONNECTION_TIMEOUT = 10000;
private static final int NAKADI_SEND_TIMEOUT = 10000;
private static final int NAKADI_POLL_TIMEOUT = 10000;
private static final Long RETENTION_TIME = 100L;
private static final Long DEFAULT_RETENTION_TIME = 100L;
private static final Long DEFAULT_TOPIC_RETENTION = 100000000L;
private static final CleanupPolicy DEFAULT_CLEANUP_POLICY = CleanupPolicy.DELETE;
private static final int KAFKA_REQUEST_TIMEOUT = 30000;
private static final int KAFKA_BATCH_SIZE = 1048576;
private static final long KAFKA_LINGER_MS = 0;
Expand All @@ -66,6 +72,8 @@ public class KafkaRepositoryAT extends BaseAT {
private ZookeeperSettings zookeeperSettings;
private KafkaTestHelper kafkaHelper;
private KafkaTopicRepository kafkaTopicRepository;
private NakadiTopicConfig defaultTopicConfig;
private KafkaTopicConfigFactory kafkaTopicConfigFactory;

@Before
public void setup() {
Expand All @@ -88,14 +96,19 @@ public void setup() {
KAFKA_LINGER_MS, KAFKA_ENABLE_AUTO_COMMIT, KAFKA_MAX_REQUEST_SIZE);
zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
kafkaHelper = new KafkaTestHelper(KAFKA_URL);
defaultTopicConfig = new NakadiTopicConfig(DEFAULT_PARTITION_COUNT, DEFAULT_CLEANUP_POLICY,
Optional.of(DEFAULT_RETENTION_TIME));
kafkaTopicConfigFactory = new KafkaTopicConfigFactory(new UUIDGenerator(), DEFAULT_REPLICA_FACTOR,
DEFAULT_TOPIC_ROTATION, COMPACTED_TOPIC_ROTATION, COMPACTED_TOPIC_SEGMENT_BYTES,
COMPACTED_TOPIC_COMPACTION_LAG);
kafkaTopicRepository = createKafkaTopicRepository();
}

@Test(timeout = 10000)
@SuppressWarnings("unchecked")
public void whenCreateTopicThenTopicIsCreated() throws Exception {
public void whenCreateTopicThenTopicIsCreated() {
// ACT //
final String topicName = kafkaTopicRepository.createTopic(DEFAULT_PARTITION_COUNT, RETENTION_TIME);
final String topicName = kafkaTopicRepository.createTopic(defaultTopicConfig);

// ASSERT //
executeWithRetry(() -> {
Expand All @@ -107,14 +120,60 @@ public void whenCreateTopicThenTopicIsCreated() throws Exception {

partitionInfos.forEach(pInfo ->
assertThat(pInfo.replicas(), arrayWithSize(DEFAULT_REPLICA_FACTOR)));

final Long retentionTime = KafkaTestHelper.getTopicRetentionTime(topicName, ZOOKEEPER_URL);
assertThat(retentionTime, equalTo(DEFAULT_RETENTION_TIME));

final String cleanupPolicy = KafkaTestHelper.getTopicCleanupPolicy(topicName, ZOOKEEPER_URL);
assertThat(cleanupPolicy, equalTo("delete"));

final String segmentMs = KafkaTestHelper.getTopicProperty(topicName, ZOOKEEPER_URL, "segment.ms");
assertThat(segmentMs, equalTo(String.valueOf(DEFAULT_TOPIC_ROTATION)));
},
new RetryForSpecifiedTimeStrategy<Void>(5000).withExceptionsThatForceRetry(AssertionError.class)
.withWaitBetweenEachTry(500));
}

@Test(timeout = 10000)
@SuppressWarnings("unchecked")
public void whenCreateCompactedTopicThenTopicIsCreated() {
// ACT //
final NakadiTopicConfig compactedTopicConfig = new NakadiTopicConfig(DEFAULT_PARTITION_COUNT,
CleanupPolicy.COMPACT, Optional.empty());
final String topicName = kafkaTopicRepository.createTopic(compactedTopicConfig);

// ASSERT //
executeWithRetry(() -> {
final Map<String, List<PartitionInfo>> topics = getAllTopics();
assertThat(topics.keySet(), hasItem(topicName));

final List<PartitionInfo> partitionInfos = topics.get(topicName);
assertThat(partitionInfos, hasSize(DEFAULT_PARTITION_COUNT));

partitionInfos.forEach(pInfo ->
assertThat(pInfo.replicas(), arrayWithSize(DEFAULT_REPLICA_FACTOR)));

final String cleanupPolicy = KafkaTestHelper.getTopicCleanupPolicy(topicName, ZOOKEEPER_URL);
assertThat(cleanupPolicy, equalTo("compact"));

final String segmentMs = KafkaTestHelper.getTopicProperty(topicName, ZOOKEEPER_URL, "segment.ms");
assertThat(segmentMs, equalTo(String.valueOf(COMPACTED_TOPIC_ROTATION)));

final String segmentBytes = KafkaTestHelper.getTopicProperty(topicName, ZOOKEEPER_URL,
"segment.bytes");
assertThat(segmentBytes, equalTo(String.valueOf(COMPACTED_TOPIC_SEGMENT_BYTES)));

final String compactionLag = KafkaTestHelper.getTopicProperty(topicName, ZOOKEEPER_URL,
"min.compaction.lag.ms");
assertThat(compactionLag, equalTo(String.valueOf(COMPACTED_TOPIC_COMPACTION_LAG)));
},
new RetryForSpecifiedTimeStrategy<Void>(5000).withExceptionsThatForceRetry(AssertionError.class)
.withWaitBetweenEachTry(500));
}

@Test(timeout = 20000)
@SuppressWarnings("unchecked")
public void whenDeleteTopicThenTopicIsDeleted() throws Exception {
public void whenDeleteTopicThenTopicIsDeleted() {

// ARRANGE //
final String topicName = UUID.randomUUID().toString();
Expand All @@ -140,7 +199,7 @@ public void whenDeleteTopicThenTopicIsDeleted() throws Exception {
}

@Test(timeout = 10000)
public void whenBulkSendSuccessfullyThenUpdateBatchItemStatus() throws Exception {
public void whenBulkSendSuccessfullyThenUpdateBatchItemStatus() {
final List<BatchItem> items = new ArrayList<>();
final String topicId = TestUtils.randomValidEventTypeName();
kafkaHelper.createTopic(topicId, ZOOKEEPER_URL);
Expand All @@ -158,20 +217,6 @@ public void whenBulkSendSuccessfullyThenUpdateBatchItemStatus() throws Exception
}
}

@Test(timeout = 10000)
@SuppressWarnings("unchecked")
public void whenCreateTopicWithRetentionTime() throws Exception {
// ACT //
final String topicName = kafkaTopicRepository.createTopic(DEFAULT_PARTITION_COUNT, RETENTION_TIME);

// ASSERT //
executeWithRetry(() -> Assert.assertEquals(
KafkaTestHelper.getTopicRetentionTime(topicName, ZOOKEEPER_URL), RETENTION_TIME),
new RetryForSpecifiedTimeStrategy<Void>(5000)
.withExceptionsThatForceRetry(AssertionError.class)
.withWaitBetweenEachTry(500));
}

private Map<String, List<PartitionInfo>> getAllTopics() {
final KafkaConsumer<String, String> kafkaConsumer = kafkaHelper.createConsumer();
return kafkaConsumer.listTopics();
Expand Down Expand Up @@ -203,7 +248,7 @@ private KafkaTopicRepository createKafkaTopicRepository() {
nakadiSettings,
kafkaSettings,
zookeeperSettings,
new UUIDGenerator());
kafkaTopicConfigFactory);
}

}
Expand Up @@ -68,8 +68,7 @@ public List<Cursor> getOffsetsToReadFromLatest(final String topic) {
.map(cursor -> {
if ("0".equals(cursor.getOffset())) {
return new Cursor(cursor.getPartition(), "001-0001--1");
}
else {
} else {
final long lastEventOffset = toKafkaOffset(cursor.getOffset()) - 1;
final String offset = StringUtils.leftPad(toNakadiOffset(lastEventOffset),
CURSOR_OFFSET_LENGTH, '0');
Expand Down Expand Up @@ -103,17 +102,24 @@ public void createTopic(final String topic, final String zkUrl) {
try {
zkUtils = ZkUtils.apply(zkUrl, 30000, 10000, false);
AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Safe$.MODULE$);
}
finally {
} finally {
if (zkUtils != null) {
zkUtils.close();
}
}
}

public static Long getTopicRetentionTime(final String topic, final String zkPath) {
return Long.valueOf(getTopicProperty(topic, zkPath, "retention.ms"));
}

public static String getTopicCleanupPolicy(final String topic, final String zkPath) {
return getTopicProperty(topic, zkPath, "cleanup.policy");
}

public static String getTopicProperty(final String topic, final String zkPath, final String propertyName) {
final ZkUtils zkUtils = ZkUtils.apply(zkPath, 30000, 10000, false);
final Properties topicConfig = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic);
return Long.valueOf(topicConfig.getProperty("retention.ms"));
return topicConfig.getProperty(propertyName);
}
}
Expand Up @@ -3,10 +3,17 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import org.apache.http.HttpStatus;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.DateTime;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.zalando.nakadi.domain.CleanupPolicy;
import org.zalando.nakadi.domain.EnrichmentStrategyDescriptor;
import org.zalando.nakadi.domain.EventCategory;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.domain.ResourceAuthorization;
import org.zalando.nakadi.domain.ResourceAuthorizationAttribute;
Expand Down Expand Up @@ -221,6 +228,60 @@ public void whenUpdateRetentionTimeThenUpdateInKafkaAndDB() throws Exception {
assertRetentionTime(newRetentionTime, eventType.getName());
}

@Test(timeout = 10000)
public void compactedEventTypeJourney() throws IOException {
// create event type with 'compact' cleanup_policy
final EventType eventType = EventTypeTestBuilder.builder()
.category(EventCategory.BUSINESS)
.cleanupPolicy(CleanupPolicy.COMPACT)
.enrichmentStrategies(ImmutableList.of(EnrichmentStrategyDescriptor.METADATA_ENRICHMENT))
.build();
final String body = MAPPER.writer().writeValueAsString(eventType);
given().body(body)
.contentType(JSON)
.post(ENDPOINT)
.then()
.statusCode(HttpStatus.SC_CREATED);

// get event type and check that properties are set correctly
given().body(body)
.contentType(JSON)
.get(ENDPOINT + "/" + eventType.getName())
.then()
.statusCode(HttpStatus.SC_OK)
.body("cleanup_policy", equalTo("compact"));

// assert that created topic in kafka has correct cleanup_policy
final String topic = (String) NakadiTestUtils.listTimelines(eventType.getName()).get(0).get("topic");
final String cleanupPolicy = KafkaTestHelper.getTopicCleanupPolicy(topic, ZOOKEEPER_URL);
assertThat(cleanupPolicy, equalTo("compact"));

// publish event to compacted event type
publishEvent(eventType.getName(), "{\"metadata\":{" +
"\"occurred_at\":\"1992-08-03T10:00:00Z\"," +
"\"eid\":\"329ed3d2-8366-11e8-adc0-fa7ae01bbebc\"," +
"\"partition_compaction_key\":\"abc\"}}");

// assert that key was correctly propagated to event key in kafka
final KafkaTestHelper kafkaHelper = new KafkaTestHelper(KAFKA_URL);
final KafkaConsumer<String, String> consumer = kafkaHelper.createConsumer();
final TopicPartition tp = new TopicPartition(topic, 0);
consumer.assign(ImmutableList.of(tp));
consumer.seek(tp, 0);
final ConsumerRecords<String, String> records = consumer.poll(5000);
final ConsumerRecord<String, String> record = records.iterator().next();
assertThat(record.key(), equalTo("abc"));

// publish event with missing compaction key and expect 422
given().body("[{\"metadata\":{" +
"\"occurred_at\":\"1992-08-03T10:00:00Z\"," +
"\"eid\":\"329ed3d2-8366-11e8-adc0-fa7ae01bbebc\"}}]")
.contentType(JSON)
.post(ENDPOINT + "/" + eventType.getName() + "/events")
.then()
.statusCode(HttpStatus.SC_UNPROCESSABLE_ENTITY);
}

@Test
public void whenUpdateRetentionTimeWithNullValueNoChange() throws Exception {
final EventType eventType = NakadiTestUtils.createEventType();
Expand Down
Expand Up @@ -92,6 +92,7 @@ public void userJourneyM1() throws InterruptedException, IOException {
.body("category", equalTo("undefined"))
.body("audience", equalTo("external-public"))
.body("ordering_key_fields", equalTo(Lists.newArrayList("foo", "bar.baz")))
.body("cleanup_policy", equalTo("delete"))
.body("schema.type", equalTo("json_schema"))
.body("schema.schema", equalTo("{\"type\": \"object\", \"properties\": {\"foo\": " +
"{\"type\": \"string\"}, \"bar\": {\"type\": \"object\", \"properties\": " +
Expand Down
Expand Up @@ -12,14 +12,15 @@
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.NativeWebRequest;
import org.zalando.nakadi.exceptions.runtime.ConflictException;
import org.zalando.nakadi.exceptions.runtime.NotFoundException;
import org.zalando.nakadi.exceptions.runtime.TimelineException;
import org.zalando.nakadi.exceptions.runtime.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.exceptions.runtime.ConflictException;
import org.zalando.nakadi.exceptions.runtime.InconsistentStateException;
import org.zalando.nakadi.exceptions.runtime.NotFoundException;
import org.zalando.nakadi.exceptions.runtime.RepositoryProblemException;
import org.zalando.nakadi.exceptions.runtime.TimelineException;
import org.zalando.nakadi.exceptions.runtime.TimelinesNotSupportedException;
import org.zalando.nakadi.exceptions.runtime.TopicRepositoryException;
import org.zalando.nakadi.exceptions.runtime.UnableProcessException;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.view.TimelineRequest;
import org.zalando.nakadi.view.TimelineView;
Expand Down Expand Up @@ -60,7 +61,9 @@ public ResponseEntity<?> getTimelines(@PathVariable("name") final String eventTy
.collect(Collectors.toList()));
}

@ExceptionHandler(UnableProcessException.class)
@ExceptionHandler({
UnableProcessException.class,
TimelinesNotSupportedException.class})
public ResponseEntity<Problem> unprocessable(final UnableProcessException ex, final NativeWebRequest request) {
LOG.error(ex.getMessage(), ex);
return Responses.create(MoreStatus.UNPROCESSABLE_ENTITY, ex.getMessage(), request);
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/zalando/nakadi/domain/BatchItem.java
Expand Up @@ -61,6 +61,7 @@ public static EmptyInjectionConfiguration build(final int position, final boolea
private final List<Integer> skipCharacters;
private String partition;
private String brokerId;
private String eventKey;
private int eventSize;

public BatchItem(
Expand Down Expand Up @@ -101,6 +102,15 @@ public String getPartition() {
return partition;
}

@Nullable
public String getEventKey() {
return eventKey;
}

public void setEventKey(@Nullable final String eventKey) {
this.eventKey = eventKey;
}

@Nullable
public String getBrokerId() {
return brokerId;
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/zalando/nakadi/domain/CleanupPolicy.java
@@ -0,0 +1,5 @@
package org.zalando.nakadi.domain;

public enum CleanupPolicy {
DELETE, COMPACT
}