Skip to content
Permalink
Browse files

Merge branch 'hotfix-1.45.1' into develop

  • Loading branch information
niksv committed Dec 30, 2019
2 parents 6884e59 + c53b369 commit 1e44e902790274efa8c41cbe96bc82c7b3123ef5
Showing with 297 additions and 108 deletions.
  1. +15 −0 CHANGELOG.md
  2. +11 −2 .../functional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/AutoRerouteSpec.groovy
  3. +11 −2 ...unctional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/AutoRerouteV2Spec.groovy
  4. +1 −1 ...ional-tests/src/test/groovy/org/openkilda/functionaltests/spec/flows/ThrottlingRerouteSpec.groovy
  5. +23 −0 ...a-persistence-api/src/main/java/org/openkilda/persistence/repositories/PathSegmentRepository.java
  6. +2 −0 ...kilda-persistence-api/src/main/java/org/openkilda/persistence/repositories/RepositoryFactory.java
  7. +83 −0 ...e-neo4j/src/main/java/org/openkilda/persistence/repositories/impl/Neo4jPathSegmentRepository.java
  8. +6 −0 ...tence-neo4j/src/main/java/org/openkilda/persistence/repositories/impl/Neo4jRepositoryFactory.java
  9. +38 −31 services/wfm/src/main/java/org/openkilda/wfm/topology/floodlightrouter/FloodlightRouterTopology.java
  10. +1 −1 ...org/openkilda/wfm/topology/flowhs/fsm/reroute/actions/OnReceivedRemoveOrRevertResponseAction.java
  11. +15 −24 ...wfm/src/main/java/org/openkilda/wfm/topology/flowhs/fsm/reroute/actions/RevertNewRulesAction.java
  12. +0 −1 services/wfm/src/main/java/org/openkilda/wfm/topology/flowhs/utils/SpeakerRequestEmitter.java
  13. +1 −3 services/wfm/src/main/java/org/openkilda/wfm/topology/reroute/bolts/RerouteBolt.java
  14. +53 −38 services/wfm/src/main/java/org/openkilda/wfm/topology/reroute/service/RerouteService.java
  15. +37 −5 services/wfm/src/test/java/org/openkilda/wfm/topology/reroute/service/RerouteServiceTest.java
@@ -1,5 +1,20 @@
# Changelog

## v1.45.1 (30/12/2019)

### Bug Fixes:
- [#2785](https://github.com/telstra/open-kilda/pull/2785) Reroute topology updates flow status for flows (Issue: [#2781](https://github.com/telstra/open-kilda/issues/2781)) [**storm-topologies**]
- [#3086](https://github.com/telstra/open-kilda/pull/3086) Decrease parallelism for reply kafka spouts in flr [**storm-topologies**]
- [#3062](https://github.com/telstra/open-kilda/pull/3062) Fix rollback in flow reroute [**storm-topologies**]


For the complete list of changes, check out [the commit log](https://github.com/telstra/open-kilda/compare/v1.45.0...v1.45.1).

### Affected Components:
flow-hs, router, reroute

---

## v1.45.0 (10/12/2019)

### Features:
@@ -56,6 +56,7 @@ class AutoRerouteSpec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()
}

@Tags(SMOKE)
@@ -97,6 +98,7 @@ class AutoRerouteSpec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()
}

@Tags([VIRTUAL, LOW_PRIORITY])
@@ -211,8 +213,11 @@ class AutoRerouteSpec extends HealthCheckSpecification {
Wrappers.wait(WAIT_OFFSET) { assert northbound.activeSwitches*.switchId.contains(flowPath[1].switchId) }

then: "The flow is #flowStatus"
TimeUnit.SECONDS.sleep(discoveryInterval + rerouteDelay + 2)
Wrappers.wait(WAIT_OFFSET) { assert northbound.getFlowStatus(flow.id).status == flowStatus }
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getLinks(flowPath[1].switchId, null, null, null)
.each { assert it.state == IslChangeType.DISCOVERED }
}
Wrappers.wait(WAIT_OFFSET + rerouteDelay) { assert northbound.getFlowStatus(flow.id).status == flowStatus }

and: "Restore topology to the original state, remove the flow, reset toggles"
flowHelper.deleteFlow(flow.id)
@@ -221,6 +226,7 @@ class AutoRerouteSpec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()

where:
flowsRerouteOnIslDiscovery | flowStatus
@@ -268,6 +274,7 @@ class AutoRerouteSpec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()
}

@Tags(SMOKE)
@@ -305,6 +312,7 @@ class AutoRerouteSpec extends HealthCheckSpecification {

and: "Delete the flow"
flowHelper.deleteFlow(flow.id)
database.resetCosts()
}

@Tags([VIRTUAL, SMOKE])
@@ -367,6 +375,7 @@ class AutoRerouteSpec extends HealthCheckSpecification {
and: "Bring flow ports up and delete the flow"
["source", "destination"].each { antiflap.portUp(flow."$it".datapath, flow."$it".portNumber) }
flowHelper.deleteFlow(flow.id)
database.resetCosts()
}

def "System doesn't reroute flow to a path with not enough bandwidth available"() {
@@ -59,6 +59,7 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()
}

@Tags(SMOKE)
@@ -100,6 +101,7 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()
}

@Tags([VIRTUAL, LOW_PRIORITY])
@@ -214,8 +216,11 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {
Wrappers.wait(WAIT_OFFSET) { assert northbound.activeSwitches*.switchId.contains(flowPath[1].switchId) }

then: "The flow is #flowStatus"
TimeUnit.SECONDS.sleep(discoveryInterval + rerouteDelay + 2)
Wrappers.wait(WAIT_OFFSET) { assert northbound.getFlowStatus(flow.flowId).status == flowStatus }
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getLinks(flowPath[1].switchId, null, null, null)
.each { assert it.state == IslChangeType.DISCOVERED }
}
Wrappers.wait(WAIT_OFFSET + rerouteDelay) { assert northbound.getFlowStatus(flow.flowId).status == flowStatus }

and: "Restore topology to the original state, remove the flow, reset toggles"
flowHelperV2.deleteFlow(flow.flowId)
@@ -224,6 +229,7 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()

where:
flowsRerouteOnIslDiscovery | flowStatus
@@ -271,6 +277,7 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {
Wrappers.wait(discoveryInterval + WAIT_OFFSET) {
northbound.getAllLinks().each { assert it.state != IslChangeType.FAILED }
}
database.resetCosts()
}

@Tags(SMOKE)
@@ -308,6 +315,7 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {

and: "Delete the flow"
flowHelperV2.deleteFlow(flow.flowId)
database.resetCosts()
}

@Tags([VIRTUAL, SMOKE])
@@ -370,6 +378,7 @@ class AutoRerouteV2Spec extends HealthCheckSpecification {
and: "Bring flow ports up and delete the flow"
["source", "destination"].each { antiflap.portUp(flow."$it".switchId, flow."$it".portNumber) }
flowHelperV2.deleteFlow(flow.flowId)
database.resetCosts()
}

def "System doesn't reroute flow to a path with not enough bandwidth available"() {
@@ -65,7 +65,7 @@ class ThrottlingRerouteSpec extends HealthCheckSpecification {

then: "The oldest broken flow is still not rerouted before rerouteDelay run out"
sleep(untilReroutesBegin() - (long) (rerouteDelay * 1000 * 0.3)) //check after 70% of rerouteDelay has passed
northbound.getFlowStatus(flows.first().flowId).status == FlowState.UP
northbound.getFlowHistory(flows.first().flowId).last().action == "Flow creating" //reroute didn't start yet

and: "The oldest broken flow is rerouted when the rerouteDelay runs out"
def waitTime = untilReroutesBegin() / 1000.0 + PATH_INSTALLATION_TIME
@@ -0,0 +1,23 @@
/* Copyright 2019 Telstra Open Source
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openkilda.persistence.repositories;

import org.openkilda.model.PathId;
import org.openkilda.model.PathSegment;

public interface PathSegmentRepository extends Repository<PathSegment> {
void updateFailedStatus(PathId pathId, PathSegment segment, boolean failed);
}
@@ -70,4 +70,6 @@
PortHistoryRepository createPortHistoryRepository();

PortPropertiesRepository createPortPropertiesRepository();

PathSegmentRepository createPathSegmentRepository();
}
@@ -0,0 +1,83 @@
/* Copyright 2019 Telstra Open Source
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openkilda.persistence.repositories.impl;

import static java.lang.String.format;

import org.openkilda.model.PathId;
import org.openkilda.model.PathSegment;
import org.openkilda.persistence.PersistenceException;
import org.openkilda.persistence.TransactionManager;
import org.openkilda.persistence.converters.PathIdConverter;
import org.openkilda.persistence.converters.SwitchIdConverter;
import org.openkilda.persistence.repositories.PathSegmentRepository;

import org.neo4j.ogm.session.Neo4jSession;
import org.neo4j.ogm.session.Session;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* Neo4j OGM implementation of {@link PathSegmentRepository}.
*/
public class Neo4jPathSegmentRepository extends Neo4jGenericRepository<PathSegment> implements PathSegmentRepository {
private final SwitchIdConverter switchIdConverter = new SwitchIdConverter();
private final PathIdConverter pathIdConverter = new PathIdConverter();

public Neo4jPathSegmentRepository(Neo4jSessionFactory sessionFactory, TransactionManager transactionManager) {
super(sessionFactory, transactionManager);
}

@Override
public void updateFailedStatus(PathId pathId, PathSegment segment, boolean failed) {
Map<String, Object> parameters = new HashMap<>();
parameters.put("src_switch", switchIdConverter.toGraphProperty(segment.getSrcSwitch().getSwitchId()));
parameters.put("src_port", segment.getSrcPort());
parameters.put("dst_switch", switchIdConverter.toGraphProperty(segment.getDestSwitch().getSwitchId()));
parameters.put("dst_port", segment.getDestPort());
parameters.put("path_id", pathIdConverter.toGraphProperty(pathId));
parameters.put("failed", failed);

Session session = getSession();
Optional<Long> updatedEntityId = queryForLong(
"MATCH (src:switch)-[:source]-(ps:path_segment)-[:destination]-(dst:switch) "
+ "WHERE src.name = $src_switch AND ps.src_port = $src_port "
+ "AND dst.name = $dst_switch AND ps.dst_port = $dst_port "
+ "MATCH (fp:flow_path {path_id: $path_id})-[:owns]-(ps) "
+ "SET ps.failed=$failed "
+ "RETURN id(ps) as id", parameters, "id");
if (!updatedEntityId.isPresent()) {
throw new PersistenceException(format("PathSegment not found to be updated: %s_%d - %s_%d. Path id: %s.",
segment.getSrcSwitch().getSwitchId(), segment.getSrcPort(),
segment.getDestSwitch().getSwitchId(), segment.getDestPort(), pathId));
}

Object updatedEntity = ((Neo4jSession) session).context().getNodeEntity(updatedEntityId.get());
if (updatedEntity instanceof PathSegment) {
PathSegment updatedPathSegment = (PathSegment) updatedEntity;
updatedPathSegment.setFailed(failed);
} else if (updatedEntity != null) {
throw new PersistenceException(format("Expected a PathSegment entity, but found %s.", updatedEntity));
}
}

@Override
protected Class<PathSegment> getEntityType() {
return PathSegment.class;
}
}
@@ -28,6 +28,7 @@
import org.openkilda.persistence.repositories.IslRepository;
import org.openkilda.persistence.repositories.KildaConfigurationRepository;
import org.openkilda.persistence.repositories.LinkPropsRepository;
import org.openkilda.persistence.repositories.PathSegmentRepository;
import org.openkilda.persistence.repositories.PortPropertiesRepository;
import org.openkilda.persistence.repositories.RepositoryFactory;
import org.openkilda.persistence.repositories.SwitchConnectedDeviceRepository;
@@ -171,4 +172,9 @@ public PortHistoryRepository createPortHistoryRepository() {
public PortPropertiesRepository createPortPropertiesRepository() {
return new Neo4jPortPropertiesRepository(sessionFactory, transactionManager);
}

@Override
public PathSegmentRepository createPathSegmentRepository() {
return new Neo4jPathSegmentRepository(sessionFactory, transactionManager);
}
}

0 comments on commit 1e44e90

Please sign in to comment.
You can’t perform that action at this time.