/
PowerBIPush.java
346 lines (315 loc) · 11.6 KB
/
PowerBIPush.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
/* Generated by Streams Studio: January 1, 2016 at 11:12:21 PM GMT+1 */
package com.ibm.streams.powerbi;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.http.client.ClientProtocolException;
import org.apache.log4j.Logger;
import org.com.powerbi.streams.PowerBI;
import org.com.powerbi.streams.PowerBI.TableValue;
import com.ibm.streams.operator.AbstractOperator;
import com.ibm.streams.operator.OperatorContext;
import com.ibm.streams.operator.StreamSchema;
import com.ibm.streams.operator.StreamingData.Punctuation;
import com.ibm.streams.operator.StreamingInput;
import com.ibm.streams.operator.Tuple;
import com.ibm.streams.operator.Type;
import com.ibm.streams.operator.log4j.TraceLevel;
import com.ibm.streams.operator.model.InputPortSet;
import com.ibm.streams.operator.model.InputPortSet.WindowMode;
import com.ibm.streams.operator.model.InputPortSet.WindowPunctuationInputMode;
import com.ibm.streams.operator.model.InputPorts;
import com.ibm.streams.operator.model.Libraries;
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.model.PrimitiveOperator;
/**
* Class for an operator that consumes tuples and does not produce an output
* stream. This pattern supports a number of input streams and no output
* streams.
* <P>
* The following event methods from the Operator interface can be called:
* </p>
* <ul>
* <li><code>initialize()</code> to perform operator initialization</li>
* <li>allPortsReady() notification indicates the operator's ports are ready to
* process and submit tuples</li>
* <li>process() handles a tuple arriving on an input port
* <li>processPuncuation() handles a punctuation mark arriving on an input port
* <li>shutdown() to shutdown the operator. A shutdown request may occur at any
* time, such as a request to stop a PE or cancel a job. Thus the shutdown() may
* occur while the operator is processing tuples, punctuation marks, or even
* during port ready notification.</li>
* </ul>
* <p>
* With the exception of operator initialization, all the other events may occur
* concurrently with each other, which lead to these methods being called
* concurrently by different threads.
* </p>
*/
@Libraries({ "impl/lib/*" })
@PrimitiveOperator(name = "PowerBI", namespace = "com.ibm.streams.powerbi", description = "Java Operator PowerBI")
@InputPorts({
@InputPortSet(description = "Port that ingests tuples", cardinality = 1, optional = false, windowingMode = WindowMode.NonWindowed, windowPunctuationInputMode = WindowPunctuationInputMode.Oblivious) })
public class PowerBIPush extends AbstractOperator {
private final Logger log = Logger.getLogger(this.getClass());
/** parameter */
private String oauth_username;
/** parameter */
private String oauth_password;
/** parameter */
private String oauth_clientid;
/** parameter */
private String datasetName;
/** parameter */
private String tablename;
/** parameter */
private int flushsize = 1;
/** parameter */
private boolean clearfirstly = false;
/** token id after authentication */
private String token = null;
/** data set id */
private String datasetid = null;
/** buffer for tuples. Access if synchronized. */
private final List<Map<String, TableValue>> rlist = new ArrayList<Map<String, TableValue>>();
private boolean isPunctuation() {
return flushsize == 0;
}
public static class PowerException extends Exception {
private static final long serialVersionUID = 1L;
PowerException(String mess) {
super(mess);
}
}
private void failure(String mess) throws PowerException {
log.log(TraceLevel.ERROR, mess);
throw new PowerException(mess);
}
private Map<String, String> createTableSchema(OperatorContext context) throws PowerException {
StreamSchema sche = context.getStreamingInputs().get(0).getStreamSchema();
Map<String, String> bischema = new HashMap<String, String>();
for (String name : sche.getAttributeNames()) {
Type.MetaType ty = sche.getAttribute(name).getType().getMetaType();
String biType = null;
switch (ty) {
case RSTRING:
case USTRING:
case ENUM:
biType = PowerBI.STRING_TYPE;
break;
case INT8:
case INT32:
case INT64:
case UINT8:
case UINT16:
case UINT32:
case UINT64:
case INT16:
biType = PowerBI.INT64_TYPE;
break;
case DECIMAL128:
case DECIMAL32:
case DECIMAL64:
case FLOAT32:
case FLOAT64:
biType = PowerBI.DOUBLE_TYPE;
break;
case BOOLEAN:
biType = PowerBI.BOOL_TYPE;
break;
case TIMESTAMP:
biType = PowerBI.DATETIME_TYPE;
break;
default:
break;
}
if (biType == null)
failure("Attribute " + name + " type " + ty.getLanguageType() + " not supported");
log.log(TraceLevel.DEBUG, "Attribute " + name + " type " + ty.getLanguageType() + " mapped to " + biType);
bischema.put(name, biType);
}
return bischema;
}
/**
* Initialize this operator. Called once before any tuples are processed.
*
* @param context
* OperatorContext for this operator.
* @throws Exception
* Operator failure, will cause the enclosing PE to terminate.
*/
@Override
public synchronized void initialize(OperatorContext context) throws Exception {
// Must call super.initialize(context) to correctly setup an operator.
super.initialize(context);
log.trace("Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: "
+ context.getPE().getJobId());
log.log(TraceLevel.INFO, "Logging on the Power BI using " + oauth_username);
log.log(TraceLevel.INFO, "Database set " + datasetName + " table name " + tablename);
if (clearfirstly)
log.log(TraceLevel.INFO, "Data is cleared at the beginning");
else
log.log(TraceLevel.INFO, "Data is not cleared at the beginning");
log.log(TraceLevel.INFO, "Flush buffer size " + flushsize);
if (isPunctuation())
log.log(TraceLevel.INFO, "Data is flushed and sent to PowerBI at the punctuation marker.");
token = PowerBI.getAuthToken(oauth_username, oauth_password, oauth_clientid);
if (token == null)
failure("Authentication failed");
log.log(TraceLevel.INFO, "Authentication successfull");
log.log(TraceLevel.INFO, "Check or create table schema if necessary");
Map<String, String> schema = createTableSchema(context);
datasetid = PowerBI.checkTableDataSet(token, datasetName, tablename, schema);
if (clearfirstly) {
log.log(TraceLevel.INFO, "Data cleansing started");
PowerBI.clearTable(token, datasetid, tablename);
log.log(TraceLevel.INFO, "Data cleansing completed");
}
if (datasetid == null)
failure("Cannot find or create table " + tablename + " in data set " + datasetName);
log.log(TraceLevel.INFO, "Success, table name " + tablename + " in " + datasetName + " accessible");
}
/**
* Notification that initialization is complete and all input and output
* ports are connected and ready to receive and submit tuples.
*
* @throws Exception
* Operator failure, will cause the enclosing PE to terminate.
*/
@Override
public synchronized void allPortsReady() throws Exception {
// This method is commonly used by source operators.
// Operators that process incoming tuples generally do not need this
// notification.
OperatorContext context = getOperatorContext();
log.trace("Operator " + context.getName() + " all ports are ready in PE: " + context.getPE().getPEId()
+ " in Job: " + context.getPE().getJobId());
}
private synchronized void pushData(Map<String, TableValue> elem) throws ClientProtocolException, IOException {
rlist.add(elem);
if (rlist.size() >= flushsize && !isPunctuation()) {
PowerBI.addTableRows(token, datasetid, tablename, rlist);
rlist.clear();
}
}
private synchronized void pushPunctuation() throws ClientProtocolException, IOException {
PowerBI.addTableRows(token, datasetid, tablename, rlist);
rlist.clear();
}
/**
* Process an incoming tuple that arrived on the specified port.
*
* @param stream
* Port the tuple is arriving on.
* @param tuple
* Object representing the incoming tuple.
* @throws Exception
* Operator failure, will cause the enclosing PE to terminate.
*/
@Override
public void process(StreamingInput<Tuple> stream, Tuple tuple) throws Exception {
StreamSchema sche = stream.getStreamSchema();
Map<String, TableValue> elem = new HashMap<String, TableValue>();
for (String name : sche.getAttributeNames()) {
Type.MetaType ty = sche.getAttribute(name).getType().getMetaType();
TableValue val = null;
switch (ty) {
case RSTRING:
case USTRING:
case ENUM:
val = new TableValue(tuple.getString(name));
break;
case INT8:
case INT32:
case INT64:
case INT16:
case UINT8:
case UINT16:
case UINT32:
case UINT64:
val = new TableValue(tuple.getLong(name));
break;
case DECIMAL128:
case DECIMAL32:
case DECIMAL64:
case FLOAT32:
case FLOAT64:
val = new TableValue(tuple.getDouble(name));
break;
case BOOLEAN:
val = new TableValue(tuple.getBoolean(name));
break;
case TIMESTAMP:
val = new TableValue(new Date(tuple.getTimestamp(name).getTime()), null);
break;
default:
break;
}
if (val != null)
elem.put(name, val);
}
pushData(elem);
}
/**
* Process an incoming punctuation that arrived on the specified port.
*
* @param stream
* Port the punctuation is arriving on.
* @param mark
* The punctuation mark
* @throws Exception
* Operator failure, will cause the enclosing PE to terminate.
*/
@Override
public void processPunctuation(StreamingInput<Tuple> stream, Punctuation mark) throws Exception {
if (isPunctuation())
pushPunctuation();
}
/**
* Shutdown this operator.
*
* @throws Exception
* Operator failure, will cause the enclosing PE to terminate.
*/
@Override
public synchronized void shutdown() throws Exception {
OperatorContext context = getOperatorContext();
log.trace("Operator " + context.getName() + " shutting down in PE: " + context.getPE().getPEId() + " in Job: "
+ context.getPE().getJobId());
// TODO: If needed, close connections or release resources related to
// any external system or data store.
// Must call super.shutdown()
super.shutdown();
}
@Parameter(description = "This parameter is mandatory. It is oauth username.")
public void setOauth_username(String oauth_username) {
this.oauth_username = oauth_username;
}
@Parameter(description = "This parameter is mandatory. It is oauth password.")
public void setOauth_password(String oauth_password) {
this.oauth_password = oauth_password;
}
@Parameter(description = "This parameter is mandatory. It is AZURE AD client id.")
public void setOauth_clientid(String oauth_clientid) {
this.oauth_clientid = oauth_clientid;
}
@Parameter(description = "This parameter is mandatory. It is table name inside data set.")
public void setTablename(String tablename) {
this.tablename = tablename;
}
@Parameter(description = "This parameter is mandatory. It is data set name.")
public void setDatasetName(String datasetName) {
this.datasetName = datasetName;
}
@Parameter(optional = true, description = "This parameter is optional. Number of tuples to be sent to PowerBI. If not specified default is 1. If 0 then data is flushed at the puctuation marker.")
public void setFlushsize(int flushsize) {
this.flushsize = flushsize;
}
@Parameter(optional = true, description = "This parameter is optional. If true then table is clear at the begining.")
public void setClearfirstly(boolean clearfirstly) {
this.clearfirstly = clearfirstly;
}
}