Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question : I try to use it but nothing ... #26

Closed
richiesgr opened this issue Jan 13, 2015 · 7 comments
Closed

Question : I try to use it but nothing ... #26

richiesgr opened this issue Jan 13, 2015 · 7 comments

Comments

@richiesgr
Copy link

Hi

I've try to use it but I can't get it working I mean that I get message from kafka only at the this time then I enter in this mode:
[test-akka.actor.default-dispatcher-3] INFO com.sclasen.akka.kafka.ConnectorFSM - at=created-streams
[test-akka.actor.default-dispatcher-3] INFO akka.actor.ActorSystemImpl - at=consumer-started
[test-akka.actor.default-dispatcher-5] INFO com.sclasen.akka.kafka.ConnectorFSM - at=transition from=Receiving to=Committing uncommitted=0
[test-akka.actor.default-dispatcher-5] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1
[test-akka.actor.default-dispatcher-5] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1
[test-akka.actor.default-dispatcher-2] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1

And don't get nothing anymore.
I've try to play with CommitConf without success.
Could you help me
My code is
Main () {
val system = ActorSystem("test")

val senderRequest = system.actorOf(Props[SenderRequest])
val consumerProps = AkkaConsumerProps.forSystem(
system = system,
zkConnect = "localhost:2181",
topic = "HTTP",
group = "1",
streams = 1, //one per partition
keyDecoder = new DefaultDecoder(),
msgDecoder = new DefaultDecoder(),
receiver = senderRequest,
commitConfig = new CommitConfig(Some(10 seconds), Some(1), Timeout(10 second))
)

val consumer = new AkkaConsumer(consumerProps)
consumer.start()
}

class SenderRequest extends Actor with ActorLogging {
override def receive: Actor.Receive = {
case data : Any =>
log.info("RECEIVE HERE !!!!!!!!!!!!!!!")
sender() ! StreamFSM.Processed
}
}

Thanks

@sclasen
Copy link
Owner

sclasen commented Jan 13, 2015

Hmm, perhaps there is an off by one error. What happens if you use

commitConfig = new CommitConfig(Some(10 seconds), Some(10), Timeout(10 second))

also, is there any process sending messages to kafka?

@sclasen
Copy link
Owner

sclasen commented Jan 13, 2015

also, what version are you using?

@richiesgr
Copy link
Author

Thanks for super quick help !!

using 0.9 from maven repo
try your commitConfig don't help

here is my log with debug
[test-akka.actor.default-dispatcher-6] INFO com.sclasen.akka.kafka.ConnectorFSM - at=created-streams
2015-01-13 19:21:51 DEBUG ClientCnxn:839 - Reading reply sessionid:0x14ae3aafd0c0028, packet:: clientPath:null serverPath:null finished:false header:: 16,4 replyHeader:: 16,453,0 request:: '/brokers/ids/0,F response:: #7b226a6d785f706f7274223a2d312c2274696d657374616d70223a2231343231313538383536393333222c22686f7374223a223139322e3136382e35302e3731222c2276657273696f6e223a312c22706f7274223a393039327d,s{218,218,1421158856939,1421158856939,0,0,0,93137065843884032,90,0,218}
[test-akka.actor.default-dispatcher-6] INFO akka.actor.ActorSystemImpl - at=consumer-started
2015-01-13 19:21:51 INFO VerifiableProperties:68 - Verifying properties
2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property client.id is overridden to 1
2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property metadata.broker.list is overridden to 192.168.50.71:9092
2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property request.timeout.ms is overridden to 30000
2015-01-13 19:21:51 INFO ClientUtils$:68 - Fetching metadata from broker id:0,host:192.168.50.71,port:9092 with correlation id 0 for 1 topic(s) Set(HTTP)
2015-01-13 19:21:51 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 530904 (requested -1), SO_SNDBUF = 102400 (requested 102400).
2015-01-13 19:21:51 INFO SyncProducer:68 - Connected to 192.168.50.71:9092 for producing
2015-01-13 19:21:51 INFO SyncProducer:68 - Disconnecting from 192.168.50.71:9092
2015-01-13 19:21:51 DEBUG ClientUtils$:52 - Successfully fetched metadata for 1 topic(s) Set(HTTP)
2015-01-13 19:21:51 DEBUG ConsumerFetcherManager$LeaderFinderThread:52 - [1_richard-Latitude-E6440-1421169710915-2dd7b19d-leader-finder-thread], {TopicMetadata for topic HTTP ->
Metadata for partition [HTTP,0] is partition 0 leader: 0 (192.168.50.71:9092) replicas: 0 (192.168.50.71:9092) isr: 0 (192.168.50.71:9092) isUnderReplicated: false}
2015-01-13 19:21:51 INFO ConsumerFetcherThread:68 - [ConsumerFetcherThread-1_richard-Latitude-E6440-1421169710915-2dd7b19d-0-0], Starting
2015-01-13 19:21:51 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 1313280 (requested -1).
2015-01-13 19:21:51 DEBUG PartitionTopicInfo:52 - reset fetch offset of ( HTTP:0: fetched offset = 20996: consumed offset = -1 ) to 20996
2015-01-13 19:21:51 DEBUG PartitionTopicInfo:52 - reset consume offset of HTTP:0: fetched offset = 20996: consumed offset = 20996 to 20996
2015-01-13 19:21:51 INFO ConsumerFetcherManager:68 - [ConsumerFetcherManager-1421169710957] Added fetcher for partitions ArrayBuffer([[HTTP,0], initOffset -1 to broker id:0,host:192.168.50.71,port:9092] )
2015-01-13 19:21:53 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 1ms
2015-01-13 19:21:55 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 1ms
2015-01-13 19:21:57 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 1ms
2015-01-13 19:21:59 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 0ms
2015-01-13 19:22:01 DEBUG PartitionTopicInfo:52 - updated fetch offset of (HTTP:0: fetched offset = 20997: consumed offset = 20996) to 20997
2015-01-13 19:22:01 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 0ms
[test-akka.actor.default-dispatcher-2] INFO com.sclasen.akka.kafka.ConnectorFSM - at=transition from=Receiving to=Committing uncommitted=0
[test-akka.actor.default-dispatcher-4] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1

@sclasen
Copy link
Owner

sclasen commented Jan 13, 2015

OOh, sorry I think 0.0.9 is broken. .Can you try 0.0.10-SNAPSHOT from
oss.sonatype. I am publishing 0.0.10 right now!!!

On Tue, Jan 13, 2015 at 9:23 AM, richiesgr notifications@github.com wrote:

Thanks for super quick help !!

using 0.9 from maven repo
try your commitConfig don't help

here is my log with debug
[test-akka.actor.default-dispatcher-6] INFO
com.sclasen.akka.kafka.ConnectorFSM - at=created-streams
2015-01-13 19:21:51 DEBUG ClientCnxn:839 - Reading reply
sessionid:0x14ae3aafd0c0028, packet:: clientPath:null serverPath:null
finished:false header:: 16,4 replyHeader:: 16,453,0 request::
'/brokers/ids/0,F response::
#7b226a6d785f706f7274223a2d312c2274696d657374616d70223a2231343231313538383536393333222c22686f7374223a223139322e3136382e35302e3731222c2276657273696f6e223a312c22706f7274223a393039327d,s{218,218,1421158856939,1421158856939,0,0,0,93137065843884032,90,0,218}

[test-akka.actor.default-dispatcher-6] INFO akka.actor.ActorSystemImpl -
at=consumer-started
2015-01-13 19:21:51 INFO VerifiableProperties:68 - Verifying properties
2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property client.id is
overridden to 1
2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property
metadata.broker.list is overridden to 192.168.50.71:9092
2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property
request.timeout.ms is overridden to 30000
2015-01-13 19:21:51 INFO ClientUtils$:68 - Fetching metadata from broker
id:0,host:192.168.50.71,port:9092 with correlation id 0 for 1 topic(s)
Set(HTTP)
2015-01-13 19:21:51 DEBUG BlockingChannel:52 - Created socket with
SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 530904 (requested -1),
SO_SNDBUF = 102400 (requested 102400).
2015-01-13 19:21:51 INFO SyncProducer:68 - Connected to 192.168.50.71:9092
for producing
2015-01-13 19:21:51 INFO SyncProducer:68 - Disconnecting from
192.168.50.71:9092
2015-01-13 19:21:51 DEBUG ClientUtils$:52 - Successfully fetched metadata
for 1 topic(s) Set(HTTP)
2015-01-13 19:21:51 DEBUG ConsumerFetcherManager$LeaderFinderThread:52 -
[1_richard-Latitude-E6440-1421169710915-2dd7b19d-leader-finder-thread],
{TopicMetadata for topic HTTP ->
Metadata for partition [HTTP,0] is partition 0 leader: 0 (
192.168.50.71:9092) replicas: 0 (192.168.50.71:9092) isr: 0 (
192.168.50.71:9092) isUnderReplicated: false}
2015-01-13 19:21:51 INFO ConsumerFetcherThread:68 -
[ConsumerFetcherThread-1_richard-Latitude-E6440-1421169710915-2dd7b19d-0-0],
Starting
2015-01-13 19:21:51 DEBUG BlockingChannel:52 - Created socket with
SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536),
SO_SNDBUF = 1313280 (requested -1).
2015-01-13 19:21:51 DEBUG PartitionTopicInfo:52 - reset fetch offset of (
HTTP:0: fetched offset = 20996: consumed offset = -1 ) to 20996
2015-01-13 19:21:51 DEBUG PartitionTopicInfo:52 - reset consume offset of
HTTP:0: fetched offset = 20996: consumed offset = 20996 to 20996
2015-01-13 19:21:51 INFO ConsumerFetcherManager:68 -
[ConsumerFetcherManager-1421169710957] Added fetcher for partitions
ArrayBuffer([[HTTP,0], initOffset -1 to broker
id:0,host:192.168.50.71,port:9092] )
2015-01-13 19:21:53 DEBUG ClientCnxn:759 - Got ping response for
sessionid: 0x14ae3aafd0c0028 after 1ms
2015-01-13 19:21:55 DEBUG ClientCnxn:759 - Got ping response for
sessionid: 0x14ae3aafd0c0028 after 1ms
2015-01-13 19:21:57 DEBUG ClientCnxn:759 - Got ping response for
sessionid: 0x14ae3aafd0c0028 after 1ms
2015-01-13 19:21:59 DEBUG ClientCnxn:759 - Got ping response for
sessionid: 0x14ae3aafd0c0028 after 0ms
2015-01-13 19:22:01 DEBUG PartitionTopicInfo:52 - updated fetch offset of
(HTTP:0: fetched offset = 20997: consumed offset = 20996) to 20997
2015-01-13 19:22:01 DEBUG ClientCnxn:759 - Got ping response for
sessionid: 0x14ae3aafd0c0028 after 0ms
[test-akka.actor.default-dispatcher-2] INFO
com.sclasen.akka.kafka.ConnectorFSM - at=transition from=Receiving
to=Committing uncommitted=0
[test-akka.actor.default-dispatcher-4] WARN
com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout
drained=1 streams=1


Reply to this email directly or view it on GitHub
#26 (comment).

@sclasen
Copy link
Owner

sclasen commented Jan 13, 2015

0.0.10 should be live in just a few minutes

On Tue, Jan 13, 2015 at 9:24 AM, Scott Clasen scott@heroku.com wrote:

OOh, sorry I think 0.0.9 is broken. .Can you try 0.0.10-SNAPSHOT from
oss.sonatype. I am publishing 0.0.10 right now!!!

On Tue, Jan 13, 2015 at 9:23 AM, richiesgr notifications@github.com
wrote:

Thanks for super quick help !!

using 0.9 from maven repo
try your commitConfig don't help

here is my log with debug
[test-akka.actor.default-dispatcher-6] INFO
com.sclasen.akka.kafka.ConnectorFSM - at=created-streams
2015-01-13 19:21:51 DEBUG ClientCnxn:839 - Reading reply
sessionid:0x14ae3aafd0c0028, packet:: clientPath:null serverPath:null
finished:false header:: 16,4 replyHeader:: 16,453,0 request::
'/brokers/ids/0,F response::
#7b226a6d785f706f7274223a2d312c2274696d657374616d70223a2231343231313538383536393333222c22686f7374223a223139322e3136382e35302e3731222c2276657273696f6e223a312c22706f7274223a393039327d,s{218,218,1421158856939,1421158856939,0,0,0,93137065843884032,90,0,218}

[test-akka.actor.default-dispatcher-6] INFO akka.actor.ActorSystemImpl -
at=consumer-started
2015-01-13 19:21:51 INFO VerifiableProperties:68 - Verifying properties
2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property client.id is
overridden to 1
2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property
metadata.broker.list is overridden to 192.168.50.71:9092
2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property
request.timeout.ms is overridden to 30000
2015-01-13 19:21:51 INFO ClientUtils$:68 - Fetching metadata from broker
id:0,host:192.168.50.71,port:9092 with correlation id 0 for 1 topic(s)
Set(HTTP)
2015-01-13 19:21:51 DEBUG BlockingChannel:52 - Created socket with
SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 530904 (requested -1),
SO_SNDBUF = 102400 (requested 102400).
2015-01-13 19:21:51 INFO SyncProducer:68 - Connected to
192.168.50.71:9092 for producing
2015-01-13 19:21:51 INFO SyncProducer:68 - Disconnecting from
192.168.50.71:9092
2015-01-13 19:21:51 DEBUG ClientUtils$:52 - Successfully fetched metadata
for 1 topic(s) Set(HTTP)
2015-01-13 19:21:51 DEBUG ConsumerFetcherManager$LeaderFinderThread:52 -
[1_richard-Latitude-E6440-1421169710915-2dd7b19d-leader-finder-thread],
{TopicMetadata for topic HTTP ->
Metadata for partition [HTTP,0] is partition 0 leader: 0 (
192.168.50.71:9092) replicas: 0 (192.168.50.71:9092) isr: 0 (
192.168.50.71:9092) isUnderReplicated: false}
2015-01-13 19:21:51 INFO ConsumerFetcherThread:68 -
[ConsumerFetcherThread-1_richard-Latitude-E6440-1421169710915-2dd7b19d-0-0],
Starting
2015-01-13 19:21:51 DEBUG BlockingChannel:52 - Created socket with
SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536),
SO_SNDBUF = 1313280 (requested -1).
2015-01-13 19:21:51 DEBUG PartitionTopicInfo:52 - reset fetch offset of (
HTTP:0: fetched offset = 20996: consumed offset = -1 ) to 20996
2015-01-13 19:21:51 DEBUG PartitionTopicInfo:52 - reset consume offset of
HTTP:0: fetched offset = 20996: consumed offset = 20996 to 20996
2015-01-13 19:21:51 INFO ConsumerFetcherManager:68 -
[ConsumerFetcherManager-1421169710957] Added fetcher for partitions
ArrayBuffer([[HTTP,0], initOffset -1 to broker
id:0,host:192.168.50.71,port:9092] )
2015-01-13 19:21:53 DEBUG ClientCnxn:759 - Got ping response for
sessionid: 0x14ae3aafd0c0028 after 1ms
2015-01-13 19:21:55 DEBUG ClientCnxn:759 - Got ping response for
sessionid: 0x14ae3aafd0c0028 after 1ms
2015-01-13 19:21:57 DEBUG ClientCnxn:759 - Got ping response for
sessionid: 0x14ae3aafd0c0028 after 1ms
2015-01-13 19:21:59 DEBUG ClientCnxn:759 - Got ping response for
sessionid: 0x14ae3aafd0c0028 after 0ms
2015-01-13 19:22:01 DEBUG PartitionTopicInfo:52 - updated fetch offset of
(HTTP:0: fetched offset = 20997: consumed offset = 20996) to 20997
2015-01-13 19:22:01 DEBUG ClientCnxn:759 - Got ping response for
sessionid: 0x14ae3aafd0c0028 after 0ms
[test-akka.actor.default-dispatcher-2] INFO
com.sclasen.akka.kafka.ConnectorFSM - at=transition from=Receiving
to=Committing uncommitted=0
[test-akka.actor.default-dispatcher-4] WARN
com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout
drained=1 streams=1


Reply to this email directly or view it on GitHub
#26 (comment).

@richiesgr
Copy link
Author

Thanks I give it a try

@richiesgr
Copy link
Author

Thanks a lot for your help working now on 0.0.10

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants