Skip to content

Commit

Permalink
Reproduce Kyber issue found by PQShield
Browse files Browse the repository at this point in the history
Co-Authored-By: Fabian Albert <fabian.albert@rohde-schwarz.com>
  • Loading branch information
reneme and FAlbertDev committed Jul 5, 2024
1 parent 9438217 commit 487eb50
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 16 deletions.
91 changes: 85 additions & 6 deletions src/valgrind_selftest/valgrind_selftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#if defined(BOTAN_HAS_VALGRIND)

#include <botan/auto_rng.h>
#include <botan/hex.h>
#include <botan/internal/ct_utils.h>
#include <botan/internal/fmt.h>

Expand Down Expand Up @@ -140,6 +141,57 @@ void test_conditional_branch_on_poisoned_bit() {
}
}

void test_clang_conditional_jump_on_bare_metal_ct_mask() {
Botan::AutoSeeded_RNG rng;
const auto poisoned_byte = rng.next_byte();
Botan::CT::poison(&poisoned_byte, sizeof(poisoned_byte));

std::array<uint8_t, 16> output_bytes;
std::memset(output_bytes.data(), 0x42, sizeof(output_bytes));

// This mimicks what went wrong in Kyber's secret message expansion
// that was found by PQShield in Kyber's reference implementation and
// was fixed in https://github.com/randombit/botan/pull/4107.
//
// Certain versions of Clang, namely 15, 16, 17 and 18 (maybe more) with
// specific optimization flags (-Os, -O1, -O2 -fno-vectorize, ...) do
// realize that `poisoned_mask` can only ever be all-zero or all-one and
// conditionally jump over the loop execution below.
//
// See: https://pqshield.com/pqshield-plugs-timing-leaks-in-kyber-ml-kem-to-improve-pqc-implementation-maturity/
const uint8_t poisoned_mask = -static_cast<uint8_t>(poisoned_byte & 1);
for(size_t i = 0; i < sizeof(output_bytes); ++i) {
output_bytes[i] &= poisoned_mask;
}

// Unpoison output_bytes to safely print them. The actual side channel
// happened above.
Botan::CT::unpoison(output_bytes.data(), output_bytes.size());
std::cout << Botan::hex_encode(output_bytes) << std::endl;
}

// Similar to test_clang_conditional_jump_on_bare_metal_ct_mask() but with Botan's CT::Masks.
// Should not fail, because CT::Mask uses CT::value_barrier().
void regression_test_conditional_jump_in_ct_mask() {
Botan::AutoSeeded_RNG rng;
const auto poisoned_byte = rng.next_byte();
Botan::CT::poison(&poisoned_byte, sizeof(poisoned_byte));

std::array<uint8_t, 16> output_bytes;
std::memset(output_bytes.data(), 0x42, sizeof(output_bytes));

// Before the introduction of CT::value_barrier, this did generate a
// conditional jump when compiled with clang using certain compiler
// optimizations. See the test case above for further details.
auto poisoned_mask = Botan::CT::Mask<uint8_t>::expand(poisoned_byte & 1);
for(size_t i = 0; i < sizeof(output_bytes); ++i) {
output_bytes[i] = poisoned_mask.select(output_bytes[i], 0);
}

Botan::CT::unpoison(output_bytes.data(), output_bytes.size());
std::cout << Botan::hex_encode(output_bytes) << std::endl;
}

struct Test {
bool expect_failure;
std::function<void()> test;
Expand All @@ -150,19 +202,35 @@ constexpr bool SHOULD_SUCCEED = false;

void print_help(std::string_view path) {
std::cerr << "Usage: valgrind [...] " << path << " [testname]" << std::endl;
std::cerr << "Usage: " << path << " [--list|--help]" << std::endl;
std::cerr << "Usage: " << path << " [--list|--list-special|--help]" << std::endl;
std::cerr << "By design, this can only run one test at a time."
<< "Some tests are expected to cause valgrind warnings about misusing uninitialized values." << std::endl;
}

void list_tests(const std::map<std::string, Test>& tests) {
std::cout << "fail?\ttest name\n\n";
for(const auto& [name, test_info] : tests) {
std::cout << Botan::fmt("{}\t{}\n", test_info.expect_failure ? "true" : "false", name);
}
}

template <typename MapT>
auto merge_maps(const MapT& a, const MapT& b) {
MapT result = a;
result.insert(b.begin(), b.end());
return result;
}

bool running_on_valgrind() {
return RUNNING_ON_VALGRIND;
}

} // namespace

int main(int argc, char* argv[]) {
std::map<std::string, Test> available_tests = {
// Those tests should run as expected on any compiler and any set of
// optimization flags.
std::map<std::string, Test> generic_tests = {
{"poisoned_conditional_jump", {SHOULD_FAIL, test_conditional_jump_on_poisoned_data}},
{"poisoned_memory_lookup", {SHOULD_FAIL, test_array_access_on_poisoned_data}},
{"transitive_poisoned_conditional_jump", {SHOULD_FAIL, test_conditional_jump_on_transitively_poisoned_data}},
Expand All @@ -171,8 +239,17 @@ int main(int argc, char* argv[]) {
{"poisoned_and_cleared", {SHOULD_SUCCEED, test_poisoned_and_cleared_data}},
{"conditional_jump_on_unpoisoned_bit", {SHOULD_SUCCEED, test_conditional_branch_on_unpoisoned_bit}},
{"conditional_jump_on_poisoned_bit", {SHOULD_FAIL, test_conditional_branch_on_poisoned_bit}},
{"regression_test_clang_vs_ct_mask", {SHOULD_SUCCEED, regression_test_conditional_jump_in_ct_mask}},
};

// These tests require special conditions to run. E.g. certain compiler
// versions and optimization flags.
std::map<std::string, Test> special_case_tests = {
{"clang_vs_bare_metal_ct_mask", {SHOULD_FAIL, test_clang_conditional_jump_on_bare_metal_ct_mask}},
};

auto available_tests = merge_maps(generic_tests, special_case_tests);

if(argc != 2) {
print_help(argv[0]);
return 1;
Expand All @@ -185,10 +262,12 @@ int main(int argc, char* argv[]) {
}

if(argument == "--list") {
std::cout << "fail?\ttest name\n\n";
for(const auto& [name, test_info] : available_tests) {
std::cout << Botan::fmt("{}\t{}\n", test_info.expect_failure ? "true" : "false", name);
}
list_tests(generic_tests);
return 0;
}

if(argument == "--list-special") {
list_tests(special_case_tests);
return 0;
}

Expand Down
48 changes: 38 additions & 10 deletions src/valgrind_selftest/valgrind_selftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import sys
from typing import Self
from enum import StrEnum, auto
import json

def run_command(cmd: list[str], is_text = True):
""" Run the command . """
Expand Down Expand Up @@ -45,15 +46,19 @@ class Status(StrEnum):
ERROR = auto()
UNKNOWN = auto()

def __init__(self, line: str):
info = line.split("\t")
assert len(info) == 2
self.name = info[1]
self.expect_failure = info[0] == "true"
def __init__(self, name, expect_failure):
self.name = name
self.expect_failure = expect_failure
self.status = self.Status.UNKNOWN
self.stdout = None
self.stderr = None

@staticmethod
def from_line(line: str):
info = line.split("\t")
assert len(info) == 2
return ValgrindTest(info[1], info[0] == "true")

def run(self, exe_path: str):
""" Run the test and return whether it succeeded """

Expand All @@ -79,25 +84,48 @@ def run(self, exe_path: str):
def read_test_list(valgrind_selftest_test_list: str) -> list[Self]:
""" Read the list of tests from the output of `valgrind_selftest --list`. """

return [ValgrindTest(line) for line in valgrind_selftest_test_list.split("\n")[2:] if line]
return [ValgrindTest.from_line(line) for line in valgrind_selftest_test_list.split("\n")[2:] if line]


def main(): # pylint: disable=missing-function-docstring
parser = argparse.ArgumentParser("valgrind_selftests")
parser.add_argument("valgrind_selftest_path", help="Path to the valgrind_selftest executable")
parser.add_argument("--filter", help="Regex filter for test names", default=".*")
parser.add_argument("--build-config-path", help="Path to Botan's build-config.json file", default="")

args = parser.parse_args()

# Check if the path is valid
if not os.path.isfile(args.valgrind_selftest_path):
raise FileNotFoundError(f"Invalid path: {args.valgrind_selftest_path}")

test_list = run_command([args.valgrind_selftest_path, "--list"])
if test_list.returncode != 0:
raise RuntimeError("Failed to collect the test list from valgrind_selftest")
test_list = ValgrindTest.read_test_list(test_list.stdout)
def find_test_list():
test_list_args = [args.valgrind_selftest_path, "--list"]

test_list_result = run_command(test_list_args)
if test_list_result.returncode != 0:
raise RuntimeError("Failed to collect the test list from valgrind_selftest")
test_list = ValgrindTest.read_test_list(test_list_result.stdout)

if args.build_config_path:
if not os.path.isfile(args.build_config_path):
raise FileNotFoundError(f"Invalid path: {args.build_config_path}")

# Figure out what build configuration we're running under, to decide
# whether or not to run certain special-case valgrind tests.
with open(args.build_config_path, encoding="utf-8") as build_info_file:
build_info = json.load(build_info_file)
cc_compile_flags = build_info["cc_compile_flags"]
cc_macro = build_info["cc_macro"]

# This is certainly not an exhaustive list of compiler configurations
# that could be problematic for this specific test
if cc_macro == "CLANG" and "-Os" in cc_compile_flags:
test_list.append(ValgrindTest("clang_vs_bare_metal_ct_mask", True))

return test_list

test_list = find_test_list()
filtered_test_list = [test for test in test_list if re.search(args.filter, test.name)]
for test in filtered_test_list:
print(f"running {test.name}... ", end="", flush=True)
Expand Down

0 comments on commit 487eb50

Please sign in to comment.