Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to deliver event to Kafka #2014

Open
filgiuff opened this issue Apr 20, 2021 · 6 comments
Open

Unable to deliver event to Kafka #2014

filgiuff opened this issue Apr 20, 2021 · 6 comments

Comments

@filgiuff
Copy link

filgiuff commented Apr 20, 2021

I've installed Fiware Orion (v 3.0.0), Fiware Cygnus (2.8.0) and Kafka (2.7.0) in order to send data from Orion to Kafka via Cygnus.

When the notification arrives from Orion to Cygnus, this error appears:

cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | time=2021-04-16T10:12:13.774Z | lvl=ERROR | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-6 7ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=run | msg=org.apache.flume.SinkRunner$PollingRunner[158] : Unable to deliver event. Exception follows. cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at com.google.common.base.Preconditions.checkState(Preconditions.java:172) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at com.telefonica.iot.cygnus.sinks.NGSISink.processNewBatches(NGSISink.java:646) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:373) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | at java.lang.Thread.run(Thread.java:748) cygnus_cygnus.1.wfg7cz8r3e1v@docker-desktop | time=2021-04-16T10:12:18.778Z | lvl=INFO | corr=36cccd7a-9e9c-11eb-8e44-02420a000004; cbnotif=2 | trans=a6fe0530-c653-4e19-828a-67 ec45b37a96 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[643] : Rollback transaction by Exception (begin() calle d when transaction is OPEN!) ...

Here other details.

@damianhorna
Copy link

Hi, I've got the same issue. Apparently the error appears when cygnus is checking whether a specific kafka topic exists.

cygnus_1  | time=2021-04-23T09:04:54.013Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=topicExists | msg=com.telefonica.iot.cygnus.backends.kafka.KafkaBackendImpl[60] : Checking if topic 'libraryxffffx002fcatalogxffffBook1xffffBook' already exists.
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=sendResponse | msg=org.eclipse.jetty.server.HttpChannel[693] : sendResponse info=null content=HeapByteBuffer@587c2b08[p=0,l=0,c=0,r=0]={<<<>>>} complete=true committing=false callback=Blocker@4fec5d4c{null}
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=process | msg=org.eclipse.jetty.server.HttpConnection$SendCallback[694] : org.eclipse.jetty.server.HttpConnection$SendCallback@3e547de7[PROCESSING][i=null,cb=Blocker@4fec5d4c{null}] generate: CONTINUE (null,[p=0,l=0,c=0,r=0],true)@COMPLETING
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=process | msg=org.eclipse.jetty.server.HttpConnection$SendCallback[694] : org.eclipse.jetty.server.HttpConnection$SendCallback@3e547de7[PROCESSING][i=null,cb=Blocker@4fec5d4c{null}] generate: NEED_CHUNK (null,[p=0,l=0,c=0,r=0],true)@COMPLETING
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=process | msg=org.eclipse.jetty.server.HttpConnection$SendCallback[694] : org.eclipse.jetty.server.HttpConnection$SendCallback@3e547de7[PROCESSING][i=null,cb=Blocker@4fec5d4c{null}] generate: FLUSH (null,[p=0,l=0,c=0,r=0],true)@COMPLETING
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=write | msg=org.eclipse.jetty.io.WriteFlusher[315] : write: WriteFlusher@76adc9a1{IDLE}->null [HeapByteBuffer@2ee8a74[p=0,l=5,c=1024,r=5]={<<<0\r\n\r\n>>>\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00}]
cygnus_1  | time=2021-04-23T09:04:54.015Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=updateState | msg=org.eclipse.jetty.io.WriteFlusher[119] : update WriteFlusher@76adc9a1{WRITING}->null:IDLE-->WRITING
cygnus_1  | time=2021-04-23T09:04:54.016Z | lvl=DEBUG | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=library | subsrv=/catalog | comp=cygnus-ngsi | op=flush | msg=org.eclipse.jetty.io.ChannelEndPoint[288] : flushed 5 SocketChannelEndPoint@376222c4{/172.25.0.3:58186<->/172.25.0.4:5050,OPEN,fill=-,flush=W,to=4/30000}{io=0/0,kio=0,kro=1}->HttpConnection@210f4b41[p=HttpParser{s=END,383 of 383},g=HttpGenerator@59427309{s=COMPLETING}]=>HttpChannelOverHttp@f59a675{r=1,c=true,a=COMPLETING,uri=//cygnus:5050/notify}
cygnus_1  | time=2021-04-23T09:04:54.016Z | lvl=ERROR | corr=f7bffea0-a412-11eb-8d4a-0242ac190003 | trans=9aebe718-5668-47af-bb02-306ffbbaa1e1 | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=run | msg=org.apache.flume.SinkRunner$PollingRunner[158] : Unable to deliver event. Exception follows.
cygnus_1  | java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
cygnus_1  |     at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
cygnus_1  |     at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
cygnus_1  |     at com.telefonica.iot.cygnus.sinks.NGSISink.processNewBatches(NGSISink.java:646)
cygnus_1  |     at com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:373)
cygnus_1  |     at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
cygnus_1  |     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
cygnus_1  |     at java.lang.Thread.run(Thread.java:748)

@damianhorna
Copy link

Possible steps to reproduce:

docker-compose.yml:

version: "3.9"
services:
  mongo:
    image: mongo:4.4
    command: --nojournal
  orion:
    image: fiware/orion
    links:
      - mongo
    ports:
      - "1026:1026"
    command: -dbhost mongo
  cygnus:
    image: fiware/cygnus-ngsi
    volumes:
      - ./agent.conf:/opt/apache-flume/conf/agent.conf
    depends_on:
      - orion
    expose:
      - "5050"
      - "5080"
    ports:
      - "5050:5050"
      - "5080:5080"
    environment:
      - CYGNUS_LOG_LEVEL=DEBUG
      - CYGNUS_SKIP_CONF_GENERATION=true
      - CYGNUS_MULTIAGENT=false

agent.conf:

cygnus-ngsi.sources = http-source
cygnus-ngsi.sinks = kafka-sink
cygnus-ngsi.channels = kafka-channel

cygnus-ngsi.sources.http-source.channels = kafka-channel
cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnus-ngsi.sources.http-source.port = 5050
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
cygnus-ngsi.sources.http-source.handler.default_service = def_serv
cygnus-ngsi.sources.http-source.handler.default_service_path = /def_servpath
cygnus-ngsi.sources.http-source.handler.events_ttl = 2
cygnus-ngsi.sources.http-source.interceptors = ts gi
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = /opt/apache-flume/conf/grouping_rules.conf

cygnus-ngsi.channels.kafka-channel.type = memory
cygnus-ngsi.channels.kafka-channel.capacity = 1000
cygnus-ngsi.channels.kafka-channel.trasactionCapacity = 100

cygnus-ngsi.sinks.kafka-sink.type = com.telefonica.iot.cygnus.sinks.NGSIKafkaSink
cygnus-ngsi.sinks.kafka-sink.channel = kafka-channel
cygnus-ngsi.sinks.kafka-sink.enable_grouping = false
cygnus-ngsi.sinks.kafka-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.kafka-sink.broker_list = 172.17.0.1:9092
cygnus-ngsi.sinks.kafka-sink.zookeeper_endpoint = 172.17.0.1:2181
cygnus-ngsi.sinks.kafka-sink.batch_size = 1
cygnus-ngsi.sinks.kafka-sink.batch_timeout = 10

And then requests as in the tutorial (https://fiware-cygnus.readthedocs.io/en/latest/cygnus-ngsi/integration/orion_cygnus_kafka/index.html):

(curl localhost:1026/v1/subscribeContext -s -S --header 'Content-type: application/json' --header 'Accept: application/json' --header 'Fiware-Service: Library' --header 'Fiware-ServicePath: /catalog' -d @- | python -mjson.tool) <<EOF
{
    "entities": [
        {
            "type": "Book",
            "isPattern": "false",
            "id": "Book1"
        }
    ],
    "attributes": [
    ],
    "reference": "http://cygnus:5050/notify",
    "duration": "P1M",
    "notifyConditions": [
        {
            "type": "ONCHANGE",
            "condValues": [
                "title",
                "pages",
                "price"
            ]
        }
    ],
    "throttling": "PT5S"
}
EOF
(curl localhost:1026/v1/updateContext -s -S --header 'Content-Type: application/json' --header 'Accept: application/json' --header 'Fiware-Service: Library' --header 'Fiware-ServicePath: /catalog' -d @- | python -mjson.tool) <<EOF
{
    "contextElements": [
        {
            "type": "Book",
            "isPattern": "false",
            "id": "Book1",
            "attributes": [
                {
                    "name": "title",
                    "type": "text",
                    "value": "Game of Thrones: Book 1"
                },
                {
                    "name": "pages",
                    "type": "integer",
                    "value": "927"
                },
                {
                    "name": "price",
                    "type": "float",
                    "value": "18.50"
                }
            ]
        }
    ],
    "updateAction": "APPEND"
}
EOF

After that the error appears in the console.

@mapedraza
Copy link
Collaborator

Thanks for adding the information Damian. The problem seems to be a specific problem with the Kafka sink, which it does not have a high priority on the maintenance list at the present moment by Cygnus development team, so we cannot offer a quick answer to your problem. However, we are open to contributions to get Kafka sink working properly.

@AlvaroVega
Copy link
Member

Could you also provide how you starts kafka broker?

@filgiuff
Copy link
Author

I've tried with different versions of Kafka and docker-compose
Following one that I used:

version: "3"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    deploy:
      replicas: 1
      restart_policy:
        condition: any
        delay: 5s
        max_attempts: 3
        window: 120s  
      
  kafka:
    image: docker.io/bitnami/kafka:2
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_LISTENERS=PLAINTEXT://:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.142:9092
    depends_on:
      - zookeeper
    deploy:
      replicas: 1
      restart_policy:
        condition: any
        delay: 5s
        max_attempts: 3
        window: 120s  
      
volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

Started using
docker stack deploy -c docker-compose.yml kafka

The result was always the same error.

@jaimeventura
Copy link

Hey,
im having the same issue.
Is there any update on this?
Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants