Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(pipelined): Fix selection of mypy errors #12875

Merged
merged 2 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions lte/gateway/python/magma/pipelined/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import gzip
import hashlib
import logging
from typing import Union

from Crypto.Cipher import AES, ARC4
from Crypto.Hash import HMAC
Expand All @@ -27,7 +28,7 @@ def pad(m):


def encrypt_str(s: str, key: bytes, encryption_algorithm, mac: bytes = None):
ret = ""
ret: Union[str, bytes]
if encryption_algorithm == PipelineD.HEConfig.RC4:
cipher = ARC4.new(key)
ret = cipher.encrypt(s.encode('utf-8')).hex()
Expand Down Expand Up @@ -69,7 +70,7 @@ def encrypt_str(s: str, key: bytes, encryption_algorithm, mac: bytes = None):
return ret


def decrypt_str(data: str, key: bytes, encryption_algorithm, mac) -> str:
def decrypt_str(data, key: bytes, encryption_algorithm, mac) -> str:
ret = ""
if encryption_algorithm == PipelineD.HEConfig.RC4:
cipher = ARC4.new(key)
Expand Down Expand Up @@ -99,8 +100,7 @@ def decrypt_str(data: str, key: bytes, encryption_algorithm, mac) -> str:
ret = decrypted.decode("utf-8").strip()
elif encryption_algorithm == PipelineD.HEConfig.GZIPPED_AES256_ECB_SHA1:
# Convert to hex str
hexlify = codecs.getencoder('hex')
data = hexlify(gzip.decompress(data))[0].decode('utf-8')
data = gzip.decompress(data).hex()

verify = data[0:32]
hmac = HMAC.new(mac)
Expand All @@ -117,8 +117,8 @@ def decrypt_str(data: str, key: bytes, encryption_algorithm, mac) -> str:
return ret


def get_hash(s: str, hash_function) -> bytes:
hash_bytes = bytes()
def get_hash(s, hash_function) -> bytes:
hash_bytes: bytes
if hash_function == PipelineD.HEConfig.MD5:
m = hashlib.md5()
m.update(s.encode('utf-8'))
Expand Down
3 changes: 2 additions & 1 deletion lte/gateway/python/magma/pipelined/policy_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
limitations under the License.
"""
import ipaddress
from typing import Dict

from lte.protos.mobilityd_pb2 import IPAddress
from lte.protos.policydb_pb2 import FlowMatch
Expand Down Expand Up @@ -219,7 +220,7 @@ def ipv4_address_to_str(ipaddr: IPAddress):


def get_ue_ip_match_args(ip_addr: IPAddress, direction: Direction):
ip_match = {}
ip_match: Dict = {}

if ip_addr:
if ip_addr.version == ip_addr.IPV4:
Expand Down
8 changes: 4 additions & 4 deletions lte/gateway/python/magma/pipelined/qos/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import threading
import traceback
from enum import Enum
from typing import Any, Dict, List, Tuple # noqa
from typing import Any, Dict, List, Set, Tuple

from lte.protos.policydb_pb2 import FlowMatch
from magma.common.redis.client import get_default_client
Expand Down Expand Up @@ -62,7 +62,7 @@ def __init__(self, ip_addr: str):
self.ambr_dl = 0
self.ambr_dl_leaf = 0

self.rules = set()
self.rules: Set = set()

def set_ambr(self, d: FlowMatch.Direction, root: int, leaf: int) -> None:
if d == FlowMatch.UPLINK:
Expand Down Expand Up @@ -90,8 +90,8 @@ class SubscriberState(object):

def __init__(self, imsi: str, qos_store: Dict):
self.imsi = imsi
self.rules = {} # rule -> qos handle
self.sessions = {} # IP -> [sessions(ip, qos_argumets, rule_no), ...]
self.rules: Dict = {} # rule -> qos handle
self.sessions: Dict = {} # IP -> [sessions(ip, qos_argumets, rule_no), ...]
self._redis_store = qos_store

def check_empty(self) -> bool:
Expand Down
17 changes: 8 additions & 9 deletions lte/gateway/python/magma/pipelined/rpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
import logging
import os
import queue
from collections import OrderedDict
from concurrent.futures import Future
from typing import List, Tuple
from typing import List, OrderedDict, Tuple

import grpc
from lte.protos import pipelined_pb2_grpc
Expand Down Expand Up @@ -74,7 +73,7 @@
convert_ipv6_bytes_to_ip_proto,
)

grpc_msg_queue = queue.Queue()
grpc_msg_queue: queue.Queue = queue.Queue()
DEFAULT_CALL_TIMEOUT = 5


Expand Down Expand Up @@ -147,7 +146,7 @@ def SetupDefaultControllers(self, request, context) -> SetupFlowsResult:
return SetupFlowsResult(result=SetupFlowsResult.SUCCESS)

def _get_setup_result(self, controller, context):
fut = Future()
fut: Future = Future()
self._loop.call_soon_threadsafe(self._setup_default_controller, controller, fut)
try:
return fut.result(timeout=self._call_timeout)
Expand Down Expand Up @@ -185,7 +184,7 @@ def SetupPolicyFlows(self, request, context) -> SetupFlowsResult:
if ret is not None:
return SetupFlowsResult(result=ret)

fut = Future()
fut: Future = Future()
self._loop.call_soon_threadsafe(self._setup_flows, request, fut)
try:
return fut.result(timeout=self._call_timeout)
Expand Down Expand Up @@ -682,7 +681,7 @@ def SetupUEMacFlows(self, request, context) -> SetupFlowsResult:
if ret is not None:
return SetupFlowsResult(result=ret)

fut = Future()
fut: Future = Future()
self._loop.call_soon_threadsafe(
self._setup_ue_mac,
request, fut,
Expand Down Expand Up @@ -819,7 +818,7 @@ def SetupQuotaFlows(self, request, context) -> SetupFlowsResult:
if ret is not None:
return SetupFlowsResult(result=ret)

fut = Future()
fut: Future = Future()
self._loop.call_soon_threadsafe(
self._setup_quota,
request, fut,
Expand Down Expand Up @@ -945,7 +944,7 @@ def ng_update_session_flows(
request.subscriber_id, request.session_version,
)
# Convert message containing PDR to Named Tuple Rules.
process_pdr_rules = OrderedDict()
process_pdr_rules: OrderedDict = OrderedDict()
response = self._ng_servicer_app.ng_session_message_handler(
request,
process_pdr_rules,
Expand Down Expand Up @@ -1027,7 +1026,7 @@ def _ng_qer_update(
self, request: SessionSet, pdr_entry: PDRRuleEntry,
) -> Tuple[List[RuleModResult], List[RuleModResult]]:
enforcement_res = []
failed_policy_rules_results = []
failed_policy_rules_results: List = []

local_f_teid_ng = request.local_f_teid

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import socket
import subprocess
import unittest
from typing import List

from lte.protos.mobilityd_pb2 import IPAddress
from magma.pipelined.bridge_util import BridgeTools
Expand Down Expand Up @@ -49,7 +50,7 @@ class eBpfDatapathDLTest(unittest.TestCase):

imsi = '122321231222333'

packet_cap1 = []
packet_cap1: List = []
sniffer = None
ebpf_man = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import socket
import subprocess
import unittest
from typing import List

from lte.protos.mobilityd_pb2 import IPAddress
from magma.pipelined.bridge_util import BridgeTools
Expand Down Expand Up @@ -44,7 +45,7 @@ class eBpfDatapathULTest(unittest.TestCase):
gtp_pkt_dst = '11.1.1.1'
gtp_pkt_src = '11.1.1.2'

packet_cap1 = []
packet_cap1: List = []
sniffer = None
ebpf_man = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class EnforcementTableTest(unittest.TestCase):
BRIDGE = 'testing_br'
IFACE = 'testing_br'
MAC_DEST = "5e:cc:cc:b1:49:4b"
he_controller_reference = Future()
he_controller_reference: Future = Future()
VETH = 'tveth1'
VETH_NS = 'tveth1_ns'
PROXY_PORT = '16'
Expand Down
2 changes: 1 addition & 1 deletion lte/gateway/python/magma/pipelined/tests/test_he.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ class EnforcementTableHeTest(unittest.TestCase):
BRIDGE = 'testing_br'
IFACE = 'testing_br'
MAC_DEST = "5e:cc:cc:b1:49:4b"
he_controller_reference = Future()
he_controller_reference: Future = Future()
VETH = 'tveth1'
VETH_NS = 'tveth1_ns'
PROXY_PORT = '15'
Expand Down
12 changes: 6 additions & 6 deletions lte/gateway/python/magma/pipelined/tests/test_inout_non_nat.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import unittest
import warnings
from concurrent.futures import Future
from typing import List
from typing import Dict, List

from lte.protos.mobilityd_pb2 import GWInfo, IPAddress, IPBlock
from magma.pipelined.app import egress
Expand All @@ -37,7 +37,7 @@
from ryu.lib import hub
from ryu.ofproto.ofproto_v1_4 import OFPP_LOCAL

gw_info_map = {}
gw_info_map: Dict = {}
gw_info_lock = threading.RLock() # re-entrant locks


Expand Down Expand Up @@ -155,10 +155,10 @@ def setUpNetworkAndController(

cls.service_manager = create_service_manager([])

ingress_controller_reference = Future()
middle_controller_reference = Future()
egress_controller_reference = Future()
testing_controller_reference = Future()
ingress_controller_reference: Future = Future()
middle_controller_reference: Future = Future()
egress_controller_reference: Future = Future()
testing_controller_reference: Future = Future()

if non_nat_arp_egress_port is None:
non_nat_arp_egress_port = cls.DHCP_PORT
Expand Down