-
Notifications
You must be signed in to change notification settings - Fork 40
/
PublishAccountStatusUpdate.java
82 lines (68 loc) · 2.71 KB
/
PublishAccountStatusUpdate.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.hortonworks.iot.financial.bolts;
import java.util.HashMap;
import java.util.Map;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import com.hortonworks.iot.financial.events.CustomerResponse;
import com.hortonworks.iot.financial.util.Constants;
/*
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
*/
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class PublishAccountStatusUpdate extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private Constants constants;
private BayeuxClient bayuexClient;
private OutputCollector collector;
public void execute(Tuple tuple) {
CustomerResponse customerResponse = (CustomerResponse) tuple.getValueByField("CustomerResponse");
Map<String, Object> data = new HashMap<String, Object>();
data.put("accountNumber", customerResponse.getAccountNumber());
data.put("transactionId", customerResponse.getTransactionId());
data.put("fraudulent", customerResponse.getFraudulent());
bayuexClient.getChannel(constants.getAccountStatusUpdateChannel()).publish(data);
collector.emit(tuple, new Values((CustomerResponse) customerResponse));
collector.ack(tuple);
}
public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
this.collector = collector;
constants = new Constants();
HttpClient httpClient = new HttpClient();
try {
httpClient.start();
} catch (Exception e) {
e.printStackTrace();
}
// Prepare the transport
Map<String, Object> options = new HashMap<String, Object>();
ClientTransport transport = new LongPollingTransport(options, httpClient);
// Create the BayeuxClient
bayuexClient = new BayeuxClient(constants.getPubSubUrl(), transport);
bayuexClient.handshake();
boolean handshaken = bayuexClient.waitFor(3000, BayeuxClient.State.CONNECTED);
if (handshaken)
{
System.out.println("Connected to Cometd Http PubSub Platform");
}
else{
System.out.println("Could not connect to Cometd Http PubSub Platform");
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("CustomerResponse"));
}
}