Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Please enumerate **all user-facing** changes using format `<githib issue/pr numb

## 0.5.0


* [#838](https://github.com/kroxylicious/kroxylicious/issues/838): Ensure the decryption maintains record ordering, regardless of completion order of the decryptor.
* [#837](https://github.com/kroxylicious/kroxylicious/pull/837): refactor: take advantage of the topic injection in several integration tests including (the SampleFilterIT)
* [#827](https://github.com/kroxylicious/kroxylicious/issues/827): Release process should update version number references in container image versions too
* [#825](https://github.com/kroxylicious/kroxylicious/pull/825): Improve the topic encryption example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ public class EnvelopeEncryptionFilter<K>

@SuppressWarnings("unchecked")
public static <T> CompletionStage<List<T>> join(List<? extends CompletionStage<T>> stages) {
CompletableFuture<T>[] futures = new CompletableFuture[stages.size()];
for (int i = 0; i < stages.size(); i++) {
futures[i] = stages.get(i).toCompletableFuture();
}
CompletableFuture<T>[] futures = stages.stream().map(CompletionStage::toCompletableFuture).toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(futures)
.thenApply(ignored -> Stream.of(futures).map(CompletableFuture::join).toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.kroxylicious.filter.encryption.EncryptionException;
import io.kroxylicious.filter.encryption.EncryptionScheme;
import io.kroxylicious.filter.encryption.EncryptionVersion;
import io.kroxylicious.filter.encryption.EnvelopeEncryptionFilter;
import io.kroxylicious.filter.encryption.KeyManager;
import io.kroxylicious.filter.encryption.Receiver;
import io.kroxylicious.filter.encryption.RecordField;
Expand Down Expand Up @@ -295,32 +296,43 @@ private CompletableFuture<AesGcmEncryptor> makeDecryptor(E edek) {
.thenApply(AesGcmEncryptor::forDecrypt).toCompletableFuture();
}

private record DecryptState(@NonNull Record kafkaRecord, @NonNull ByteBuffer valueWrapper, @Nullable EncryptionVersion decryptionVersion,
@Nullable AesGcmEncryptor encryptor) {}

@NonNull
@Override
public CompletionStage<Void> decrypt(String topicName,
int partition,
@NonNull List<? extends Record> records,
@NonNull Receiver receiver) {
List<CompletionStage<Void>> futures = new ArrayList<>(records.size());
for (var kafkaRecord : records) {
var decryptStateStages = new ArrayList<CompletionStage<DecryptState>>(records.size());

for (Record kafkaRecord : records) {
var decryptionVersion = decryptionVersion(topicName, partition, kafkaRecord);
if (decryptionVersion == null) {
receiver.accept(kafkaRecord, kafkaRecord.value(), kafkaRecord.headers());
futures.add(CompletableFuture.completedFuture(null));
decryptStateStages.add(CompletableFuture.completedStage(new DecryptState(kafkaRecord, kafkaRecord.value(), null, null)));
}
else {
// right now (because we only support topic name based kek selection) once we've resolved the first value we
// can keep the lock and process all the records
ByteBuffer wrapper = kafkaRecord.value();
var x = resolveEncryptor(decryptionVersion.wrapperVersion(), wrapper).thenAccept(encryptor -> {
decryptRecord(decryptionVersion, encryptor, wrapper, kafkaRecord, receiver);
});
futures.add(x);
decryptStateStages.add(
resolveEncryptor(decryptionVersion.wrapperVersion(), wrapper).thenApply(enc -> new DecryptState(kafkaRecord, wrapper, decryptionVersion, enc)));
}
}

return io.kroxylicious.filter.encryption.EnvelopeEncryptionFilter.join(futures).thenAccept(list -> {
});
return EnvelopeEncryptionFilter.join(decryptStateStages)
.thenApply(decryptStates -> {
decryptStates.forEach(decryptState -> {
if (decryptState.encryptor() == null) {
receiver.accept(decryptState.kafkaRecord(), decryptState.valueWrapper(), decryptState.kafkaRecord().headers());
}
else {
decryptRecord(decryptState.decryptionVersion(), decryptState.encryptor(), decryptState.valueWrapper(), decryptState.kafkaRecord(), receiver);
}
});
return null;
});
}

@SuppressWarnings("java:S2445")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class EnvelopeEncryptionFilterIT {
private static final String HELLO_SECRET = "hello secret";

@TestTemplate
void roundTrip(KafkaCluster cluster, Topic topic, TestKmsFacade<?, ?, ?> testKmsFacade) throws Exception {
void roundTripSingleRecord(KafkaCluster cluster, Topic topic, TestKmsFacade<?, ?, ?> testKmsFacade) throws Exception {
var testKekManager = testKmsFacade.getTestKekManager();
testKekManager.generateKek(topic.name());

Expand All @@ -72,6 +72,36 @@ void roundTrip(KafkaCluster cluster, Topic topic, TestKmsFacade<?, ?, ?> testKms
}
}

@TestTemplate
void roundTripManyRecordsFromDifferentProducers(KafkaCluster cluster, Topic topic, TestKmsFacade<?, ?, ?> testKmsFacade) throws Exception {
var testKekManager = testKmsFacade.getTestKekManager();
testKekManager.generateKek(topic.name());

var builder = proxy(cluster);

builder.addToFilters(buildEncryptionFilterDefinition(testKmsFacade));

try (var tester = kroxyliciousTester(builder);
var producer1 = tester.producer();
var producer2 = tester.producer();
var consumer = tester.consumer()) {

producer1.send(new ProducerRecord<>(topic.name(), HELLO_WORLD + 1));
producer1.send(new ProducerRecord<>(topic.name(), HELLO_WORLD + 2));
producer1.send(new ProducerRecord<>(topic.name(), HELLO_WORLD + 3)).get(5, TimeUnit.SECONDS);
producer2.send(new ProducerRecord<>(topic.name(), HELLO_WORLD + 4));
producer2.send(new ProducerRecord<>(topic.name(), HELLO_WORLD + 5)).get(5, TimeUnit.SECONDS);

consumer.subscribe(List.of(topic.name()));
var records = consumer.poll(Duration.ofSeconds(2));
assertThat(records.iterator())
.toIterable()
.hasSize(5)
.extracting(ConsumerRecord::value)
.containsExactly(HELLO_WORLD + 1, HELLO_WORLD + 2, HELLO_WORLD + 3, HELLO_WORLD + 4, HELLO_WORLD + 5);
}
}

// This ensures the decrypt-ability guarantee, post kek rotation
@TestTemplate
void decryptionAfterKekRotation(KafkaCluster cluster, Topic topic, TestKmsFacade<?, ?, ?> testKmsFacade) throws Exception {
Expand Down