Skip to content
Permalink
Browse files

Merge pull request #2785 from telstra/flow_status_during_reroutes

Reroute topology updates flow status for flows
  • Loading branch information
niksv committed Dec 30, 2019
2 parents 0852ba9 + 2f3e863 commit 2a978ccd258b439e74af821366f9815331f16ff7
@@ -56,6 +56,7 @@ class AutoRerouteSpec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()
}

@Tags(SMOKE)
@@ -97,6 +98,7 @@ class AutoRerouteSpec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()
}

@Tags([VIRTUAL, LOW_PRIORITY])
@@ -211,8 +213,11 @@ class AutoRerouteSpec extends HealthCheckSpecification {
Wrappers.wait(WAIT_OFFSET) { assert northbound.activeSwitches*.switchId.contains(flowPath[1].switchId) }

then: "The flow is #flowStatus"
TimeUnit.SECONDS.sleep(discoveryInterval + rerouteDelay + 2)
Wrappers.wait(WAIT_OFFSET) { assert northbound.getFlowStatus(flow.id).status == flowStatus }
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getLinks(flowPath[1].switchId, null, null, null)
.each { assert it.state == IslChangeType.DISCOVERED }
}
Wrappers.wait(WAIT_OFFSET + rerouteDelay) { assert northbound.getFlowStatus(flow.id).status == flowStatus }

and: "Restore topology to the original state, remove the flow, reset toggles"
flowHelper.deleteFlow(flow.id)
@@ -221,6 +226,7 @@ class AutoRerouteSpec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()

where:
flowsRerouteOnIslDiscovery | flowStatus
@@ -268,6 +274,7 @@ class AutoRerouteSpec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()
}

@Tags(SMOKE)
@@ -305,6 +312,7 @@ class AutoRerouteSpec extends HealthCheckSpecification {

and: "Delete the flow"
flowHelper.deleteFlow(flow.id)
database.resetCosts()
}

@Tags([VIRTUAL, SMOKE])
@@ -367,6 +375,7 @@ class AutoRerouteSpec extends HealthCheckSpecification {
and: "Bring flow ports up and delete the flow"
["source", "destination"].each { antiflap.portUp(flow."$it".datapath, flow."$it".portNumber) }
flowHelper.deleteFlow(flow.id)
database.resetCosts()
}

def "System doesn't reroute flow to a path with not enough bandwidth available"() {
@@ -56,6 +56,7 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()
}

@Tags(SMOKE)
@@ -97,6 +98,7 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()
}

@Tags([VIRTUAL, LOW_PRIORITY])
@@ -211,8 +213,11 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {
Wrappers.wait(WAIT_OFFSET) { assert northbound.activeSwitches*.switchId.contains(flowPath[1].switchId) }

then: "The flow is #flowStatus"
TimeUnit.SECONDS.sleep(discoveryInterval + rerouteDelay + 2)
Wrappers.wait(WAIT_OFFSET) { assert northbound.getFlowStatus(flow.flowId).status == flowStatus }
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getLinks(flowPath[1].switchId, null, null, null)
.each { assert it.state == IslChangeType.DISCOVERED }
}
Wrappers.wait(WAIT_OFFSET + rerouteDelay) { assert northbound.getFlowStatus(flow.flowId).status == flowStatus }

and: "Restore topology to the original state, remove the flow, reset toggles"
flowHelperV2.deleteFlow(flow.flowId)
@@ -221,6 +226,7 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()

where:
flowsRerouteOnIslDiscovery | flowStatus
@@ -268,6 +274,7 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()
}

@Tags(SMOKE)
@@ -305,6 +312,7 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {

and: "Delete the flow"
flowHelperV2.deleteFlow(flow.flowId)
database.resetCosts()
}

@Tags([VIRTUAL, SMOKE])
@@ -367,6 +375,7 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {
and: "Bring flow ports up and delete the flow"
["source", "destination"].each { antiflap.portUp(flow."$it".switchId, flow."$it".portNumber) }
flowHelperV2.deleteFlow(flow.flowId)
database.resetCosts()
}

def "System doesn't reroute flow to a path with not enough bandwidth available"() {
@@ -65,7 +65,7 @@ class ThrottlingRerouteSpec extends HealthCheckSpecification {

then: "The oldest broken flow is still not rerouted before rerouteDelay run out"
sleep(untilReroutesBegin() - (long) (rerouteDelay * 1000 * 0.3)) //check after 70% of rerouteDelay has passed
northbound.getFlowStatus(flows.first().flowId).status == FlowState.UP
northbound.getFlowHistory(flows.first().flowId).last().action == "Flow creating" //reroute didn't start yet

and: "The oldest broken flow is rerouted when the rerouteDelay runs out"
def waitTime = untilReroutesBegin() / 1000.0 + PATH_INSTALLATION_TIME
@@ -0,0 +1,23 @@
/* Copyright 2019 Telstra Open Source
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openkilda.persistence.repositories;

import org.openkilda.model.PathId;
import org.openkilda.model.PathSegment;

public interface PathSegmentRepository extends Repository<PathSegment> {
void updateFailedStatus(PathId pathId, PathSegment segment, boolean failed);
}
@@ -69,4 +69,6 @@
PortHistoryRepository createPortHistoryRepository();

PortPropertiesRepository createPortPropertiesRepository();

PathSegmentRepository createPathSegmentRepository();
}
@@ -0,0 +1,83 @@
/* Copyright 2019 Telstra Open Source
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openkilda.persistence.repositories.impl;

import static java.lang.String.format;

import org.openkilda.model.PathId;
import org.openkilda.model.PathSegment;
import org.openkilda.persistence.PersistenceException;
import org.openkilda.persistence.TransactionManager;
import org.openkilda.persistence.converters.PathIdConverter;
import org.openkilda.persistence.converters.SwitchIdConverter;
import org.openkilda.persistence.repositories.PathSegmentRepository;

import org.neo4j.ogm.session.Neo4jSession;
import org.neo4j.ogm.session.Session;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* Neo4j OGM implementation of {@link PathSegmentRepository}.
*/
public class Neo4jPathSegmentRepository extends Neo4jGenericRepository<PathSegment> implements PathSegmentRepository {
private final SwitchIdConverter switchIdConverter = new SwitchIdConverter();
private final PathIdConverter pathIdConverter = new PathIdConverter();

public Neo4jPathSegmentRepository(Neo4jSessionFactory sessionFactory, TransactionManager transactionManager) {
super(sessionFactory, transactionManager);
}

@Override
public void updateFailedStatus(PathId pathId, PathSegment segment, boolean failed) {
Map<String, Object> parameters = new HashMap<>();
parameters.put("src_switch", switchIdConverter.toGraphProperty(segment.getSrcSwitch().getSwitchId()));
parameters.put("src_port", segment.getSrcPort());
parameters.put("dst_switch", switchIdConverter.toGraphProperty(segment.getDestSwitch().getSwitchId()));
parameters.put("dst_port", segment.getDestPort());
parameters.put("path_id", pathIdConverter.toGraphProperty(pathId));
parameters.put("failed", failed);

Session session = getSession();
Optional<Long> updatedEntityId = queryForLong(
"MATCH (src:switch)-[:source]-(ps:path_segment)-[:destination]-(dst:switch) "
+ "WHERE src.name = $src_switch AND ps.src_port = $src_port "
+ "AND dst.name = $dst_switch AND ps.dst_port = $dst_port "
+ "MATCH (fp:flow_path {path_id: $path_id})-[:owns]-(ps) "
+ "SET ps.failed=$failed "
+ "RETURN id(ps) as id", parameters, "id");
if (!updatedEntityId.isPresent()) {
throw new PersistenceException(format("PathSegment not found to be updated: %s_%d - %s_%d. Path id: %s.",
segment.getSrcSwitch().getSwitchId(), segment.getSrcPort(),
segment.getDestSwitch().getSwitchId(), segment.getDestPort(), pathId));
}

Object updatedEntity = ((Neo4jSession) session).context().getNodeEntity(updatedEntityId.get());
if (updatedEntity instanceof PathSegment) {
PathSegment updatedPathSegment = (PathSegment) updatedEntity;
updatedPathSegment.setFailed(failed);
} else if (updatedEntity != null) {
throw new PersistenceException(format("Expected a PathSegment entity, but found %s.", updatedEntity));
}
}

@Override
protected Class<PathSegment> getEntityType() {
return PathSegment.class;
}
}
@@ -29,6 +29,7 @@
import org.openkilda.persistence.repositories.IslRepository;
import org.openkilda.persistence.repositories.KildaConfigurationRepository;
import org.openkilda.persistence.repositories.LinkPropsRepository;
import org.openkilda.persistence.repositories.PathSegmentRepository;
import org.openkilda.persistence.repositories.PortPropertiesRepository;
import org.openkilda.persistence.repositories.RepositoryFactory;
import org.openkilda.persistence.repositories.SwitchPropertiesRepository;
@@ -171,4 +172,9 @@ public PortHistoryRepository createPortHistoryRepository() {
public PortPropertiesRepository createPortPropertiesRepository() {
return new Neo4jPortPropertiesRepository(sessionFactory, transactionManager);
}

@Override
public PathSegmentRepository createPathSegmentRepository() {
return new Neo4jPathSegmentRepository(sessionFactory, transactionManager);
}
}
@@ -24,7 +24,6 @@
import org.openkilda.model.FlowPath;
import org.openkilda.model.PathId;
import org.openkilda.persistence.PersistenceManager;
import org.openkilda.persistence.repositories.RepositoryFactory;
import org.openkilda.wfm.AbstractBolt;
import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.topology.reroute.RerouteTopology;
@@ -63,8 +62,7 @@ public RerouteBolt(PersistenceManager persistenceManager) {
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
RepositoryFactory repositoryFactory = persistenceManager.getRepositoryFactory();
this.rerouteService = new RerouteService(repositoryFactory);
this.rerouteService = new RerouteService(persistenceManager);
super.prepare(stormConf, context, collector);
}

0 comments on commit 2a978cc

Please sign in to comment.
You can’t perform that action at this time.