Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I cannot get messages using clj-kafka.consumer.zk #74

Closed
ogis-hashi opened this issue Dec 27, 2015 · 2 comments
Closed

I cannot get messages using clj-kafka.consumer.zk #74

ogis-hashi opened this issue Dec 27, 2015 · 2 comments

Comments

@ogis-hashi
Copy link

I tried to get kafka messages using clj-kafka.consumer.zk but I cannot do it yet.
The producer code is as follows:

(ns chapter04.kafka
  (:require [clj-kafka.producer :as p]
            [clj-kafka.core :as core]
            [clj-kafka.consumer.zk :as zk-consumer]
            )
  (:import (kafka.consumer Consumer ConsumerConfig KafkaStream)
           (kafka.producer KeyedMessage ProducerConfig)
           (kafka.javaapi.producer Producer)
           (java.util Properties)
           (java.util.concurrent Executors))
  )

(def p (p/producer {"metadata.broker.list" "localhost:9092"
                  "serializer.class" "kafka.serializer.DefaultEncoder"
                  "partitioner.class" "kafka.producer.DefaultPartitioner"}))

(p/send-message p (p/message "test" (.getBytes "this is my message")))
(p/send-message p (p/message "test" (.getBytes "this is my message")))

The below is the consumer code:

(def consumer-config {"zookeeper.connect" "localhost:2181"
             "group.id" "clj-kafka.consumer"
             "auto.offset.reset" "smallest"
             "auto.commit.enable" "false"})

(core/with-resource [c (zk-consumer/consumer consumer-config)]
  zk-consumer/shutdown
  (take 2 (zk-consumer/messages c "test"))
  )

The last consumer expression returns (). I am expecting the result is ("this is my message" "this is my message").

Using kafka bundled kafka-console-consumer.sh can retrieve messages correctly.

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
this is my message
this is my message

I am using kafka_2.10-0.8.2.1. Is there something wrong in my environment ? Or is there any way to solve this problem ?

@ghost
Copy link

ghost commented Dec 28, 2015

I think the docs are just sucky here.

(core/with-resource [c (zk-consumer/consumer consumer-config)]
  zk-consumer/shutdown
  (take 2 (zk-consumer/messages c "test")))

What happens is that this code will open a consumer, create a lazy seq of it's messages, and then immediately close the consumer and return the seq.

However since the seq is lazy, once you start reading, the consumer has already been closed and thus has no new messages for it.

To return the actual messages you have to force the seq's evaluation within the with-resource.

(core/with-resource [c (zk-consumer/consumer consumer-config)]
  zk-consumer/shutdown
  (doall (take 2 (zk-consumer/messages c "test"))))

@ogis-hashi
Copy link
Author

@j-pb Oh great ! Thank you !
It worked as follows:

(def x
     (core/with-resource [c (zk/consumer config)]
       zk/shutdown
       (doall (take 1 (zk/messages c "test"))))
  )
;; #'clj-kafka-example.core/x
(realized? x)
;; true
x
;;(#clj_kafka.core.KafkaMessage{:topic "test", :offset 0, :partition 0, :key nil, :value #object["[B" 0x6bbcc481 "[B@6bbcc481"]})

(def x
     (core/with-resource [c (zk/consumer config)]
       zk/shutdown
       (take 1 (zk/messages c "test"))
       )
  )
;; #'clj-kafka-example.core/x
(realized? x)
;; false
x
;;()

I have another question. When I called a consumer 3 times, it returned the same message.
I would like to know how to get different consequent messages.

(core/with-resource [c (zk/consumer config)]
  zk/shutdown
  (doall (take 1 (zk/messages c "test"))))
;; (#clj_kafka.core.KafkaMessage{:topic "test", :offset 0, :partition 0, :key nil, :value #object["[B" 0x6d1531d "[B@6d1531d"]})
(core/with-resource [c (zk/consumer config)]
  zk/shutdown
  (doall (take 1 (zk/messages c "test"))))
;; (#clj_kafka.core.KafkaMessage{:topic "test", :offset 0, :partition 0, :key nil, :value #object["[B" 0x2cdb87c2 "[B@2cdb87c2"]})
(core/with-resource [c (zk/consumer config)]
  zk/shutdown
  (doall (take 1 (zk/messages c "test"))))
;; (#clj_kafka.core.KafkaMessage{:topic "test", :offset 0, :partition 0, :key nil, :value #object["[B" 0x1b90d67c "[B@1b90d67c"]})

Another problem is it worked well to take messages less than messages in a topic, however it hanged
to try to take messages more than messages in a topic.

Assume there are 10 messages and it worked to take 3 messages from the topic but it hanged to try to
take 11 messages.

(core/with-resource [c (zk/consumer config)]
  zk/shutdown
  (doall (take 3 (zk/messages c "test"))))
;; (#clj_kafka.core.KafkaMessage{:topic "test", :offset 0, :partition 0, :key nil, :value #object["[B" 0x45df4b8d "[B@45df4b8d"]}
;; #clj_kafka.core.KafkaMessage{:topic "test", :offset 1, :partition 0, :key nil, :value #object["[B" 0x645c0a87 "[B@645c0a87"]}
;; #clj_kafka.core.KafkaMessage{:topic "test", :offset 2, :partition 0, :key nil, :value #object["[B" 0x4b45519a "[B@4b45519a"]})
(core/with-resource [c (zk/consumer config)]
  zk/shutdown
  (doall (take 11 (zk/messages c "test"))))

The above case, it seemed waiting for a next message but it didn't exit after produce a new message on the topic(looked hanged).

Thanks,
MH

@pingles pingles closed this as completed Jan 29, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants