Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
change following comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jiazhai committed Jul 23, 2019
1 parent 6366342 commit a09a805
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
37 changes: 20 additions & 17 deletions src/main/java/io/streamnative/kop/KafkaRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.kafka.common.Node;
Expand Down Expand Up @@ -79,6 +79,7 @@
* This class contains all the request handling methods.
*/
@Slf4j
@Getter
public class KafkaRequestHandler extends KafkaCommandDecoder {

private final KafkaService kafkaService;
Expand Down Expand Up @@ -273,7 +274,7 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar) {
new MetadataResponse(
allNodes,
clusterName,
0,
MetadataResponse.NO_CONTROLLER_ID,
allTopicMetadata);
ctx.writeAndFlush(responseToByteBuf(finalResponse, metadataHar));
}
Expand All @@ -285,7 +286,6 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar) {

protected void handleProduceRequest(KafkaHeaderAndRequest produceHar) {
checkArgument(produceHar.getRequest() instanceof ProduceRequest);

ProduceRequest produceRequest = (ProduceRequest) produceHar.getRequest();

if (produceRequest.transactionalId() != null) {
Expand All @@ -304,31 +304,32 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar) {
final int responsesSize = produceRequest.partitionRecordsOrFail().size();

for (Map.Entry<TopicPartition, ? extends Records> entry : produceRequest.partitionRecordsOrFail().entrySet()) {
// 1. create PersistTopic
// 3. tracking each record status.
TopicPartition topicPartition = entry.getKey();

CompletableFuture<PartitionResponse> partitionResponse = new CompletableFuture<>();
responsesFutures.put(topicPartition, partitionResponse);

if (log.isDebugEnabled()) {
log.debug("[{}] Produce messages for topic {} partition {}, requestsize: {} ",
log.debug("[{}] Produce messages for topic {} partition {}, request size: {} ",
ctx.channel(), topicPartition.topic(), topicPartition.partition(), responsesSize);
}

TopicName topicName = pulsarTopicName(topicPartition.topic(), topicPartition.partition());

kafkaService.getBrokerService().getTopic(topicName.toString(), true)
.thenApply(Optional::get)
.thenAccept(topic -> {
publishMessages(entry.getValue(), topic, partitionResponse);
})
.exceptionally(exception -> {
Throwable cause = exception.getCause();
log.error("[{}] Failed to getOrCreateTopic {}", ctx.channel(), topicName, exception);
partitionResponse.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR));

return null;
.whenComplete((topicOpt, exception) -> {
if (exception != null) {
log.error("[{}] Failed to getOrCreateTopic {}. exception:",
ctx.channel(), topicName, exception);
partitionResponse.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR));
} else {
if (topicOpt.isPresent()) {
publishMessages(entry.getValue(), topicOpt.get(), partitionResponse);
} else {
log.error("[{}] getOrCreateTopic get empty topic for name {}", ctx.channel(), topicName);
partitionResponse.complete(new PartitionResponse(Errors.KAFKA_STORAGE_ERROR));
}
}
});
}

Expand Down Expand Up @@ -365,6 +366,8 @@ private void publishMessages(Records records,
ctx.channel(), topic.getName(), size.get());
}

// TODO: Handle Records in a batched way:
// https://github.com/streamnative/kop/issues/16
List<CompletableFuture<Long>> futures = Collections
.synchronizedList(Lists.newArrayListWithExpectedSize(size.get()));

Expand Down Expand Up @@ -409,7 +412,7 @@ public long getSequenceId() {
public void completed(Exception exception, long ledgerId, long entryId) {

if (exception != null) {
log.debug("Failed write entry: {}, entryId: {}, sequenceId: {}. and triggered send callback.",
log.error("Failed write entry: {}, entryId: {}, sequenceId: {}. and triggered send callback.",
ledgerId, entryId, sequenceId);
offsetFuture.completeExceptionally(exception);
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/streamnative/kop/utils/ReflectionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ public static void callNoArgVoidMethod(Object privateObject,
String methodName) throws Exception {
try {
Method privateStringMethod = privateObject.getClass().getSuperclass()
.getDeclaredMethod(methodName, null);
.getDeclaredMethod(methodName, (Class<?>[]) null);

privateStringMethod.setAccessible(true);
privateStringMethod.invoke(privateObject, null);
privateStringMethod.invoke(privateObject, (Object[]) null);
} catch (NoSuchMethodException | IllegalAccessException e) {
throw new RuntimeException("Unable to call method '" + methodName + "'", e);
} catch (InvocationTargetException e) {
Expand Down

0 comments on commit a09a805

Please sign in to comment.