Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(agw): activate mypy code scanning in lte except integ_tests #13187

Merged
merged 1 commit into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def check_rules(
port: str,
enodebd_public_ip: str,
private_ip: str,
) -> None:
) -> bool:
unexpected_rules = []
expected_rules_present = False
pattern = r'DNAT\s+tcp\s+--\s+anywhere\s+{pub_ip}\s+tcp\s+dpt:{dport} to:{ip}'.format(
Expand Down
5 changes: 2 additions & 3 deletions lte/gateway/python/magma/enodebd/stats_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,10 @@ def _get_enb_label_from_request(self, request) -> str:
logger.error("Couldn't find serial for ip", ip)
return label

@asyncio.coroutine
def _post_and_put_handler(self, request) -> web.Response:
async def _post_and_put_handler(self, request) -> web.Response:
""" HTTP POST handler """
# Read request body and convert to XML tree
body = yield from request.read()
body = await request.read()

root = ElementTree.fromstring(body)
label = self._get_enb_label_from_request(request)
Expand Down
93 changes: 39 additions & 54 deletions lte/gateway/python/magma/pipelined/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import gzip
import hashlib
import logging
from typing import Union
from typing import Optional

from Crypto.Cipher import AES, ARC4
from Crypto.Hash import HMAC
Expand All @@ -27,94 +27,79 @@ def pad(m):
return m + ' ' * (16 - len(m) % 16)


def encrypt_str(s: str, key: bytes, encryption_algorithm, mac: bytes = None):
ret: Union[str, bytes]
def encrypt_str(s: str, key: bytes, encryption_algorithm, mac: Optional[bytes] = None):
if encryption_algorithm == PipelineD.HEConfig.RC4:
cipher = ARC4.new(key)
ret = cipher.encrypt(s.encode('utf-8')).hex()
elif encryption_algorithm == PipelineD.HEConfig.AES256_CBC_HMAC_MD5:
iv = get_random_bytes(16)
key_val = key
key_mac = mac

cipher = AES.new(key_val, AES.MODE_CBC, iv)
enc = cipher.encrypt(pad(s).encode('utf-8'))
return cipher.encrypt(s.encode('utf-8')).hex()
github-advanced-security[bot] marked this conversation as resolved.
Show resolved Hide resolved

hmac = HMAC.new(key_mac)
hmac.update(iv + enc)

ret = hmac.hexdigest() + iv.hex() + enc.hex()
elif encryption_algorithm == PipelineD.HEConfig.AES256_ECB_HMAC_MD5:
if mac is not None:
key_val = key
key_mac = mac

cipher = AES.new(key_val, AES.MODE_ECB)
enc = cipher.encrypt(pad(s).encode('utf-8'))

hmac = HMAC.new(key_mac)
hmac.update(enc)

ret = hmac.hexdigest() + enc.hex()
elif encryption_algorithm == PipelineD.HEConfig.GZIPPED_AES256_ECB_SHA1:
key_val = key
key_mac = mac

cipher = AES.new(key_val, AES.MODE_ECB)
enc = cipher.encrypt(pad(s).encode('utf-8'))

hmac = HMAC.new(key_mac)
hmac.update(enc)
ret = gzip.compress(hmac.digest() + enc)
else:
logging.error("Unsupported encryption algorithm")
return ret
if encryption_algorithm == PipelineD.HEConfig.AES256_CBC_HMAC_MD5:
iv = get_random_bytes(16)
aes_cipher = AES.new(key_val, AES.MODE_CBC, iv)
enc = aes_cipher.encrypt(pad(s).encode('utf-8'))
hmac.update(iv + enc)
return hmac.hexdigest() + iv.hex() + enc.hex()
elif encryption_algorithm == PipelineD.HEConfig.AES256_ECB_HMAC_MD5:
aes_cipher = AES.new(key_val, AES.MODE_ECB)
enc = aes_cipher.encrypt(pad(s).encode('utf-8'))

Check failure

Code scanning / CodeQL

Use of a broken or weak cryptographic algorithm

The block mode ECB is broken or weak, and should not be used.
hmac.update(enc)
return hmac.hexdigest() + enc.hex()
elif encryption_algorithm == PipelineD.HEConfig.GZIPPED_AES256_ECB_SHA1:
aes_cipher = AES.new(key_val, AES.MODE_ECB)
enc = aes_cipher.encrypt(pad(s).encode('utf-8'))

Check failure

Code scanning / CodeQL

Use of a broken or weak cryptographic algorithm

The block mode ECB is broken or weak, and should not be used.
hmac.update(enc)
return gzip.compress(hmac.digest() + enc)

raise ValueError("Unsupported encryption algorithm")


def decrypt_str(data, key: bytes, encryption_algorithm, mac) -> str:
ret = ""
if encryption_algorithm == PipelineD.HEConfig.RC4:
cipher = ARC4.new(key)
ret = cipher.decrypt(data).hex()
elif encryption_algorithm == PipelineD.HEConfig.AES256_CBC_HMAC_MD5:
return cipher.decrypt(data).hex()
github-advanced-security[bot] marked this conversation as resolved.
Show resolved Hide resolved

hmac = HMAC.new(mac)

if encryption_algorithm == PipelineD.HEConfig.AES256_CBC_HMAC_MD5:
verify = data[0:32]
hmac = HMAC.new(mac)
hmac.update(codecs.decode(data[32:], 'hex_codec'))

if hmac.hexdigest() != verify:
return ""

iv = codecs.decode(data[32:64], 'hex_codec')
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(codecs.decode(data[64:], 'hex_codec'))
ret = decrypted.decode("utf-8").strip()
aes_cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = aes_cipher.decrypt(codecs.decode(data[64:], 'hex_codec'))
return decrypted.decode("utf-8").strip()

elif encryption_algorithm == PipelineD.HEConfig.AES256_ECB_HMAC_MD5:
verify = data[0:32]
hmac = HMAC.new(mac)
hmac.update(codecs.decode(data[32:], 'hex_codec'))

if hmac.hexdigest() != verify:
return ""

cipher = AES.new(key, AES.MODE_ECB)
decrypted = cipher.decrypt(codecs.decode(data[32:], 'hex_codec'))
ret = decrypted.decode("utf-8").strip()
aes_cipher = AES.new(key, AES.MODE_ECB)
decrypted = aes_cipher.decrypt(codecs.decode(data[32:], 'hex_codec'))

Check failure

Code scanning / CodeQL

Use of a broken or weak cryptographic algorithm

The block mode ECB is broken or weak, and should not be used.
return decrypted.decode("utf-8").strip()

elif encryption_algorithm == PipelineD.HEConfig.GZIPPED_AES256_ECB_SHA1:
# Convert to hex str
data = gzip.decompress(data).hex()

verify = data[0:32]
hmac = HMAC.new(mac)
hmac.update(codecs.decode(data[32:], 'hex_codec'))

if hmac.hexdigest() != verify:
return ""

cipher = AES.new(key, AES.MODE_ECB)
decrypted = cipher.decrypt(codecs.decode(data[32:], 'hex_codec'))
ret = decrypted.decode("utf-8").strip()
else:
logging.error("Unsupported encryption algorithm")
return ret
aes_cipher = AES.new(key, AES.MODE_ECB)
decrypted = aes_cipher.decrypt(codecs.decode(data[32:], 'hex_codec'))

Check failure

Code scanning / CodeQL

Use of a broken or weak cryptographic algorithm

The block mode ECB is broken or weak, and should not be used.
return decrypted.decode("utf-8").strip()
raise ValueError("Unsupported encryption algorithm")


def get_hash(s, hash_function) -> bytes:
Expand Down
2 changes: 1 addition & 1 deletion lte/gateway/python/magma/pipelined/rpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from typing import List, OrderedDict

import grpc
from lte.protos import pipelined_pb2_grpc
from lte.protos import pipelined_pb2_grpc # type: ignore[attr-defined]
from lte.protos.apn_pb2 import AggregatedMaximumBitrate
from lte.protos.mobilityd_pb2 import IPAddress
from lte.protos.pipelined_pb2 import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging

from grpc import StatusCode
from lte.protos import (
from lte.protos import ( # type: ignore[attr-defined]
diam_errors_pb2,
subscriberauth_pb2,
subscriberauth_pb2_grpc,
Expand Down
6 changes: 5 additions & 1 deletion lte/gateway/python/magma/subscriberdb/rpc_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
from typing import NamedTuple

import grpc
from lte.protos import apn_pb2, subscriberdb_pb2, subscriberdb_pb2_grpc
from lte.protos import ( # type: ignore[attr-defined]
apn_pb2,
subscriberdb_pb2,
subscriberdb_pb2_grpc,
)
from magma.common.rpc_utils import print_grpc, return_void
from magma.subscriberdb.sid import SIDUtils
from magma.subscriberdb.store.base import (
Expand Down
16 changes: 8 additions & 8 deletions lte/gateway/python/scripts/generate_oai_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def _get_congestion_control_config(service_mconfig):
return True


def _get_converged_core_config(service_mconfig: object) -> bool:
def _get_converged_core_config(service_mconfig: MME) -> bool:
"""Retrieve enable5g_features config value. If it does not exist it defaults to False. It gives precedence to the service_mconfig file.

Args:
Expand All @@ -273,7 +273,7 @@ def _get_converged_core_config(service_mconfig: object) -> bool:
return False


def _get_default_slice_service_type_config(service_mconfig: object) -> str:
def _get_default_slice_service_type_config(service_mconfig: MME) -> str:
"""Retrieve default_slice_service_type config value. If it does not exist, it defaults to DEFAULT_NGAP_S_NSSAI_SST.

Args:
Expand All @@ -294,7 +294,7 @@ def _get_default_slice_service_type_config(service_mconfig: object) -> str:
return service_mconfig.amf_default_slice_service_type or DEFAULT_NGAP_S_NSSAI_SST


def _get_default_slice_differentiator_type_config(service_mconfig: object) -> str:
def _get_default_slice_differentiator_type_config(service_mconfig: MME) -> str:
"""Retrieve default_slice_differentiator config value. If it does not exist it defaults to DEFAULT_NGAP_S_NSSAI_SD.

Args:
Expand All @@ -313,7 +313,7 @@ def _get_default_slice_differentiator_type_config(service_mconfig: object) -> st
return service_mconfig.amf_default_slice_differentiator or DEFAULT_NGAP_S_NSSAI_SD


def _get_amf_name_config(service_mconfig: object) -> str:
def _get_amf_name_config(service_mconfig: MME) -> str:
"""Retrieve amf_name config value. If it does not exist, it defaults to DEFAULT_NGAP_AMF_NAME.

Args:
Expand Down Expand Up @@ -360,7 +360,7 @@ def _get_default_auth_timer_expire_msec() -> str:
)


def _get_default_dnn_config(service_mconfig: object) -> str:
def _get_default_dnn_config(service_mconfig: MME) -> str:
"""Retrieve default_dnn config value. If it does not exist, it defaults to DEFAULT_DEFAULT_DNN.

Args:
Expand All @@ -379,7 +379,7 @@ def _get_default_dnn_config(service_mconfig: object) -> str:
return DEFAULT_DEFAULT_DNN


def _get_amf_region_id(service_mconfig: object) -> str:
def _get_amf_region_id(service_mconfig: MME) -> str:
"""Retrieve amf_region_id config value. If it does not exist it defaults to DEFAULT_NGAP_AMF_REGION_ID.

Args:
Expand All @@ -398,7 +398,7 @@ def _get_amf_region_id(service_mconfig: object) -> str:
return service_mconfig.amf_region_id or DEFAULT_NGAP_AMF_REGION_ID


def _get_amf_set_id(service_mconfig: object) -> str:
def _get_amf_set_id(service_mconfig: MME) -> str:
"""Retrieve amf_set_id config value. If it does not exist it defaults to DEFAULT_NGAP_SET_ID.

Args:
Expand All @@ -417,7 +417,7 @@ def _get_amf_set_id(service_mconfig: object) -> str:
return service_mconfig.amf_set_id or DEFAULT_NGAP_SET_ID


def _get_amf_pointer(service_mconfig: object) -> str:
def _get_amf_pointer(service_mconfig: MME) -> str:
"""Retrieve amf_pointer config value. If it does not exist it defaults to DEFAULT_NGAP_AMF_POINTER.

Args:
Expand Down
4 changes: 0 additions & 4 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,4 @@ install_types = True
non_interactive = True
exclude = (?x)(
^lte/gateway/python/integ_tests/ |
^lte/gateway/python/magma/pipelined/ |
^lte/gateway/python/magma/enodebd/ |
^lte/gateway/python/magma/subscriberdb/protocols/m5g_auth_servicer.py$ |
^lte/gateway/python/scripts/generate_oai_config.py$
)