Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unable to connect MQTT source connector with confluent kafka v5.4 #3

Open
ntsh999 opened this issue Jun 1, 2020 · 5 comments
Open

Comments

@ntsh999
Copy link

ntsh999 commented Jun 1, 2020

Objective: to setup MQTT source connector with Confluent kafka v 5.4
Steps Performed:

git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git
cd kafka-connect-mqtt
mvn clean install
copied the jar from /kafka-connect-mqtt/target to the plugins directory. The path of this directory is mentioned in the worker configuration.
executing http://:8083/connector-plugins gives the expected output.
curl -d @./mqtt-source-connector-1.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors
Expected: There should be no error logs in kafka connect and connector should be available for usage. But after 6th step is performed following is the error received in kafka connect logs :

[2020-06-01 09:42:38,755] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Not authorized to connect (5)
        at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:53)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: Not authorized to connect (5)
        at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:28)
        at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:1040)
        at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:151)
        ... 1 more
[2020-06-01 09:42:38,756] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

The error shows connection failure between mqtt broker and kafka connect. Although, I have entered correct mqtt broker credentials. I verified this by running the mosquitto pub sub commands from the VM on which my confluent platform is running. Any idea why am I getting this error?

@ntsh999 ntsh999 changed the title To connect MQTT source connector with confluent kafka v5.4 unable to connect MQTT source connector with confluent kafka v5.4 Jun 1, 2020
@johanvandevenne
Copy link
Owner

johanvandevenne commented Jun 1, 2020 via email

@ntsh999
Copy link
Author

ntsh999 commented Jun 1, 2020

I did not check the logs on the broker side as I tried with another broker and it worked. The broker which produced the abv error had 'at the rate 'symbol in the password so may be it was creating problem.

@PetitCedric
Copy link

Hello,

I've the same issue with the sink connector.

Have you find any clues ? Or do you have maybe a solution ?

Regards

@fjhuanca
Copy link

I'm not sure if the same problem of above but I end here searching a solution for the same trace log. I realize that in the documentation de parameter for username is mqtt.username but on MQTTSinkConnectorConfig.java and MQTTSourceConnectorConfig.java is defined as mqtt.userName. The solution, as you might guess, was use mqtt.userName in properties file.

@BilalHammas
Copy link

Do you get any valuable logging on the MQTT broker side ?

On Mon, Jun 1, 2020 at 11:37 AM ntsh999 @.***> wrote: Objective: to setup MQTT source connector with Confluent kafka v 5.4 Steps Performed: git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git cd kafka-connect-mqtt mvn clean install copied the jar from /kafka-connect-mqtt/target to the plugins directory. The path of this directory is mentioned in the worker configuration. executing http://:8083/connector-plugins gives the expected output. curl -d @./mqtt-source-connector-1.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors Expected: There should be no error logs in kafka connect and connector should be available for usage. But after 6th step is performed following is the error received in kafka connect logs : [2020-06-01 08:24:15,107] INFO [Producer clientId=connector-producer-johan-mqtt-source-connector-1-0] Cluster ID: 0ZK4cUcPSkmwpRsrIxZKxg (org.apache.kafka.clients.Metadata:259) [2020-06-01 08:24:15,416] INFO Connected to MQTT Broker (be.jovacon.kafka.connect.MQTTSourceConnector:38) [2020-06-01 08:24:15,416] INFO Subscribing to my_mqtt_topic with QOS 1 (be.jovacon.kafka.connect.MQTTSourceConnector:43) [2020-06-01 08:24:15,420] INFO WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416) [2020-06-01 08:24:15,420] INFO WorkerSourceTask{id=johan-mqtt-source-connector-1-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433) [2020-06-01 08:24:15,420] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.connect.errors.ConnectException: MqttException (128) at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:53) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: MqttException (128) at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:466) at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:454) at be.jovacon.kafka.connect.MQTTSourceTask.start(MQTTSourceTask.java:44) ... 8 more [2020-06-01 08:24:15,421] ERROR WorkerSourceTask{id=johan-mqtt-source-connector-1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180) [2020-06-01 08:24:15,425] INFO [Producer clientId=connector-producer-johan-mqtt-source-connector-1-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183) The error shows connection failure between mqtt broker and kafka connect. Although, I have entered correct mqtt broker credentials. I verified this by running the mosquitto pub sub commands from the VM on which my confluent platform is running. Any idea why am I getting this error? — You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub <#3>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AODU2ZFVYARARNAEI5NPODTRUNZGHANCNFSM4NPVHUPQ .

Hello do you have the solution of this MqttException(128), please let me know I can't click on the link ;)
Great, Thanks ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants