Skip to content

Commit

Permalink
Fix IPFIX cleanup, add unit test (#7611)
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Yurchenko <koolzz@fb.com>
  • Loading branch information
koolzz committed Jun 17, 2021
1 parent 805c1f3 commit b695df6
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 19 deletions.
Expand Up @@ -25,6 +25,7 @@ LimitNOFILE=65536
LimitNPROC=65536
LimitSIGPENDING=65536
EnvironmentFile=/etc/environment
ExecStartPre=/usr/bin/ovs-vsctl --all destroy Flow_Sample_Collector_Set
ExecStartPre=/usr/bin/ovs-vsctl set bridge gtp_br0 protocols=OpenFlow10,OpenFlow13,OpenFlow14 other-config:disable-in-band=true
ExecStartPre=/usr/bin/ovs-vsctl set-controller gtp_br0 tcp:127.0.0.1:6633 tcp:127.0.0.1:6654
ExecStartPre=/usr/bin/ovs-vsctl set-fail-mode gtp_br0 secure
Expand Down
8 changes: 7 additions & 1 deletion lte/gateway/deploy/roles/magma/tasks/main.yml
Expand Up @@ -387,12 +387,18 @@
- ipfix0
when: full_provision

- name: Copy ryu ipfix patch
- name: Copy ryu ipfix patch for 3.5
copy:
src: nx_actions.py
dest: /home/vagrant/build/python/lib/python3.5/site-packages/ryu/ofproto/
when: full_provision

- name: Copy ryu ipfix patch for 3.8
copy:
src: nx_actions_3.5.py
dest: /home/vagrant/build/python/lib/python3.8/site-packages/ryu/ofproto/nx_actions.py
when: full_provision

- name: Change build folder ownership
ansible.builtin.file:
path: /home/vagrant/build
Expand Down
8 changes: 0 additions & 8 deletions lte/gateway/python/magma/pipelined/app/ipfix.py
Expand Up @@ -10,7 +10,6 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import shlex
import subprocess
from typing import Dict, NamedTuple

Expand Down Expand Up @@ -119,13 +118,6 @@ def initialize_on_connect(self, datapath: Datapath):
self._delete_all_flows(datapath)
self._install_default_flows(datapath)

rm_cmd = "ovs-vsctl destroy Flow_Sample_Collector_Set {}" \
.format(self.ipfix_config.collector_set_id)

args = shlex.split(rm_cmd)
ret = subprocess.call(args)
self.logger.debug("Removed old Flow_Sample_Collector_Set ret %d", ret)

if not self.ipfix_config.enabled:
return

Expand Down
4 changes: 3 additions & 1 deletion lte/gateway/python/magma/pipelined/bridge_util.py
Expand Up @@ -147,6 +147,8 @@ def create_bridge(bridge_name, iface_name):
Creates a simple bridge, sets up an interface.
Used when running unit tests
"""
subprocess.Popen(["ovs-vsctl", "--all", "destroy",
"Flow_Sample_Collector_Set"]).wait()
subprocess.Popen(["ovs-vsctl", "--if-exists", "del-br",
bridge_name]).wait()
subprocess.Popen(["ovs-vsctl", "add-br", bridge_name]).wait()
Expand Down Expand Up @@ -216,7 +218,7 @@ def get_flows_for_bridge(bridge_name, table_num=None, include_stats=True):
set_cmd.append("table=%s" % table_num)

flows = \
subprocess.check_output(set_cmd).decode('utf-8').split('\n')
subprocess.check_output(set_cmd).decode('ascii', 'ignore').split('\n')
flows = list(filter(lambda x: (x is not None and
x != '' and
x.find("NXST_FLOW") == -1),
Expand Down
13 changes: 8 additions & 5 deletions lte/gateway/python/magma/pipelined/tests/pipelined_test_util.py
Expand Up @@ -437,7 +437,7 @@ def fail(test_case: TestCase, err_msg: str, _bridge_name: str,
p = subprocess.Popen([ofctl_cmd],
stdout=subprocess.PIPE,
shell=True)
ofctl_dump = p.stdout.read().decode("utf-8").strip()
ofctl_dump = p.stdout.read().decode("utf-8", 'ignore').strip()
logging.error("cmd ofctl_dump: %s", ofctl_dump)

msg = 'Snapshot mismatch with error:\n' \
Expand Down Expand Up @@ -514,7 +514,7 @@ def wait_for_snapshots(test_case: TestCase,
snapshot_name: Optional[str] = None,
wait_time: int = 1, max_sleep_time: int = 20,
datapath=None,
try_snapshot=False):
try_snapshot=False, include_stats=True):
"""
Wait after checking ovs snapshot as new changes might still come in,
Expand All @@ -527,13 +527,15 @@ def wait_for_snapshots(test_case: TestCase,
Throws a WaitTimeExceeded Exception if max_sleep_time exceeded
"""
sleep_time = 0
old_snapshot = _get_current_bridge_snapshot(bridge_name, service_manager)
old_snapshot = _get_current_bridge_snapshot(bridge_name, service_manager,
include_stats=include_stats)
while True:
if datapath:
flows.set_barrier(datapath)
hub.sleep(wait_time)

new_snapshot = _get_current_bridge_snapshot(bridge_name, service_manager)
new_snapshot = _get_current_bridge_snapshot(bridge_name, service_manager,
include_stats=include_stats)
if try_snapshot:
snapshot_file, expected_ = expected_snapshot(test_case,
bridge_name,
Expand Down Expand Up @@ -600,7 +602,8 @@ def __exit__(self, type, value, traceback):
self._snapshot_name,
max_sleep_time=self._max_sleep_time,
datapath=self._datapath,
try_snapshot=self._try_snapshot)
try_snapshot=self._try_snapshot,
include_stats=self._include_stats)
except WaitTimeExceeded as e:
ofctl_cmd = "sudo ovs-ofctl dump-flows %s".format(self._bridge_name)
p = subprocess.Popen([ofctl_cmd],
Expand Down
Expand Up @@ -3,9 +3,7 @@
priority=12,dl_dst=5e:cc:cc:b1:49:4b actions=set_field:0x48c2739fd9c3->metadata,resubmit(,ue_mac(scratch_table_0)),set_field:0->reg0,set_field:0->reg3
priority=10,arp actions=resubmit(,ingress(main_table)),set_field:0->reg0,set_field:0->reg3
table=dpi(main_table), priority=0 actions=resubmit(,dpi(scratch_table_0)),resubmit(,ipfix(main_table)),set_field:0->reg0,set_field:0->reg3
table=dpi(scratch_table_0), idle_timeout=42, priority=10,tcp,nw_src=1.2.3.0,nw_dst=45.10.0.1,tp_src=51115,tp_dst=80 actions=set_field:0xa->reg10,set_field:0->reg0,set_field:0->reg3
table=dpi(scratch_table_0), idle_timeout=42, priority=10,tcp,nw_src=45.10.0.1,nw_dst=1.2.3.0,tp_src=80,tp_dst=51115 actions=set_field:0xa->reg10,set_field:0->reg0,set_field:0->reg3
table=dpi(scratch_table_0), priority=0 actions=set_field:0xffffffff->reg10,set_field:0->reg0,set_field:0->reg3
table=ipfix(main_table), priority=0 actions=resubmit(,egress(main_table)),set_field:0->reg0,set_field:0->reg3
table=ue_mac(scratch_table_0), priority=15,udp,tp_src=68,tp_dst=67 actions=set_field:0x1->reg6,resubmit(,ingress(main_table)),set_field:0->reg0,set_field:0->reg3
table=ue_mac(scratch_table_0), priority=15,udp,tp_src=67,tp_dst=68 actions=set_field:0x1->reg6,resubmit(,ue_mac(scratch_table_1)),set_field:0->reg0,set_field:0->reg3
table=ue_mac(scratch_table_0), priority=15,udp,tp_dst=53 actions=set_field:0x1->reg6,resubmit(,ingress(main_table)),set_field:0->reg0,set_field:0->reg3
Expand All @@ -22,3 +20,6 @@
cookie=0xb, table=201, priority=0 actions=resubmit(,dpi(scratch_table_0)),resubmit(,202),set_field:0->reg0,set_field:0->reg3
table=202, priority=12,dl_src=5e:cc:cc:b1:49:4b actions=set_field:0x48c2739fd9c3->metadata,resubmit(,203),set_field:0->reg0,set_field:0->reg3
table=202, priority=12,dl_dst=5e:cc:cc:b1:49:4b actions=set_field:0x48c2739fd9c3->metadata,resubmit(,203),set_field:0->reg0,set_field:0->reg3
cookie=0xc, table=202, priority=0 actions=resubmit(,203),set_field:0->reg0,set_field:0->reg3
table=203, priority=12,metadata=0x48c2739fd9c3 actions=sample(probability=65535,collector_set_id=1,obs_domain_id=1,obs_point_id=1,pdp_start_epoch=145,msisdn=magma_is_awesome,apn_name=apn_name123456789g15,sampling_port=32768)
table=203, priority=10 actions=sample(probability=65535,collector_set_id=1,obs_domain_id=1,obs_point_id=1,pdp_start_epoch=1,msisdn=defau,apn_name=default,sampling_port=32768)
Expand Up @@ -19,6 +19,7 @@
from lte.protos.pipelined_pb2 import FlowRequest
from lte.protos.policydb_pb2 import FlowMatch
from magma.pipelined.app.dpi import DPIController
from magma.pipelined.app.ipfix import IPFIXController
from magma.pipelined.bridge_util import BridgeTools
from magma.pipelined.policy_converters import convert_ipv4_str_to_ip_proto
from magma.pipelined.tests.app.start_pipelined import (
Expand Down Expand Up @@ -61,10 +62,12 @@ def setUpClass(cls):

ue_mac_controller_reference = Future()
dpi_controller_reference = Future()
ipfix_controller_reference = Future()
testing_controller_reference = Future()
test_setup = TestSetup(
apps=[PipelinedController.UEMac,
PipelinedController.DPI,
PipelinedController.IPFIX,
PipelinedController.Testing,
PipelinedController.StartupFlows],
references={
Expand All @@ -74,6 +77,8 @@ def setUpClass(cls):
dpi_controller_reference,
PipelinedController.Arp:
Future(),
PipelinedController.IPFIX:
ipfix_controller_reference,
PipelinedController.Testing:
testing_controller_reference,
PipelinedController.StartupFlows:
Expand All @@ -89,11 +94,25 @@ def setUpClass(cls):
'clean_restart': True,
'setup_type': 'CWF',
'dpi': {
'enabled': False,
'enabled': True,
'mon_port': 'mon1',
'mon_port_number': 32769,
'idle_timeout': 42,
},
'ipfix': {
'enabled': True,
'probability': 65,
'collector_set_id': 1,
'collector_ip': '1.1.1.1',
'collector_port': 65010,
'cache_timeout': 60,
'obs_domain_id': 1,
'obs_point_id': 1,
},
'conntrackd': {
'enabled': True,
},
'ovs_gtp_port_number': 32768,
},
mconfig=PipelineD(),
loop=None,
Expand All @@ -109,6 +128,7 @@ def setUpClass(cls):

cls.ue_mac_controller = ue_mac_controller_reference.result()
cls.dpi_controller = dpi_controller_reference.result()
cls.ipfix_controller = ipfix_controller_reference.result()
cls.testing_controller = testing_controller_reference.result()

cls.dpi_controller._policy_dict = cls._static_rule_dict
Expand Down Expand Up @@ -139,6 +159,8 @@ def test_subscriber_policy(self):
self.dpi_controller.add_classify_flow(
flow_match, FlowRequest.FLOW_FINAL_CLASSIFICATION,
'base.ip.http.facebook', 'tbd')
self.ipfix_controller.add_ue_sample_flow(imsi, "magma_is_awesome_msisdn",
"00:11:22:33:44:55", "apn_name123456789", 145)

snapshot_verifier = SnapshotVerifier(self, self.BRIDGE,
self.service_manager,
Expand Down

0 comments on commit b695df6

Please sign in to comment.