Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Add Batch support (#42)
Browse files Browse the repository at this point in the history
Fix #16 
In KoP we would like to keep batch message. In Kafka message passed into Broker in format `Records`, each `Records` contains 1 or more `Record`. This is similar to the batched Message in Pulsar.
But because we have to turn Kafka `Records` into Pulsar BatchedMessage to make message could both be recognized by both Pulsar and Kafka client , we have to read each Record out from Records, and turn into Pulsar Message, This may involve some overhead of un-batch/re-batch. 

changes:
- move message produce/consume logic from KafkaRequestHandler.java into separate files.
- add support for batch produce/consume.
- add support for message headers.
- change offset format in MessageRecordUtils.java to support batch index.
- add test for added code.
  • Loading branch information
jiazhai authored and sijie committed Oct 4, 2019
1 parent 2c4ef03 commit 4a88999
Show file tree
Hide file tree
Showing 11 changed files with 1,188 additions and 584 deletions.
540 changes: 15 additions & 525 deletions src/main/java/io/streamnative/kop/KafkaRequestHandler.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.streamnative.kop;

import static com.google.common.base.Preconditions.checkArgument;
import static io.streamnative.kop.utils.MessageIdUtils.offsetAfterBatchIndex;

import io.streamnative.kop.utils.MessageIdUtils;
import java.util.UUID;
Expand Down Expand Up @@ -48,6 +49,9 @@ public class KafkaTopicConsumerManager {
}

public CompletableFuture<Pair<ManagedCursor, Long>> remove(long offset) {
// This is for read a new entry, first check if offset is from a batched message request.
offset = offsetAfterBatchIndex(offset);

CompletableFuture<Pair<ManagedCursor, Long>> cursor = consumers.remove(offset);
if (cursor != null) {
if (log.isDebugEnabled()) {
Expand Down
347 changes: 347 additions & 0 deletions src/main/java/io/streamnative/kop/MessageFetchContext.java

Large diffs are not rendered by default.

161 changes: 161 additions & 0 deletions src/main/java/io/streamnative/kop/MessagePublishContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.kop;

import static io.streamnative.kop.utils.MessageRecordUtils.messageToByteBuf;
import static io.streamnative.kop.utils.MessageRecordUtils.recordToEntry;
import static io.streamnative.kop.utils.MessageRecordUtils.recordsToByteBuf;

import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.streamnative.kop.utils.MessageIdUtils;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.Topic.PublishContext;

/**
* Implementation for PublishContext.
*/
@Slf4j
public final class MessagePublishContext implements PublishContext {

private CompletableFuture<Long> offsetFuture;
private Topic topic;
private long startTimeNs;
public static final boolean MESSAGE_BATCHED = true;

/**
* Executed from managed ledger thread when the message is persisted.
*/
@Override
public void completed(Exception exception, long ledgerId, long entryId) {

if (exception != null) {
log.error("Failed write entry: ledgerId: {}, entryId: {}. triggered send callback.",
ledgerId, entryId);
offsetFuture.completeExceptionally(exception);
} else {
if (log.isDebugEnabled()) {
log.debug("Success write topic: {}, ledgerId: {}, entryId: {}"
+ " And triggered send callback.",
topic.getName(), ledgerId, entryId);
}

topic.recordAddLatency(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTimeNs));

offsetFuture.complete(Long.valueOf(MessageIdUtils.getOffset(ledgerId, entryId)));
}

recycle();
}

// recycler
public static MessagePublishContext get(CompletableFuture<Long> offsetFuture,
Topic topic,
long startTimeNs) {
MessagePublishContext callback = RECYCLER.get();
callback.offsetFuture = offsetFuture;
callback.topic = topic;
callback.startTimeNs = startTimeNs;
return callback;
}

private final Handle<MessagePublishContext> recyclerHandle;

private MessagePublishContext(Handle<MessagePublishContext> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<MessagePublishContext> RECYCLER = new Recycler<MessagePublishContext>() {
protected MessagePublishContext newObject(Handle<MessagePublishContext> handle) {
return new MessagePublishContext(handle);
}
};

public void recycle() {
offsetFuture = null;
topic = null;
startTimeNs = -1;
recyclerHandle.recycle(this);
}


// publish Kafka records to pulsar topic, handle callback in MessagePublishContext.
public static void publishMessages(MemoryRecords records,
Topic topic,
CompletableFuture<PartitionResponse> future) {

// get records size.
AtomicInteger size = new AtomicInteger(0);
records.records().forEach(record -> size.incrementAndGet());
int rec = size.get();

if (log.isDebugEnabled()) {
log.debug("publishMessages for topic partition: {} , records size is {} ", topic.getName(), size.get());
}

if (MESSAGE_BATCHED) {
CompletableFuture<Long> offsetFuture = new CompletableFuture<>();

ByteBuf headerAndPayload = recordsToByteBuf(records, rec);
topic.publishMessage(
headerAndPayload,
MessagePublishContext.get(
offsetFuture, topic, System.nanoTime()));

offsetFuture.whenComplete((offset, ex) -> {
if (ex != null) {
log.error("publishMessages for topic partition: {} failed when write.",
topic.getName(), ex);
future.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR));
} else {
future.complete(new PartitionResponse(Errors.NONE));
}
});
} else {
List<CompletableFuture<Long>> futures = Collections
.synchronizedList(Lists.newArrayListWithExpectedSize(size.get()));

records.records().forEach(record -> {
CompletableFuture<Long> offsetFuture = new CompletableFuture<>();
futures.add(offsetFuture);
ByteBuf headerAndPayload = messageToByteBuf(recordToEntry(record));
topic.publishMessage(
headerAndPayload,
MessagePublishContext.get(
offsetFuture, topic, System.nanoTime()));
});

CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[rec])).whenComplete((ignore, ex) -> {
if (ex != null) {
log.error("publishMessages for topic partition: {} failed when write.",
topic.getName(), ex);
future.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR));
} else {
future.complete(new PartitionResponse(Errors.NONE));
}
});
}
}
}
51 changes: 44 additions & 7 deletions src/main/java/io/streamnative/kop/utils/MessageIdUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,39 @@
* Utils for Pulsar MessageId.
*/
public class MessageIdUtils {
// use 28 bits for ledgerId,
// 32 bits for entryId,
// 12 bits for batchIndex.
public static final int LEDGER_BITS = 20;
public static final int ENTRY_BITS = 32;
public static final int BATCH_BITS = 12;

public static final long getOffset(long ledgerId, long entryId) {
// Combine ledger id and entry id to form offset
// Use less than 32 bits to represent entry id since it will get
// rolled over way before overflowing the max int range
checkArgument(ledgerId > 0, "Expected ledgerId > 0, but get " + ledgerId);
checkArgument(entryId >= 0, "Expected entryId >= 0, but get " + entryId);

long offset = (ledgerId << 28) | entryId;
long offset = (ledgerId << (ENTRY_BITS + BATCH_BITS) | (entryId << BATCH_BITS));
return offset;
}

public static final long getOffset(long ledgerId, long entryId, int batchIndex) {
checkArgument(ledgerId > 0, "Expected ledgerId > 0, but get " + ledgerId);
checkArgument(entryId >= 0, "Expected entryId >= 0, but get " + entryId);
checkArgument(batchIndex >= 0, "Expected batchIndex >= 0, but get " + batchIndex);
checkArgument(batchIndex < (1 << BATCH_BITS),
"Expected batchIndex only take " + BATCH_BITS + " bits, but it is " + batchIndex);

long offset = (ledgerId << (ENTRY_BITS + BATCH_BITS) | (entryId << BATCH_BITS)) + batchIndex;
return offset;
}

public static final MessageId getMessageId(long offset) {
// De-multiplex ledgerId and entryId from offset
checkArgument(offset > 0, "Expected Offset > 0, but get " + offset);

long ledgerId = offset >>> 28;
long entryId = offset & 0x0F_FF_FF_FFL;
long ledgerId = offset >>> (ENTRY_BITS + BATCH_BITS);
long entryId = (offset & 0x0F_FF_FF_FF_FF_FFL) >>> BATCH_BITS;

return new MessageIdImpl(ledgerId, entryId, -1);
}
Expand All @@ -49,9 +64,31 @@ public static final PositionImpl getPosition(long offset) {
// De-multiplex ledgerId and entryId from offset
checkArgument(offset >= 0, "Expected Offset >= 0, but get " + offset);

long ledgerId = offset >>> 28;
long entryId = offset & 0x0F_FF_FF_FFL;
long ledgerId = offset >>> (ENTRY_BITS + BATCH_BITS);
long entryId = (offset & 0x0F_FF_FF_FF_FF_FFL) >>> BATCH_BITS;

return new PositionImpl(ledgerId, entryId);
}

// get the batchIndex contained in offset.
public static final int getBatchIndex(long offset) {
checkArgument(offset >= 0, "Expected Offset >= 0, but get " + offset);

return (int) (offset & 0x0F_FF);
}

// get next offset that after batch Index.
// In TopicConsumereManager, next read offset is updated after each entry reads,
// if it read a batched message previously, the next offset waiting read is next entry.
public static final long offsetAfterBatchIndex(long offset) {
// De-multiplex ledgerId and entryId from offset
checkArgument(offset >= 0, "Expected Offset >= 0, but get " + offset);

int batchIndex = getBatchIndex(offset);
// this is a for
if (batchIndex != 0) {
return (offset - batchIndex) + (1 << BATCH_BITS);
}
return offset;
}
}
Loading

0 comments on commit 4a88999

Please sign in to comment.