Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
Compression must be applied during deferred schema preparation and en…
Browse files Browse the repository at this point in the history
…ableBatching is enabled (apache#9396)

* Compression must be applied during deferred schema preparation and enableBatching is enabled
if you do not set an initial schema to the Producer the schema must be prepared at the first message with a Schema.
There is a bug and compression is not applied in this case, and the consumer receives an uncompressed message, failing

* address Matteo's comments

Co-authored-by: Enrico Olivelli <eolivelli@apache.org>
(cherry picked from commit 0143850)
  • Loading branch information
eolivelli authored and codelipenghui committed Feb 7, 2021
1 parent 600f137 commit e72b924
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,15 @@
import java.util.stream.Collectors;

import lombok.Cleanup;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
Expand Down Expand Up @@ -3606,4 +3609,49 @@ public void testIncomingMessageSize(boolean isPartitioned) throws Exception {
Assert.assertEquals(size, 0);
});
}


@Data
@EqualsAndHashCode
public static class MyBean {
private String field;
}

@DataProvider(name = "enableBatching")
public static Object[] isEnableBatching() {
return new Object[]{false, true};
}

@Test(dataProvider = "enableBatching")
public void testSendCompressedWithDeferredSchemaSetup(boolean enableBatching) throws Exception {
log.info("-- Starting {} test --", methodName);

final String topic = "persistent://my-property/my-ns/deferredSchemaCompressed";
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionName("testsub")
.subscribe();

// initially we are not setting a Schema in the producer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(enableBatching)
.compressionType(CompressionType.LZ4)
.create();
MyBean payload = new MyBean();
payload.setField("aaaaaaaaaaaaaaaaaaaaaaaaa");

// now we send with a schema, but we have enabled compression and batching
// the producer will have to setup the schema and resume the send
producer.newMessage(Schema.AVRO(MyBean.class)).value(payload).send();
producer.close();

GenericRecord res = consumer.receive().getValue();
consumer.close();
for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
log.info("field {} {}", f.getName(), res.getField(f));
assertEquals("field", f.getName());
assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
}
assertEquals(1, res.getFields().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,17 @@ public void addCallback(MessageImpl<?> msg, SendCallback scb) {
return future;
}

/**
* Compress the payload if compression is configured
* @param payload
* @return a new payload
*/
private ByteBuf applyCompression(ByteBuf payload) {
ByteBuf compressedPayload = compressor.encode(payload);
payload.release();
return compressedPayload;
}

public void sendAsync(Message<?> message, SendCallback callback) {
checkArgument(message instanceof MessageImpl);

Expand All @@ -363,11 +374,12 @@ public void sendAsync(Message<?> message, SendCallback callback) {
// If compression is enabled, we are compressing, otherwise it will simply use the same buffer
int uncompressedSize = payload.readableBytes();
ByteBuf compressedPayload = payload;
boolean compressed = false;
// Batch will be compressed when closed
// If a message has a delayed delivery time, we'll always send it individually
if (!isBatchMessagingEnabled() || msgMetadataBuilder.hasDeliverAtTime()) {
compressedPayload = compressor.encode(payload);
payload.release();
compressedPayload = applyCompression(payload);
compressed = true;

// validate msg-size (For batching this will be check at the batch completion size)
int compressedSize = compressedPayload.readableBytes();
Expand Down Expand Up @@ -415,7 +427,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
String uuid = UUID.randomUUID().toString();
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
serializeAndSendMessage(msg, msgMetadataBuilder, payload, uuid, chunkId, totalChunks,
readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload,
readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
compressedPayload.readableBytes(), uncompressedSize, callback);
readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());
}
Expand All @@ -430,7 +442,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {

private void serializeAndSendMessage(MessageImpl<?> msg, Builder msgMetadataBuilder, ByteBuf payload,
String uuid, int chunkId, int totalChunks, int readStartIndex, int chunkMaxSizeInBytes, ByteBuf compressedPayload,
int compressedPayloadSize,
boolean compressed, int compressedPayloadSize,
int uncompressedSize, SendCallback callback) throws IOException, InterruptedException {
ByteBuf chunkPayload = compressedPayload;
Builder chunkMsgMetadataBuilder = msgMetadataBuilder;
Expand Down Expand Up @@ -503,7 +515,12 @@ private void serializeAndSendMessage(MessageImpl<?> msg, Builder msgMetadataBuil
doBatchSendAndAdd(msg, callback, payload);
}
} else {
ByteBuf encryptedPayload = encryptMessage(chunkMsgMetadataBuilder, chunkPayload);
// in this case compression has not been applied by the caller
// but we have to compress the payload if compression is configured
if (!compressed) {
chunkPayload = applyCompression(chunkPayload);
}
ByteBuf encryptedPayload = encryptMessage(msgMetadataBuilder, chunkPayload);

MessageMetadata msgMetadata = chunkMsgMetadataBuilder.build();
// When publishing during replication, we need to set the correct number of message in batch
Expand Down

0 comments on commit e72b924

Please sign in to comment.