Skip to content

Commit

Permalink
chore(pipelined): Fix selection of mypy errors (#12875)
Browse files Browse the repository at this point in the history
* mypy errors ’Need type annotation' are removed

Signed-off-by: Marco Pfirrmann <marco.pfirrmann@tngtech.com>

* mypy errors in 'encoding.py' are removed

Signed-off-by: Marco Pfirrmann <marco.pfirrmann@tngtech.com>
  • Loading branch information
mpfirrmann committed May 30, 2022
1 parent d065beb commit 7ea7292
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 30 deletions.
12 changes: 6 additions & 6 deletions lte/gateway/python/magma/pipelined/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import gzip
import hashlib
import logging
from typing import Union

from Crypto.Cipher import AES, ARC4
from Crypto.Hash import HMAC
Expand All @@ -27,7 +28,7 @@ def pad(m):


def encrypt_str(s: str, key: bytes, encryption_algorithm, mac: bytes = None):
ret = ""
ret: Union[str, bytes]
if encryption_algorithm == PipelineD.HEConfig.RC4:
cipher = ARC4.new(key)
ret = cipher.encrypt(s.encode('utf-8')).hex()
Expand Down Expand Up @@ -69,7 +70,7 @@ def encrypt_str(s: str, key: bytes, encryption_algorithm, mac: bytes = None):
return ret


def decrypt_str(data: str, key: bytes, encryption_algorithm, mac) -> str:
def decrypt_str(data, key: bytes, encryption_algorithm, mac) -> str:
ret = ""
if encryption_algorithm == PipelineD.HEConfig.RC4:
cipher = ARC4.new(key)
Expand Down Expand Up @@ -99,8 +100,7 @@ def decrypt_str(data: str, key: bytes, encryption_algorithm, mac) -> str:
ret = decrypted.decode("utf-8").strip()
elif encryption_algorithm == PipelineD.HEConfig.GZIPPED_AES256_ECB_SHA1:
# Convert to hex str
hexlify = codecs.getencoder('hex')
data = hexlify(gzip.decompress(data))[0].decode('utf-8')
data = gzip.decompress(data).hex()

verify = data[0:32]
hmac = HMAC.new(mac)
Expand All @@ -117,8 +117,8 @@ def decrypt_str(data: str, key: bytes, encryption_algorithm, mac) -> str:
return ret


def get_hash(s: str, hash_function) -> bytes:
hash_bytes = bytes()
def get_hash(s, hash_function) -> bytes:
hash_bytes: bytes
if hash_function == PipelineD.HEConfig.MD5:
m = hashlib.md5()
m.update(s.encode('utf-8'))
Expand Down
3 changes: 2 additions & 1 deletion lte/gateway/python/magma/pipelined/policy_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
limitations under the License.
"""
import ipaddress
from typing import Dict

from lte.protos.mobilityd_pb2 import IPAddress
from lte.protos.policydb_pb2 import FlowMatch
Expand Down Expand Up @@ -219,7 +220,7 @@ def ipv4_address_to_str(ipaddr: IPAddress):


def get_ue_ip_match_args(ip_addr: IPAddress, direction: Direction):
ip_match = {}
ip_match: Dict = {}

if ip_addr:
if ip_addr.version == ip_addr.IPV4:
Expand Down
8 changes: 4 additions & 4 deletions lte/gateway/python/magma/pipelined/qos/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import threading
import traceback
from enum import Enum
from typing import Any, Dict, List, Tuple # noqa
from typing import Any, Dict, List, Set, Tuple

from lte.protos.policydb_pb2 import FlowMatch
from magma.common.redis.client import get_default_client
Expand Down Expand Up @@ -62,7 +62,7 @@ def __init__(self, ip_addr: str):
self.ambr_dl = 0
self.ambr_dl_leaf = 0

self.rules = set()
self.rules: Set = set()

def set_ambr(self, d: FlowMatch.Direction, root: int, leaf: int) -> None:
if d == FlowMatch.UPLINK:
Expand Down Expand Up @@ -90,8 +90,8 @@ class SubscriberState(object):

def __init__(self, imsi: str, qos_store: Dict):
self.imsi = imsi
self.rules = {} # rule -> qos handle
self.sessions = {} # IP -> [sessions(ip, qos_argumets, rule_no), ...]
self.rules: Dict = {} # rule -> qos handle
self.sessions: Dict = {} # IP -> [sessions(ip, qos_argumets, rule_no), ...]
self._redis_store = qos_store

def check_empty(self) -> bool:
Expand Down
17 changes: 8 additions & 9 deletions lte/gateway/python/magma/pipelined/rpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
import logging
import os
import queue
from collections import OrderedDict
from concurrent.futures import Future
from typing import List, Tuple
from typing import List, OrderedDict, Tuple

import grpc
from lte.protos import pipelined_pb2_grpc
Expand Down Expand Up @@ -74,7 +73,7 @@
convert_ipv6_bytes_to_ip_proto,
)

grpc_msg_queue = queue.Queue()
grpc_msg_queue: queue.Queue = queue.Queue()
DEFAULT_CALL_TIMEOUT = 5


Expand Down Expand Up @@ -147,7 +146,7 @@ def SetupDefaultControllers(self, request, context) -> SetupFlowsResult:
return SetupFlowsResult(result=SetupFlowsResult.SUCCESS)

def _get_setup_result(self, controller, context):
fut = Future()
fut: Future = Future()
self._loop.call_soon_threadsafe(self._setup_default_controller, controller, fut)
try:
return fut.result(timeout=self._call_timeout)
Expand Down Expand Up @@ -185,7 +184,7 @@ def SetupPolicyFlows(self, request, context) -> SetupFlowsResult:
if ret is not None:
return SetupFlowsResult(result=ret)

fut = Future()
fut: Future = Future()
self._loop.call_soon_threadsafe(self._setup_flows, request, fut)
try:
return fut.result(timeout=self._call_timeout)
Expand Down Expand Up @@ -682,7 +681,7 @@ def SetupUEMacFlows(self, request, context) -> SetupFlowsResult:
if ret is not None:
return SetupFlowsResult(result=ret)

fut = Future()
fut: Future = Future()
self._loop.call_soon_threadsafe(
self._setup_ue_mac,
request, fut,
Expand Down Expand Up @@ -819,7 +818,7 @@ def SetupQuotaFlows(self, request, context) -> SetupFlowsResult:
if ret is not None:
return SetupFlowsResult(result=ret)

fut = Future()
fut: Future = Future()
self._loop.call_soon_threadsafe(
self._setup_quota,
request, fut,
Expand Down Expand Up @@ -945,7 +944,7 @@ def ng_update_session_flows(
request.subscriber_id, request.session_version,
)
# Convert message containing PDR to Named Tuple Rules.
process_pdr_rules = OrderedDict()
process_pdr_rules: OrderedDict = OrderedDict()
response = self._ng_servicer_app.ng_session_message_handler(
request,
process_pdr_rules,
Expand Down Expand Up @@ -1027,7 +1026,7 @@ def _ng_qer_update(
self, request: SessionSet, pdr_entry: PDRRuleEntry,
) -> Tuple[List[RuleModResult], List[RuleModResult]]:
enforcement_res = []
failed_policy_rules_results = []
failed_policy_rules_results: List = []

local_f_teid_ng = request.local_f_teid

Expand Down
3 changes: 2 additions & 1 deletion lte/gateway/python/magma/pipelined/tests/test_ebpf_dl_dp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import socket
import subprocess
import unittest
from typing import List

from lte.protos.mobilityd_pb2 import IPAddress
from magma.pipelined.bridge_util import BridgeTools
Expand Down Expand Up @@ -49,7 +50,7 @@ class eBpfDatapathDLTest(unittest.TestCase):

imsi = '122321231222333'

packet_cap1 = []
packet_cap1: List = []
sniffer = None
ebpf_man = None

Expand Down
3 changes: 2 additions & 1 deletion lte/gateway/python/magma/pipelined/tests/test_ebpf_ul_dp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import socket
import subprocess
import unittest
from typing import List

from lte.protos.mobilityd_pb2 import IPAddress
from magma.pipelined.bridge_util import BridgeTools
Expand Down Expand Up @@ -44,7 +45,7 @@ class eBpfDatapathULTest(unittest.TestCase):
gtp_pkt_dst = '11.1.1.1'
gtp_pkt_src = '11.1.1.2'

packet_cap1 = []
packet_cap1: List = []
sniffer = None
ebpf_man = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class EnforcementTableTest(unittest.TestCase):
BRIDGE = 'testing_br'
IFACE = 'testing_br'
MAC_DEST = "5e:cc:cc:b1:49:4b"
he_controller_reference = Future()
he_controller_reference: Future = Future()
VETH = 'tveth1'
VETH_NS = 'tveth1_ns'
PROXY_PORT = '16'
Expand Down
2 changes: 1 addition & 1 deletion lte/gateway/python/magma/pipelined/tests/test_he.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ class EnforcementTableHeTest(unittest.TestCase):
BRIDGE = 'testing_br'
IFACE = 'testing_br'
MAC_DEST = "5e:cc:cc:b1:49:4b"
he_controller_reference = Future()
he_controller_reference: Future = Future()
VETH = 'tveth1'
VETH_NS = 'tveth1_ns'
PROXY_PORT = '15'
Expand Down
12 changes: 6 additions & 6 deletions lte/gateway/python/magma/pipelined/tests/test_inout_non_nat.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import unittest
import warnings
from concurrent.futures import Future
from typing import List
from typing import Dict, List

from lte.protos.mobilityd_pb2 import GWInfo, IPAddress, IPBlock
from magma.pipelined.app import egress
Expand All @@ -37,7 +37,7 @@
from ryu.lib import hub
from ryu.ofproto.ofproto_v1_4 import OFPP_LOCAL

gw_info_map = {}
gw_info_map: Dict = {}
gw_info_lock = threading.RLock() # re-entrant locks


Expand Down Expand Up @@ -155,10 +155,10 @@ def setUpNetworkAndController(

cls.service_manager = create_service_manager([])

ingress_controller_reference = Future()
middle_controller_reference = Future()
egress_controller_reference = Future()
testing_controller_reference = Future()
ingress_controller_reference: Future = Future()
middle_controller_reference: Future = Future()
egress_controller_reference: Future = Future()
testing_controller_reference: Future = Future()

if non_nat_arp_egress_port is None:
non_nat_arp_egress_port = cls.DHCP_PORT
Expand Down

0 comments on commit 7ea7292

Please sign in to comment.