Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 111 additions & 4 deletions misc/scripts/models-as-data/bulk_generate_mad.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
Experimental script for bulk generation of MaD models based on a list of projects.

Note: This file must be formatted using the Black Python formatter.

Security Requirements:
- All subprocess.check_call invocations MUST use shell=False to prevent shell injection
- Language parameter MUST be validated against VALID_LANGUAGES before use
- Extractor options MUST be validated to reject shell metacharacters
- These requirements are enforced by validate_language() and validate_extractor_options()

References:
- https://docs.python.org/3/library/subprocess.html#security-considerations
- https://owasp.org/www-community/attacks/Command_Injection
"""

import pathlib
Expand Down Expand Up @@ -43,6 +53,64 @@ def missing_module(module_name: str) -> None:
)
build_dir = pathlib.Path(gitroot, "mad-generation-build")

# List of valid CodeQL languages to prevent command injection
# Reference: https://docs.python.org/3/library/subprocess.html#security-considerations
VALID_LANGUAGES = {
"cpp",
"csharp",
"go",
"java",
"javascript",
"python",
"ruby",
"rust",
"swift",
}


def validate_language(language: str) -> None:
"""
Validate that the language is from a known safe list.

This is a security measure to prevent command injection attacks.
Reference: https://owasp.org/www-community/attacks/Command_Injection

Args:
language: The language identifier to validate

Raises:
ValueError: If the language is not in the valid list
"""
if language not in VALID_LANGUAGES:
raise ValueError(
f"Invalid language '{language}'. Must be one of: {', '.join(sorted(VALID_LANGUAGES))}"
)


def validate_extractor_options(extractor_options: list) -> None:
"""
Validate that extractor options are safe to use in subprocess calls.

This is a security measure to prevent command injection attacks.
Reference: https://owasp.org/www-community/attacks/Command_Injection

Args:
extractor_options: List of extractor options to validate

Raises:
ValueError: If any option contains potentially dangerous characters
"""
dangerous_chars = set(";&|<>`$(){}[]\\'\"\n*")
for option in extractor_options:
if not isinstance(option, str):
raise ValueError(
f"Extractor option must be a string, got {type(option).__name__}"
)
if any(char in option for char in dangerous_chars):
raise ValueError(
f"Extractor option '{option}' contains potentially dangerous characters"
)


# A project to generate models for
Project = TypedDict(
Expand Down Expand Up @@ -95,6 +163,13 @@ def clone_project(project: Project) -> str:
else:
print(f"Cloning {name} from {repo_url}")

# Security: Always use shell=False (explicit) and pass arguments as a list
# to prevent shell injection attacks. This is critical even though we're
# already passing arguments as a list, as it makes the security posture explicit.
# References:
# - https://docs.python.org/3/library/subprocess.html#security-considerations
# - https://owasp.org/www-community/attacks/Command_Injection
# IMPORTANT: Keep shell=False for all subprocess calls to maintain security.
subprocess.check_call(
[
"git",
Expand All @@ -107,7 +182,8 @@ def clone_project(project: Project) -> str:
), # Add branch if tag is provided
repo_url,
target_dir,
]
],
shell=False,
)
print(f"Completed cloning {name}")
else:
Expand Down Expand Up @@ -190,6 +266,14 @@ def build_database(
"""
name = project["name"]

# Security: Validate language and extractor_options before using in subprocess
# to prevent command injection attacks.
# References:
# - https://docs.python.org/3/library/subprocess.html#security-considerations
# - https://owasp.org/www-community/attacks/Command_Injection
validate_language(language)
validate_extractor_options(extractor_options)

# Create database directory path
database_dir = build_dir / f"{name}-db"

Expand All @@ -198,18 +282,28 @@ def build_database(
print(f"Building CodeQL database for {name}...")
extractor_options = [option for x in extractor_options for option in ("-O", x)]
try:
# Security: Always use shell=False (explicit) and pass arguments as a list
# to prevent shell injection attacks. This is critical even though we're
# already passing arguments as a list, as it makes the security posture explicit.
# References:
# - https://docs.python.org/3/library/subprocess.html#security-considerations
# - https://owasp.org/www-community/attacks/Command_Injection
# IMPORTANT: Keep shell=False for all subprocess calls to maintain security.
subprocess.check_call(
[
"codeql",
"database",
"create",
f"--language={language}",
"--source-root=" + project_dir,
"--language",
language,
"--source-root",
project_dir,
"--overwrite",
*extractor_options,
"--",
database_dir,
]
],
shell=False,
)
print(f"Successfully created database at {database_dir}")
except subprocess.CalledProcessError as e:
Expand Down Expand Up @@ -457,13 +551,26 @@ def main(config, args) -> None:
sys.exit(1)
language = config["language"]

# Security: Validate language early to prevent command injection
try:
validate_language(language)
except ValueError as e:
print(f"ERROR: {e}")
sys.exit(1)

# Create build directory if it doesn't exist
build_dir.mkdir(parents=True, exist_ok=True)

database_results = []
match get_strategy(config):
case "repo":
extractor_options = config.get("extractor_options", [])
# Security: Validate extractor_options early to prevent command injection
try:
validate_extractor_options(extractor_options)
except ValueError as e:
print(f"ERROR: {e}")
sys.exit(1)
database_results = build_databases_from_projects(
language,
extractor_options,
Expand Down
136 changes: 136 additions & 0 deletions misc/scripts/models-as-data/test_bulk_generate_mad_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#!/usr/bin/env python3
"""
Security tests for bulk_generate_mad.py to validate subprocess security improvements.

These tests ensure that:
1. Language validation works correctly
2. Extractor options validation works correctly
3. Invalid inputs are rejected to prevent command injection
"""

import unittest
import sys
import pathlib

# Add the directory to the path so we can import the module
sys.path.insert(0, str(pathlib.Path(__file__).parent))

import bulk_generate_mad


class TestLanguageValidation(unittest.TestCase):
"""Test cases for language validation."""

def test_valid_languages(self):
"""Test that all valid languages are accepted."""
valid_languages = [
"cpp",
"csharp",
"go",
"java",
"javascript",
"python",
"ruby",
"rust",
"swift",
]
for language in valid_languages:
try:
bulk_generate_mad.validate_language(language)
except ValueError:
self.fail(f"Valid language '{language}' was rejected")

def test_invalid_language(self):
"""Test that invalid languages are rejected."""
invalid_languages = [
"invalid",
"shell",
"bash",
"perl",
'"; rm -rf /',
]
for language in invalid_languages:
with self.assertRaises(
ValueError, msg=f"Invalid language '{language}' was accepted"
):
bulk_generate_mad.validate_language(language)


class TestExtractorOptionsValidation(unittest.TestCase):
"""Test cases for extractor options validation."""

def test_valid_extractor_options(self):
"""Test that valid extractor options are accepted."""
valid_options = [
["--option1", "--option2"],
["--verbose"],
["--timeout=100"],
[], # Empty list should be valid
]
for options in valid_options:
try:
bulk_generate_mad.validate_extractor_options(options)
except ValueError:
self.fail(f"Valid extractor options {options} were rejected")

def test_dangerous_characters_rejected(self):
"""Test that extractor options with dangerous characters are rejected."""
dangerous_options = [
["; rm -rf /"],
["| cat /etc/passwd"],
["&& malicious_command"],
["`whoami`"],
["$(malicious)"],
["test;malicious"],
["test|other"],
["test&background"],
["test<input"],
["test>output"],
["test\\escape"],
["test'quote"],
['test"doublequote'],
["test\nmalicious"],
["test{brace}"],
["test[bracket]"],
["test(paren)"],
["test*glob"],
]
for options in dangerous_options:
with self.assertRaises(
ValueError,
msg=f"Dangerous extractor option {options} was accepted",
):
bulk_generate_mad.validate_extractor_options(options)

def test_non_string_options_rejected(self):
"""Test that non-string options are rejected."""
invalid_options = [
[123],
[None],
[{"key": "value"}],
[["nested", "list"]],
]
for options in invalid_options:
with self.assertRaises(
ValueError, msg=f"Non-string option {options} was accepted"
):
bulk_generate_mad.validate_extractor_options(options)


class TestSubprocessSecurity(unittest.TestCase):
"""Test cases to ensure subprocess calls are secure."""

def test_valid_languages_constant_exists(self):
"""Test that VALID_LANGUAGES constant is defined and contains expected languages."""
self.assertIn("VALID_LANGUAGES", dir(bulk_generate_mad))
self.assertIsInstance(bulk_generate_mad.VALID_LANGUAGES, set)
# Check for a few expected languages
expected = {"python", "java", "javascript", "go", "rust"}
self.assertTrue(
expected.issubset(bulk_generate_mad.VALID_LANGUAGES),
f"Expected languages {expected} not found in VALID_LANGUAGES",
)


if __name__ == "__main__":
unittest.main()