forked from apache/ignite
-
Notifications
You must be signed in to change notification settings - Fork 0
/
StormIgniteStreamerSelfTest.java
176 lines (152 loc) · 6.47 KB
/
StormIgniteStreamerSelfTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.stream.storm;
import backtype.storm.ILocalCluster;
import backtype.storm.LocalCluster;
import backtype.storm.Testing;
import backtype.storm.generated.StormTopology;
import backtype.storm.testing.CompleteTopologyParam;
import backtype.storm.testing.MkClusterParam;
import backtype.storm.testing.MockedSources;
import backtype.storm.testing.TestJob;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.Config;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* Tests {@link StormStreamer}.
*/
public class StormIgniteStreamerSelfTest extends GridCommonAbstractTest {
/** Storm stream object initialization. */
StormStreamer<String, String, String> stormStreamer = null;
/** Count. */
private static final int CNT = 100;
/** Cache Name */
private static final String cacheName = "testCache";
private Ignite ignite;
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected void beforeTest() throws Exception {
IgniteConfiguration cfg = loadConfiguration("modules/storm/src/test/resources/example-ignite.xml");
cfg.setClientMode(false);
ignite = startGrid("igniteServerNode", cfg);
//
// IgniteConfiguration cfg2 = loadConfiguration("modules/storm/src/test/resources/example-ignite.xml");
//
// cfg2.setClientMode(false);
//
// ignite = startGrid("igniteServerNode2", cfg2);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
}
/**
* Test with the bolt Ignite started in bolt.
* NOTE: the only working solutions for now.
* @throws TimeoutException
* @throws InterruptedException
*/
public void testStormStreamerIgniteBolt() throws TimeoutException, InterruptedException {
stormStreamer = new StormStreamer<>();
stormStreamer.setThreads(5);
startSimulatedTopology(stormStreamer);
}
/**
* Note to run this on TC: the time out has to be setted in according
* to power of the server. In a simple dual core it takes 6 sec.
* look setMessageTimeoutSecs parameter.
* @param stormStreamer the storm streamer in Ignite
*/
public void startSimulatedTopology (final StormStreamer stormStreamer) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout( "spout", new StormSpout() );
builder.setBolt( "bolt", stormStreamer )
.shuffleGrouping("spout");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
compareStreamCacheData(new StormSpout().getKeyValMap());
cluster.killTopology("test");
cluster.shutdown();
// MkClusterParam mkClusterParam = new MkClusterParam();
// mkClusterParam.setSupervisors(4);
//
// Config daemonConf = new Config();
// daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false);
//
// mkClusterParam.setDaemonConf(daemonConf);
//
// Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() {
// @Override
// public void run(ILocalCluster cluster) throws IOException {
// /* Storm topology builder. */
// TopologyBuilder builder = new TopologyBuilder();
//
// StormSpout stormSpout = new StormSpout();
//
// /*Set storm spout in topology builder. */
// builder.setSpout("spout", stormSpout);
//
// /*Set bolt spout in topology builder. */
// builder.setBolt("bolt", stormStreamer)
// .shuffleGrouping("spout");
//
// /* Create storm topology. */
// StormTopology topology = builder.createTopology();
//
// MockedSources mockedSources = new MockedSources();
//
// //Our spout will be processing this values.
// mockedSources.addMockData("spout", new Values(stormSpout.getKeyValMap()));
//
// // prepare the config
// Config conf = new Config();
//
// // this parameter is necessary
// conf.setMessageTimeoutSecs(10);
//
// CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
// completeTopologyParam.setMockedSources(mockedSources);
// completeTopologyParam.setStormConf(conf);
//
// Map result = Testing.completeTopology(cluster, topology, completeTopologyParam);
// compareStreamCacheData(stormSpout.getKeyValMap());
// }
// }
// );
}
public void compareStreamCacheData(HashMap<String, String> keyValMap){
// Ignite ignite = grid();
// Get the cache.
IgniteCache<String, String> cache = ignite.getOrCreateCache(cacheName);
for (Map.Entry<String, String> entry : keyValMap.entrySet()) {
System.out.println(" Key === " +entry.getKey() + " Value ==== " + cache.get(entry.getKey()));
assertEquals(entry.getValue(), cache.get(entry.getKey()));
}
}
}