Skip to content

Commit

Permalink
[lte][agw] Support to test pco option ipcp (#4767)
Browse files Browse the repository at this point in the history
* Added support to send pco option ipcp

Signed-off-by: Pruthvi Hebbani <pruthvi.hebbani@radisys.com>
  • Loading branch information
pruthvihebbani committed May 5, 2021
1 parent 32df459 commit 0703ad4
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 6 deletions.
2 changes: 1 addition & 1 deletion lte/gateway/c/oai/tasks/sgw/pgw_pco.c
Expand Up @@ -172,7 +172,7 @@ int pgw_process_pco_request_ipcp(
OAILOG_DEBUG(
LOG_SPGW_APP,
"PCO: Protocol identifier IPCP option "
"SECONDARY_DNS_SERVER_IP_ADDRESS ipcp_out_dns_prim_ipv4_addr "
"PRIMARY_DNS_SERVER_IP_ADDRESS ipcp_out_dns_prim_ipv4_addr "
"0x%x\n",
ipcp_out_dns_prim_ipv4_addr);
}
Expand Down
3 changes: 2 additions & 1 deletion lte/gateway/c/oai/tasks/sgw/spgw_state.cpp
Expand Up @@ -89,7 +89,8 @@ void spgw_free_s11_bearer_context_information(
if (*context_p) {
sgw_free_pdn_connection(
&(*context_p)->sgw_eps_bearer_context_information.pdn_connection);

clear_protocol_configuration_options(
&(*context_p)->sgw_eps_bearer_context_information.saved_message.pco);
pgw_delete_procedures(*context_p);
if ((*context_p)->pgw_eps_bearer_context_information.apns) {
obj_hashtable_ts_destroy(
Expand Down
41 changes: 37 additions & 4 deletions lte/gateway/python/integ_tests/s1aptests/s1ap_utils.py
Expand Up @@ -88,6 +88,7 @@ class S1ApUtil(object):
PROT_CFG_CID_PCSCF_IPV6_ADDR_REQUEST = 0x0001
PROT_CFG_CID_PCSCF_IPV4_ADDR_REQUEST = 0x000C
PROT_CFG_CID_DNS_SERVER_IPV6_ADDR_REQUEST = 0x0003
PROT_CFG_PID_IPCP = 0x8021

lib_name = "libtfw.so"

Expand Down Expand Up @@ -193,14 +194,16 @@ def get_response(self):
return self._msg.get(True)

def populate_pco(
self, protCfgOpts_pr, pcscf_addr_type=None, dns_ipv6_addr=False
self, protCfgOpts_pr, pcscf_addr_type=None, dns_ipv6_addr=False,
ipcp=False
):
"""
Populates the PCO values.
Args:
protCfgOpts_pr: PCO structure
pcscf_addr_type: ipv4/ipv6/ipv4v6 flag
dns_ipv6_addr: True/False flag
ipcp: True/False flag
Returns:
None
"""
Expand Down Expand Up @@ -249,6 +252,35 @@ def populate_pco(
idx
].cid = S1ApUtil.PROT_CFG_CID_DNS_SERVER_IPV6_ADDR_REQUEST

if ipcp:
protCfgOpts_pr.numProtId += 1
protCfgOpts_pr.p[
0
].pid = S1ApUtil.PROT_CFG_PID_IPCP
protCfgOpts_pr.p[
0
].len = 0x10

# PPP IP Control Protocol packet as per rfc 1877
# 01 00 00 10 81 06 00 00 00 00 83 06 00 00 00 00

protCfgOpts_pr.p[0].val[0] = 0x01 # code = 01 - Config Request
protCfgOpts_pr.p[0].val[1] = 0x00 # Identifier : 00
protCfgOpts_pr.p[0].val[2] = 0x00 # Length : 16
protCfgOpts_pr.p[0].val[3] = 0x10
protCfgOpts_pr.p[0].val[4] = 0x81 # Options:Primary DNS IP Addr
protCfgOpts_pr.p[0].val[5] = 0x06 # len = 6
protCfgOpts_pr.p[0].val[6] = 0x00 # 00.00.00.00
protCfgOpts_pr.p[0].val[7] = 0x00
protCfgOpts_pr.p[0].val[8] = 0x00
protCfgOpts_pr.p[0].val[9] = 0x00
protCfgOpts_pr.p[0].val[10] = 0x83 # Options:Secondary DNS IP Addr
protCfgOpts_pr.p[0].val[11] = 0x06 # len = 6
protCfgOpts_pr.p[0].val[12] = 0x00 # 00.00.00.00
protCfgOpts_pr.p[0].val[13] = 0x00
protCfgOpts_pr.p[0].val[14] = 0x00
protCfgOpts_pr.p[0].val[15] = 0x00

def attach(
self,
ue_id,
Expand All @@ -261,6 +293,7 @@ def attach(
pdn_type=1,
pcscf_addr_type=None,
dns_ipv6_addr=False,
ipcp=False,
):
"""Given a UE issue the attach request of specified type
Expand Down Expand Up @@ -293,10 +326,10 @@ def attach(
attach_req.pdnType_pr.pres = True
attach_req.pdnType_pr.pdn_type = pdn_type

# Populate PCO only if pcscf_addr_type is set
if pcscf_addr_type or dns_ipv6_addr:
# Populate PCO if pcscf_addr_type/dns_ipv6_addr/ipcp is set
if pcscf_addr_type or dns_ipv6_addr or ipcp:
self.populate_pco(
attach_req.protCfgOpts_pr, pcscf_addr_type, dns_ipv6_addr,
attach_req.protCfgOpts_pr, pcscf_addr_type, dns_ipv6_addr, ipcp
)
assert self.issue_cmd(attach_type, attach_req) == 0

Expand Down
@@ -0,0 +1,61 @@
"""
Copyright 2020 The Magma Authors.
This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import unittest

import s1ap_types

from integ_tests.s1aptests import s1ap_wrapper


class TestAttachDetachWithPcoIpcp(unittest.TestCase):
def setUp(self):
self._s1ap_wrapper = s1ap_wrapper.TestWrapper()

def tearDown(self):
self._s1ap_wrapper.cleanup()

def test_attach_detach_with_pco_ipcp(self):
""" Test case to test PCO option IPCP during attach"""
num_ues = 1
self._s1ap_wrapper.configUEDevice(num_ues)
req = self._s1ap_wrapper.ue_req

print(
"************************* Running End to End attach for ",
"UE id ",
req.ue_id,
)
# Now actually complete the attach
self._s1ap_wrapper._s1_util.attach(
req.ue_id,
s1ap_types.tfwCmd.UE_END_TO_END_ATTACH_REQUEST,
s1ap_types.tfwCmd.UE_ATTACH_ACCEPT_IND,
s1ap_types.ueAttachAccept_t,
ipcp=True,
)

# Wait on EMM Information from MME
self._s1ap_wrapper._s1_util.receive_emm_info()
print(
"************************* Running UE detach for UE id ",
req.ue_id,
)
# Now detach the UE
self._s1ap_wrapper.s1_util.detach(
req.ue_id, s1ap_types.ueDetachType_t.UE_NORMAL_DETACH.value, True,
)


if __name__ == "__main__":
unittest.main()

0 comments on commit 0703ad4

Please sign in to comment.