Skip to content
Permalink
Browse files

Merge pull request #3086 from telstra/parralelism_for_spouts

Decrease parallelism for reply kafka spouts in flr
  • Loading branch information
niksv committed Dec 30, 2019
2 parents 2a978cc + 197616e commit 3c823fc3d82dd4a5bac7096b7375c2925942be2e
@@ -74,9 +74,10 @@ private void createKildaFlowHsKafkaBolt(TopologyBuilder builder, int parallelism
.shuffleGrouping(ComponentType.KILDA_FLOW_HS_REPLY_BOLT, Stream.KILDA_HS_FLOW);
}

private void createKildaFlowReplyStream(TopologyBuilder builder, int parallelism, KafkaTopicsConfig topicsConfig,
private void createKildaFlowReplyStream(TopologyBuilder builder, int spoutParallelism,
int parallelism, KafkaTopicsConfig topicsConfig,
List<String> kildaFlowTopics) {
createKildaFlowSpout(builder, parallelism, kildaFlowTopics);
createKildaFlowSpout(builder, spoutParallelism, kildaFlowTopics);
createKildaFlowKafkaBolt(builder, parallelism, topicsConfig);

ReplyBolt replyBolt = new ReplyBolt(Stream.KILDA_FLOW);
@@ -85,9 +86,9 @@ private void createKildaFlowReplyStream(TopologyBuilder builder, int parallelism

}

private void createKildaFlowHsReplyStream(TopologyBuilder builder, int parallelism,
private void createKildaFlowHsReplyStream(TopologyBuilder builder, int spoutParallelism, int parallelism,
KafkaTopicsConfig topicsConfig, List<String> kildaFlowHsTopics) {
createKildaFlowHsSpout(builder, parallelism, kildaFlowHsTopics);
createKildaFlowHsSpout(builder, spoutParallelism, kildaFlowHsTopics);
createKildaFlowHsKafkaBolt(builder, parallelism, topicsConfig);

ReplyBolt replyBolt = new ReplyBolt(Stream.KILDA_HS_FLOW);
@@ -109,9 +110,10 @@ private void createKildaPingKafkaBolt(TopologyBuilder builder, int parallelism,
}


private void createKildaPingReplyStream(TopologyBuilder builder, int parallelism, KafkaTopicsConfig topicsConfig,
private void createKildaPingReplyStream(TopologyBuilder builder, int spoutParallelism, int parallelism,
KafkaTopicsConfig topicsConfig,
List<String> kildaPingTopics) {
createKildaPingSpout(builder, parallelism, kildaPingTopics);
createKildaPingSpout(builder, spoutParallelism, kildaPingTopics);
createKildaPingKafkaBolt(builder, parallelism, topicsConfig);

ReplyBolt replyBolt = new ReplyBolt(Stream.KILDA_PING);
@@ -133,9 +135,10 @@ private void createKildaStatsKafkaBolt(TopologyBuilder builder, int parallelism,
}


private void createKildaStatsReplyStream(TopologyBuilder builder, int parallelism, KafkaTopicsConfig topicsConfig,
private void createKildaStatsReplyStream(TopologyBuilder builder, int spoutParallelism, int parallelism,
KafkaTopicsConfig topicsConfig,
List<String> kildaStatsTopics) {
createKildaStatsSpout(builder, parallelism, kildaStatsTopics);
createKildaStatsSpout(builder, spoutParallelism, kildaStatsTopics);
createKildaStatsKafkaBolt(builder, parallelism, topicsConfig);

ReplyBolt replyBolt = new ReplyBolt(Stream.KILDA_STATS);
@@ -157,9 +160,9 @@ private void createKildaIslLatencyKafkaBolt(TopologyBuilder builder, int paralle
.shuffleGrouping(ComponentType.KILDA_ISL_LATENCY_REPLY_BOLT, Stream.KILDA_ISL_LATENCY);
}

private void createKildaIslLatencyReplyStream(TopologyBuilder builder, int parallelism,
private void createKildaIslLatencyReplyStream(TopologyBuilder builder, int spoutParallelism, int parallelism,
KafkaTopicsConfig topicsConfig, List<String> kildaIslLatencyTopics) {
createKildaIslLatencySpout(builder, parallelism, kildaIslLatencyTopics);
createKildaIslLatencySpout(builder, spoutParallelism, kildaIslLatencyTopics);
createKildaIslLatencyKafkaBolt(builder, parallelism, topicsConfig);

ReplyBolt replyBolt = new ReplyBolt(Stream.KILDA_ISL_LATENCY);
@@ -179,9 +182,9 @@ private void createKildaConnectedDevicesKafkaBolt(TopologyBuilder builder, int p
.shuffleGrouping(ComponentType.KILDA_CONNECTED_DEVICES_REPLY_BOLT, Stream.KILDA_CONNECTED_DEVICES);
}

private void createKildaConnectedDevicesReplyStream(TopologyBuilder builder, int parallelism,
private void createKildaConnectedDevicesReplyStream(TopologyBuilder builder, int spoutParallelism, int parallelism,
KafkaTopicsConfig topicsConfig, List<String> topics) {
createKildaConnectedDevicesSpout(builder, parallelism, topics);
createKildaConnectedDevicesSpout(builder, spoutParallelism, topics);
createKildaConnectedDevicesKafkaBolt(builder, parallelism, topicsConfig);

ReplyBolt replyBolt = new ReplyBolt(Stream.KILDA_CONNECTED_DEVICES);
@@ -205,10 +208,10 @@ private void createKildaSwitchManagerKafkaBolt(TopologyBuilder builder, int para
}


private void createKildaSwitchManagerReplyStream(TopologyBuilder builder, int parallelism,
private void createKildaSwitchManagerReplyStream(TopologyBuilder builder, int spoutParallelism, int parallelism,
KafkaTopicsConfig topicsConfig,
List<String> kildaSwitchManagerTopics) {
createKildaSwitchManagerSpout(builder, parallelism, kildaSwitchManagerTopics);
createKildaSwitchManagerSpout(builder, spoutParallelism, kildaSwitchManagerTopics);
createKildaSwitchManagerKafkaBolt(builder, parallelism, topicsConfig);

ReplyBolt replyBolt = new ReplyBolt(Stream.KILDA_SWITCH_MANAGER);
@@ -231,10 +234,10 @@ private void createKildaNorthboundKafkaBolt(TopologyBuilder builder, int paralle
}


private void createKildaNorthboundReplyStream(TopologyBuilder builder, int parallelism,
private void createKildaNorthboundReplyStream(TopologyBuilder builder, int spoutParallelism, int parallelism,
KafkaTopicsConfig topicsConfig,
List<String> kildaNorthboundTopics) {
createKildaNorthboundSpout(builder, parallelism, kildaNorthboundTopics);
createKildaNorthboundSpout(builder, spoutParallelism, kildaNorthboundTopics);
createKildaNorthboundKafkaBolt(builder, parallelism, topicsConfig);

ReplyBolt replyBolt = new ReplyBolt(Stream.NORTHBOUND_REPLY);
@@ -257,10 +260,10 @@ private void createKildaNbWorkerKafkaBolt(TopologyBuilder builder, int paralleli
}


private void createKildaNbWorkerReplyStream(TopologyBuilder builder, int parallelism,
private void createKildaNbWorkerReplyStream(TopologyBuilder builder, int spoutParallelism, int parallelism,
KafkaTopicsConfig topicsConfig,
List<String> kildaNbWorkerTopics) {
createKildaNbWorkerSpout(builder, parallelism, kildaNbWorkerTopics);
createKildaNbWorkerSpout(builder, spoutParallelism, kildaNbWorkerTopics);
createKildaNbWorkerKafkaBolt(builder, parallelism, topicsConfig);

ReplyBolt replyBolt = new ReplyBolt(Stream.NB_WORKER);
@@ -376,10 +379,10 @@ private void createKildaTopoDiscoKafkaBolt(TopologyBuilder builder, int parallel
}


private void createKildaTopoDiscoReplyStream(TopologyBuilder builder, int parallelism,
private void createKildaTopoDiscoReplyStream(TopologyBuilder builder, int spoutParallelism, int parallelism,
KafkaTopicsConfig topicsConfig,
List<String> kildaTopoDiscoTopics) {
createKildaTopoDiscoSpout(builder, parallelism, kildaTopoDiscoTopics);
createKildaTopoDiscoSpout(builder, spoutParallelism, kildaTopoDiscoTopics);
createKildaTopoDiscoKafkaBolt(builder, parallelism, topicsConfig);

DiscoReplyBolt replyBolt = new DiscoReplyBolt(Stream.KILDA_TOPO_DISCO);
@@ -476,6 +479,8 @@ public StormTopology createTopology() {

TopologyBuilder builder = new TopologyBuilder();
Integer newParallelism = topologyConfig.getNewParallelism();
Integer parallelism = topologyConfig.getParallelism();

KafkaTopicsConfig topicsConfig = topologyConfig.getKafkaTopics();
Set<String> regions = topologyConfig.getFloodlightRegions();
// Floodlight -- kilda.flow --> Router
@@ -485,59 +490,62 @@ public StormTopology createTopology() {
kildaFlowTopics.add(Stream.formatWithRegion(topicsConfig.getFlowRegionTopic(), region));
kildaFlowHsTopics.add(Stream.formatWithRegion(topicsConfig.getFlowHsSpeakerRegionTopic(), region));
}
createKildaFlowReplyStream(builder, newParallelism, topicsConfig, kildaFlowTopics);
createKildaFlowHsReplyStream(builder, newParallelism, topicsConfig, kildaFlowHsTopics);
createKildaFlowReplyStream(builder, parallelism, newParallelism, topicsConfig, kildaFlowTopics);
createKildaFlowHsReplyStream(builder, parallelism, newParallelism, topicsConfig, kildaFlowHsTopics);

// Floodlight -- kilda.ping --> Router
List<String> kildaPingTopics = new ArrayList<>();
for (String region: regions) {
kildaPingTopics.add(Stream.formatWithRegion(topicsConfig.getPingRegionTopic(), region));
}
createKildaPingReplyStream(builder, newParallelism, topicsConfig, kildaPingTopics);
createKildaPingReplyStream(builder, parallelism, newParallelism, topicsConfig, kildaPingTopics);

// Floodlight -- kilda.stats --> Router
List<String> kildaStatsTopics = new ArrayList<>();
for (String region: regions) {
kildaStatsTopics.add(Stream.formatWithRegion(topicsConfig.getStatsRegionTopic(), region));
}
createKildaStatsReplyStream(builder, newParallelism, topicsConfig, kildaStatsTopics);
createKildaStatsReplyStream(builder, parallelism, newParallelism, topicsConfig, kildaStatsTopics);

// Floodlight -- kilda.topo.isl.latency --> Router
List<String> kildaIslLatencyTopics = new ArrayList<>();
for (String region: regions) {
kildaIslLatencyTopics.add(Stream.formatWithRegion(topicsConfig.getTopoIslLatencyRegionTopic(), region));
}
createKildaIslLatencyReplyStream(builder, newParallelism, topicsConfig, kildaIslLatencyTopics);
createKildaIslLatencyReplyStream(builder, parallelism, newParallelism, topicsConfig, kildaIslLatencyTopics);

// Floodlight -- kilda.floodlight.connected.devices.priv --> Router
List<String> kildaConnectedDevicesTopics = new ArrayList<>();
for (String region: regions) {
kildaConnectedDevicesTopics.add(Stream.formatWithRegion(
topicsConfig.getTopoConnectedDevicesRegionTopic(), region));
}
createKildaConnectedDevicesReplyStream(builder, newParallelism, topicsConfig, kildaConnectedDevicesTopics);
createKildaConnectedDevicesReplyStream(builder, parallelism, newParallelism,
topicsConfig, kildaConnectedDevicesTopics);

// Floodlight -- kilda.topo.switch.manager --> Router
List<String> kildaSwitchManagerTopics = new ArrayList<>();
for (String region: regions) {
kildaSwitchManagerTopics.add(
Stream.formatWithRegion(topicsConfig.getTopoSwitchManagerRegionTopic(), region));
}
createKildaSwitchManagerReplyStream(builder, newParallelism, topicsConfig, kildaSwitchManagerTopics);
createKildaSwitchManagerReplyStream(builder, parallelism,
newParallelism, topicsConfig, kildaSwitchManagerTopics);

// Floodlight -- kilda.northbound --> Router
List<String> kildaNorthboundTopics = new ArrayList<>();
for (String region: regions) {
kildaNorthboundTopics.add(Stream.formatWithRegion(topicsConfig.getNorthboundRegionTopic(), region));
}
createKildaNorthboundReplyStream(builder, newParallelism, topicsConfig, kildaNorthboundTopics);
createKildaNorthboundReplyStream(builder, parallelism,
newParallelism, topicsConfig, kildaNorthboundTopics);

// Floodlight -- kilda.topo.nb --> Router
List<String> kildaNbWorkerTopics = new ArrayList<>();
for (String region: regions) {
kildaNbWorkerTopics.add(Stream.formatWithRegion(topicsConfig.getTopoNbRegionTopic(), region));
}
createKildaNbWorkerReplyStream(builder, newParallelism, topicsConfig, kildaNbWorkerTopics);
createKildaNbWorkerReplyStream(builder, parallelism, newParallelism, topicsConfig, kildaNbWorkerTopics);

// Part3 Request to Floodlights
// Storm -- kilda.speaker.flow --> Floodlight
@@ -558,9 +566,8 @@ public StormTopology createTopology() {
kildaTopoDiscoTopics.add(Stream.formatWithRegion(topicsConfig.getTopoDiscoRegionTopic(), region));

}
createKildaTopoDiscoReplyStream(builder, newParallelism, topicsConfig, kildaTopoDiscoTopics);
createKildaTopoDiscoReplyStream(builder, parallelism, newParallelism, topicsConfig, kildaTopoDiscoTopics);

Integer parallelism = topologyConfig.getParallelism();
createDiscoveryPipelines(builder, parallelism, topicsConfig, kildaTopoDiscoTopics);

// Storm -- kilda.stats.stats-request.priv --> Floodlight

0 comments on commit 3c823fc

Please sign in to comment.
You can’t perform that action at this time.