Skip to content

Commit

Permalink
mba: making MBA policy parser and checker pluggable
Browse files Browse the repository at this point in the history
Signed-off-by: George Almasi <gheorghe@us.ibm.com>
  • Loading branch information
George Almasi authored and maugustosilva committed Jul 13, 2023
1 parent e76a4a7 commit c99103b
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 115 deletions.
2 changes: 1 addition & 1 deletion keylime/cli/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def process_policy(args: ArgsType) -> Tuple[Dict[str, Any], Optional[str], str,
if isinstance(args["mb_refstate"], str):
if args["mb_refstate"] == "default":
args["mb_refstate"] = config.get("tenant", "mb_refstate")
mb_refstate_data = mba.load_policy_file(args["mb_refstate"])
mb_refstate_data = mba.policy_load(args["mb_refstate"])
else:
raise UserError("Invalid measured boot reference state (intended state) provided")

Expand Down
4 changes: 2 additions & 2 deletions keylime/cloud_verifier_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from keylime.failure import Component, Event, Failure
from keylime.ima import file_signatures
from keylime.ima.types import RuntimePolicyType
from keylime.mba import mba
from keylime.tpm import tpm_util
from keylime.tpm.tpm_main import Tpm

Expand Down Expand Up @@ -259,8 +260,7 @@ def prepare_get_quote(agent: Dict[str, Any]) -> Dict[str, Union[str, int]]:
def process_get_status(agent: VerfierMain) -> Dict[str, Any]:
has_mb_refstate = 0
try:
mb_refstate = json.loads(cast(str, agent.mb_refstate))
if mb_refstate and mb_refstate.keys():
if mba.policy_is_valid(cast(str, agent.mb_refstate)):
has_mb_refstate = 1
except Exception as e:
logger.warning(
Expand Down
6 changes: 2 additions & 4 deletions keylime/cmd/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ def main() -> None:
if config.has_option("verifier", "auto_migrate_db") and config.getboolean("verifier", "auto_migrate_db"):
apply("cloud_verifier")

# Load explicitly the policy modules into Keylime for the verifier,
# so that they are not loaded accidentally from other components
mba.load_policy_engine()
mba.load_parser_engine()
# Explicitly load and initialize measured boot components
mba.load_imports()
cloud_verifier_tornado.main()


Expand Down
3 changes: 1 addition & 2 deletions keylime/da/attest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ def main() -> None:

args = parser.parse_args()

mba.load_policy_engine()
mba.load_parser_engine()
mba.load_imports()

rmcb = config.get("registrar", "durable_attestation_import", fallback="")
rmc = record.get_record_mgt_class(rmcb)
Expand Down
3 changes: 2 additions & 1 deletion keylime/mba/elchecking/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import json
import sys

from keylime.mba import mba
from keylime.mba.elparsing import tpm2_tools_elparser

from . import policies

policies.load_policies()
# This main module is just for command-line based testing.
# It implements a command to do one test.
# Invoke it with `python3 -m $packagename`, for some value of
Expand All @@ -18,6 +18,7 @@
parser.add_argument("refstate_file", type=argparse.FileType("rt"))
parser.add_argument("eventlog_file", type=argparse.FileType("rb"), default=sys.stdin)
args = parser.parse_args()
mba.load_imports()
policy = policies.get_policy(args.policy_name)
if policy is None:
print(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,63 @@

from keylime import config, keylime_logging
from keylime.failure import Component, Failure
from keylime.mba.elchecking import tests
from keylime.mba.elchecking import policies, tests

logger = keylime_logging.init_logging("measured_boot")


def evaluate_bootlog(
def policy_load(policy_path: Optional[str] = None) -> str:
"""
Load (and validates) an actual policy file.
:param policy_path: <optional> name of policy file to load
:returns: a string defining the policy.
Errors: if the policy file cannot be read, or contains errors, this function may
cause exceptions. Validation in this case is to confirm that the file
is formatted as proper JSON; nothing more.
TODO: default policy should probably not be defined, because it depends on the policy engine itself?
"""
try:
if policy_path is None:
policy_path = config.get("tenant", "mb_refstate")
with open(policy_path, encoding="utf-8") as f:
mb_policy_data = json.load(f)
return json.dumps(mb_policy_data)
except Exception as e:
raise ValueError from e


def policy_is_valid(mb_refstate: Optional[str]) -> bool:
"""
Returns true if the policy argument is a valid nonempty policy
"""
if not mb_refstate:
return False
try:
mb_refstate_obj = json.loads(mb_refstate)
except Exception as _:
return False
if not mb_refstate_obj:
return False
if len(mb_refstate_obj) == 0:
return False
return True


def bootlog_evaluate(
mb_refstate_str: Optional[str],
mb_measurement_data: tests.Data,
pcrs_inquote: Set[int],
agent_id: str,
) -> Failure:
"""
Evaluating a measured boot event log against a policy
:param policy_data: policy definition (aka "refstate") (as a string).
:param measurement_data: parsed measured boot event log as produced by `parse_bootlog`
:param pcrsInQuote: a set of PCRs provided by the quote.
:param agent_id: the UUID of the keylime agent sending this data.
:returns: list of all failures encountered while evaluating the boot log against the policy.
"""
failure = Failure(Component.MEASURED_BOOT)

# no evaluation if the refstate is an empty string
Expand All @@ -29,18 +75,15 @@ def evaluate_bootlog(
# load policy name
mb_policy_name = config.get("verifier", "measured_boot_policy_name", fallback="accept-all")

# pylint: disable=import-outside-toplevel
from keylime.mba.elchecking import policies as eventlog_policies

# pylint: enable=import-outside-toplevel
mb_policy = eventlog_policies.get_policy(mb_policy_name)
mb_policy = policies.get_policy(mb_policy_name)

# fallback if we cannot find policy
# Should not happen in the verifier because we check on startup if the policy exists
if mb_policy is None:
logger.warning("Invalid measured boot policy name %s -- using reject-all instead.", mb_policy_name)
mb_policy_name = "reject-all"
mb_policy = eventlog_policies.RejectAll()
mb_policy = policies.RejectAll()

# figure out whether the quote contains all quotes to evaluate the policy
# if there are any PCRs in the policy that are not in the quote, we canot evaluate.
Expand Down Expand Up @@ -76,3 +119,6 @@ def evaluate_bootlog(
True,
)
return failure


logger.debug("mba.elchecking.elchecker: policy names = %s", str(policies.get_policy_names()))
12 changes: 0 additions & 12 deletions keylime/mba/elchecking/policies.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import abc
import importlib
import typing

from keylime import config

from . import tests

# This module defines Policy for testing measured boot logs.
Expand Down Expand Up @@ -101,12 +98,3 @@ def evaluate(policy_name: str, refstate: RefState, eventlog: tests.Data) -> str:
"""
tester = refstate_to_test(policy_name, refstate)
return tester.why_not({}, eventlog)


def load_policies() -> None:
imports = config.getlist("verifier", "measured_boot_imports")
imports.append(".example")
if imports:
for imp in imports:
if imp:
importlib.import_module(imp, __package__)
25 changes: 12 additions & 13 deletions keylime/mba/elparsing/tpm2_tools_elparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from packaging.version import Version

from keylime import cmd_exec, config, keylime_logging
from keylime.common import algorithms
from keylime.failure import Component, Failure

if typing.TYPE_CHECKING:
Expand All @@ -18,10 +17,11 @@
logger = keylime_logging.init_logging("elparsing")


def parse_bootlog(
mb_measurement_list: Optional[str], hash_alg: algorithms.Hash
def bootlog_parse(
mb_measurement_list: Optional[str], hash_alg: str
) -> typing.Tuple["MBPCRDict", "MBAgg", "MBLog", Failure]:
"""Parse the measured boot log and return its object and the state of the PCRs
"""
Parse the measured boot log and return its object and the state of the PCRs
:param mb_measurement_list: The measured boot measurement list
:param hash_alg: the hash algorithm that should be used for the PCRs
:returns: Returns a map of the state of the PCRs, measured boot data object and True for success
Expand All @@ -39,11 +39,9 @@ def parse_bootlog(
logger.error("Parse of measured boot event log has unexpected value for .pcrs: %r", log_pcrs)
failure.add_event("invalid_pcrs", {"got": log_pcrs}, True)
return {}, None, {}, failure
pcr_hashes = log_pcrs.get(str(hash_alg))
pcr_hashes = log_pcrs.get(hash_alg)
if (not isinstance(pcr_hashes, dict)) or not pcr_hashes:
logger.error(
"Parse of measured boot event log has unexpected value for .pcrs.%s: %r", str(hash_alg), pcr_hashes
)
logger.error("Parse of measured boot event log has unexpected value for .pcrs.%s: %r", hash_alg, pcr_hashes)
failure.add_event("invalid_pcrs_hashes", {"got": pcr_hashes}, True)
return {}, None, {}, failure
boot_aggregates = mb_measurement_data.get("boot_aggregates")
Expand Down Expand Up @@ -245,28 +243,29 @@ def tpm2_tools_getversion() -> str:
and len(tools_version) > 1
and int(tools_version[1]) >= 24
):
logger.info("TPM2-TOOLS Version: %s", tools_version[0])
_tpm2_tools_version = "5.4"
return _tpm2_tools_version
if Version(tools_version[0]) == Version("5.2"):
# GA, MaS: experimentally found that version 5.2 of the tpm2_eventlog package produces output with
# zero-terminated strings on both centos 9 and Ubuntu 22.04.
# Adding a separate category for tpm2_tools_version so we can control whether strings are unescaped properly.
logger.info("TPM2-TOOLS Version: %s", tools_version[0])
_tpm2_tools_version = "5.2"
return _tpm2_tools_version
if Version(tools_version[0]) >= Version("4.2"):
logger.info("TPM2-TOOLS Version: %s", tools_version[0])
_tpm2_tools_version = "4.2"
return _tpm2_tools_version
if Version(tools_version[0]) >= Version("4.0.0"):
logger.info("TPM2-TOOLS Version: %s", tools_version[0])
_tpm2_tools_version = "4.0"
return _tpm2_tools_version
if Version(tools_version[0]) >= Version("3.2.0"):
logger.info("TPM2-TOOLS Version: %s", tools_version[0])
_tpm2_tools_version = "3.2"
return _tpm2_tools_version
logger.error("TPM2-TOOLS Version %s is not supported.", tools_version[0])
_tpm2_tools_version = "unknown"
return _tpm2_tools_version


toolversion = tpm2_tools_getversion()
if toolversion == "unknown":
raise ValueError("TPM2-TOOLS: version cannot be determined or unsupported")
logger.debug("mba.elparser.tpm2_tools_elparser: TPM2-TOOLS %s detected.", toolversion)
Loading

0 comments on commit c99103b

Please sign in to comment.