### importing jars

In [2]:
%classpath add jar libs/component-catalog.jar
%classpath add jar libs/mqttv3-1.1.2.jar

In [3]:
// import org.eclipse.paho.client.mqttv3.;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.UUID;

public class MQTTConsumer implements MqttCallback {

    private MqttClient client;
    private String uri;
    private String user;
    private String password;
    private String topic;
    private MqttConnectOptions connectionOptions;
    private String clientID;
    
    
    public MQTTConsumer(String uri, String user, String password, String topic) {

        this.uri = uri;
        this.user = user;
        this.password = password;
        this.topic = topic;
        this.connect();
    }

    public MQTTConsumer(String uri, String topic) {

        this.uri = uri;
        this.topic = topic;    
        this.connect();
    }


    private void connect() {
        
        this.clientID = UUID.randomUUID().toString();      
        this.connectionOptions = new MqttConnectOptions();
        this.connectionOptions.setAutomaticReconnect(true);
        this.connectionOptions.setCleanSession(true);
        this.connectionOptions.setConnectionTimeout(20);
        
        
        try {
            client = new MqttClient(this.uri, this.clientID);
            client.connect(this.connectionOptions);            
            client.setCallback(this);
            client.subscribe(this.topic);
            System.out.println("[MQTTConsumer] connected");            

        } catch (MqttException e) {
            
            e.printStackTrace();
        }
    }

    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("[MQTTConsumer] Connection lost.  Reason: " + cause);
        System.exit(1);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws MqttException {
        System.out.println(String.format("[MQTTConsumer] New message received | topic: %s | payload: %s", topic, new String(message.getPayload())));
    }


    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {

    }

}

com.twosigma.beaker.javash.bkr02a73051.MQTTConsumer

In [4]:
MQTTConsumer consumer = new MQTTConsumer("tcp://localhost:1883","news/stream");
return "";

[MQTTConsumer] connected




[MQTTConsumer] New message received | topic: news/stream | payload: test1
[MQTTConsumer] New message received | topic: news/stream | payload: test2
[MQTTConsumer] New message received | topic: news/stream | payload: test3


In [5]:
// import org.eclipse.paho.client.mqttv3.;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.UUID;

public class MQTTProducer implements MqttCallback {

    private final int qos = 1;    
    private MqttClient client;
    private String uri;
    private String user;
    private String password;
    private MqttConnectOptions connectionOptions;
    private String clientID;
    
    
    public MQTTProducer(String uri, String user, String password) {

        this.uri = uri;
        this.user = user;
        this.password = password;
        this.connect();
        
    }

    public MQTTProducer(String uri) {

        this.uri = uri;
        this.connect();
        
    }

    private void connect() {
        
        this.clientID = UUID.randomUUID().toString();      
        this.connectionOptions = new MqttConnectOptions();
        this.connectionOptions.setAutomaticReconnect(true);
        this.connectionOptions.setCleanSession(true);
        this.connectionOptions.setConnectionTimeout(20);
        
        
        try {
            client = new MqttClient(this.uri, this.clientID);
            client.connect(this.connectionOptions);            
            client.setCallback(this);
            System.out.println("[MQTTProducer] Connected");

        } catch (MqttException e) {
            
            e.printStackTrace();
        }
    }        

    public void sendMessage(String topic, String payload) {
        
        try{
            MqttMessage message = new MqttMessage(payload.getBytes());
            message.setQos(qos);
            this.client.publish(topic, message); // Blocking publish
            System.out.println("[MQTTProducer] Message sent");    
        } catch (MqttException e) {
            
            e.printStackTrace();
        }        
        
    }        

    

    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("[MQTTProducer] Connection lost.  Reason: " + cause);
        System.exit(1);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws MqttException {
        
        System.out.println(String.format("[%s] %s", topic, new String(message.getPayload())));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {

    }

}

com.twosigma.beaker.javash.bkr02a73051.MQTTProducer

In [6]:
MQTTConsumer consumer = new MQTTConsumer("tcp://localhost:1883","news/stream");
MessageFilter mfilter
mfilter

consumer.connect(filter)
filter.connect(pro)

MQTTProducer producer = new MQTTProducer("tcp://localhost:1883");
producer.sendMessage("news/stream","test1");
producer.sendMessage("news/stream","test2");
producer.sendMessage("news/stream","test3");

[MQTTConsumer] connected
[MQTTProducer] Connected
[MQTTProducer] Message sent
[MQTTConsumer] New message received | topic: news/stream | payload: test1
[MQTTConsumer] New message received | topic: news/stream | payload: test2
[MQTTProducer] Message sent
[MQTTProducer] Message sent
[MQTTConsumer] New message received | topic: news/stream | payload: test3


null