Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer messages not seen by consumer #17

Closed
patrickmay opened this issue Jun 24, 2019 · 2 comments
Closed

Producer messages not seen by consumer #17

patrickmay opened this issue Jun 24, 2019 · 2 comments

Comments

@patrickmay
Copy link

patrickmay commented Jun 24, 2019

I'm testing the latest release with these two functions:

(defun test-consume (topic-name)
  (let* ((string-serde (lambda (x)
                         (kf:bytes->object x 'string)))
         (conf (kf:conf
	        "bootstrap.servers" "127.0.0.1:9092"
	        "group.id" (write-to-string (get-universal-time))
	        "enable.auto.commit" "false"
	        "auto.offset.reset" "earliest"
	        "offset.store.method" "broker"
	        "enable.partition.eof"  "false"))
         (consumer (make-instance 'kf:consumer
			          :conf conf
                                  :key-serde string-serde
			          :value-serde string-serde))
         (topics (list topic-name)))
    (kf:subscribe consumer topics)
    (format t "Subscribed:  ~S, ~S.~%"
            (gethash "bootstrap.servers" conf)
            (kf:subscription consumer))
    (let ((message (kf:poll consumer 10000)))
      (format t "Message received or poll expired.~%")
      (when message
        (format t "Received message:  (~S) ~S~%"
                (kf:key message) (kf:value message))
        (kf:commit consumer)))
    (format t "Unsubscribing.~%")
    (kf:unsubscribe consumer)))

and

(defun test-produce (topic-name key message)
  (let ((producer (make-instance 'kf:producer
                                 :conf (kf:conf
                                        "bootstrap.servers" "127.0.0.1:9092")
                                 :key-serde #'kf:object->bytes
                                 :value-serde #'kf:object->bytes)))
    (kf:produce producer topic-name message :key key)
    (kf:flush producer 2000)))

When I monitor the topic with kafka-console-consumer, I see the messages produced by test-produce. When I run test-consumer and send messages with kafka-console-producer, it gets the messages. When I send messages with test-producer, test-consumer does not see them.
Please tell me I'm making a foolish mistake.

@SahilKang
Copy link
Owner

SahilKang commented Jun 26, 2019

🤔 The code you posted worked fine for me:

(test-produce "test-topic-1" "key-1" "value-1")

(test-consume "test-topic-1")
;; the call to test-consume printed:
;; Subscribed:  "127.0.0.1:9092", #("test-topic-1").
;; Message received or poll expired.
;; Received message:  ("key-1") "value-1"
;; Unsubscribing.

If you're still not having any luck, can you share your cl-rdkafka and librdkafka versions? Here's what I'm running:

(cl-rdkafka/ll:rd-kafka-version-str) ; => "0.9.3"
(asdf:component-version (asdf:find-system 'cl-rdkafka)) ; => "0.2.2"

@patrickmay
Copy link
Author

The issue appears to be related to Emacs and Slime. When I run the consumer test in a standalone SBCL image, it works fine. Inside of Slime, something gets confused (possibly me).

Thanks for checking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants