Skip to content

Commit

Permalink
ima emulator: use IMA AST and support multiple hash algorithms
Browse files Browse the repository at this point in the history
Signed-off-by: Thore Sommer <mail@thson.de>
  • Loading branch information
THS-on committed Dec 3, 2021
1 parent 335cb6c commit a312c80
Showing 1 changed file with 48 additions and 60 deletions.
108 changes: 48 additions & 60 deletions keylime/cmd/ima_emulator_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,103 +4,91 @@
Copyright 2017 Massachusetts Institute of Technology.
'''

import hashlib
import codecs
import sys
import select
import time
import itertools
import argparse

from keylime.tpm.tpm_main import tpm
from keylime.tpm.tpm_abstract import config
from keylime.common import algorithms
from keylime import ima_ast

# Instaniate tpm
tpm_instance = tpm(need_hw_tpm=True)

START_HASH = '0000000000000000000000000000000000000000'
FF_HASH = 'ffffffffffffffffffffffffffffffffffffffff'


def ml_extend(ml, position, searchHash=None):
f = open(ml, encoding="utf-8")
def measure_list(file_path, position, hash_alg, search_val=None):
f = open(file_path, encoding="utf-8")
lines = itertools.islice(f, position, None)

runninghash = ima_ast.get_START_HASH(hash_alg)

if search_val is not None:
search_val = codecs.decode(search_val.encode('utf-8'), 'hex')

for line in lines:
line = line.strip()
tokens = line.split()

if line == '':
continue
if len(tokens) < 5:
print("ERROR: invalid measurement list file line: -%s-" % (line))
return position
position += 1

# get the filename roughly
path = str(line[line.rfind(tokens[3]) + len(tokens[3]) + 1:])
template_hash = tokens[1]
entry = ima_ast.Entry(line, None, ima_hash_alg=hash_alg, pcr_hash_alg=hash_alg)

# this is some IMA weirdness
if template_hash == START_HASH:
template_hash = FF_HASH

if searchHash is None:
print("extending hash %s for %s" % (template_hash, path))
# TODO: Add support for other hash algorithms
tpm_instance.extendPCR(config.IMA_PCR, template_hash, algorithms.Hash.SHA1)
if search_val is None:
val = codecs.encode(entry.pcr_template_hash, 'hex').decode("utf8")
tpm_instance.extendPCR(config.IMA_PCR, val, hash_alg)
else:
# Let's only encode if its not a byte
try:
runninghash = START_HASH.encode('utf-8')
except AttributeError:
pass
# Let's only encode if its not a byte
try:
template_hash = template_hash.encode('utf-8')
except AttributeError:
pass

runninghash = hashlib.sha1(runninghash + template_hash).digest()

if runninghash == searchHash:
print("Located last IMA file updated: %s" % (path))
runninghash = hash_alg.hash(runninghash + entry.pcr_template_hash)
if runninghash == search_val:
return position

if searchHash is not None:
raise Exception(
"Unable to find current measurement list position, Resetting the TPM emulator may be neccesary")
if search_val is not None:
raise Exception("Unable to find current measurement list position, Resetting the TPM emulator may be neccesary")

return position


def main():
def main(argv):
parser = argparse.ArgumentParser()
parser.add_argument('-a', '--hash_algs', nargs='*', default=["sha1"], help='PCR banks hash algorithms')
args = parser.parse_args(argv[1:])

if not tpm_instance.is_emulator():
raise Exception("This stub should only be used with a TPM emulator")

# initialize position in ML
pos = 0
position = {}
for hash_alg in args.hash_algs:
hash_alg = algorithms.Hash(hash_alg)
position[hash_alg] = 0

# check if pcr is clean
pcrval = tpm_instance.readPCR(config.IMA_PCR, algorithms.Hash.SHA1)
if pcrval != START_HASH:
print("Warning: IMA PCR is not empty, trying to find the last updated file in the measurement list...")
pos = ml_extend(config.IMA_ML, 0, pcrval)
for hash_alg in position.keys():
print(hash)
pcr_val = tpm_instance.readPCR(config.IMA_PCR, hash_alg)
if codecs.decode(pcr_val.encode('utf-8'), 'hex') != ima_ast.get_START_HASH(hash_alg):
print(f"Warning: IMA PCR is not empty for hash algorithm {hash_alg}, "
"trying to find the last updated file in the measurement list...")
position[hash_alg] = measure_list(config.IMA_ML, position[hash_alg], hash_alg, pcr_val)

print("Monitoring %s" % (config.IMA_ML))
print(f"Monitoring {config.IMA_ML}")
poll_object = select.poll()
fd_object = open(config.IMA_ML, encoding="utf-8")
number = fd_object.fileno()
poll_object.register(fd_object, select.POLLIN | select.POLLPRI)

while True:
results = poll_object.poll()
for result in results:
if result[0] != number:
continue
pos = ml_extend(config.IMA_ML, pos)
time.sleep(0.2)
sys.exit(1)
try:
while True:
results = poll_object.poll()
for result in results:
if result[0] != number:
continue
for hash_alg, pos in position.items():
position[hash_alg] = measure_list(config.IMA_ML, pos, hash_alg)

time.sleep(0.2)
except (SystemExit, KeyboardInterrupt):
sys.exit(1)


if __name__ == '__main__':
main()
main(sys.argv)

0 comments on commit a312c80

Please sign in to comment.