Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event structure example #3

Closed
luckyswede opened this issue Nov 26, 2014 · 17 comments
Closed

Event structure example #3

luckyswede opened this issue Nov 26, 2014 · 17 comments

Comments

@luckyswede
Copy link

Hi,
Do you have an example of what structure the kafka events should have in order to work with this river?
I'm receiving json from kafka and I have configured a mapping in ES to describe the messages and configured the river to use this type. I see the messages coming in from kafka in the ES logs, but no docs are put into ES.
Any clues?

Thanks / Jonas

@mariamhakobyan-zanox
Copy link

Hi @luckyswede,

Currently the plugin supports reading string messages from kafka (and the DefaultEncoder is used as serializer.class), and puts into ES as a value property. It is planned to add support for JSON Kafka messages as well, but if you need it asap, I would welcome you to add the support and send a pull request.

Regarding not seeing docs in ES, the cause might be that you did not specify bulk.size property while creating the index, and by default the value 100 will be used. If that's the case, you will need to consume 100 messages minimum, and only then the docs will be inserted into ES, because the plugin uses Bulk API to insert messages in a bulk.

Please let me know if that resolves your problem, or if I can help you with anything else.
Regards,
Mariam

@luckyswede
Copy link
Author

Ok I see.
What does the "type" configuration parameter mean though? I thought that corresponded to ES mapping types.

/ Jonas

@mariamhakobyan
Copy link
Owner

I am not sure which type configuration parameter are you referring to, the one in the top level when creating the index, or the one under index ?

curl -XPUT 'localhost:9200/_river/<river-name>/_meta' -d '
{
     "type" : "kafka",
     "kafka" : {
        "zookeeper.connect" : <zookeeper.connect>, 
        "zookeeper.connection.timeout.ms" : <zookeeper.connection.timeout.ms>,
        "topic" : <topic-name>
    },
    "index" : {
        "index" : <index-name>,
        "type" : <mapping-type-name>,
        "bulk.size" : <bulk.size>,
        "concurrent.requests" : <concurrent.requests>
     }
 }'

The top level one is a static value "kafka", can not be changed by the client.
The one under index is the mapping type of elasticsearch index. This type property has a string type, and can be configured by the client.

@rberger
Copy link

rberger commented Nov 27, 2014

I'm having the same problem. What's strange is it worked for a little while, then I deleted the river and re-created it and can't get it to work.

I see the data in the ElasticSearch Logs:

[2014-11-27 19:31:49,698][INFO ][org.elasticsearch.river.kafka.KafkaWorker] {
 "Topic": "ep-client-stats-staging",
   "MAC": "b8-76-3f-01-02-5b",
   "Wlan": "wlan0",
   "ConnectedTimeSec": 38927,
   "InactiveTimeMilliSec": 352,
   "RSSI": -46,
   "AvgRSSI": -45,
   "TxBitRate": 117.0,
   "TxBytes": 148668399,
   "TxPkts": 253983,
   "TxRetries": 62955,
   "TxFailed": 1,
   "Txbps": 925,
   "Txpps": 1,
   "RxBitRate": 144.4,
   "RxBytes": 64647711,
   "RxPkts": 244225,
   "Rxbps": 1239,
   "Rxpps": 1,
   "ARCStatus": 0
  }
 ]
}

But the Search only shows the River stuff no data:

POST _search
{
   "query": {
      "match_all": {}
   }
}

Here's the result:

{
   "took": 16,
   "timed_out": false,
   "_shards": {
      "total": 21,
      "successful": 21,
      "failed": 0
   },
   "hits": {
      "total": 4,
      "max_score": 1,
      "hits": [
         {
            "_index": ".kibana",
            "_type": "config",
            "_id": "4.0.0-BETA2",
            "_score": 1,
            "_source": {
               "defaultIndex": "*"
            }
         },
         {
            "_index": "_river",
            "_type": "ep-client-stats-staging",
            "_id": "_status",
            "_score": 1,
            "_source": {
               "node": {
                  "id": "P6XexnIGS5SiP5XIjJUW7A",
                  "name": "elasticsearch-000",
                  "transport_address": "inet[/172.31.54.221:9300]"
               }
            }
         },
         {
            "_index": "_river",
            "_type": "ep-client-stats-staging",
            "_id": "_meta",
            "_score": 1,
            "_source": {
               "type": "kafka",
               "kafka": {
                  "zookeeper.connect":  "ec2-54-93-25-195.compute-1.amazonaws.com:2181,ec2-54-165-148-83.compute-1.amazonaws.com:2181,ec2-54-132-88-110.compute-1.amazonaws.com:2181",
                  "zookeeper.connection.timeout.ms": 10000,
                  "topic": "ep-client-stats-staging"
               },
               "index": {
                  "index": "ep-client-stats-staging-index",
                  "type": "status",
                  "bulk.size": 100,
                  "concurrent.requests": 1
               }
            }
         }
      ]
   }
}

(Not showing the kibana entry)

Here's the PUT to create the River:

PUT /_river/ep-client-stats-staging/_meta
{
    "type" : "kafka",
    "kafka" : {
       "zookeeper.connect" :  "ec2-54-93-25-195.compute-1.amazonaws.com:2181,ec2-54-165-148-83.compute-1.amazonaws.com:2181,ec2-54-132-88-110.compute-1.amazonaws.com:2181",
       "zookeeper.connection.timeout.ms" : 10000,
       "topic" : "ep-client-stats-staging"         
   },
   "index" : {
       "index" : "ep-client-stats-staging-index",
       "type" : "status",
       "bulk.size" : 100,
       "concurrent.requests" : 1
   }
}

@rberger
Copy link

rberger commented Nov 28, 2014

On a related note, you mentioned:

The one under index is the mapping type of elasticsearch index. This type property has a string type, and can be configured by the client.

How do I tell elastic search to interpret the elasticsearch index (in my case status) should be a JSON object and not a string?

Thanks

@rberger
Copy link

rberger commented Nov 29, 2014

I can't yet tell for sure, but it seems like the
Set<MessageAndMetadata> consumedMessages = consumePartitionMessages(stream);

never returns and so the elasticsearchProducer.addMessagesToBulkProcessor(consumedMessages); is never called.

        consume = true;
        try {
            logger.info("Kafka consumer started...");

            while (consume) {
                KafkaStream stream = chooseRandomStream(kafkaConsumer.getStreams());
                Set<MessageAndMetadata> consumedMessages = consumePartitionMessages(stream);

                elasticsearchProducer.addMessagesToBulkProcessor(consumedMessages);
            }
        } finally {
            logger.info("Kafka consumer has stopped...");
            consume = false;
        }

I.E. this block never exits:

 try {
            // by default it waits forever for message, but there is timeout configured
            final ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();

            // Consume all the messages of the stream (partition)
            while (consumerIterator.hasNext() && consume) {

                final MessageAndMetadata messageAndMetadata = consumerIterator.next();
                logMessage(messageAndMetadata);

                messageSet.add(messageAndMetadata);
            }
        } catch (ConsumerTimeoutException ex) {
            logger.info("Nothing to be consumed for now. Consume flag is: " + consume);
        }
        return messageSet;

Could there be conditions where the line

while (consumerIterator.hasNext() && consume) {

never goes false?

@mariamhakobyan
Copy link
Owner

Hi @rberger,

Regarding your first comment "I'm having the same problem. What's strange is it worked for a little while, then I deleted the river and re-created it and can't get it to work". I just tested the same behaviour on my local machine, when I delete the river, and create it again, without restarting the elasticsearch server, I could see the new messages being inserted from kafka into elasticsearch (I was using plain string messages).

Actually I am using the following command to retrieve all the data inserted into ES:

GET /kafka-index/_search?pretty=1

Could you maybe try the above mentioned command, and see if that shows any result for you?

Regards,
Mariam

@rberger
Copy link

rberger commented Nov 29, 2014

I'm pretty sure that for some reason, maybe due to the way the data from my kafka topic is coming in, the elasticsearch bulk add is never getting called. I see the logging from the line
logMessage(messageAndMetadata); in the KafkaWorker but no other logging output that indicates it ever does ever call elasticsearchProducer.addMessagesToBulkProcessor(consumedMessages)

I am wondering if I have a stream of data coming from kafka that always makes consumerIterator.hasNext == true.

My Java fu is weak and I'm just learning Elasticsearch and Kafka, but I will try to do some experiments. I will also try another kafka topic where I update the topic manually so I can see if its related to how much continuous data is in the kafka topic.

Any help is appreciated. If you have the interest, please drop me an email at rberger@mistsys.com and I could give you access to our kafka consumer to see if its a usage pattern problem or if I'm just doing something wrong. In any case appreciate you creating this tool and your help!

If I do the same command as GET /kafka-index/_search?pretty=1 but with my index:

GET /ep-client-stats-staging-index/_search?pretty=1

I get:

{
   "error": "IndexMissingException[[ep-client-stats-staging-index] missing]",
   "status": 404
}

@mariamhakobyan
Copy link
Owner

Hi @rberger,

After examining your use case again (when you have an infinite flow of data, without stopping), I am actually thinking that should be a bug in my code, inside the while loop. Because you have always data to read, consumerIterator.hasNext() keeps reading all the time, and never gets out of the loop, so it can add the messages to the ES bulkprocessor. It will only go out of the loop, when there is no data to read, and consumer times out.

I will fix this issue tomorrow, and will update you to test it again.
In the meanwhile, just to make sure that the plugin works for you, you can try the same thing locally.

Locally perform the following steps:
0. install Kafka

  1. run the Zookeeper server,
  2. run the Kafka server,
  3. create a topic
  4. create a console producer
  5. type some messages to be produced (manually in the console)
  6. install ElasticSearch
  7. install the river plugin
  8. run the elasticsearch server
  9. create a river (e.g. bulk_size = 5)
  10. and you will see the produced messages being logged
  11. execute GET /kafka-index/_search?pretty=1 to see the data inserted into ES

This should work, because when you stop typing, the consumer will time out after 15 ms, and the while loop will be terminated, so the messages will be added into ES bulk processor.

Sorry that this caused trouble for your application, I will fix it asap and let you know.
Thanks for finding it out :)

Cheers,
Mariam

@rberger
Copy link

rberger commented Nov 29, 2014

Ok, I think I proved that the issue is that the elasticsearchProducer.addMessagesToBulkProcessor(consumedMessages) is never being called due to consumerIterator.hasNext always being true.

I had the river running. In the elasticsearch log I'm seeing continuous output of logMessage(messageAndMetadata); along the lines of:

[2014-11-29 22:25:19,211][INFO ][org.elasticsearch.river.kafka.KafkaWorker] {
 "Topic": "ep-client-stats-staging",
 "HashKey": "00-11-22-33-44-56",
 "ID": "00-11-22-33-44-56",
 "When": "2014-11-29T22:26:03.797468114Z",
 "Interval": 10001782339,
 "Clients": [
  {
   "MAC": "18-87-96-f4-9d-50",
   "Wlan": "wlan0",
   "ConnectedTimeSec": 3014,
   "InactiveTimeMilliSec": 7876,
   "RSSI": -52,
   "AvgRSSI": -49,
   "TxBitRate": 65.0,
   "TxBytes": 2457502,
   "TxPkts": 2752,
   "TxRetries": 38,
   "TxFailed": 0,
   "Txbps": 0,
   "Txpps": 0,
   "RxBitRate": 65.0,
   "RxBytes": 358103,
   "RxPkts": 3598,
   "Rxbps": 62,
   "Rxpps": 0,
   "ARCStatus": 0
  },
  {
   "MAC": "00-21-00-47-5f-ef",
   "Wlan": "wlan0",
   "ConnectedTimeSec": 9964,
   "InactiveTimeMilliSec": 644,
   "RSSI": -39,
   "AvgRSSI": -38,
   "TxBitRate": 11.0,
   "TxBytes": 11177723,
   "TxPkts": 17284,
   "TxRetries": 2261,
   "TxFailed": 0,
   "Txbps": 14113,
   "Txpps": 1,
   "RxBitRate": 48.0,
   "RxBytes": 4754545,
   "RxPkts": 22700,
   "Rxbps": 2590,
   "Rxpps": 1,
   "ARCStatus": 0
  }
 ]
}

But never any log messages that show that elasticsearchProducer.addMessagesToBulkProcessor(consumedMessages) is ever called.

Then I actually shut down the kafka brokers assuming that would make consumerIterator.hasNext == false. And it sure did.

Besides the error messages saying it couldn't connect to the kafka broker it for the first time said:

[2014-11-29 22:23:04,110][INFO ][kafka.consumer.ConsumerFetcherManager] [ConsumerFetcherManager-1417296145707] Added fetcher for partitions ArrayBuffer()
[2014-11-29 22:23:04,228][INFO ][org.elasticsearch.river.kafka.KafkaWorker] Nothing to be consumed for now. Consume flag is: true
[2014-11-29 22:23:04,233][INFO ][org.elasticsearch.river.kafka.ElasticsearchProducer] Going to execute bulk request composed of 100 actions.
[2014-11-29 22:23:04,243][INFO ][org.elasticsearch.river.kafka.ElasticsearchProducer] Going to execute bulk request composed of 100 actions.
[2014-11-29 22:23:04,264][INFO ][cluster.metadata         ] [elasticsearch-000] [ep-client-stats-staging-index] creating index, cause [auto(bulk api)], shards [20]/[2], mappings
[]

And then a flurry of:

[2014-11-29 22:23:12,851][INFO ][org.elasticsearch.river.kafka.ElasticsearchProducer] Going to execute bulk request composed of 100 actions.
[2014-11-29 22:23:12,870][INFO ][org.elasticsearch.river.kafka.ElasticsearchProducer] Executed bulk composed of 100 actions.

And now when I say:

GET /ep-client-stats-staging-index/_search?pretty=1

I get:

{
   "took": 6,
   "timed_out": false,
   "_shards": {
      "total": 20,
      "successful": 20,
      "failed": 0
   },
   "hits": {
      "total": 453300,
      "max_score": 1,
      "hits": [
         {
            "_index": "ep-client-stats-staging-index",
            "_type": "stats",
            "_id": "ed5d1f38-9d75-4f0a-92b0-f44aa07ac9b4",
            "_score": 1,
            "_source": {
               "value": "   \"TxPkts\": 3995,"
            }
         },
         {
            "_index": "ep-client-stats-staging-index",
            "_type": "stats",
            "_id": "dc21e4da-7aec-43bb-a127-2f223e7b79bb",
            "_score": 1,
            "_source": {
               "value": "   \"InactiveTimeMilliSec\": 368,"
            }
         },
...

So need to do something that causes consumerIterator.hasNext to take a break every once in a while.

@rberger
Copy link

rberger commented Nov 29, 2014

I presume that if we would just make

while (consumerIterator.hasNext() && consume) {

have another parameter that breaks every bulk.size messages.

That would cause it to break out of the loop, and let the elasticsearchProducer.addMessagesToBulkProcessor(consumedMessages) get its messages.

I’ll probably take a stab at trying to fix it. Its good for me to learn more about this :-) If you have any hints as to where you think the fix should go and what it would be let me know and I’ll give it a try and send a pull request.

@mariamhakobyan
Copy link
Owner

Hi @rberger,

I actually already fixed the problem, will deploy the latest version in couple of minutes :)

Cheers,
Mariam

@rberger
Copy link

rberger commented Nov 29, 2014

Cool! Thanks!

@mariamhakobyan
Copy link
Owner

Hi @rberger,

The new fix is already available in the latest release version 1.1.1. See here - https://github.com/mariamhakobyan/elasticsearch-river-kafka/releases/tag/v1.1.1.
Please try the new version and let me know if that works for you. You will need to delete and install the new version of the plugin, and restart the elasticsearch.

Cheers,
Mariam

@rberger
Copy link

rberger commented Nov 29, 2014

Yay! It works! Now I can get back to figuring out how to have the message from Kafka be interpreted as JSON and not Text. I will look into what changes are needed to make that happen. I think I learnt enough from this exercise to maybe address that issue.

Thanks again!
Rob

@mariamhakobyan
Copy link
Owner

Great to hear that! In current version, if you receive JSON messages from Kafka, that messages will be inserted into ES as one value (not extracted per each json property). This is the default behaviour.

Could we consider this issue as resolved?

Regards,
Mariam

@rberger
Copy link

rberger commented Nov 30, 2014

Yes please consider this issue resolved.

Thanks Again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants