Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple configuration files #829

Merged
merged 1 commit into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion keylime/cloud_verifier_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def process_get_status(agent):

def notify_error(agent, msgtype='revocation', event=None):
send_mq = config.getboolean('cloud_verifier', 'revocation_notifier')
send_webhook = config.getboolean('cloud_verifier', 'revocation_notifier_webhook', False)
send_webhook = config.getboolean('cloud_verifier', 'revocation_notifier_webhook', fallback=False)
if not (send_mq or send_webhook):
return

Expand Down
8 changes: 4 additions & 4 deletions keylime/cloud_verifier_tornado.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def delete(self):
logger.info('DELETE returning 404 response. agent id: %s not found.', agent_id)
return

verifier_id = config.get('cloud_verifier', 'cloudverifier_id', cloud_verifier_common.DEFAULT_VERIFIER_ID)
verifier_id = config.get('cloud_verifier', 'cloudverifier_id', fallback=cloud_verifier_common.DEFAULT_VERIFIER_ID)
if verifier_id != agent.verifier_id:
config.echo_json_response(self, 404, "agent id associated to this verifier")
logger.info('DELETE returning 404 response. agent id: %s not associated to this verifer.', agent_id)
Expand Down Expand Up @@ -416,7 +416,7 @@ def post(self):
agent_data['pcr10'] = None
agent_data['next_ima_ml_entry'] = 0
agent_data['learned_ima_keyrings'] = {}
agent_data['verifier_id'] = config.get('cloud_verifier', 'cloudverifier_id', cloud_verifier_common.DEFAULT_VERIFIER_ID)
agent_data['verifier_id'] = config.get('cloud_verifier', 'cloudverifier_id', fallback=cloud_verifier_common.DEFAULT_VERIFIER_ID)
agent_data['verifier_ip'] = config.get('cloud_verifier', 'cloudverifier_ip')
agent_data['verifier_port'] = config.get('cloud_verifier', 'cloudverifier_port')

Expand Down Expand Up @@ -491,7 +491,7 @@ def put(self):
config.echo_json_response(self, 400, "uri not supported")
logger.warning("PUT returning 400 response. uri not supported")
try:
verifier_id = config.get('cloud_verifier', 'cloudverifier_id', cloud_verifier_common.DEFAULT_VERIFIER_ID)
verifier_id = config.get('cloud_verifier', 'cloudverifier_id', fallback=cloud_verifier_common.DEFAULT_VERIFIER_ID)
agent = session.query(VerfierMain).filter_by(
agent_id=agent_id, verifier_id=verifier_id).one()
except SQLAlchemyError as e:
Expand Down Expand Up @@ -970,7 +970,7 @@ def main():

cloudverifier_port = config.get('cloud_verifier', 'cloudverifier_port')
cloudverifier_host = config.get('cloud_verifier', 'cloudverifier_ip')
cloudverifier_id = config.get('cloud_verifier', 'cloudverifier_id', cloud_verifier_common.DEFAULT_VERIFIER_ID)
cloudverifier_id = config.get('cloud_verifier', 'cloudverifier_id', fallback=cloud_verifier_common.DEFAULT_VERIFIER_ID)

# allow tornado's max upload size to be configurable
max_upload_size = None
Expand Down
93 changes: 31 additions & 62 deletions keylime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import os
import os.path
import configparser
import sys
import urllib.parse
import re
from http.server import BaseHTTPRequestHandler
Expand Down Expand Up @@ -115,69 +114,39 @@ def environ_bool(env_name, default):
print("WARNING: running without root access")


CONFIG_FILE = os.getenv('KEYLIME_CONFIG', '/etc/keylime.conf')


WARN = False
if not os.path.exists(CONFIG_FILE):
# try to locate the config file next to the script if bundled
if getattr(sys, 'frozen', False):
CONFIG_FILE = os.path.dirname(
os.path.abspath(sys.executable)) + "/keylime.conf"
else:
# instead try to get config file from python data_files install
CONFIG_FILE = os.path.dirname(os.path.abspath(
__file__)) + "/../package_default/keylime.conf"
WARN = True

if not os.path.exists(CONFIG_FILE):
raise Exception(f"{CONFIG_FILE} does not exist. Please set environment"
f"variable KEYLIME_CONFIG or see {__file__} for more"
f"details")
print(f"Using config file {CONFIG_FILE}")
if WARN:
print("WARNING: Keylime is using the config file from its installation location. \n\tWe recommend you copy keylime.conf to /etc/ to customize it.")


_CURRENT_CONFIG = None
# Config files can be merged together, reading from the system to the
# user.
CONFIG_FILES = [
os.path.expanduser("~/.config/keylime.conf"), "/etc/keylime.conf", "/usr/etc/keylime.conf"
]
if "KEYLIME_CONFIG" in os.environ:
CONFIG_FILES.insert(0, os.environ["KEYLIME_CONFIG"])


def get_config():
global _CURRENT_CONFIG
if _CURRENT_CONFIG is None:
# read the config file
_CURRENT_CONFIG = configparser.ConfigParser()
_CURRENT_CONFIG.read(CONFIG_FILE)
return _CURRENT_CONFIG


def get(section, option, fallback=None):
if fallback is not None:
return get_config().get(section, option, fallback=fallback)
return get_config().get(section, option)


def getint(section, option, fallback=None):
if fallback is not None:
return get_config().get(section, option, fallback=fallback)
return get_config().getint(section, option)


def getboolean(section, option, fallback=None):
if fallback is not None:
return get_config().getboolean(section, option, fallback=fallback)
return get_config().getboolean(section, option)


def getfloat(section, option, fallback=None):
if fallback is not None:
return get_config().get(section, option, fallback=fallback)
return get_config().getfloat(section, option)


def has_option(section, option):
return get_config().has_option(section, option)

"""Read configuration files and merge them together."""
if not getattr(get_config, "config", None):
# TODO - use logger and be sure that all variables have a
# propper default, and the sections are initialized
if not any(os.path.exists(c) for c in CONFIG_FILES):
print(f"Config file not found in {CONFIG_FILES}. Please set "
f"environment variable KEYLIME_CONFIG or see {__file__} "
"for more details")

# Validate that at least one config file is present
get_config.config = configparser.ConfigParser()
config_files = get_config.config.read(CONFIG_FILES)
# TODO - use the logger
print(f"Reading configuration from {config_files}")
return get_config.config


# Re-export some utility functions
get = get_config().get
getint = get_config().getint
getboolean = get_config().getboolean
getfloat = get_config().getfloat
has_option = get_config().has_option

if not REQUIRE_ROOT:
DEFAULT_WORK_DIR = os.path.abspath(".")
Expand Down Expand Up @@ -332,6 +301,6 @@ def valid_hex(value: str):
BOOTSTRAP_KEY_SIZE = 32

# choose between cfssl or openssl for creating CA certificates
CA_IMPL = get_config().get('general', 'ca_implementation')
CA_IMPL = get_config().get('general', 'ca_implementation', fallback='openssl')

CRL_PORT = 38080
2 changes: 1 addition & 1 deletion keylime/ima_ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def invalid(self):
if self.ima_template_hash == get_FF_HASH(self._ima_hash_alg):
logger.warning("Skipped template_hash validation entry with FF_HASH")
# By default ToMToU errors are not treated as a failure
if config.getboolean("cloud_verifier", "tomtou_errors", False):
if config.getboolean("cloud_verifier", "tomtou_errors", fallback=False):
failure.add_event("tomtou", "hash validation was skipped", True)
return failure
if self.ima_template_hash != self._ima_hash_alg.hash(self._bytes):
Expand Down
4 changes: 3 additions & 1 deletion keylime/keylime_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
else:
LOGSTREAM = LOGDIR + '/keylime-stream.log'

logging.config.fileConfig(config.CONFIG_FILE)
for config_file in reversed(config.CONFIG_FILES):
if os.path.exists(config_file):
logging.config.fileConfig(config_file)


def set_log_func(loglevel, logger):
Expand Down
2 changes: 1 addition & 1 deletion keylime/revocation_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def worker(tosend):


def notify_webhook(tosend):
url = config.get('cloud_verifier', 'webhook_url', '')
url = config.get('cloud_verifier', 'webhook_url', fallback='')
# Check if a url was specified
if url == '':
return
Expand Down
3 changes: 3 additions & 0 deletions test/data/config/keylime-1.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[default]

attribute_1 = value_1
3 changes: 3 additions & 0 deletions test/data/config/keylime-2.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[default]

attribute_2= value_2
4 changes: 4 additions & 0 deletions test/data/config/keylime-3.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[default]

attribute_1 = value_1_3
attribute_3 = value_3
82 changes: 82 additions & 0 deletions test/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import importlib
import os
import unittest

from keylime import config

CONFIG_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "data/config"))


class TestConfig(unittest.TestCase):
def setUp(self):
"""The config module should be reloaded."""
# Because we can alter global state, we should reload the
# config module before every test
importlib.reload(config)
# Remove a side effect of some variables pre-loading the
# configuration files
del config.get_config.config

def test_default_config_files(self):
"""Test default config file list."""
self.assertEqual(
config.CONFIG_FILES,
[
os.path.expanduser("~/.config/keylime.conf"),
"/etc/keylime.conf",
"/usr/etc/keylime.conf",
],
)

def test_no_config(self):
"""Test that no config files is an empty set."""
c = config.get_config()
self.assertEqual(c.get("default", "value_1", fallback=None), None)

def test_single_config(self):
"""Test reading a single config file."""
config.CONFIG_FILES = [os.path.join(CONFIG_DIR, "keylime-1.conf")]
c = config.get_config()
self.assertEqual(c.get("default", "attribute_1"), "value_1")

def test_multiple_config(self):
"""Test reading multiple config files."""
config.CONFIG_FILES = [
os.path.join(CONFIG_DIR, "keylime-1.conf"),
os.path.join(CONFIG_DIR, "keylime-2.conf"),
]
c = config.get_config()
self.assertEqual(c.get("default", "attribute_1"), "value_1")
self.assertEqual(c.get("default", "attribute_2"), "value_2")

def test_merge_config(self):
"""Test reading multiple config files and merging them."""
config.CONFIG_FILES = [
os.path.join(CONFIG_DIR, "keylime-1.conf"),
os.path.join(CONFIG_DIR, "keylime-2.conf"),
os.path.join(CONFIG_DIR, "keylime-3.conf"),
]
c = config.get_config()
self.assertEqual(c.get("default", "attribute_1"), "value_1_3")
self.assertEqual(c.get("default", "attribute_2"), "value_2")
self.assertEqual(c.get("default", "attribute_3"), "value_3")

def test_cache_config(self):
"""Test the config is properly cached between calls."""
c = config.get_config()
self.assertEqual(c.get("default", "attribute", fallback=None), None)

c.add_section("default")
c.set("default", "attribute", "value")
self.assertEqual(c.get("default", "attribute"), "value")

c_copy = config.get_config()
self.assertEqual(c_copy.get("default", "attribute"), "value")

def test_reexport_function(self):
"""Test re-exported functions to access data."""
self.assertEqual(config.get("default", "attribute", fallback=None), None)


if __name__ == "__main__":
unittest.main()