Skip to content

KafkaProducerMessage: generic key & value #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 28, 2023

Conversation

felixschlegel
Copy link
Contributor

Motivation:

We want to allow our users to feed arbitrary data as key and value
of a KafkaProducerMessage.

Modifications:

  • add new public protocol KafkaBuffer which asks implementing types to
    provide safe access to an UnsafeMutableRawBufferPointer containing
    the data
  • implement KafkaBuffer in String
  • implement KafkaBuffer in ByteBuffer
  • update tests

@@ -110,7 +110,7 @@ final class KafkaProducerTests: XCTestCase {
}

let expectedTopic = "test-topic"
let message = KafkaProducerMessage(
let message = KafkaProducerMessage<ByteBuffer, ByteBuffer>(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keys are optional, which leads to the problem that their type cannot be inferred when set to nil. This is a workaround but far from perfect. Any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we conform Never to KafkaBuffer and provide an init that doesn't take a key when we are Never.

Comment on lines 18 to 24
/// Yields a buffer pointer containing this `KafkaBuffer`'s readable bytes.
///
/// - warning: Do not escape the pointer from the closure for later use.
///
/// - parameters:
/// - body: The closure that will accept the yielded bytes.
/// - returns: The value returned by `body`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just having this doc on the protocol should be enough.

Comment on lines 26 to 28
try ByteBuffer(string: self).withUnsafeReadableBytes { UnsafeRawBufferPointer in
try body(UnsafeRawBufferPointer)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should try to use self.utf8.withContiguousStorageIfAvailable and only fallback to bridging into ByteBuffer in the slow path

Comment on lines 18 to 24
/// Yields a buffer pointer containing this `KafkaBuffer`'s readable bytes.
///
/// - warning: Do not escape the pointer from the closure for later use.
///
/// - parameters:
/// - body: The closure that will accept the yielded bytes.
/// - returns: The value returned by `body`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

@@ -252,7 +252,7 @@ public final class KafkaProducer: Service, Sendable {
/// of the corresponding ``KafkaAcknowledgedMessage``.
/// - Throws: A ``KafkaError`` if sending the message failed.
@discardableResult
public func send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID {
public func send<K, V>(_ message: KafkaProducerMessage<K, V>) throws -> KafkaProducerMessageID {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name those generic types Key and Value please

@@ -97,33 +97,45 @@ final class RDKafkaClient: Sendable {
/// - Parameter newMessageID: ID that was assigned to the `message`.
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
/// - Parameter topicHandles: Topic handles that this client uses to produce new messages
func produce(
message: KafkaProducerMessage,
func produce<K, V>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

/// - parameters:
/// - body: The closure that will accept the yielded bytes.
/// - returns: The value returned by `body`.
func withUnsafeRawBufferPointer<T>(_ body: (UnsafeRawBufferPointer) throws -> T) rethrows -> T
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to have such a low-level requirement? It feels like an API smell to me that the only requirement is an unsafe API.

Is it possible, for example, to rephrase the API in terms of e.g. RandomAccessCollection of UInt8 and have encoders/decoders?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this as well but I am unsure this fulfils the goals that we want to achieve here. We want to make it possible to have both key and especially value be able to pass copy-free to rdkafka. If we take a collection of UInt8 then I am not sure if we can achieve that.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do have withContiguousStorageIfAvailable for RAC which should fulfil that most of the time (I guess only custom RACs might not implement it).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you are right we can probable be generic over RAC here instead of creating our own type. In the slow path we just write the RAC into byte buffer. @felixschlegel can you try this out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that having the extra KafkaBuffer type is not ideal. However, I do not find the RandomAccessCollection approach more ergonomic (from what I have investigated):

  1. Our KafkaProducerMessage type's String initializer would have to take String as parameter value, but stores a ContiuousArray<UInt8> which still requires copying
See implementation
/// Message that is sent by the `KafkaProducer`
public struct KafkaProducerMessage<Key: RandomAccessCollection<UInt8>, Value: RandomAccessCollection<UInt8>> {
    public var topic: String
    public var partition: KafkaPartition
    public var key: Key?
    public var value: Value

    /// Create a new `KafkaProducerMessage` with a key and value.
    /// - Parameter topic: The topic the message will be sent to. Topics may be created by the `KafkaProducer` if non-existent.
    /// - Parameter partition: The topic partition the message will be sent to. If not set explicitly, the partiotion will be assigned automatically.
    /// - Parameter key: Used to guarantee that messages with the same key will be sent to the same partittion so that their order is preserved.
    /// - Parameter value: The message body.
    public init(
        topic: String,
        partition: KafkaPartition? = nil,
        key: Key,
        value: Value
    ) {
        self.topic = topic
        self.key = key
        self.value = value

        if let partition = partition {
            self.partition = partition
        } else {
            self.partition = .unassigned
        }
    }
}

// MARK: - KafkaProducerMessage + String Initializer

extension KafkaProducerMessage where Key == ContiguousArray<UInt8>, Value == ContiguousArray<UInt8> {

    /// Create a new `KafkaProducerMessage` with a key and value.
    /// - Parameter topic: The topic the message will be sent to. Topics may be created by the `KafkaProducer` if non-existent.
    /// - Parameter partition: The topic partition the message will be sent to. If not set explicitly, the partiotion will be assigned automatically.
    /// - Parameter key: Used to guarantee that messages with the same key will be sent to the same partittion so that their order is preserved.
    /// - Parameter value: The message body.
    public init(
        topic: String,
        partition: KafkaPartition? = nil,
        key: String? = nil,
        value: String
    ) {
        self.topic = topic
        self.key = key.map { ContiguousArray($0.utf8) }
        self.value = ContiguousArray(value.utf8)

        if let partition = partition {
            self.partition = partition
        } else {
            self.partition = .unassigned
        }
    }
}
  1. We would have to take ByteBufferView as parameter type instead of ByteBuffer as only ByteBufferView conforms to RandomAccessCollection

So yes it is a smell, but we provide default implementations of KafkaBuffer for String and ByteBuffer. The protocol gives the user the freedom to make their own types conform to KafkaBuffer at their own risk. Furthermore this enables them to avoid copying their own types.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, UTF8View is not a RAC, but it is a BidirectionalCollection which should be sufficient to avoid the copy. I don't think having convenience initialisers (for String and ByteBuffer) is an issue either. I'd rather reuse existing protocols if possible but don't feel too strongly.

keyBytes?.count ?? 0,
UnsafeMutableRawPointer(bitPattern: newMessageID)
)
let responseCode = try message.value.withUnsafeRawBufferPointer { valueBuffer in
Copy link
Collaborator

@blindspotbounty blindspotbounty Jul 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create the following method in message, please:

func withUnsafeKeyValueRawBufferPointer(_ closure: (Buffer?, Buffer)) {
    let responseCode = try value.withUnsafeRawBufferPointer { valueBuffer in
        if let key {
            key.withUnsafeRawBufferPointer { keyBuffer in
                closure(keyBuffer, valueBuffer)
            }
        } else {
            closure(nil, valueBuffer)
        }
    }
}

Then we could avoid call key.withUnsafeRawBufferPointer for every topic. Especially, it might make sense for custom types.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ou, sorry, I see that it is only for one topic.

@felixschlegel felixschlegel force-pushed the fs-kafka-data-type branch 2 times, most recently from 900e8cc to 0f21615 Compare July 26, 2023 15:26
Motivation:

We want to allow our users to feed arbitrary data as `key` and `value`
of a `KafkaProducerMessage`.

Modifications:

* add new `public protocol KafkaBuffer` which asks implementing types to
  provide safe access to an `UnsafeMutableRawBufferPointer` containing
  the data
* implement `KafkaBuffer` in `String`
* implement `KafkaBuffer` in `ByteBuffer`
* update tests
Modifications:

* remove duplicate docc comments for `KafkaBuffer` adopters
* spell out `K` and `V` as `Key` and `Value`
* `String+KafkaBuffer`: only convert `String` to `ByteBuffer` on slow
  path (no contiguous bytes availble)
* make `Never` conform to `KafkaBuffer`
* `KafkaProducerMessage`: make `key` optional
Modifications:

* change `KafkaBuffer` `protocol` into `KafkaContiguousBytes` type (similar to
  `SwiftProtobuf/SwiftProtobufContiguousBytes`)
* make `Array where Array.Element == UInt8` conform to
  `KafkaContiguousBytes`
Comment on lines 27 to 29
return try ByteBuffer(string: self).withUnsafeReadableBytes { unsafeRawBufferPointer in
try body(unsafeRawBufferPointer)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: We can just call our other conformance here

Suggested change
return try ByteBuffer(string: self).withUnsafeReadableBytes { unsafeRawBufferPointer in
try body(unsafeRawBufferPointer)
}
return try ByteBuffer(string: self).withUnsafeBytes(body)


/// Create a new `KafkaProducerMessage` with a `String` key and value
extension KafkaProducerMessage where Key == Never {
/// Create a new `KafkaProducerMessage` with a `ByteBuffer` key and value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks outdated

Modifications:

* fix outdated `KafkaProducerMessage` documentation
* simplify `String+KafkaContiguousBytes`
@felixschlegel felixschlegel requested a review from FranzBusch July 28, 2023 08:29
@FranzBusch FranzBusch merged commit a3df8d8 into swift-server:main Jul 28, 2023
@FranzBusch FranzBusch deleted the fs-kafka-data-type branch July 28, 2023 08:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants