diff --git a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java index b23116ba48..1b65fa6fe5 100644 --- a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java +++ b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java @@ -15,48 +15,36 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static io.streamnative.kop.MessagePublishContext.publishMessages; +import static io.streamnative.kop.utils.TopicNameUtils.pulsarTopicName; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.util.Recycler; -import io.netty.util.Recycler.Handle; import io.streamnative.kop.coordinator.group.GroupMetadata.GroupSummary; import io.streamnative.kop.offset.OffsetAndMetadata; import io.streamnative.kop.utils.CoreUtils; import io.streamnative.kop.utils.MessageIdUtils; -import java.io.IOException; -import java.io.UncheckedIOException; import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; -import java.time.Clock; -import java.util.Base64; import java.util.Collections; -import java.util.Date; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; -import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -64,17 +52,12 @@ import org.apache.bookkeeper.mledger.impl.OffsetFinder; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.NotImplementedException; -import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.header.Header; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.MemoryRecordsBuilder; -import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.DeleteGroupsRequest; @@ -84,8 +67,6 @@ import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMember; import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata; import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.requests.FetchResponse.PartitionData; import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatRequest; @@ -109,28 +90,15 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; -import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.Utils; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; -import org.apache.pulsar.broker.service.Topic; -import org.apache.pulsar.broker.service.Topic.PublishContext; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; -import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; -import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.protocol.Commands.ChecksumType; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.util.Murmur3_32Hash; @@ -148,12 +116,6 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final PulsarAdmin admin; private final KafkaTopicManager topicManager; - private static final Clock clock = Clock.systemDefaultZone(); - private static final String FAKE_KOP_PRODUCER_NAME = "fake_kop_producer_name"; - - private static final int DEFAULT_FETCH_BUFFER_SIZE = 1024 * 1024; - private static final int MAX_RECORDS_BUFFER_SIZE = 100 * 1024 * 1024; - public KafkaRequestHandler(KafkaService kafkaService) throws Exception { super(); @@ -236,7 +198,7 @@ protected CompletableFuture handleTopicMetadataRequest(Kafka requestTopics.stream() .forEach(topic -> { - TopicName topicName = pulsarTopicName(topic); + TopicName topicName = pulsarTopicName(topic, kafkaNamespace); // get partition numbers for each topic. PersistentTopicsBase .getPartitionedTopicMetadata( @@ -431,6 +393,8 @@ protected CompletableFuture handleProduceRequest(KafkaHeader final int responsesSize = produceRequest.partitionRecordsOrFail().size(); + // TODO: handle un-exist topic: + // nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) for (Map.Entry entry : produceRequest.partitionRecordsOrFail().entrySet()) { TopicPartition topicPartition = entry.getKey(); @@ -443,7 +407,7 @@ protected CompletableFuture handleProduceRequest(KafkaHeader topicPartition.topic(), topicPartition.partition(), responsesSize); } - TopicName topicName = pulsarTopicName(topicPartition); + TopicName topicName = pulsarTopicName(topicPartition, kafkaNamespace); kafkaService.getBrokerService().getTopic(topicName.toString(), true) .whenComplete((topicOpt, exception) -> { @@ -453,8 +417,9 @@ protected CompletableFuture handleProduceRequest(KafkaHeader partitionResponse.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR)); } else { if (topicOpt.isPresent()) { + // TODO: need to add a produce Manager topicManager.addTopic(topicName.toString(), (PersistentTopic) topicOpt.get()); - publishMessages(entry.getValue(), topicOpt.get(), partitionResponse); + publishMessages((MemoryRecords) entry.getValue(), topicOpt.get(), partitionResponse); } else { log.error("[{}] Request {}: getOrCreateTopic get empty topic for name {}", ctx.channel(), produceHar.getHeader(), topicName); @@ -482,234 +447,6 @@ protected CompletableFuture handleProduceRequest(KafkaHeader return resultFuture; } - // publish Kafka records to pulsar topic, handle callback in MessagePublishContext. - private void publishMessages(Records records, - Topic topic, - CompletableFuture future) { - - // get records size. - AtomicInteger size = new AtomicInteger(0); - records.records().forEach(record -> size.incrementAndGet()); - int rec = size.get(); - - if (log.isDebugEnabled()) { - log.debug("[{}] publishMessages for topic partition: {} , records size is {} ", - ctx.channel(), topic.getName(), size.get()); - } - - // TODO: Handle Records in a batched way: - // https://github.com/streamnative/kop/issues/16 - List> futures = Collections - .synchronizedList(Lists.newArrayListWithExpectedSize(size.get())); - - records.records().forEach(record -> { - CompletableFuture offsetFuture = new CompletableFuture<>(); - futures.add(offsetFuture); - ByteBuf headerAndPayload = messageToByteBuf(recordToEntry(record)); - topic.publishMessage( - headerAndPayload, - MessagePublishContext.get( - offsetFuture, topic, record.sequence(), - record.sizeInBytes(), System.nanoTime())); - }); - - CompletableFuture.allOf(futures.toArray(new CompletableFuture[rec])).whenComplete((ignore, ex) -> { - if (ex != null) { - log.error("[{}] publishMessages for topic partition: {} failed when write.", - ctx.channel(), topic.getName(), ex); - future.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR)); - } else { - future.complete(new PartitionResponse(Errors.NONE)); - } - }); - } - - - private static final class MessagePublishContext implements PublishContext { - private CompletableFuture offsetFuture; - private Topic topic; - private long sequenceId; - private int msgSize; - private long startTimeNs; - - public long getSequenceId() { - return sequenceId; - } - - /** - * Executed from managed ledger thread when the message is persisted. - */ - @Override - public void completed(Exception exception, long ledgerId, long entryId) { - - if (exception != null) { - log.error("Failed write entry: ledgerId: {}, entryId: {}, sequenceId: {}. triggered send callback.", - ledgerId, entryId, sequenceId); - offsetFuture.completeExceptionally(exception); - } else { - if (log.isDebugEnabled()) { - log.debug("Success write topic: {}, ledgerId: {}, entryId: {}, sequenceId: {}," - + "messageSize: {}. And triggered send callback.", - topic.getName(), ledgerId, entryId, sequenceId, msgSize); - } - - topic.recordAddLatency(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTimeNs)); - - offsetFuture.complete(Long.valueOf(MessageIdUtils.getOffset(ledgerId, entryId))); - } - - recycle(); - } - - // recycler - static MessagePublishContext get(CompletableFuture offsetFuture, - Topic topic, - long sequenceId, - int msgSize, - long startTimeNs) { - MessagePublishContext callback = RECYCLER.get(); - callback.offsetFuture = offsetFuture; - callback.topic = topic; - callback.sequenceId = sequenceId; - callback.msgSize = msgSize; - callback.startTimeNs = startTimeNs; - return callback; - } - - private final Handle recyclerHandle; - - private MessagePublishContext(Handle recyclerHandle) { - this.recyclerHandle = recyclerHandle; - } - - private static final Recycler RECYCLER = new Recycler() { - protected MessagePublishContext newObject(Recycler.Handle handle) { - return new MessagePublishContext(handle); - } - }; - - public void recycle() { - offsetFuture = null; - topic = null; - sequenceId = -1; - msgSize = 0; - startTimeNs = -1; - recyclerHandle.recycle(this); - } - } - - // convert kafka Record to Pulsar Message. - private Message recordToEntry(Record record) { - @SuppressWarnings("unchecked") - TypedMessageBuilderImpl builder = new TypedMessageBuilderImpl(null, Schema.BYTES); - - // key - if (record.hasKey()) { - byte[] key = new byte[record.keySize()]; - record.key().get(key); - builder.keyBytes(key); - } - - // value - if (record.hasValue()) { - byte[] value = new byte[record.valueSize()]; - record.value().get(value); - builder.value(value); - } else { - builder.value(new byte[0]); - } - - // sequence - if (record.sequence() >= 0) { - builder.sequenceId(record.sequence()); - } - - // timestamp - if (record.timestamp() >= 0) { - builder.eventTime(record.timestamp()); - } - - // header - for (Header h : record.headers()) { - builder.property(h.key(), - Base64.getEncoder().encodeToString(h.value())); - } - - return builder.getMessage(); - } - - // convert message to ByteBuf payload for ledger.addEntry. - private ByteBuf messageToByteBuf(Message message) { - checkArgument(message instanceof MessageImpl); - - MessageImpl msg = (MessageImpl) message; - MessageMetadata.Builder msgMetadataBuilder = msg.getMessageBuilder(); - ByteBuf payload = msg.getDataBuffer(); - - // filled in required fields - if (!msgMetadataBuilder.hasSequenceId()) { - msgMetadataBuilder.setSequenceId(-1); - } - if (!msgMetadataBuilder.hasPublishTime()) { - msgMetadataBuilder.setPublishTime(clock.millis()); - } - if (!msgMetadataBuilder.hasProducerName()) { - msgMetadataBuilder.setProducerName(FAKE_KOP_PRODUCER_NAME); - } - - msgMetadataBuilder.setCompression( - CompressionCodecProvider.convertToWireProtocol(CompressionType.NONE)); - msgMetadataBuilder.setUncompressedSize(payload.readableBytes()); - MessageMetadata msgMetadata = msgMetadataBuilder.build(); - - ByteBuf buf = Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload); - - msgMetadataBuilder.recycle(); - msgMetadata.recycle(); - - return buf; - } - - - // convert entries read from BookKeeper into Kafka Records - private static MemoryRecords entriesToRecords(List entries) { - try (ByteBufferOutputStream outputStream = new ByteBufferOutputStream(DEFAULT_FETCH_BUFFER_SIZE)) { - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(outputStream, RecordBatch.CURRENT_MAGIC_VALUE, - org.apache.kafka.common.record.CompressionType.NONE, - TimestampType.CREATE_TIME, - MessageIdUtils.getOffset(entries.get(0).getLedgerId(), 0), - RecordBatch.NO_TIMESTAMP, - RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, - RecordBatch.NO_SEQUENCE, - false, false, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - MAX_RECORDS_BUFFER_SIZE); - - for (Entry entry : entries) { - ByteBuf metadataAndPayload = entry.getDataBuffer(); - MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload); - ByteBuf payload = metadataAndPayload.retain(); - - byte[] data = new byte[payload.readableBytes()]; - payload.readBytes(data); - - builder.appendWithOffset( - MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()), - msgMetadata.getEventTime(), - Base64.getDecoder().decode(msgMetadata.getPartitionKey()), - data); - } - return builder.build(); - } catch (IOException ioe) { - log.error("Meet IOException: {}", ioe); - throw new UncheckedIOException(ioe); - } catch (Exception e) { - log.error("Meet exception: {}", e); - throw e; - } - } - // A simple implementation, returns this broker node. protected CompletableFuture handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator) { @@ -874,7 +611,7 @@ private CompletableFuture handleListOffsetRequestV1AndAbove( request.partitionTimestamps().entrySet().stream().forEach(tms -> { TopicPartition topic = tms.getKey(); Long times = tms.getValue(); - String pulsarTopic = pulsarTopicName(topic).toString(); + String pulsarTopic = pulsarTopicName(topic, kafkaNamespace).toString(); CompletableFuture partitionData; @@ -936,7 +673,7 @@ private Map nonExistingTopicErrors(OffsetCommitRequest r return request.offsetData().entrySet().stream() .filter(entry -> // filter not exist topics - !topicManager.topicExists(pulsarTopicName(entry.getKey()).toString())) + !topicManager.topicExists(pulsarTopicName(entry.getKey(), kafkaNamespace).toString())) .collect(Collectors.toMap( e -> e.getKey(), e -> Errors.UNKNOWN_TOPIC_OR_PARTITION @@ -958,7 +695,9 @@ protected CompletableFuture handleOffsetCommitRequest(KafkaH request.memberId(), request.generationId(), CoreUtils.mapValue( - request.offsetData(), + request.offsetData().entrySet().stream() + .filter(entry -> !nonExistingTopic.containsKey(entry.getKey())) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())), (partitionData) -> OffsetAndMetadata.apply(partitionData.offset, partitionData.metadata, partitionData.timestamp) ) @@ -973,218 +712,6 @@ protected CompletableFuture handleOffsetCommitRequest(KafkaH return resultFuture; } - private void readMessages(KafkaHeaderAndRequest fetch, - Map>> cursors, - CompletableFuture resultFuture) { - AtomicInteger bytesRead = new AtomicInteger(0); - Map> responseValues = new ConcurrentHashMap<>(); - - if (log.isDebugEnabled()) { - log.debug("[{}] Request {}: Read Messages for request.", - ctx.channel(), fetch.getHeader()); - } - - readMessagesInternal(fetch, cursors, bytesRead, responseValues, resultFuture); - } - - private void readMessagesInternal(KafkaHeaderAndRequest fetch, - Map>> cursors, - AtomicInteger bytesRead, - Map> responseValues, - CompletableFuture resultFuture) { - AtomicInteger entriesRead = new AtomicInteger(0); - Map> readFutures = readAllCursorOnce(cursors); - CompletableFuture.allOf(readFutures.values().stream().toArray(CompletableFuture[]::new)) - .whenComplete((ignore, ex) -> { - // keep entries since all read completed. - readFutures.forEach((topic, readEntry) -> { - try { - Entry entry = readEntry.join(); - List entryList = responseValues.computeIfAbsent(topic, l -> Lists.newArrayList()); - - if (entry != null) { - entryList.add(entry); - entriesRead.incrementAndGet(); - bytesRead.addAndGet(entry.getLength()); - if (log.isDebugEnabled()) { - log.debug("[{}] Request {}: For topic {}, entries in list: {}. add new entry {}:{}", - ctx.channel(), fetch.getHeader(), topic.toString(), entryList.size(), - entry.getLedgerId(), entry.getEntryId()); - } - } - } catch (Exception e) { - // readEntry.join failed. ignore this partition - log.error("[{}] Request {}: Failed readEntry.join for topic: {}. ", - ctx.channel(), fetch.getHeader(), topic, e); - cursors.remove(topic); - responseValues.putIfAbsent(topic, Lists.newArrayList()); - } - }); - - FetchRequest request = (FetchRequest) fetch.getRequest(); - int maxBytes = request.maxBytes(); - int minBytes = request.minBytes(); - int waitTime = request.maxWait(); // in ms - // if endTime <= 0, then no time wait, wait for minBytes. - long endTime = waitTime > 0 ? System.currentTimeMillis() + waitTime : waitTime; - - int allSize = bytesRead.get(); - - if (log.isDebugEnabled()) { - log.debug("[{}] Request {}: One round read {} entries, " - + "allSize/maxBytes/minBytes/endTime: {}/{}/{}/{}", - ctx.channel(), fetch.getHeader(), entriesRead.get(), - allSize, maxBytes, minBytes, new Date(endTime)); - } - - // all partitions read no entry, return earlier; - // reach maxTime, return; - // reach minBytes if no endTime, return; - if ((allSize == 0 && entriesRead.get() == 0) - || (endTime > 0 && endTime <= System.currentTimeMillis()) - || allSize > minBytes - || allSize > maxBytes){ - if (log.isDebugEnabled()) { - log.debug("[{}] Request {}: Complete read {} entries with size {}", - ctx.channel(), fetch.getHeader(), entriesRead.get(), allSize); - } - - LinkedHashMap> responseData = new LinkedHashMap<>(); - - AtomicBoolean allPartitionsNoEntry = new AtomicBoolean(true); - responseValues.forEach((topicPartition, entries) -> { - final FetchResponse.PartitionData partitionData; - if (entries.isEmpty()) { - partitionData = new FetchResponse.PartitionData( - Errors.NONE, - FetchResponse.INVALID_HIGHWATERMARK, - FetchResponse.INVALID_LAST_STABLE_OFFSET, - FetchResponse.INVALID_LOG_START_OFFSET, - null, - MemoryRecords.EMPTY); - } else { - allPartitionsNoEntry.set(false); - Entry entry = entries.get(entries.size() - 1); - long entryOffset = MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()); - long highWatermark = entryOffset - + cursors.get(topicPartition).join().getLeft().getNumberOfEntries(); - - MemoryRecords records = entriesToRecords(entries); - partitionData = new FetchResponse.PartitionData( - Errors.NONE, - highWatermark, - highWatermark, - highWatermark, - null, - records); - } - responseData.put(topicPartition, partitionData); - }); - - if (allPartitionsNoEntry.get()) { - log.error("[{}] Request {}: All partitions for request read 0 entry", - ctx.channel(), fetch.getHeader()); - - // returned earlier, sleep for waitTime - try { - Thread.sleep(waitTime); - } catch (Exception e) { - log.error("[{}] Request {}: error while sleep.", - ctx.channel(), fetch.getHeader(), e); - } - - resultFuture.complete(ResponseAndRequest.of( - new FetchResponse(Errors.NONE, - responseData, - ((Integer) THROTTLE_TIME_MS.defaultValue), - ((FetchRequest) fetch.getRequest()).metadata().sessionId()), - fetch)); - } else { - resultFuture.complete(ResponseAndRequest.of( - new FetchResponse( - Errors.NONE, - responseData, - ((Integer) THROTTLE_TIME_MS.defaultValue), - ((FetchRequest) fetch.getRequest()).metadata().sessionId()), - fetch)); - } - } else { - //need do another round read - readMessagesInternal(fetch, cursors, bytesRead, responseValues, resultFuture); - } - }); - } - - private Map> readAllCursorOnce( - Map>> cursors) { - Map> readFutures = new ConcurrentHashMap<>(); - - cursors.entrySet().forEach(pair -> { - // non durable cursor create is a sync method. - ManagedCursor cursor; - CompletableFuture readFuture = new CompletableFuture<>(); - - try { - Pair cursorOffsetPair = pair.getValue().join(); - cursor = cursorOffsetPair.getLeft(); - long keptOffset = cursorOffsetPair.getRight(); - - // only read 1 entry currently. could read more in a batch. - cursor.asyncReadEntries(1, - new ReadEntriesCallback() { - @Override - public void readEntriesComplete(List list, Object o) { - TopicName topicName = pulsarTopicName(pair.getKey()); - - Entry entry = null; - if (!list.isEmpty()) { - entry = list.get(0); - long offset = MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()); - - if (log.isDebugEnabled()) { - log.debug("Topic {} success read entry: ledgerId: {}, entryId: {}, size: {}," - + " ConsumerManager original offset: {}, entryOffset: {}", - topicName.toString(), entry.getLedgerId(), entry.getEntryId(), - entry.getLength(), keptOffset, offset); - } - - topicManager - .getTopicConsumerManager(topicName.toString()) - .thenAccept(cm -> cm.add(offset + 1, Pair.of(cursor, offset + 1))); - } else { - // since no read entry, add the original offset back. - if (log.isDebugEnabled()) { - log.debug("Read no entry, add offset back: {}", - keptOffset); - } - - topicManager - .getTopicConsumerManager(topicName.toString()) - .thenAccept(cm -> - cm.add(keptOffset, Pair.of(cursor, keptOffset))); - } - - readFuture.complete(entry); - } - - @Override - public void readEntriesFailed(ManagedLedgerException e, Object o) { - log.error("Error read entry for topic: {}", pulsarTopicName(pair.getKey())); - readFuture.completeExceptionally(e); - } - }, null); - } catch (Exception e) { - log.error("Error for cursor to read entry for topic: {}. ", pulsarTopicName(pair.getKey()), e); - readFuture.completeExceptionally(e); - } - - readFutures.putIfAbsent(pair.getKey(), readFuture); - }); - - return readFutures; - } - - protected CompletableFuture handleFetchRequest(KafkaHeaderAndRequest fetch) { checkArgument(fetch.getRequest() instanceof FetchRequest); FetchRequest request = (FetchRequest) fetch.getRequest(); @@ -1200,31 +727,8 @@ protected CompletableFuture handleFetchRequest(KafkaHeaderAn }); } - // Map of partition and related cursor - Map>> topicsAndCursor = request - .fetchData().entrySet().stream() - .map(entry -> { - TopicName topicName = pulsarTopicName(entry.getKey()); - long offset = entry.getValue().fetchOffset; - - if (log.isDebugEnabled()) { - log.debug("[{}] Request {}: Fetch topic {}, remove cursor for fetch offset: {}.", - ctx.channel(), fetch.getHeader(), topicName, offset); - } - - return Pair.of( - entry.getKey(), - topicManager.getTopicConsumerManager(topicName.toString()) - .thenCompose(cm -> cm.remove(offset))); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - // wait to get all the cursor, then readMessages - CompletableFuture - .allOf(topicsAndCursor.entrySet().stream().map(Map.Entry::getValue).toArray(CompletableFuture[]::new)) - .whenComplete((ignore, ex) -> readMessages(fetch, topicsAndCursor, resultFuture)); - - return resultFuture; + MessageFetchContext fetchContext = MessageFetchContext.get(this, fetch, resultFuture); + return fetchContext.handleFetch(); } protected CompletableFuture handleJoinGroupRequest(KafkaHeaderAndRequest joinGroup) { @@ -1481,20 +985,6 @@ private CompletableFuture findBroker(KafkaService kafkaServic return resultFuture; } - private TopicName pulsarTopicName(TopicPartition topicPartition) { - return pulsarTopicName(topicPartition.topic(), topicPartition.partition()); - } - - private TopicName pulsarTopicName(String topic) { - return TopicName.get(TopicDomain.persistent.value(), kafkaNamespace, topic); - } - - private TopicName pulsarTopicName(String topic, int partitionIndex) { - return TopicName.get(TopicDomain.persistent.value(), - kafkaNamespace, - topic + PARTITIONED_TOPIC_SUFFIX + partitionIndex); - } - // TODO: handle Kafka Node.id // - https://github.com/streamnative/kop/issues/9 static Node newNode(InetSocketAddress address) { diff --git a/src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java b/src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java index 71733fc119..be9354bd92 100644 --- a/src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java +++ b/src/main/java/io/streamnative/kop/KafkaTopicConsumerManager.java @@ -14,6 +14,7 @@ package io.streamnative.kop; import static com.google.common.base.Preconditions.checkArgument; +import static io.streamnative.kop.utils.MessageIdUtils.offsetAfterBatchIndex; import io.streamnative.kop.utils.MessageIdUtils; import java.util.UUID; @@ -48,6 +49,9 @@ public class KafkaTopicConsumerManager { } public CompletableFuture> remove(long offset) { + // This is for read a new entry, first check if offset is from a batched message request. + offset = offsetAfterBatchIndex(offset); + CompletableFuture> cursor = consumers.remove(offset); if (cursor != null) { if (log.isDebugEnabled()) { diff --git a/src/main/java/io/streamnative/kop/MessageFetchContext.java b/src/main/java/io/streamnative/kop/MessageFetchContext.java new file mode 100644 index 0000000000..0f6f67dcba --- /dev/null +++ b/src/main/java/io/streamnative/kop/MessageFetchContext.java @@ -0,0 +1,347 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop; + +import static io.streamnative.kop.utils.MessageRecordUtils.entriesToRecords; +import static io.streamnative.kop.utils.TopicNameUtils.pulsarTopicName; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; + +import com.google.common.collect.Lists; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; +import io.streamnative.kop.KafkaCommandDecoder.KafkaHeaderAndRequest; +import io.streamnative.kop.KafkaCommandDecoder.ResponseAndRequest; +import io.streamnative.kop.utils.MessageIdUtils; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.FetchResponse.PartitionData; +import org.apache.pulsar.common.naming.TopicName; + +/** + * MessageFetchContext handling FetchRequest . + */ +@Slf4j +public final class MessageFetchContext { + + private KafkaRequestHandler requestHandler; + private KafkaHeaderAndRequest fetchRequest; + private CompletableFuture fetchResponse; + + // recycler and get for this object + public static MessageFetchContext get(KafkaRequestHandler requestHandler, + KafkaHeaderAndRequest fetchRequest, + CompletableFuture fetchResponse) { + MessageFetchContext context = RECYCLER.get(); + context.requestHandler = requestHandler; + context.fetchRequest = fetchRequest; + context.fetchResponse = fetchResponse; + return context; + } + + private final Handle recyclerHandle; + + private MessageFetchContext(Handle recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + private static final Recycler RECYCLER = new Recycler() { + protected MessageFetchContext newObject(Handle handle) { + return new MessageFetchContext(handle); + } + }; + + public void recycle() { + requestHandler = null; + fetchRequest = null; + fetchResponse = null; + recyclerHandle.recycle(this); + } + + + // handle request + public CompletableFuture handleFetch() { + // Map of partition and related cursor + Map>> topicsAndCursor = + ((FetchRequest) fetchRequest.getRequest()) + .fetchData().entrySet().stream() + .map(entry -> { + TopicName topicName = pulsarTopicName(entry.getKey(), requestHandler.getKafkaNamespace()); + long offset = entry.getValue().fetchOffset; + + if (log.isDebugEnabled()) { + log.debug("Request {}: Fetch topic {}, remove cursor for fetch offset: {}.", + fetchRequest.getHeader(), topicName, offset); + } + + return Pair.of( + entry.getKey(), + requestHandler.getTopicManager().getTopicConsumerManager(topicName.toString()) + .thenCompose(cm -> cm.remove(offset))); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + // wait to get all the cursor, then readMessages + CompletableFuture + .allOf(topicsAndCursor.entrySet().stream().map(Map.Entry::getValue).toArray(CompletableFuture[]::new)) + .whenComplete((ignore, ex) -> readMessages(fetchRequest, topicsAndCursor, fetchResponse)); + + return fetchResponse; + } + + + private void readMessages(KafkaHeaderAndRequest fetch, + Map>> cursors, + CompletableFuture resultFuture) { + AtomicInteger bytesRead = new AtomicInteger(0); + Map> responseValues = new ConcurrentHashMap<>(); + + if (log.isDebugEnabled()) { + log.debug("Request {}: Read Messages for request.", + fetch.getHeader()); + } + + readMessagesInternal(fetch, cursors, bytesRead, responseValues, resultFuture); + } + + private void readMessagesInternal(KafkaHeaderAndRequest fetch, + Map>> cursors, + AtomicInteger bytesRead, + Map> responseValues, + CompletableFuture resultFuture) { + AtomicInteger entriesRead = new AtomicInteger(0); + Map> readFutures = readAllCursorOnce(cursors); + CompletableFuture.allOf(readFutures.values().stream().toArray(CompletableFuture[]::new)) + .whenComplete((ignore, ex) -> { + // keep entries since all read completed. currently only read 1 entry each time. + readFutures.forEach((topic, readEntry) -> { + try { + Entry entry = readEntry.join(); + List entryList = responseValues.computeIfAbsent(topic, l -> Lists.newArrayList()); + + if (entry != null) { + entryList.add(entry); + entriesRead.incrementAndGet(); + bytesRead.addAndGet(entry.getLength()); + if (log.isDebugEnabled()) { + log.debug("Request {}: For topic {}, entries in list: {}. add new entry {}:{}", + fetch.getHeader(), topic.toString(), entryList.size(), + entry.getLedgerId(), entry.getEntryId()); + } + } + } catch (Exception e) { + // readEntry.join failed. ignore this partition + log.error("Request {}: Failed readEntry.join for topic: {}. ", + fetch.getHeader(), topic, e); + cursors.remove(topic); + responseValues.putIfAbsent(topic, Lists.newArrayList()); + } + }); + + FetchRequest request = (FetchRequest) fetch.getRequest(); + int maxBytes = request.maxBytes(); + int minBytes = request.minBytes(); + int waitTime = request.maxWait(); // in ms + // if endTime <= 0, then no time wait, wait for minBytes. + long endTime = waitTime > 0 ? System.currentTimeMillis() + waitTime : waitTime; + + int allSize = bytesRead.get(); + + if (log.isDebugEnabled()) { + log.debug("Request {}: One round read {} entries, " + + "allSize/maxBytes/minBytes/endTime: {}/{}/{}/{}", + fetch.getHeader(), entriesRead.get(), + allSize, maxBytes, minBytes, new Date(endTime)); + } + + // all partitions read no entry, return earlier; + // reach maxTime, return; + // reach minBytes if no endTime, return; + if ((allSize == 0 && entriesRead.get() == 0) + || (endTime > 0 && endTime <= System.currentTimeMillis()) + || allSize > minBytes + || allSize > maxBytes){ + if (log.isDebugEnabled()) { + log.debug(" Request {}: Complete read {} entries with size {}", + fetch.getHeader(), entriesRead.get(), allSize); + } + + LinkedHashMap> responseData = new LinkedHashMap<>(); + + AtomicBoolean allPartitionsNoEntry = new AtomicBoolean(true); + responseValues.forEach((topicPartition, entries) -> { + final FetchResponse.PartitionData partitionData; + if (entries.isEmpty()) { + partitionData = new FetchResponse.PartitionData( + Errors.NONE, + FetchResponse.INVALID_HIGHWATERMARK, + FetchResponse.INVALID_LAST_STABLE_OFFSET, + FetchResponse.INVALID_LOG_START_OFFSET, + null, + MemoryRecords.EMPTY); + } else { + allPartitionsNoEntry.set(false); + Entry entry = entries.get(entries.size() - 1); + long entryOffset = MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()); + long highWatermark = entryOffset + + cursors.get(topicPartition).join().getLeft().getNumberOfEntries(); + + // by default kafka is produced message in batched mode. + MemoryRecords records; + records = entriesToRecords(entries); + + partitionData = new FetchResponse.PartitionData( + Errors.NONE, + highWatermark, + highWatermark, + highWatermark, + null, + records); + } + responseData.put(topicPartition, partitionData); + }); + + if (allPartitionsNoEntry.get()) { + log.warn("Request {}: All partitions for request read 0 entry", + fetch.getHeader()); + + // returned earlier, sleep for waitTime + try { + Thread.sleep(waitTime); + } catch (Exception e) { + log.error("Request {}: error while sleep.", + fetch.getHeader(), e); + } + + resultFuture.complete(ResponseAndRequest.of( + new FetchResponse(Errors.NONE, + responseData, + ((Integer) THROTTLE_TIME_MS.defaultValue), + ((FetchRequest) fetch.getRequest()).metadata().sessionId()), + fetch)); + this.recycle(); + } else { + resultFuture.complete(ResponseAndRequest.of( + new FetchResponse( + Errors.NONE, + responseData, + ((Integer) THROTTLE_TIME_MS.defaultValue), + ((FetchRequest) fetch.getRequest()).metadata().sessionId()), + fetch)); + this.recycle(); + } + } else { + //need do another round read + readMessagesInternal(fetch, cursors, bytesRead, responseValues, resultFuture); + } + }); + } + + private Map> readAllCursorOnce( + Map>> cursors) { + Map> readFutures = new ConcurrentHashMap<>(); + + cursors.entrySet().forEach(pair -> { + // non durable cursor create is a sync method. + ManagedCursor cursor; + CompletableFuture readFuture = new CompletableFuture<>(); + + try { + Pair cursorOffsetPair = pair.getValue().join(); + cursor = cursorOffsetPair.getLeft(); + long keptOffset = cursorOffsetPair.getRight(); + + // only read 1 entry currently. + cursor.asyncReadEntries(1, + new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List list, Object o) { + TopicName topicName = pulsarTopicName(pair.getKey(), requestHandler.getKafkaNamespace()); + + Entry entry = null; + if (!list.isEmpty()) { + entry = list.get(0); + long offset = MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()); + // get next offset + PositionImpl nextPosition = ((NonDurableCursorImpl ) cursor) + .getNextAvailablePosition(PositionImpl + .get(entry.getLedgerId(), entry.getEntryId())); + long nextOffset = MessageIdUtils + .getOffset(nextPosition.getLedgerId(), nextPosition.getEntryId()); + + if (log.isDebugEnabled()) { + log.debug("Topic {} success read entry: ledgerId: {}, entryId: {}, size: {}," + + " ConsumerManager original offset: {}, entryOffset: {}, nextOffset: {}", + topicName.toString(), entry.getLedgerId(), entry.getEntryId(), + entry.getLength(), keptOffset, offset, nextOffset); + } + + requestHandler.getTopicManager() + .getTopicConsumerManager(topicName.toString()) + .thenAccept(cm -> cm.add(nextOffset, Pair.of(cursor, nextOffset))); + } else { + // since no read entry, add the original offset back. + if (log.isDebugEnabled()) { + log.debug("Read no entry, add offset back: {}", + keptOffset); + } + + requestHandler.getTopicManager() + .getTopicConsumerManager(topicName.toString()) + .thenAccept(cm -> + cm.add(keptOffset, Pair.of(cursor, keptOffset))); + } + + readFuture.complete(entry); + } + + @Override + public void readEntriesFailed(ManagedLedgerException e, Object o) { + log.error("Error read entry for topic: {}", + pulsarTopicName(pair.getKey(), requestHandler.getKafkaNamespace())); + readFuture.completeExceptionally(e); + } + }, null); + } catch (Exception e) { + log.error("Error for cursor to read entry for topic: {}. ", + pulsarTopicName(pair.getKey(), requestHandler.getKafkaNamespace()), e); + readFuture.completeExceptionally(e); + } + + readFutures.putIfAbsent(pair.getKey(), readFuture); + }); + + return readFutures; + } + +} diff --git a/src/main/java/io/streamnative/kop/MessagePublishContext.java b/src/main/java/io/streamnative/kop/MessagePublishContext.java new file mode 100644 index 0000000000..6593363017 --- /dev/null +++ b/src/main/java/io/streamnative/kop/MessagePublishContext.java @@ -0,0 +1,161 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop; + +import static io.streamnative.kop.utils.MessageRecordUtils.messageToByteBuf; +import static io.streamnative.kop.utils.MessageRecordUtils.recordToEntry; +import static io.streamnative.kop.utils.MessageRecordUtils.recordsToByteBuf; + +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; +import io.streamnative.kop.utils.MessageIdUtils; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.Topic.PublishContext; + +/** + * Implementation for PublishContext. + */ +@Slf4j +public final class MessagePublishContext implements PublishContext { + + private CompletableFuture offsetFuture; + private Topic topic; + private long startTimeNs; + public static final boolean MESSAGE_BATCHED = true; + + /** + * Executed from managed ledger thread when the message is persisted. + */ + @Override + public void completed(Exception exception, long ledgerId, long entryId) { + + if (exception != null) { + log.error("Failed write entry: ledgerId: {}, entryId: {}. triggered send callback.", + ledgerId, entryId); + offsetFuture.completeExceptionally(exception); + } else { + if (log.isDebugEnabled()) { + log.debug("Success write topic: {}, ledgerId: {}, entryId: {}" + + " And triggered send callback.", + topic.getName(), ledgerId, entryId); + } + + topic.recordAddLatency(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTimeNs)); + + offsetFuture.complete(Long.valueOf(MessageIdUtils.getOffset(ledgerId, entryId))); + } + + recycle(); + } + + // recycler + public static MessagePublishContext get(CompletableFuture offsetFuture, + Topic topic, + long startTimeNs) { + MessagePublishContext callback = RECYCLER.get(); + callback.offsetFuture = offsetFuture; + callback.topic = topic; + callback.startTimeNs = startTimeNs; + return callback; + } + + private final Handle recyclerHandle; + + private MessagePublishContext(Handle recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + private static final Recycler RECYCLER = new Recycler() { + protected MessagePublishContext newObject(Handle handle) { + return new MessagePublishContext(handle); + } + }; + + public void recycle() { + offsetFuture = null; + topic = null; + startTimeNs = -1; + recyclerHandle.recycle(this); + } + + + // publish Kafka records to pulsar topic, handle callback in MessagePublishContext. + public static void publishMessages(MemoryRecords records, + Topic topic, + CompletableFuture future) { + + // get records size. + AtomicInteger size = new AtomicInteger(0); + records.records().forEach(record -> size.incrementAndGet()); + int rec = size.get(); + + if (log.isDebugEnabled()) { + log.debug("publishMessages for topic partition: {} , records size is {} ", topic.getName(), size.get()); + } + + if (MESSAGE_BATCHED) { + CompletableFuture offsetFuture = new CompletableFuture<>(); + + ByteBuf headerAndPayload = recordsToByteBuf(records, rec); + topic.publishMessage( + headerAndPayload, + MessagePublishContext.get( + offsetFuture, topic, System.nanoTime())); + + offsetFuture.whenComplete((offset, ex) -> { + if (ex != null) { + log.error("publishMessages for topic partition: {} failed when write.", + topic.getName(), ex); + future.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR)); + } else { + future.complete(new PartitionResponse(Errors.NONE)); + } + }); + } else { + List> futures = Collections + .synchronizedList(Lists.newArrayListWithExpectedSize(size.get())); + + records.records().forEach(record -> { + CompletableFuture offsetFuture = new CompletableFuture<>(); + futures.add(offsetFuture); + ByteBuf headerAndPayload = messageToByteBuf(recordToEntry(record)); + topic.publishMessage( + headerAndPayload, + MessagePublishContext.get( + offsetFuture, topic, System.nanoTime())); + }); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[rec])).whenComplete((ignore, ex) -> { + if (ex != null) { + log.error("publishMessages for topic partition: {} failed when write.", + topic.getName(), ex); + future.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR)); + } else { + future.complete(new PartitionResponse(Errors.NONE)); + } + }); + } + } +} diff --git a/src/main/java/io/streamnative/kop/utils/MessageIdUtils.java b/src/main/java/io/streamnative/kop/utils/MessageIdUtils.java index 88757e2de5..7e62b8c18b 100644 --- a/src/main/java/io/streamnative/kop/utils/MessageIdUtils.java +++ b/src/main/java/io/streamnative/kop/utils/MessageIdUtils.java @@ -23,15 +23,30 @@ * Utils for Pulsar MessageId. */ public class MessageIdUtils { + // use 28 bits for ledgerId, + // 32 bits for entryId, + // 12 bits for batchIndex. + public static final int LEDGER_BITS = 20; + public static final int ENTRY_BITS = 32; + public static final int BATCH_BITS = 12; public static final long getOffset(long ledgerId, long entryId) { // Combine ledger id and entry id to form offset - // Use less than 32 bits to represent entry id since it will get - // rolled over way before overflowing the max int range checkArgument(ledgerId > 0, "Expected ledgerId > 0, but get " + ledgerId); checkArgument(entryId >= 0, "Expected entryId >= 0, but get " + entryId); - long offset = (ledgerId << 28) | entryId; + long offset = (ledgerId << (ENTRY_BITS + BATCH_BITS) | (entryId << BATCH_BITS)); + return offset; + } + + public static final long getOffset(long ledgerId, long entryId, int batchIndex) { + checkArgument(ledgerId > 0, "Expected ledgerId > 0, but get " + ledgerId); + checkArgument(entryId >= 0, "Expected entryId >= 0, but get " + entryId); + checkArgument(batchIndex >= 0, "Expected batchIndex >= 0, but get " + batchIndex); + checkArgument(batchIndex < (1 << BATCH_BITS), + "Expected batchIndex only take " + BATCH_BITS + " bits, but it is " + batchIndex); + + long offset = (ledgerId << (ENTRY_BITS + BATCH_BITS) | (entryId << BATCH_BITS)) + batchIndex; return offset; } @@ -39,8 +54,8 @@ public static final MessageId getMessageId(long offset) { // De-multiplex ledgerId and entryId from offset checkArgument(offset > 0, "Expected Offset > 0, but get " + offset); - long ledgerId = offset >>> 28; - long entryId = offset & 0x0F_FF_FF_FFL; + long ledgerId = offset >>> (ENTRY_BITS + BATCH_BITS); + long entryId = (offset & 0x0F_FF_FF_FF_FF_FFL) >>> BATCH_BITS; return new MessageIdImpl(ledgerId, entryId, -1); } @@ -49,9 +64,31 @@ public static final PositionImpl getPosition(long offset) { // De-multiplex ledgerId and entryId from offset checkArgument(offset >= 0, "Expected Offset >= 0, but get " + offset); - long ledgerId = offset >>> 28; - long entryId = offset & 0x0F_FF_FF_FFL; + long ledgerId = offset >>> (ENTRY_BITS + BATCH_BITS); + long entryId = (offset & 0x0F_FF_FF_FF_FF_FFL) >>> BATCH_BITS; return new PositionImpl(ledgerId, entryId); } + + // get the batchIndex contained in offset. + public static final int getBatchIndex(long offset) { + checkArgument(offset >= 0, "Expected Offset >= 0, but get " + offset); + + return (int) (offset & 0x0F_FF); + } + + // get next offset that after batch Index. + // In TopicConsumereManager, next read offset is updated after each entry reads, + // if it read a batched message previously, the next offset waiting read is next entry. + public static final long offsetAfterBatchIndex(long offset) { + // De-multiplex ledgerId and entryId from offset + checkArgument(offset >= 0, "Expected Offset >= 0, but get " + offset); + + int batchIndex = getBatchIndex(offset); + // this is a for + if (batchIndex != 0) { + return (offset - batchIndex) + (1 << BATCH_BITS); + } + return offset; + } } diff --git a/src/main/java/io/streamnative/kop/utils/MessageRecordUtils.java b/src/main/java/io/streamnative/kop/utils/MessageRecordUtils.java new file mode 100644 index 0000000000..494465a967 --- /dev/null +++ b/src/main/java/io/streamnative/kop/utils/MessageRecordUtils.java @@ -0,0 +1,306 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Clock; +import java.util.Base64; +import java.util.Iterator; +import java.util.List; +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata; +import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.Commands.ChecksumType; + +/** + * Pulsar Message and Kafka Record utils. + */ +@UtilityClass +@Slf4j +public final class MessageRecordUtils { + private static final int DEFAULT_FETCH_BUFFER_SIZE = 1024 * 1024; + private static final int MAX_RECORDS_BUFFER_SIZE = 100 * 1024 * 1024; + private static final String FAKE_KOP_PRODUCER_NAME = "fake_kop_producer_name"; + + private static final Clock clock = Clock.systemDefaultZone(); + + // convert kafka Record to Pulsar Message. + // called when publish received Kafka Record into Pulsar. + public static MessageImpl recordToEntry(Record record) { + @SuppressWarnings("unchecked") + TypedMessageBuilderImpl builder = new TypedMessageBuilderImpl(null, Schema.BYTES); + + // key + if (record.hasKey()) { + byte[] key = new byte[record.keySize()]; + record.key().get(key); + builder.keyBytes(key); + } + + // value + if (record.hasValue()) { + byte[] value = new byte[record.valueSize()]; + record.value().get(value); + builder.value(value); + } else { + builder.value(new byte[0]); + } + + // sequence + if (record.sequence() >= 0) { + builder.sequenceId(record.sequence()); + } + + // timestamp + if (record.timestamp() >= 0) { + builder.eventTime(record.timestamp()); + } + + // header + for (Header h : record.headers()) { + builder.property(h.key(), + new String(h.value(), UTF_8)); + } + + return (MessageImpl) builder.getMessage(); + } + + // convert message to ByteBuf payload for ledger.addEntry. + // parameter message is converted from passed in Kafka record. + // called when publish received Kafka Record into Pulsar. + public static ByteBuf messageToByteBuf(Message message) { + checkArgument(message instanceof MessageImpl); + + MessageImpl msg = (MessageImpl) message; + MessageMetadata.Builder msgMetadataBuilder = msg.getMessageBuilder(); + ByteBuf payload = msg.getDataBuffer(); + + // filled in required fields + if (!msgMetadataBuilder.hasSequenceId()) { + msgMetadataBuilder.setSequenceId(-1); + } + if (!msgMetadataBuilder.hasPublishTime()) { + msgMetadataBuilder.setPublishTime(clock.millis()); + } + if (!msgMetadataBuilder.hasProducerName()) { + msgMetadataBuilder.setProducerName(FAKE_KOP_PRODUCER_NAME); + } + + msgMetadataBuilder.setCompression( + CompressionCodecProvider.convertToWireProtocol(CompressionType.NONE)); + msgMetadataBuilder.setUncompressedSize(payload.readableBytes()); + MessageMetadata msgMetadata = msgMetadataBuilder.build(); + + ByteBuf buf = Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload); + + msgMetadataBuilder.recycle(); + msgMetadata.recycle(); + + return buf; + } + + //// for Batch messages + protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024; + protected static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024; + + // If records stored in a batched way, turn MemoryRecords into a pulsar batched message. + public static ByteBuf recordsToByteBuf(MemoryRecords records, int size) { + long currentBatchSizeBytes = 0; + int numMessagesInBatch = 0; + + long sequenceId = -1; + // TODO: handle different compression type + PulsarApi.CompressionType compressionType = PulsarApi.CompressionType.NONE; + + ByteBuf batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT + .buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES)); + List> messages = Lists.newArrayListWithExpectedSize(size); + MessageMetadata.Builder messageMetaBuilder = MessageMetadata.newBuilder(); + + Iterator iterator = records.records().iterator(); + while (iterator.hasNext()) { + MessageImpl message = recordToEntry(iterator.next()); + if (++numMessagesInBatch == 1) { + sequenceId = Commands.initBatchMessageMetadata(messageMetaBuilder, message.getMessageBuilder()); + } + messages.add(message); + currentBatchSizeBytes += message.getDataBuffer().readableBytes(); + + if (log.isDebugEnabled()) { + log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", + sequenceId, numMessagesInBatch, currentBatchSizeBytes); + } + } + + for (MessageImpl msg : messages) { + PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder(); + batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, + msg.getDataBuffer(), batchedMessageMetadataAndPayload); + msgBuilder.recycle(); + } + int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); + + if (PulsarApi.CompressionType.NONE != compressionType) { + messageMetaBuilder.setCompression(compressionType); + messageMetaBuilder.setUncompressedSize(uncompressedSize); + } + + messageMetaBuilder.setNumMessagesInBatch(numMessagesInBatch); + + MessageMetadata msgMetadata = messageMetaBuilder.build(); + + ByteBuf buf = Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, + msgMetadata, + batchedMessageMetadataAndPayload); + + messageMetaBuilder.recycle(); + msgMetadata.recycle(); + + return buf; + } + + private static Header[] getHeadersFromMetadata(List properties) { + Header[] headers = new Header[properties.size()]; + + if (log.isDebugEnabled()) { + log.debug("getHeadersFromMetadata. Header size: {}", + properties.size()); + } + + int index = 0; + for (KeyValue kv: properties) { + headers[index] = new RecordHeader(kv.getKey(), kv.getValue().getBytes(UTF_8)); + + if (log.isDebugEnabled()) { + log.debug("index: {} kv.getKey: {}. kv.getValue: {}", + index, kv.getKey(), kv.getValue()); + } + index++; + } + + return headers; + } + + // Convert entries read from BookKeeper into Kafka Records + // Entries can be batched messages, may need un-batch. + public static MemoryRecords entriesToRecords(List entries) { + try (ByteBufferOutputStream outputStream = new ByteBufferOutputStream(DEFAULT_FETCH_BUFFER_SIZE)) { + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(outputStream, RecordBatch.CURRENT_MAGIC_VALUE, + org.apache.kafka.common.record.CompressionType.NONE, + TimestampType.CREATE_TIME, + MessageIdUtils.getOffset(entries.get(0).getLedgerId(), 0), + RecordBatch.NO_TIMESTAMP, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + false, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + MAX_RECORDS_BUFFER_SIZE); + + for (Entry entry : entries) { + // each entry is a batched message + ByteBuf metadataAndPayload = entry.getDataBuffer(); + MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload); + int batchSize = msgMetadata.getNumMessagesInBatch(); + boolean isBatchMessage = msgMetadata.hasNumMessagesInBatch(); + ByteBuf payload = metadataAndPayload.retain(); + + if (log.isDebugEnabled()) { + log.debug("entriesToRecords. NumMessagesInBatch {}: entries in list: {}. new entryId {}:{}", + batchSize, entries.size(), entry.getLedgerId(), entry.getEntryId()); + log.debug("entriesToRecords. readerIndex:{} writerIndex:{}", + payload.readerIndex(), payload.writerIndex()); + } + + // need handle encryption + checkState(msgMetadata.getEncryptionKeysCount() == 0); + + if (isBatchMessage) { + for (int i = 0; i < batchSize; ++i) { + if (log.isDebugEnabled()) { + log.debug(" processing message num - {} in batch", i); + } + + SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata + .newBuilder(); + ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(metadataAndPayload, + singleMessageMetadataBuilder, i, batchSize); + + SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build(); + + byte[] data = new byte[singleMessagePayload.readableBytes()]; + singleMessagePayload.readBytes(data); + Header[] headers = getHeadersFromMetadata(singleMessageMetadata.getPropertiesList()); + + builder.appendWithOffset( + MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId(), i), + msgMetadata.getEventTime(), + Base64.getDecoder().decode(singleMessageMetadata.getPartitionKey()), + data, + headers); + singleMessageMetadataBuilder.recycle(); + } + } else { + byte[] data = new byte[payload.readableBytes()]; + payload.readBytes(data); + Header[] headers = getHeadersFromMetadata(msgMetadata.getPropertiesList()); + + builder.appendWithOffset( + MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()), + msgMetadata.getEventTime(), + Base64.getDecoder().decode(msgMetadata.getPartitionKey()), + data, + headers); + } + } + return builder.build(); + } catch (IOException ioe){ + log.error("Meet IOException: {}", ioe); + throw new UncheckedIOException(ioe); + } catch (Exception e){ + log.error("Meet exception: {}", e); + throw e; + } + } + + + +} diff --git a/src/main/java/io/streamnative/kop/utils/TopicNameUtils.java b/src/main/java/io/streamnative/kop/utils/TopicNameUtils.java new file mode 100644 index 0000000000..03268b1305 --- /dev/null +++ b/src/main/java/io/streamnative/kop/utils/TopicNameUtils.java @@ -0,0 +1,41 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils; + +import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; + +import org.apache.kafka.common.TopicPartition; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; + +/** + * Utils for Pulsar TopicName. + */ +public class TopicNameUtils { + + public static TopicName pulsarTopicName(TopicPartition topicPartition, NamespaceName namespace) { + return pulsarTopicName(topicPartition.topic(), topicPartition.partition(), namespace); + } + + public static TopicName pulsarTopicName(String topic, NamespaceName namespace) { + return TopicName.get(TopicDomain.persistent.value(), namespace, topic); + } + + public static TopicName pulsarTopicName(String topic, int partitionIndex, NamespaceName namespace) { + return TopicName.get(TopicDomain.persistent.value(), + namespace, + topic + PARTITIONED_TOPIC_SUFFIX + partitionIndex); + } +} diff --git a/src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java b/src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java index 95c196c1f8..05186e2869 100644 --- a/src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java +++ b/src/test/java/io/streamnative/kop/KafkaRequestTypeTest.java @@ -14,6 +14,7 @@ package io.streamnative.kop; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; @@ -28,6 +29,7 @@ import java.util.Base64; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -41,6 +43,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -57,6 +60,18 @@ /** * Unit test for Different kafka request type. + * Test: + * KafkaProducePulsarConsume + * KafkaProduceKafkaConsume + * PulsarProduceKafkaConsume + * with + * different partitions + * batch enabled/disabled. + * This test will involved test for class: + * KafkaRequestHandler + * MessageRecordUtils + * MessagePublishContext + * MessageConsumeContext */ @Slf4j public class KafkaRequestTypeTest extends MockKafkaServiceBaseTest { @@ -66,6 +81,16 @@ public static Object[][] partitions() { return new Object[][] { { 1 }, { 7 } }; } + @DataProvider(name = "partitionsAndBatch") + public static Object[][] partitionsAndBatch() { + return new Object[][] { + { 1, true }, + { 1, false }, + { 7, true }, + { 7, false } + }; + } + @BeforeMethod @Override protected void setup() throws Exception { @@ -108,10 +133,14 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @Test(timeOut = 20000, dataProvider = "partitions") - public void testKafkaProducePulsarConsume(int partitionNumber) throws Exception { + @Test(timeOut = 20000, dataProvider = "partitionsAndBatch") + public void testKafkaProducePulsarConsume(int partitionNumber, boolean isBatch) throws Exception { String topicName = "kopKafkaProducePulsarConsume" + partitionNumber; String pulsarTopicName = "persistent://public/default/" + topicName; + String key1 = "header_key1_"; + String key2 = "header_key2_"; + String value1 = "header_value1_"; + String value2 = "header_value2_"; // create partitioned topic. kafkaService.getAdminClient().topics().createPartitionedTopic(topicName, partitionNumber); @@ -131,16 +160,29 @@ public void testKafkaProducePulsarConsume(int partitionNumber) throws Exception for (int i = 0; i < totalMsgs; i++) { String messageStr = messageStrPrefix + i; - kProducer.getProducer() - .send(new ProducerRecord<>( - topicName, - i, - messageStr)) - .get(); - log.debug("Kafka Producer Sent message: ({}, {})", i, messageStr); + ProducerRecord record = new ProducerRecord<>( + topicName, + i, + messageStr); + + record.headers() + .add(key1 + i, (value1 + i).getBytes(UTF_8)) + .add(key2 + i, (value2 + i).getBytes(UTF_8)); + + if (isBatch) { + kProducer.getProducer() + .send(record); + } else { + kProducer.getProducer() + .send(record) + .get(); + } + if (log.isDebugEnabled()) { + log.debug("Kafka Producer Sent message with header: ({}, {})", i, messageStr); + } } - // 2. Consume messages use Pulsar client Consumer. verify content and key + // 2. Consume messages use Pulsar client Consumer. verify content and key and headers Message msg = null; for (int i = 0; i < totalMsgs; i++) { msg = consumer.receive(100, TimeUnit.MILLISECONDS); @@ -148,8 +190,25 @@ public void testKafkaProducePulsarConsume(int partitionNumber) throws Exception Integer key = kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())); assertEquals(messageStrPrefix + key.toString(), new String(msg.getValue())); - log.debug("Pulsar consumer get message: {}, key: {}", - new String(msg.getData()), kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())).toString()); + // verify added 2 key-value pair + Map properties = msg.getProperties(); + assertEquals(properties.size(), 2); + for (Map.Entry kv: properties.entrySet()) { + String k = kv.getKey(); + String v = kv.getValue(); + + if (log.isDebugEnabled()) { + log.debug("headers key: {}, value:{}", k, v); + } + + assertTrue(k.contains(key1) || k.contains(key2)); + assertTrue(v.contains(value1) || v.contains(value2)); + } + if (log.isDebugEnabled()) { + log.debug("Pulsar consumer get message: {}, key: {}", + new String(msg.getData()), + kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())).toString()); + } consumer.acknowledge(msg); } @@ -158,35 +217,45 @@ public void testKafkaProducePulsarConsume(int partitionNumber) throws Exception assertNull(msg); } - @Test(timeOut = 20000, dataProvider = "partitions") - public void testKafkaProduceKafkaConsume(int partitionNumber) throws Exception { + @Test(timeOut = 20000, dataProvider = "partitionsAndBatch") + public void testKafkaProduceKafkaConsume(int partitionNumber, boolean isBatch) throws Exception { String topicName = "kopKafkaProduceKafkaConsume" + partitionNumber; + String key1 = "header_key1_"; + String key2 = "header_key2_"; + String value1 = "header_value1_"; + String value2 = "header_value2_"; // create partitioned topic. kafkaService.getAdminClient().topics().createPartitionedTopic(topicName, partitionNumber); - @Cleanup - Consumer consumer = pulsarClient.newConsumer() - .topic(topicName) - .subscriptionName("test_k_producer_k_consumer_sub") - .subscribe(); - // 1. produce message with Kafka producer. int totalMsgs = 10; String messageStrPrefix = "Message_Kop_KafkaProduceKafkaConsume_" + partitionNumber + "_"; @Cleanup - KProducer producer = new KProducer(topicName, false, getKafkaBrokerPort()); + KProducer kProducer = new KProducer(topicName, false, getKafkaBrokerPort()); for (int i = 0; i < totalMsgs; i++) { String messageStr = messageStrPrefix + i; - producer.getProducer() - .send(new ProducerRecord<>( - topicName, - i, - messageStr)) - .get(); - log.debug("Kafka Producer Sent message: ({}, {})", i, messageStr); + ProducerRecord record = new ProducerRecord<>( + topicName, + i, + messageStr); + record.headers() + .add(key1 + i, (value1 + i).getBytes(UTF_8)) + .add(key2 + i, (value2 + i).getBytes(UTF_8)); + + if (isBatch) { + kProducer.getProducer() + .send(record); + } else { + kProducer.getProducer() + .send(record) + .get(); + } + if (log.isDebugEnabled()) { + log.debug("Kafka Producer Sent message with header: ({}, {})", i, messageStr); + } } // 2. use kafka consumer to consume. @@ -199,13 +268,29 @@ public void testKafkaProduceKafkaConsume(int partitionNumber) throws Exception { int i = 0; while (i < totalMsgs) { - log.debug("start poll message: {}", i); + if (log.isDebugEnabled()) { + log.debug("start poll message: {}", i); + } ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { Integer key = record.key(); assertEquals(messageStrPrefix + key.toString(), record.value()); - log.debug("Kafka consumer get message: {}, key: {} at offset {}", - record.key(), record.value(), record.offset()); + + Header[] headers = record.headers().toArray(); + for (int j = 1; j <= 2; j++) { + String k = headers[j - 1].key(); + String v = new String(headers[j - 1].value(), UTF_8); + + if (log.isDebugEnabled()) { + log.debug("headers key: {}, value:{}", k, v); + } + assertTrue(k.contains(key1) || k.contains(key2)); + assertTrue(v.contains(value1) || v.contains(value2)); + } + if (log.isDebugEnabled()) { + log.debug("Kafka consumer get message: {}, key: {} at offset {}", + record.key(), record.value(), record.offset()); + } i++; } } @@ -216,10 +301,14 @@ public void testKafkaProduceKafkaConsume(int partitionNumber) throws Exception { assertTrue(records.isEmpty()); } - @Test(timeOut = 20000, dataProvider = "partitions") - public void testPulsarProduceKafkaConsume(int partitionNumber) throws Exception { + @Test(timeOut = 20000, dataProvider = "partitionsAndBatch") + public void testPulsarProduceKafkaConsume(int partitionNumber, boolean isBatch) throws Exception { String topicName = "kopPulsarProduceKafkaConsume" + partitionNumber; String pulsarTopicName = "persistent://public/default/" + topicName; + String key1 = "header_key1_"; + String key2 = "header_key2_"; + String value1 = "header_value1_"; + String value2 = "header_value2_"; // create partitioned topic. kafkaService.getAdminClient().topics().createPartitionedTopic(topicName, partitionNumber); @@ -230,7 +319,7 @@ public void testPulsarProduceKafkaConsume(int partitionNumber) throws Exception ProducerBuilder producerBuilder = pulsarClient.newProducer() .topic(pulsarTopicName) - .enableBatching(false); + .enableBatching(isBatch); @Cleanup Producer producer = producerBuilder.create(); @@ -239,6 +328,8 @@ public void testPulsarProduceKafkaConsume(int partitionNumber) throws Exception producer.newMessage() .keyBytes(kafkaIntSerialize(Integer.valueOf(i))) .value(message.getBytes()) + .property(key1 + i, value1 + i) + .property(key2 + i, value2 + i) .send(); } @@ -252,13 +343,29 @@ public void testPulsarProduceKafkaConsume(int partitionNumber) throws Exception int i = 0; while (i < totalMsgs) { - log.debug("start poll message: {}", i); + if (log.isDebugEnabled()) { + log.debug("start poll message: {}", i); + } ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { Integer key = record.key(); assertEquals(messageStrPrefix + key.toString(), record.value()); - log.debug("Kafka Consumer Received message: {}, {} at offset {}", - record.key(), record.value(), record.offset()); + Header[] headers = record.headers().toArray(); + for (int j = 1; j <= 2; j++) { + String k = headers[j - 1].key(); + String v = new String(headers[j - 1].value(), UTF_8); + + if (log.isDebugEnabled()) { + log.debug("headers key: {}, value:{}", k, v); + } + + assertTrue(k.contains(key1) || k.contains(key2)); + assertTrue(v.contains(value1) || v.contains(value2)); + } + if (log.isDebugEnabled()) { + log.debug("Kafka consumer get message: {}, key: {} at offset {}", + record.key(), record.value(), record.offset()); + } i++; } } @@ -304,7 +411,9 @@ public void testPulsarProduceKafkaConsume2(int partitionNumber) throws Exception int i = 0; while (i < totalMsgs / 2) { - log.debug("start poll message: {}", i); + if (log.isDebugEnabled()) { + log.debug("start poll message: {}", i); + } ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { Integer key = record.key(); @@ -324,13 +433,17 @@ public void testPulsarProduceKafkaConsume2(int partitionNumber) throws Exception kConsumer2.getConsumer().subscribe(Collections.singletonList(topicName)); while (i < totalMsgs) { - log.debug("start poll message 2: {}", i); + if (log.isDebugEnabled()) { + log.debug("start poll message 2: {}", i); + } ConsumerRecords records = kConsumer2.getConsumer().poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { Integer key = record.key(); assertEquals(messageStrPrefix + key.toString(), record.value()); - log.debug("Kafka Consumer Received message 2: {}, {} at offset {}", - record.key(), record.value(), record.offset()); + if (log.isDebugEnabled()) { + log.debug("Kafka Consumer Received message 2: {}, {} at offset {}", + record.key(), record.value(), record.offset()); + } i++; } } @@ -378,13 +491,17 @@ public void testTopicConsumerManager() throws Exception { // read empty entry will remove and add cursor each time. int i = 0; while (i < 7) { - log.debug("start poll empty entry: {}", i); + if (log.isDebugEnabled()) { + log.debug("start poll empty entry: {}", i); + } ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { Integer key = record.key(); assertEquals(messageStrPrefix + key.toString(), record.value()); - log.debug("Kafka Consumer Received message: {}, {} at offset {}", - record.key(), record.value(), record.offset()); + if (log.isDebugEnabled()) { + log.debug("Kafka Consumer Received message: {}, {} at offset {}", + record.key(), record.value(), record.offset()); + } } i++; } @@ -417,13 +534,17 @@ public void testTopicConsumerManager() throws Exception { i = 0; // receive all message. while (i < totalMsgs) { - log.debug("start poll message: {}", i); + if (log.isDebugEnabled()) { + log.debug("start poll message: {}", i); + } ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { Integer key = record.key(); assertEquals(messageStrPrefix + key.toString(), record.value()); - log.debug("Kafka Consumer Received message: {}, {} at offset {}", - record.key(), record.value(), record.offset()); + if (log.isDebugEnabled()) { + log.debug("Kafka Consumer Received message: {}, {} at offset {}", + record.key(), record.value(), record.offset()); + } i++; } } @@ -444,13 +565,17 @@ public void testTopicConsumerManager() throws Exception { // After read all entry, read no entry again, this will remove and add cursor each time. i = 0; while (i < 7) { - log.debug("start poll empty entry again: {}", i); + if (log.isDebugEnabled()) { + log.debug("start poll empty entry again: {}", i); + } ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { Integer key = record.key(); assertEquals(messageStrPrefix + key.toString(), record.value()); - log.debug("Kafka Consumer Received message: {}, {} at offset {}", - record.key(), record.value(), record.offset()); + if (log.isDebugEnabled()) { + log.debug("Kafka Consumer Received message: {}, {} at offset {}", + record.key(), record.value(), record.offset()); + } } i++; } diff --git a/src/test/java/io/streamnative/kop/KafkaTopicConsumerManagerTest.java b/src/test/java/io/streamnative/kop/KafkaTopicConsumerManagerTest.java index 77069b64b4..a1ad900226 100644 --- a/src/test/java/io/streamnative/kop/KafkaTopicConsumerManagerTest.java +++ b/src/test/java/io/streamnative/kop/KafkaTopicConsumerManagerTest.java @@ -120,7 +120,7 @@ public void testTopicConsumerManagerRemoveAndAdd() throws Exception { i++; // simulate a read complete; - offset++; + offset += 1 << MessageIdUtils.BATCH_BITS; topicConsumerManager.add(offset, Pair.of(cursor, offset)); assertEquals(topicConsumerManager.getConsumers().size(), 1); @@ -134,7 +134,7 @@ public void testTopicConsumerManagerRemoveAndAdd() throws Exception { assertEquals(cursorCompletableFuture.get().getRight(), Long.valueOf(offset)); // simulate a read complete, add back offset. - offset++; + offset += 1 << MessageIdUtils.BATCH_BITS; topicConsumerManager.add(offset, Pair.of(cursor2, offset)); // produce another 3 message diff --git a/src/test/java/io/streamnative/kop/utils/MessageIdUtilsTest.java b/src/test/java/io/streamnative/kop/utils/MessageIdUtilsTest.java new file mode 100644 index 0000000000..35c93f58bf --- /dev/null +++ b/src/test/java/io/streamnative/kop/utils/MessageIdUtilsTest.java @@ -0,0 +1,39 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils; + +import static org.testng.Assert.assertEquals; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.testng.annotations.Test; + +/** + * Validate TopicNameUtils. + */ +@Slf4j +public class MessageIdUtilsTest { + + @Test(timeOut = 20000) + public void testMessageIdConvert() throws Exception { + long ledgerId = 77777; + long entryId = 7777; + PositionImpl position = new PositionImpl(ledgerId, entryId); + + long offset = MessageIdUtils.getOffset(ledgerId, entryId); + PositionImpl position1 = MessageIdUtils.getPosition(offset); + + assertEquals(position, position1); + } +} diff --git a/src/test/java/io/streamnative/kop/utils/TopicNameUtilsTest.java b/src/test/java/io/streamnative/kop/utils/TopicNameUtilsTest.java new file mode 100644 index 0000000000..a2528e961b --- /dev/null +++ b/src/test/java/io/streamnative/kop/utils/TopicNameUtilsTest.java @@ -0,0 +1,54 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.kop.utils; + +import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; +import static org.testng.Assert.assertTrue; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.annotations.Test; + +/** + * Validate TopicNameUtils. + */ +@Slf4j +public class TopicNameUtilsTest { + + @Test(timeOut = 20000) + public void testTopicNameConvert() throws Exception { + String topicName = "kopTopicNameConvert"; + int partitionNumber = 77; + TopicPartition topicPartition = new TopicPartition(topicName, partitionNumber); + + String tenantName = "tenant_name"; + String nsName = "ns_name"; + NamespaceName ns = NamespaceName.get(tenantName, nsName); + String expectedPulsarName = "persistent://" + tenantName + "/" + nsName + "/" + + topicName + PARTITIONED_TOPIC_SUFFIX + partitionNumber; + + TopicName topicName1 = TopicNameUtils.pulsarTopicName(topicPartition, ns); + TopicName topicName2 = TopicNameUtils.pulsarTopicName(topicName, partitionNumber, ns); + + assertTrue(topicName1.toString().equals(expectedPulsarName)); + assertTrue(topicName2.toString().equals(expectedPulsarName)); + + TopicName topicName3 = TopicNameUtils.pulsarTopicName(topicName, ns); + String expectedPulsarName3 = "persistent://" + tenantName + "/" + nsName + "/" + + topicName; + assertTrue(topicName3.toString().equals(expectedPulsarName3)); + } +}