# Catálogo de Componentes com Mensagens

# Interface ITableProducer compartilhada

In [1]:
public interface ITableProducer {
  String[] requestAttributes();
  String[][] requestInstances();
}

com.twosigma.beaker.javash.bkr605576e1.ITableProducer

In [2]:
public interface ITableReceptacle {
  public void connect(ITableProducer producer);
}

com.twosigma.beaker.javash.bkr605576e1.ITableReceptacle

# Interface IUpdate compartilhada

In [3]:
public interface IUpdate {
  public void update();
}

com.twosigma.beaker.javash.bkr605576e1.IUpdate

In [4]:
public interface IUpdateReceptacle {
  public void connect(IUpdate notify);
}

com.twosigma.beaker.javash.bkr605576e1.IUpdateReceptacle

# Console

In [5]:
public interface IConsole extends ITableReceptacle, IUpdate {
}

com.twosigma.beaker.javash.bkr605576e1.IConsole

In [6]:
public class ConsoleComponent implements IConsole {
  private ITableProducer iProducer;
  
  public void connect(ITableProducer producer) {
    iProducer = producer;
  }
  
  public void update() {
    if (iProducer != null) {
        System.out.println("=== Attributes ===");
        String attributes[] = iProducer.requestAttributes();
        for (int a = 0; a < attributes.length-1; a++)
          System.out.print(attributes[a] + ", ");
        System.out.println(attributes[attributes.length-1]);

        System.out.println();
        System.out.println("=== Instances ===");
        String instances[][] = iProducer.requestInstances();
        for (int i = 0; i < instances.length; i++) {
          for (int a = 0; a < attributes.length-1; a ++)
            System.out.print(instances[i][a] + ", ");
          System.out.println(instances[i][attributes.length-1]);
        }
    }
  }
}

com.twosigma.beaker.javash.bkr605576e1.ConsoleComponent

# BusConsumer

In [7]:
public interface IBusProperties {
  public String getBusURI();
  public void setBusURI(String dataSource);
  public String getTopic();
  public void setTopic(String topic);
}

com.twosigma.beaker.javash.bkr605576e1.IBusProperties

In [8]:
public interface IBusConsumerProperties extends IBusProperties {
  public int getBlockSize();
  public void setBlockSize(int blockSize);
  public int getVerbose();
  public void setVerbose(int verbose);
}

com.twosigma.beaker.javash.bkr605576e1.IBusConsumerProperties

In [9]:
public interface IBusConsumer extends ITableProducer, IBusConsumerProperties, IUpdateReceptacle {
}

com.twosigma.beaker.javash.bkr605576e1.IBusConsumer

In [10]:
import java.util.Date;

public class SensorReading {
   private Date timestamp;
   private String dimension;
   private double value;
   private String unity;
   
   public SensorReading() {
      /* nothing */
   }
   
   public SensorReading(Date timestamp, String dimension, double value, String unity) {
      this.timestamp = timestamp;
      this.dimension = dimension;
      this.value = value;
      this.unity = unity;
   }

   public Date getTimestamp() {
      return timestamp;
   }
   
   public void setTimestamp(Date timestamp) {
      this.timestamp = timestamp;
   }
   
   public String getDimension() {
      return dimension;
   }
   
   public void setDimension(String dimension) {
      this.dimension = dimension;
   }
   
   public double getValue() {
      return value;
   }
   
   public void setValue(double value) {
      this.value = value;
   }
   
   public String getUnity() {
      return unity;
   }
   
   public void setUnity(String unity) {
      this.unity = unity;
   }
}

com.twosigma.beaker.javash.bkr605576e1.SensorReading

In [11]:
public class Message {
   private String source;
   private String name;
   private String type;
   private SensorReading body;
   
   public Message() {
      /* nothing */
   }
   
   public Message(String source, String name, String type, SensorReading body) {
      super();
      this.source = source;
      this.name = name;
      this.type = type;
      this.body = body;
   }

   public String getSource() {
      return source;
   }
   
   public void setSource(String source) {
      this.source = source;
   }
   
   public String getName() {
      return name;
   }
   
   public void setName(String name) {
      this.name = name;
   }
   
   public String getType() {
      return type;
   }
   
   public void setType(String type) {
      this.type = type;
   }
   
   public SensorReading getBody() {
      return body;
   }
   
   public void setBody(SensorReading body) {
      this.body = body;
   }
}

com.twosigma.beaker.javash.bkr605576e1.Message

In [12]:
%classpath add jar ../../../../src/lib/mqttv3-1.1.2.jar
%classpath add jar ../../../../src/lib/mysql-connector-java-5.1.17-bin.jar
%classpath add jar ../../../../src/lib/genson-1.6.jar

In [13]:
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.UUID;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import com.owlike.genson.Genson;
import com.owlike.genson.GensonBuilder;

public class BusConsumerComponent implements IBusConsumer, MqttCallback {
  private String busURI = null;
  private String topic = null;
  private int blockSize = 10;
  private int verbose = 2;
  
  private String[] attributes = {"message", "timestamp", "dimension", "value", "unit"};
  private ArrayList<String[]> instArray = new ArrayList<String[]>();
  private IUpdate notify = null;
  private int count = 0;
  
  private Genson genson;

  private MqttClient client;
  private MqttConnectOptions connectionOptions;
  private String clientID;  
  
  public BusConsumerComponent() {
    /* nothing */
  }

  public String getBusURI() {
    return busURI;
  }

  public void setBusURI(String busURI) {
    this.busURI = busURI;
  }
  
  public String getTopic() {
     return topic;
  }
  
  public void setTopic(String topic) {
     this.topic = topic;
     readDS();
  }
  
  public int getBlockSize() {
     return blockSize;
  }
  
  public void setBlockSize(int blockSize) {
     this.blockSize = blockSize;
  }
  
  public int getVerbose() {
     return verbose;
  }
  
  public void setVerbose(int verbose) {
     this.verbose = verbose;
  }
  
  public void connect(IUpdate notify) {
     this.notify = notify;
  }
  
  public String[] requestAttributes() {
    return attributes;
  }
  
  public String[][] requestInstances() {
    String instances[][] = instArray.toArray(new String[0][]);
    instArray = new ArrayList<String[]>();
    return instances;
  }
  
  private void readDS() {
    genson = new GensonBuilder()
      .useDateFormat(new SimpleDateFormat("yyyy-MM-dd"))
      .useIndentation(true)
      .useConstructorWithArguments(false)
      .create();
     
    clientID = UUID.randomUUID().toString();      
    connectionOptions = new MqttConnectOptions();
    connectionOptions.setAutomaticReconnect(true);
    connectionOptions.setCleanSession(true);
    connectionOptions.setConnectionTimeout(20);
     
    try {
      client = new MqttClient(busURI, clientID);
      client.connect(connectionOptions);            
      client.setCallback(this);
      client.subscribe(topic);
      System.out.println("[MQTTConsumer] connected");            
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }
  
  @Override
  public void connectionLost(Throwable cause) {
    System.out.println("[BusConsumerComponent] Connection lost.  Reason: " + cause);
    System.exit(1);
  }

  @Override
  public void messageArrived(String topic, MqttMessage message) throws MqttException {
    Message mess = genson.deserialize(new String(message.getPayload()), Message.class);
    
    SensorReading body = mess.getBody();
    
    count++;
    String reading[] = {Integer.toString(count),
                        Long.toString(body.getTimestamp().getTime()), body.getDimension(),
                        Double.toString(body.getValue()), body.getUnity()}; 
    instArray.add(reading);
    
    switch (verbose) {
       case 1: System.out.println("message: " + count); break;
       case 2: System.out.println("message: " + count);
               System.out.println("topic: " + topic);
               String json = genson.serialize(mess);
               System.out.println(json);
               break;
    }

    if (count == blockSize && notify != null) {
       count = 0;
       notify.update();
    }
  }


  @Override
  public void deliveryComplete(IMqttDeliveryToken token) {
  }
}

com.twosigma.beaker.javash.bkr605576e1.BusConsumerComponent

# Conectando BusConsumer

## Propriedades
* `verbose` - define se mostrará na tela as mensagens recebidas:
  * `0` - não mostra
  * `1` - mostra somente contagem
  * `2` - mostra contagem e mensagem

In [14]:
  try {
    IBusConsumer bc = new BusConsumerComponent();
    bc.setBusURI("tcp://localhost:1883");
    bc.setTopic("sensor/+/+");
    bc.setVerbose(2);  // mostra todas as mensagens
  } catch (Exception e) {
    e.printStackTrace();
  }

[MQTTConsumer] connected


null

message: 1
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":22.699374070018592
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 2
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":22.2903575137629
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 3
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":21.529366319368304
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 4
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":23.092041761632775
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 5
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000

# Conectando com o Console

In [15]:
  try {
     IBusConsumer bc = new BusConsumerComponent();
     bc.setBusURI("tcp://localhost:1883");
     bc.setTopic("sensor/+/+");
     bc.setBlockSize(10);
     bc.setVerbose(2);  // mostra todas as mensagens

     IConsole console = new ConsoleComponent();
     console.connect(bc);

     bc.connect(console);
  } catch (Exception e) {
    e.printStackTrace();
  }

[MQTTConsumer] connected


null

message: 1
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":22.2903575137629
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 2
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":21.529366319368304
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 3
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":23.092041761632775
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 4
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":24.438068573053545
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 5
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000

## Projeção

In [16]:
public interface IProjectionProperties {
  String[] getAttributes();
  void setAttributes(String attribute[]);
}

com.twosigma.beaker.javash.bkr605576e1.IProjectionProperties

In [17]:
public interface IProjection extends IProjectionProperties, ITableProducer, ITableReceptacle {
}

com.twosigma.beaker.javash.bkr605576e1.IProjection

In [18]:
public class ProjectionComponent implements IProjection {
  private ITableProducer provider;
  
  private String attributes[] = null;
  
  public String[] getAttributes() {
    return attributes;
  }

  public void setAttributes(String attributes[]) {
    this.attributes = attributes;
  }
  
  public void connect(ITableProducer provider) {
    this.provider = provider;
  }
  
  public String[] requestAttributes() {
    return attributes;
  }
  
  public String[][] requestInstances() {
    String[][] instances = null;
    
    if (provider != null) {
      String[][] allInstances = provider.requestInstances();
      if (allInstances != null  && attributes != null) {
        instances = new String[allInstances.length][];
        
        // busca a posicao dos atributos selecionados na tabela original
        String[] allAttributes = provider.requestAttributes();
        int attrPos[] = new int[attributes.length];
        for (int as = 0; as < attributes.length; as++) {
          int aa;
          for (aa = 0; aa < allAttributes.length &&
               !attributes[as].equalsIgnoreCase(allAttributes[aa]); aa++)
            /* nothing */;
          if (aa < allAttributes.length)
            attrPos[as] = aa;
          else
            attrPos[as] = -1;
        }
        
        // filtra atributos selecionados
        for (int i = 0; i < allInstances.length; i++) {
          instances[i] = new String[attributes.length];
          for (int as = 0; as < attributes.length; as++)
            if (attrPos[as] > -1)
              instances[i][as] = allInstances[i][attrPos[as]];
        }
      }
    }
    
    return instances;
  }
}

com.twosigma.beaker.javash.bkr605576e1.ProjectionComponent

In [19]:
  try {
    IBusConsumer bc = new BusConsumerComponent();
    bc.setBusURI("tcp://localhost:1883");
    bc.setTopic("sensor/+/+");
    bc.setBlockSize(10);
    bc.setVerbose(2);  // mostra todas as mensagens

    IProjection projection = new ProjectionComponent();
    String[] attributes = {"dimension", "value"};
    projection.setAttributes(attributes);
    projection.connect(bc);

    IConsole console = new ConsoleComponent();
    console.connect(projection);

    bc.connect(console);
  } catch (Exception e) {
    e.printStackTrace();
  }

[MQTTConsumer] connected


null

message: 1
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":21.529366319368304
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 2
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":23.092041761632775
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 3
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":24.438068573053545
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 4
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":24.95818772175452
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 5
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":156600

## Seleção

In [20]:
public interface ISelectionProperties {
  String getAttribute();
  void setAttribute(String attributeA);
  public String getOperator();
  public void setOperator(String operator);
  public String getValue();
  public void setValue(String value);
  boolean isNominalComparison();
  void setNominalComparison(boolean nominalComparison);
}

com.twosigma.beaker.javash.bkr605576e1.ISelectionProperties

In [21]:
public interface ISelection extends ISelectionProperties, ITableProducer, ITableReceptacle {
}

com.twosigma.beaker.javash.bkr605576e1.ISelection

In [22]:
import java.util.ArrayList;

public class SelectionComponent implements ISelection {
  private ITableProducer provider;
  
  private String attribute = null,
                 operator = null,
                 value = null;
  private boolean nominalComparison = true;
  
  public String getAttribute() {
    return attribute;
  }

  public void setAttribute(String attribute) {
    this.attribute = attribute;
  }
  
  public String getOperator() {
    return operator;
  }
  
  public void setOperator(String operator) {
    this.operator = operator;
  }
  
  public String getValue() {
    return value;
  }
  
  public void setValue(String value) {
    this.value = value;
  }
  
  public boolean isNominalComparison() {
    return nominalComparison;
  }

  public void setNominalComparison(boolean nominalComparison) {
    this.nominalComparison = nominalComparison;
  }

  public void connect(ITableProducer provider) {
    this.provider = provider;
  }
  
  public String[] requestAttributes() {
    return (provider == null) ? null : provider.requestAttributes();
  }
  
  public String[][] requestInstances() {
    ArrayList<String[]> instances = null;
    
    if (provider != null) {
      String[][] allInstances = provider.requestInstances();
      
      if (allInstances != null) {
        // busca a posicao dos atributos selecionados na tabela original
        String[] allAttributes = provider.requestAttributes();
        int atrPos;
        for (atrPos = 0; atrPos < allAttributes.length &&
             !attribute.equalsIgnoreCase(allAttributes[atrPos]); atrPos++)
          /* nothing */;
        if (atrPos < allAttributes.length) {
          instances = new ArrayList<String[]>();
          
          for (String[] ai: allInstances) {
            boolean match = false;
            if (nominalComparison) {
              switch (operator.charAt(0)) {
                case '=': if (ai[atrPos].equalsIgnoreCase(value))
                       match = true;
                     break;
                case '<': if (ai[atrPos].compareTo(value) < 0)
                       match = true;
                     break;
                case '>': if (ai[atrPos].compareTo(value) > 0)
                       match = true;
                     break;
                case '!': if (!ai[atrPos].equalsIgnoreCase(value))
                     match = true;
                     break;
              }
            } else {
              switch (operator.charAt(0)) {
                case '=': if (Double.parseDouble(ai[atrPos]) == Double.parseDouble(value))
                            match = true;
                          break;
                case '<': if (Double.parseDouble(ai[atrPos]) < Double.parseDouble(value))
                            match = true;
                          break;
                case '>': if (Double.parseDouble(ai[atrPos]) > Double.parseDouble(value))
                            match = true;
                          break;
                case '!': if (Double.parseDouble(ai[atrPos]) != Double.parseDouble(value))
                            match = true;
                          break;
              }
            }
            if (match)
              instances.add(ai);
                
          }
        }
      }
    }
    
    return (instances == null) ? null : instances.toArray(new String[1][]);
  }
}

com.twosigma.beaker.javash.bkr605576e1.SelectionComponent

In [23]:
  try {
    IBusConsumer bc = new BusConsumerComponent();
    bc.setBusURI("tcp://localhost:1883");
    bc.setTopic("sensor/+/+");
    bc.setBlockSize(10);
    bc.setVerbose(1);  // mostra todas as mensagens

    ISelection selection = new SelectionComponent();
    selection.connect(bc);
    selection.setAttribute("value");
    selection.setOperator(">");
    selection.setValue("22");
    selection.setNominalComparison(false);

    IConsole console = new ConsoleComponent();
    console.connect(selection);

    bc.connect(console);
  } catch (Exception e) {
    e.printStackTrace();
  }

[MQTTConsumer] connected


null

message: 1
message: 2
message: 3
message: 4
message: 5
message: 6
message: 7
message: 8
message: 9
message: 10
=== Attributes ===
message, timestamp, dimension, value, unit

=== Instances ===
1, 1566000000000, temperature, 23.092041761632775, °C
2, 1566000000000, temperature, 24.438068573053545, °C
3, 1566000000000, temperature, 24.95818772175452, °C
4, 1566000000000, temperature, 24.79980553253465, °C
5, 1566000000000, temperature, 25.813849327556408, °C
6, 1566000000000, temperature, 24.74537954247924, °C
7, 1566000000000, temperature, 23.908460544937885, °C
8, 1566000000000, temperature, 23.547730593675183, °C
9, 1566000000000, temperature, 23.219009460017094, °C
message: 1
message: 2
message: 3
message: 4
message: 5
message: 6
message: 7
message: 8
message: 9
message: 10
=== Attributes ===
message, timestamp, dimension, value, unit

=== Instances ===
2, 1566000000000, temperature, 24.649524682230584, °C
3, 1566038372554, temperature, 23.800193511554745, °C
4, 1566000000000, tempera

# Producer Component - Média

In [24]:
public interface IBusProducerProperties extends IBusProperties {
  public int getVerbose();
  public void setVerbose(int verbose);
}

com.twosigma.beaker.javash.bkr605576e1.IBusProducerProperties

In [25]:
public interface IBusProducer extends ITableReceptacle, IBusProducerProperties, IUpdate {
}

com.twosigma.beaker.javash.bkr605576e1.IBusProducer

In [26]:
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.UUID;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import com.owlike.genson.Genson;
import com.owlike.genson.GensonBuilder;

public class BusProducerComponent implements IBusProducer, MqttCallback {
  private String busURI = null;
  private String topic = null;
  private int verbose = 2;
  
  private int count = 0;
  
  private ITableProducer iProducer;
  
  private Genson genson;

  private MqttClient client;
  private MqttConnectOptions connectionOptions;
  private String clientID;
  private final int qos = 1; 
  
  public BusProducerComponent() {
     genson = new GensonBuilder()
           .useDateFormat(new SimpleDateFormat("yyyy-MM-dd"))
           .useIndentation(true)
           .useConstructorWithArguments(false)
           .create();
  }

  public String getBusURI() {
    return busURI;
  }

  public void setBusURI(String busURI) {
    this.busURI = busURI;
    clientID = UUID.randomUUID().toString();      
    connectionOptions = new MqttConnectOptions();
    connectionOptions.setAutomaticReconnect(true);
    connectionOptions.setCleanSession(true);
    connectionOptions.setConnectionTimeout(20);
     
    try {
      client = new MqttClient(busURI, clientID);
      client.connect(connectionOptions);            
      client.setCallback(this);
      System.out.println("[MQTTProducer] connected");            
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }
  
  public String getTopic() {
     return topic;
  }
  
  public void setTopic(String topic) {
     this.topic = topic;
  }
  
  public int getVerbose() {
     return verbose;
  }
  
  public void setVerbose(int verbose) {
     this.verbose = verbose;
  }
  
  public void connect(ITableProducer producer) {
     iProducer = producer;
  }
   
  public void update() {
     if (iProducer != null) {
        String instances[][] = iProducer.requestInstances();
        double av = 0;
        for (int i = 0; i < instances.length; i++)
           av += Double.parseDouble(instances[i][3]);
        av = (av == 0) ? 0 : av / instances.length;
        
        Message mess = new Message("statistics", "average", "calculus",
              new SensorReading(Calendar.getInstance().getTime(), "temperature", av, instances[0][4])
        );
        String json = genson.serialize(mess);
        MqttMessage message = new MqttMessage(json.getBytes());
        message.setQos(qos);
        
         try {
            client.publish(topic, message);
          } catch (MqttException e) {
            e.printStackTrace();
          }
        count++;
        switch (verbose) {
        case 1: System.out.println("message: " + count); break;
        case 2: System.out.println("message: " + count);
                System.out.println(json);
                break;
        }        
     }
   }
  
  @Override
  public void connectionLost(Throwable cause) {
    System.out.println("[BusProducerComponent] Connection lost.  Reason: " + cause);
    System.exit(1);
  }

  @Override
  public void messageArrived(String topic, MqttMessage message) throws MqttException {
  }

  @Override
  public void deliveryComplete(IMqttDeliveryToken token) {
  }
}

com.twosigma.beaker.javash.bkr605576e1.BusProducerComponent

# Publicando a Média

In [27]:
  try {
     IBusConsumer bc = new BusConsumerComponent();
     bc.setBusURI("tcp://localhost:1883");
     bc.setTopic("sensor/+/+");
     bc.setBlockSize(10);
     bc.setVerbose(2);  // mostra todas as mensagens

     IBusProducer bp = new BusProducerComponent();
     bp.setBusURI("tcp://localhost:1883");
     bp.setTopic("sensor/avg/temperature_avg");
     bp.connect(bc);

     bc.connect(bp);
  } catch (Exception e) {
    e.printStackTrace();
  }

[MQTTConsumer] connected
[MQTTProducer] connected


null

message: 1
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":24.95818772175452
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 2
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":24.79980553253465
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 3
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":25.813849327556408
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 4
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":1566000000000,
    "unity":"°C",
    "value":24.74537954247924
  },
  "name":"1",
  "source":"sensor",
  "type":"reading"
}
message: 5
topic: sensor/1/temperature
{
  "body":{
    "dimension":"temperature",
    "timestamp":15660000

## Chart

In [28]:
public interface IChartProperties {
    
    String getTitle();
    void setTitle(String title);
    String getXTitle();
    void setXTitle(String title);
    String getYTitle();
    void setYTitle(String title);

}

com.twosigma.beaker.javash.bkr605576e1.IChartProperties

In [29]:
public interface IRun {

    public boolean start();
    public boolean stop();
    
}

com.twosigma.beaker.javash.bkr605576e1.IRun

In [30]:
public interface IChart extends ITableReceptacle, IRun, IChartProperties {
    
    /* Component Interfaces Set */

}

com.twosigma.beaker.javash.bkr605576e1.IChart

In [31]:
%classpath add jar ../../../../src/lib/xchart-3.5.2.jar

In [32]:
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;

import java.io.IOException;

import org.knowm.xchart.BubbleChart;
import org.knowm.xchart.BubbleChartBuilder;
import org.knowm.xchart.SwingWrapper;
import org.knowm.xchart.BitmapEncoder;
import org.knowm.xchart.BitmapEncoder.BitmapFormat;

import com.twosigma.beakerx.mimetype.MIMEContainer;
import java.io.File;
import java.nio.file.Files;

public class ChartBubbleComponent implements IChart {
  private BubbleChart chart = null;

  private String title = "Chart";
  private String xTitle = "X",
                 yTitle = "Y";

  private ITableProducer producer = null;
 
  public void connect(ITableProducer producer) {
    this.producer = producer;
  }
  
  public ChartBubbleComponent() {
  }
  
  public String getTitle() {
    return title;
  }
  
  public void setTitle(String title) {
    this.title = title;
  }

  public String getXTitle() {
    return xTitle;
  }
  
  public void setXTitle(String title) {
    xTitle = title;
  }

  public String getYTitle() {
    return title;
  }
  
  public void setYTitle(String title) {
    yTitle = title;
  }

  public boolean start() {
    boolean status = true;
    
    chart = new BubbleChartBuilder().width(600).height(400).title(title).xAxisTitle(xTitle).yAxisTitle(yTitle).build();
    try{
        
        buildChart();         
        BitmapEncoder.saveBitmap(chart, "./chart", BitmapFormat.PNG);
        
        File file = new File("chart.png");
        byte[] data = Files.readAllBytes(file.toPath());
    
        MIMEContainer image = new MIMEContainer(MIMEContainer.MIME.IMAGE_PNG, data);
        display(image);
      }catch(IOException ex){
        status = false;
        System.out.println (ex.toString());      
      }
        
    return status;
  }
  
  public boolean stop() {
    return true;
  }
  
  public double[] toDouble(String[][] instances, int column ) {
    double[] numbers = new double[instances.length];
    for (int i = 0; i < instances.length; i++)
      numbers[i] = Double.parseDouble(instances[i][column]);
    return numbers;
  }

  private void buildChart() {
    if (chart != null && producer != null) {
      String[][] instances = producer.requestInstances();

      if (instances != null) {
        double[] xData = toDouble(instances, 0),
                 yData = toDouble(instances, 1);
        
        String[] categoryData = null;
        if (instances[0].length > 2) {
          categoryData = new String[instances.length];
          for (int c = 0; c < instances.length; c++)
            categoryData[c] = instances[c][2];
        }
        
        double[] bubbleData = null;
        if (instances[0].length > 3)
          bubbleData = toDouble(producer.requestInstances(), 3);
        else {
          bubbleData = new double[instances.length];
          for (int i = 0; i < xData.length; i++)
            bubbleData[i] = 10;
        }
          
        
        if (categoryData == null ||
            xData.length != yData.length || yData.length != bubbleData.length ||
            bubbleData.length != categoryData.length) {
          chart.addSeries(" ", xData, yData, bubbleData);
        } else {
          Hashtable<String,String> hash = new Hashtable<String,String>();
          int outer = 0;
          int size = categoryData.length;
          while (outer < size) {
            if (hash.containsKey(categoryData[outer]))
              outer++;
            else {
              hash.put(categoryData[outer],categoryData[outer]);
              List<Double> xSub = new ArrayList<Double>(),
                           ySub = new ArrayList<Double>(),
                           bubbleSub = new ArrayList<Double>();
              for (int inner = outer; inner < size; inner++)
                if (categoryData[inner].equalsIgnoreCase(categoryData[outer])) {
                  xSub.add(xData[inner]);
                  ySub.add(yData[inner]);
                  bubbleSub.add(bubbleData[inner]);
                }
              chart.addSeries(categoryData[outer], xSub, ySub, bubbleSub);
              outer++;
            }
          }
        }
      }
      
    }
    
  }
  
}

com.twosigma.beaker.javash.bkr605576e1.ChartBubbleComponent

In [33]:
IDataSet dataset = new DataSetComponent();
dataset.setDataSource("../../../../src/db/datasets/zombie/complete/zombie-health-spreadsheet-ml-training.csv");

IProjection projection = new ProjectionComponent();
projection.connect(dataset);
String attributes[] = {"days_recovery", "age"};
projection.setAttributes(attributes);

IChart chart = new ChartBubbleComponent();
chart.setTitle("Zombie Health");
chart.setXTitle("Days Recovery");
chart.setYTitle("Age");
chart.connect(projection);

return chart.start();


cannot find symbol: cannot find symbol

## Statistics

In [34]:
public interface IStatisticsProperties {

    int getSize();

    double[] getValueSet();

    void setValueSet(double[] valueSet);

    double getValueSet(int index);

    void setValueSet(int index, double value);

}

com.twosigma.beaker.javash.bkr605576e1.IStatisticsProperties

In [35]:
/**
 * Interface for a Statistics Class that registers a set of numbers
 * and calculates the sum and average of these numbers. 
 * 
 * @author Andre Santanche
 */
public interface IStatisticsServices {
   /**
    * Inserts a value into the set.
    * @param value the value to be inserted into the set
    */
    public void insertValue(double value);

    
    /**
     * Calculates the sum of the values in the set. Returns zero if the set is empty.
     * @return sum of the values in the set
     */
    public double sum();
  
    
    /**
     * Calculates the average of the values in the set. Returns zero if the set is empty.
     * @return average of the values in the set
     */
    public double average();
}


com.twosigma.beaker.javash.bkr605576e1.IStatisticsServices

In [36]:
public interface IStatistics extends IStatisticsServices, IStatisticsProperties {

}

com.twosigma.beaker.javash.bkr605576e1.IStatistics

In [37]:
import java.util.Vector;

/**
 * Registers a set of numbers and calculates the sum and average of these numbers.
 * 
 * @author Andre Santanche
 */
public class StatisticsComponent implements IStatistics {
   private Vector<Double> valueSet;
   
   /*
    * Constructor
    **************/

   public StatisticsComponent() {
      super();
      valueSet = new Vector<Double>();
   }
   
   public StatisticsComponent(int capacity) {
      super();
      valueSet = new Vector<Double>(capacity);
   }

   /* Properties
    ************/
   
   public int getSize() {
       return valueSet.size();
   }
   
   public double[] getValueSet() {
      int size = valueSet.size();
      double result[] = new double[size];
      for (int d = 0; d < size; d++)
         result[d] = valueSet.get(d);
      return result;
   }

   public void setValueSet(double[] valueSet) {
      for (int d = 0; d < valueSet.length; d++)
         this.valueSet.add(valueSet[d]);
   }
   
   public double getValueSet(int index) {
        return (index < getSize()) ? valueSet.get(index) : 0;
   }
   
   public void setValueSet(int index, double value) {
        int position = (index < getSize()) ? index : getSize();
        if (position < getSize())
            valueSet.set(index, value);
        else
            valueSet.add(value);
   }

   /*
    * IStatisticsServices Interface
    *******************************/
   
   public void insertValue(double value) {
      valueSet.add(value);
   }

   public double sum() {
      double theSum = 0.0f;
      
      for (double value : valueSet)
         theSum += value;
      
      return theSum;
   }

   public double average() {
      double avg = 0;
      
      if (valueSet.size() > 0)
         avg = sum() / valueSet.size();
      
      return avg;
   }
}

com.twosigma.beaker.javash.bkr605576e1.StatisticsComponent

In [38]:
try {
   IStatistics stat = new StatisticsComponent(3);

   System.out.println("inserido valor: " + 50.0f);
   stat.insertValue(50.0f);
   System.out.println("inserido valor: " + 70.0f);
   stat.insertValue(70.0f);
   System.out.println("inserido valor: " + 30.0f);
   stat.insertValue(30.0f);

   System.out.println("-- somatorio: " + stat.sum());
   System.out.println("-- media: " + stat.average());
   System.out.println("-- tamanho: " + stat.getSize());
} catch (Exception e) {
   e.printStackTrace();
}

inserido valor: 50.0
inserido valor: 70.0
inserido valor: 30.0
-- somatorio: 150.0
-- media: 50.0
-- tamanho: 3


null

## Statistics Required

In [39]:
public interface IRun {
    
    public void start();
    
}

com.twosigma.beaker.javash.bkr605576e1.IRun

In [40]:
public interface IStatisticsReceptacle{
    
    public void connect(IStatistics provider);
    
}

com.twosigma.beaker.javash.bkr605576e1.IStatisticsReceptacle

In [41]:
public interface IClientProperties {
    public int getSize();    
    public void setSize(int size);
}

com.twosigma.beaker.javash.bkr605576e1.IClientProperties

In [42]:
public interface IClient extends IStatisticsReceptacle, IClientProperties, IRun {

}

com.twosigma.beaker.javash.bkr605576e1.IClient

In [43]:
public class ClientComponent implements IClient {
    public final static int STANDARD_SIZE = 5;
   
    private int size;
    private int first, second;
   
    private IStatistics provider = null;
    
    /*
     * Constructor
     *************/
    
    public ClientComponent() {
      super();
      size = STANDARD_SIZE;
      first = 1;
      second = 1;
    }
    
    /*
     * Properties
     */
    
    public int getSize() {
        return size;
    }
    
    public void setSize(int size) {
        this.size = size;
    }
    
    /*
     * IStatisticsReceptacle Interface
     **********************************/
    
    public void connect(IStatistics provider)
    {
        this.provider = provider;
    }
    
    public int next() {
       int returnNumber = first;
       first = second;
       second = returnNumber + first;
       return returnNumber;
    }
    
    public void start()
    {
       for (int f = 1; f <= size; f++) {
          int nextNumber = next();
          System.out.println("produzido: " + nextNumber);
          provider.insertValue(nextNumber);
       }
   
       System.out.println("-- somatorio: " + provider.sum());
       System.out.println("-- media: " + provider.average());
    }
    
}

com.twosigma.beaker.javash.bkr605576e1.ClientComponent

# Exercícios

## Exercício 1

Conecte os componentes que você achar necessário para mostrar no console o nome e a idade de zumbis com infecção bacteriana.

## Exercício 2

Considerando que o `ClientComponent` gera uma sequência de Fibonacci e que a propriedade `size` deste componente define quantos elementos haverá nessa sequência, conecte `StatisticsComponent` e o `ClientComponent` para que em conjunto calculem a média dos 10 primeiros termos da sequência de Fibonacci.

## Exercício 3

Escreva um componente que se ligue a um componente que forneça a interface `ITableProducer` e que gere uma saída convertendo a primeira coluna da tabela em um vetor de double. Este componente espera que a primeira coluna seja double.

## Exercício 4

Usando o seu componente e o `StatisticsComponent`, escreva um código que calcule a média de idade de zumbis.

## Exercício 5

Construa um componente que, dado um dataset e duas colunas alvo, faça a média dos valores da segunda coluna em relação a cada valor distinto da primeira coluna. Seu componente espera o dataset e duas variáveis do tipo `String` referentes à colunas alvo. A primeira String é referente a uma coluna com valores categóricos do Dataset. A segunda String é referente a uma coluna numérica do dataset. Seu componente deverá fazer, para cada valor distinto da primeira coluna alvo, a média dos valores da segunda coluna alvo.


Considere, por exemplo, o csv disponível no caminho `"../../../../src/db/datasets/zombie/complete/zombie-health-spreadsheet-ml-training.csv"`. 
Dadas as Strings `column1 = "diagnostic"` e `column2 = "age"`, seu componente deverá fazer a média das idades associadas a cada um dos diagnósticos.

Este componente deve obrigatoriamente ser feito pela conexão de outros componentes.

In [44]:
import java.util.Date;

public class SensorReading {
   private Date timestamp;
   private String dimension;
   private double value;
   private String unity;
   
   public Date getTimestamp() {
      return timestamp;
   }
   
   public void setTimestamp(Date timestamp) {
      this.timestamp = timestamp;
   }
   
   public String getDimension() {
      return dimension;
   }
   
   public void setDimension(String dimension) {
      this.dimension = dimension;
   }
   
   public double getValue() {
      return value;
   }
   
   public void setValue(double value) {
      this.value = value;
   }
   
   public String getUnity() {
      return unity;
   }
   
   public void setUnity(String unity) {
      this.unity = unity;
   }
}

com.twosigma.beaker.javash.bkr605576e1.SensorReading