-
Notifications
You must be signed in to change notification settings - Fork 128
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you first add a unit to test the request handler first? You don't need setup the server to test. You just need to test handleProduceRequest logic.
@@ -240,7 +273,7 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar) { | |||
new MetadataResponse( | |||
allNodes, | |||
clusterName, | |||
MetadataResponse.NO_CONTROLLER_ID, | |||
0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is 0 for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. This is a tmp change. since no controller here, will change it back to NO_CONTROLLER_ID.
|
||
for (Map.Entry<TopicPartition, ? extends Records> entry : produceRequest.partitionRecordsOrFail().entrySet()) { | ||
// 1. create PersistTopic | ||
// 3. tracking each record status. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is 2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. original thought was to create a producer for each topic at step2 . and then found it is not needed, so removed it. will change this.
|
||
// Ignore request.acks() and request.timeout(), which related to kafka replication in this broker. | ||
|
||
Map<TopicPartition, CompletableFuture<PartitionResponse>> responsesFutures = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need to use a synchronized hash map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. no need here, requests are handled in a for loop, and no async operation during this hash map put.
TopicName topicName = pulsarTopicName(topicPartition.topic(), topicPartition.partition()); | ||
|
||
kafkaService.getBrokerService().getTopic(topicName.toString(), true) | ||
.thenApply(Optional::get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you should call Optional::get
directly. If the value doesn't exist, it will fail. you have to handle the case that Optional is empty
public void completed(Exception exception, long ledgerId, long entryId) { | ||
|
||
if (exception != null) { | ||
log.debug("Failed write entry: {}, entryId: {}, sequenceId: {}. and triggered send callback.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this is debug not error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change to error.
List<CompletableFuture<Long>> futures = Collections | ||
.synchronizedList(Lists.newArrayListWithExpectedSize(size.get())); | ||
|
||
records.records().forEach(record -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we batch multiple records into one message batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an issue to handle this. #16
This PR is based on [Produce request](a09a805), which is in PR #14, please review and merge that first. changes: - change to sn pulsar version, so we could call getter/setter for parent class in `KafkaBrokerService` and `KafkaService`. Or it is not able to do the mock in test framework. - add test framework, - add a simple produce request test, and pass write test.
…AvroSchemaOnDecode) (streamnative#14)
Master issue: streamnative/ksn#7 ### Motivation A KDC server is required to run a Kerberos authentication. Before adding the implementation of Kerberos authentication, this PR aimes at setting up a KDC server along with the KoP service. ### Modification - Add a `MiniKdc` implementation based on the [Hadoop MiniKdc](https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-minikdc/src/main/java/org/apache/hadoop/minikdc/MiniKdc.java) - Migrate some classes like `SaslSetup` and `JaasUtils` to save some JAAS configuration files. - Add a trivial `GssapiAuthenticationTest` to verify the KDC server has started. ### Documentation Check the box below. Need to update docs? - [ ] `doc-required` (If you need help on updating docs, create a doc issue) - [x] `no-need-doc` (Please explain why) - [ ] `doc` (If this PR contains doc changes)
Master issue: streamnative/ksn#7 ### Motivation A KDC server is required to run a Kerberos authentication. Before adding the implementation of Kerberos authentication, this PR aimes at setting up a KDC server along with the KoP service. ### Modification - Add a `MiniKdc` implementation based on the [Hadoop MiniKdc](https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-minikdc/src/main/java/org/apache/hadoop/minikdc/MiniKdc.java) - Migrate some classes like `SaslSetup` and `JaasUtils` to save some JAAS configuration files. - Add a trivial `GssapiAuthenticationTest` to verify the KDC server has started. ### Documentation Check the box below. Need to update docs? - [ ] `doc-required` (If you need help on updating docs, create a doc issue) - [x] `no-need-doc` (Please explain why) - [ ] `doc` (If this PR contains doc changes) (cherry picked from commit d977dab)
Master Issue: #4
This is the request produce implementation.
Basic logic:
Manually tested, kafka client could send data to kop broker: