Skip to content
Permalink
Browse files Browse the repository at this point in the history
tpm: do not compress quote with zlib by default
Pythons zlib decompression has no mitigations against zip bombs or similar
attacks, so we remove the compression for the quote data.

The data itself is rather small compared to the IMA or UEFI event log.

Signed-off-by: Thore Sommer <mail@thson.de>
  • Loading branch information
THS-on authored and mpeters committed Jan 27, 2022
1 parent 796ed49 commit 6e44758
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 14 deletions.
3 changes: 2 additions & 1 deletion keylime/cloud_verifier_common.py
Expand Up @@ -152,7 +152,8 @@ def process_quote_response(agent, json_response, agentAttestState) -> Failure:
algorithms.Hash(hash_alg),
ima_keyrings,
mb_measurement_list,
agent['mb_refstate'])
agent['mb_refstate'],
compressed=(agent['supported_version'] == "1.0")) # TODO: change this to always False after initial update
failure.merge(quote_validation_failure)

if not failure:
Expand Down
4 changes: 3 additions & 1 deletion keylime/tenant.py
Expand Up @@ -545,7 +545,9 @@ def validate_tpm_quote(self, public_key, quote, hash_alg):
logger.warning("AIK not found in registrar, quote not validated")
return False

failure = self.tpm_instance.check_quote(AgentAttestState(self.agent_uuid), self.nonce, public_key, quote, self.registrar_data['aik_tpm'], hash_alg=hash_alg)
failure = self.tpm_instance.check_quote(AgentAttestState(self.agent_uuid), self.nonce, public_key, quote,
self.registrar_data['aik_tpm'], hash_alg=hash_alg,
compressed=(self.supported_version == "1.0"))
if failure:
if self.registrar_data['regcount'] > 1:
logger.error("WARNING: This UUID had more than one ek-ekcert registered to it! This might indicate that your system is misconfigured or a malicious host is present. Run 'regdelete' for this agent and restart")
Expand Down
4 changes: 2 additions & 2 deletions keylime/tpm/tpm_abstract.py
Expand Up @@ -158,11 +158,11 @@ def tpm_init(self, self_activate=False, config_pw=None):

# tpm_quote
@abstractmethod
def create_quote(self, nonce, data=None, pcrmask=EMPTYMASK, hash_alg=None):
def create_quote(self, nonce, data=None, pcrmask=EMPTYMASK, hash_alg=None, compress=False):
pass

@abstractmethod
def check_quote(self, agentAttestState, nonce, data, quote, aikTpmFromRegistrar, tpm_policy={}, ima_measurement_list=None, allowlist={}, hash_alg=None, ima_keyrings=None, mb_measurement_list=None, mb_refstate=None):
def check_quote(self, agentAttestState, nonce, data, quote, aikTpmFromRegistrar, tpm_policy={}, ima_measurement_list=None, allowlist={}, hash_alg=None, ima_keyrings=None, mb_measurement_list=None, mb_refstate=None, compressed=False):
pass

def START_HASH(self, algorithm=None):
Expand Down
32 changes: 22 additions & 10 deletions keylime/tpm/tpm_main.py
Expand Up @@ -959,7 +959,7 @@ def __pcr_mask_to_list(mask):
pcr_list.append(str(pcr))
return ",".join(pcr_list)

def create_quote(self, nonce, data=None, pcrmask=tpm_abstract.AbstractTPM.EMPTYMASK, hash_alg=None):
def create_quote(self, nonce, data=None, pcrmask=tpm_abstract.AbstractTPM.EMPTYMASK, hash_alg=None, compress=False):
if hash_alg is None:
hash_alg = self.defaults['hash']

Expand Down Expand Up @@ -992,11 +992,15 @@ def create_quote(self, nonce, data=None, pcrmask=tpm_abstract.AbstractTPM.EMPTYM
command = ["tpm2_quote", "-c", keyhandle, "-l", "%s:%s" % (hash_alg, pcrlist), "-q", nonce, "-m", quotepath.name, "-s", sigpath.name, "-o", pcrpath.name, "-g", hash_alg, "-p", aik_pw]
retDict = self.__run(command, lock=False, outputpaths=[quotepath.name, sigpath.name, pcrpath.name])
quoteraw = retDict['fileouts'][quotepath.name]
quote_b64encode = base64.b64encode(zlib.compress(quoteraw))
sigraw = retDict['fileouts'][sigpath.name]
sigraw_b64encode = base64.b64encode(zlib.compress(sigraw))
pcrraw = retDict['fileouts'][pcrpath.name]
pcrraw_b64encode = base64.b64encode(zlib.compress(pcrraw))
if compress:
quoteraw = zlib.compress(quoteraw)
sigraw = zlib.compress(sigraw)
pcrraw = zlib.compress(pcrraw)
quote_b64encode = base64.b64encode(quoteraw)
sigraw_b64encode = base64.b64encode(sigraw)
pcrraw_b64encode = base64.b64encode(pcrraw)
quote = quote_b64encode.decode('utf-8') + ":" + sigraw_b64encode.decode('utf-8') + ":" + pcrraw_b64encode.decode('utf-8')

return 'r' + quote
Expand All @@ -1018,12 +1022,13 @@ def __tpm2_checkquote(self, pubaik, nonce, quoteFile, sigFile, pcrFile, hash_alg
retDict = self.__run(command, lock=False)
return retDict

def _tpm2_checkquote(self, aikTpmFromRegistrar, quote, nonce, hash_alg):
def _tpm2_checkquote(self, aikTpmFromRegistrar, quote, nonce, hash_alg, compressed):
"""Write the files from data returned from tpm2_quote for running tpm2_checkquote
:param aikTpmFromRegistrar: AIK used to generate the quote and is needed for verifying it now.
:param quote: quote data in the format 'r<b64-compressed-quoteblob>:<b64-compressed-sigblob>:<b64-compressed-pcrblob>
:param nonce: nonce that was used to create the quote
:param hash_alg: the hash algorithm that was used
:param compressed: if the quote data is compressed with zlib or not
:returns: Returns the 'retout' from running tpm2_checkquote and True in case of success, None and False in case of error.
This function throws an Exception on bad input.
"""
Expand All @@ -1042,9 +1047,16 @@ def _tpm2_checkquote(self, aikTpmFromRegistrar, quote, nonce, hash_alg):
if len(quote_tokens) < 3:
raise Exception("Quote is not compound! %s" % quote)

quoteblob = zlib.decompress(base64.b64decode(quote_tokens[0]))
sigblob = zlib.decompress(base64.b64decode(quote_tokens[1]))
pcrblob = zlib.decompress(base64.b64decode(quote_tokens[2]))
quoteblob = base64.b64decode(quote_tokens[0])
sigblob = base64.b64decode(quote_tokens[1])
pcrblob = base64.b64decode(quote_tokens[2])

if compressed:
logger.warning("Decompressing quote data which is unsafe!")
quoteblob = zlib.decompress(quoteblob)
sigblob = zlib.decompress(sigblob)
pcrblob = zlib.decompress(pcrblob)


qfd = sfd = pfd = afd = -1
quoteFile = None
Expand Down Expand Up @@ -1100,12 +1112,12 @@ def _tpm2_checkquote(self, aikTpmFromRegistrar, quote, nonce, hash_alg):

def check_quote(self, agentAttestState, nonce, data, quote, aikTpmFromRegistrar, tpm_policy={},
ima_measurement_list=None, allowlist={}, hash_alg=None, ima_keyrings=None,
mb_measurement_list=None, mb_refstate=None) -> Failure:
mb_measurement_list=None, mb_refstate=None, compressed=False) -> Failure:
failure = Failure(Component.QUOTE_VALIDATION)
if hash_alg is None:
hash_alg = self.defaults['hash']

retout, success = self._tpm2_checkquote(aikTpmFromRegistrar, quote, nonce, hash_alg)
retout, success = self._tpm2_checkquote(aikTpmFromRegistrar, quote, nonce, hash_alg, compressed)
if not success:
# If the quote validation fails we will skip all other steps therefore this failure is irrecoverable.
failure.add_event("quote_validation", {"message": "Quote validation using tpm2-tools", "data": retout}, False)
Expand Down

0 comments on commit 6e44758

Please sign in to comment.