diff --git a/integration_tests/test_jwt_decode_verify.py b/integration_tests/test_jwt_decode_verify.py index de0fb0a6..8ea2f136 100644 --- a/integration_tests/test_jwt_decode_verify.py +++ b/integration_tests/test_jwt_decode_verify.py @@ -1,4 +1,4 @@ -from core_codemods.jwt_decode_verify import JwtDecodeVerify +from core_codemods.jwt_decode_verify import JwtDecodeVerify, JwtDecodeVerifyTransformer from codemodder.codemods.test import ( BaseIntegrationTest, original_and_expected_from_code_path, @@ -24,4 +24,4 @@ class TestJwtDecodeVerify(BaseIntegrationTest): expected_diff = '--- \n+++ \n@@ -8,7 +8,7 @@\n \n encoded_jwt = jwt.encode(payload, SECRET_KEY, algorithm="HS256")\n \n-decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], verify=False)\n-decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], options={"verify_signature": False})\n+decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], verify=True)\n+decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], options={"verify_signature": True})\n \n var = "something"\n' expected_line_change = "11" num_changes = 2 - change_description = JwtDecodeVerify.change_description + change_description = JwtDecodeVerifyTransformer.change_description diff --git a/integration_tests/test_sonar_exception_without_raise.py b/integration_tests/test_sonar_exception_without_raise.py index 73e0a0f4..c3655b21 100644 --- a/integration_tests/test_sonar_exception_without_raise.py +++ b/integration_tests/test_sonar_exception_without_raise.py @@ -1,7 +1,5 @@ -from core_codemods.exception_without_raise import ( - ExceptionWithoutRaise, - ExceptionWithoutRaiseTransformer, -) +from core_codemods.exception_without_raise import ExceptionWithoutRaiseTransformer +from core_codemods.sonar.sonar_exception_without_raise import SonarExceptionWithoutRaise from codemodder.codemods.test import ( BaseIntegrationTest, original_and_expected_from_code_path, @@ -9,7 +7,7 @@ class TestSonarExceptionWithoutRaise(BaseIntegrationTest): - codemod = ExceptionWithoutRaise + codemod = SonarExceptionWithoutRaise code_path = "tests/samples/exception_without_raise.py" original_code, expected_new_code = original_and_expected_from_code_path( code_path, diff --git a/integration_tests/test_sonar_jwt_decode_verify.py b/integration_tests/test_sonar_jwt_decode_verify.py new file mode 100644 index 00000000..ac1d44f3 --- /dev/null +++ b/integration_tests/test_sonar_jwt_decode_verify.py @@ -0,0 +1,32 @@ +from core_codemods.sonar.sonar_jwt_decode_verify import ( + SonarJwtDecodeVerify, + JwtDecodeVerifySonarTransformer, +) +from codemodder.codemods.test import ( + BaseIntegrationTest, + original_and_expected_from_code_path, +) + + +class TestJwtDecodeVerify(BaseIntegrationTest): + codemod = SonarJwtDecodeVerify + code_path = "tests/samples/jwt_decode_verify.py" + original_code, expected_new_code = original_and_expected_from_code_path( + code_path, + [ + ( + 10, + """decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], verify=True)\n""", + ), + ( + 11, + """decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], options={"verify_signature": True})\n""", + ), + ], + ) + sonar_issues_json = "tests/samples/sonar_issues.json" + + expected_diff = '--- \n+++ \n@@ -8,7 +8,7 @@\n \n encoded_jwt = jwt.encode(payload, SECRET_KEY, algorithm="HS256")\n \n-decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], verify=False)\n-decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], options={"verify_signature": False})\n+decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], verify=True)\n+decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], options={"verify_signature": True})\n \n var = "something"\n' + expected_line_change = "11" + num_changes = 2 + change_description = JwtDecodeVerifySonarTransformer.change_description diff --git a/src/codemodder/codemods/base_visitor.py b/src/codemodder/codemods/base_visitor.py index cfd243d1..d9bd9b31 100644 --- a/src/codemodder/codemods/base_visitor.py +++ b/src/codemodder/codemods/base_visitor.py @@ -23,6 +23,8 @@ def __init__( def filter_by_result(self, node): pos_to_match = self.node_position(node) if self.results is None: + # Returning True here means codemods without detectors (and results) + # will still run their transformations. return True return any(result.match_location(pos_to_match, node) for result in self.results) diff --git a/src/codemodder/result.py b/src/codemodder/result.py index 8320b07d..0d8f3ed3 100644 --- a/src/codemodder/result.py +++ b/src/codemodder/result.py @@ -34,12 +34,11 @@ class Result(ABCDataclass): def match_location(self, pos: CodeRange, node: cst.CSTNode) -> bool: del node return any( - pos.start.line == location.start.line + same_line(pos, location) and ( pos.start.column in ((start_column := location.start.column) - 1, start_column) ) - and pos.end.line == location.end.line and ( pos.end.column in ((end_column := location.end.column) - 1, end_column) ) @@ -47,6 +46,18 @@ def match_location(self, pos: CodeRange, node: cst.CSTNode) -> bool: ) +def same_line(pos: CodeRange, location: Location) -> bool: + return pos.start.line == location.start.line and pos.end.line == location.end.line + + +def fuzzy_column_match(pos: CodeRange, location: Location) -> bool: + """Checks that a result location is within the range of node's `pos` position""" + return ( + pos.start.column <= location.start.column <= pos.end.column + 1 + and pos.start.column <= location.end.column <= pos.end.column + 1 + ) + + class ResultSet(dict[str, dict[Path, list[Result]]]): def add_result(self, result: Result): for loc in result.locations: diff --git a/src/codemodder/scripts/generate_docs.py b/src/codemodder/scripts/generate_docs.py index 88b25670..cce5e15e 100644 --- a/src/codemodder/scripts/generate_docs.py +++ b/src/codemodder/scripts/generate_docs.py @@ -287,6 +287,11 @@ class DocMetadata: ].guidance_explained, need_sarif="Yes (Sonar)", ), + "jwt-decode-verify-S5659": DocMetadata( + importance=CORE_METADATA["jwt-decode-verify"].importance, + guidance_explained=CORE_METADATA["jwt-decode-verify"].guidance_explained, + need_sarif="Yes (Sonar)", + ), } diff --git a/src/core_codemods/__init__.py b/src/core_codemods/__init__.py index 86712f5f..a7e721e5 100644 --- a/src/core_codemods/__init__.py +++ b/src/core_codemods/__init__.py @@ -63,6 +63,7 @@ from .str_concat_in_seq_literal import StrConcatInSeqLiteral from .fix_async_task_instantiation import FixAsyncTaskInstantiation from .django_model_without_dunder_str import DjangoModelWithoutDunderStr +from .sonar.sonar_jwt_decode_verify import SonarJwtDecodeVerify registry = CodemodCollection( origin="pixee", @@ -134,5 +135,6 @@ SonarRemoveAssertionInPytestRaises, SonarFlaskJsonResponseType, SonarDjangoJsonResponseType, + SonarJwtDecodeVerify, ], ) diff --git a/src/core_codemods/jwt_decode_verify.py b/src/core_codemods/jwt_decode_verify.py index 604a5c82..b810f973 100644 --- a/src/core_codemods/jwt_decode_verify.py +++ b/src/core_codemods/jwt_decode_verify.py @@ -5,41 +5,17 @@ Metadata, Reference, ReviewGuidance, - SimpleCodemod, ) +from core_codemods.api.core_codemod import CoreCodemod +from codemodder.codemods.libcst_transformer import ( + LibcstTransformerPipeline, + LibcstResultTransformer, +) +from codemodder.codemods.semgrep import SemgrepRuleDetector -class JwtDecodeVerify(SimpleCodemod): - metadata = Metadata( - name="jwt-decode-verify", - summary="Verify JWT Decode", - review_guidance=ReviewGuidance.MERGE_WITHOUT_REVIEW, - references=[ - Reference(url="https://pyjwt.readthedocs.io/en/stable/api.html"), - Reference( - url="https://owasp.org/www-project-web-security-testing-guide/latest/4-Web_Application_Security_Testing/06-Session_Management_Testing/10-Testing_JSON_Web_Tokens" - ), - ], - ) +class JwtDecodeVerifyTransformer(LibcstResultTransformer): change_description = "Enable all verifications in `jwt.decode` call." - detector_pattern = r""" - rules: - - pattern-either: - - patterns: - - pattern: jwt.decode(..., verify=False, ...) - - pattern-inside: | - import jwt - ... - - patterns: - - pattern: | - jwt.decode(..., options={..., "$KEY": False, ...}, ...) - - metavariable-regex: - metavariable: $KEY - regex: verify_ - - pattern-inside: | - import jwt - ... - """ def _replace_opts_dict(self, opts_dict): new_dict_elements = [] @@ -100,3 +76,39 @@ def is_verify_keyword(element: cst.DictElement) -> bool: matchers.matches(element.key, matchers.SimpleString()) and "verify" in element.key.value ) + + +JwtDecodeVerify = CoreCodemod( + metadata=Metadata( + name="jwt-decode-verify", + summary="Verify JWT Decode", + review_guidance=ReviewGuidance.MERGE_WITHOUT_REVIEW, + references=[ + Reference(url="https://pyjwt.readthedocs.io/en/stable/api.html"), + Reference( + url="https://owasp.org/www-project-web-security-testing-guide/latest/4-Web_Application_Security_Testing/06-Session_Management_Testing/10-Testing_JSON_Web_Tokens" + ), + ], + ), + transformer=LibcstTransformerPipeline(JwtDecodeVerifyTransformer), + detector=SemgrepRuleDetector( + r""" + rules: + - pattern-either: + - patterns: + - pattern: jwt.decode(..., verify=False, ...) + - pattern-inside: | + import jwt + ... + - patterns: + - pattern: | + jwt.decode(..., options={..., "$KEY": False, ...}, ...) + - metavariable-regex: + metavariable: $KEY + regex: verify_ + - pattern-inside: | + import jwt + ... + """ + ), +) diff --git a/src/core_codemods/sonar/sonar_jwt_decode_verify.py b/src/core_codemods/sonar/sonar_jwt_decode_verify.py new file mode 100644 index 00000000..70835b79 --- /dev/null +++ b/src/core_codemods/sonar/sonar_jwt_decode_verify.py @@ -0,0 +1,42 @@ +import libcst as cst +from codemodder.codemods.base_codemod import Reference +from codemodder.result import same_line, fuzzy_column_match +from codemodder.codemods.sonar import SonarCodemod +from codemodder.codemods.libcst_transformer import ( + LibcstTransformerPipeline, +) +from core_codemods.jwt_decode_verify import JwtDecodeVerify, JwtDecodeVerifyTransformer + + +class JwtDecodeVerifySonarTransformer(JwtDecodeVerifyTransformer): + def filter_by_result(self, node) -> bool: + """ + Special case result-matching for this rule because the sonar + results returned have a start/end column for the verify keyword + within the `decode` call, not for the entire call like semgrep returns. + """ + match node: + case cst.Call(): + pos_to_match = self.node_position(node) + return any( + self.match_location(pos_to_match, result) + for result in self.results or [] + ) + return False + + def match_location(self, pos, result): + return any( + same_line(pos, location) and fuzzy_column_match(pos, location) + for location in result.locations + ) + + +SonarJwtDecodeVerify = SonarCodemod.from_core_codemod( + name="jwt-decode-verify-S5659", + other=JwtDecodeVerify, + rules=["python:S5659"], + new_references=[ + Reference(url="https://rules.sonarsource.com/python/RSPEC-5659/"), + ], + transformer=LibcstTransformerPipeline(JwtDecodeVerifySonarTransformer), +) diff --git a/tests/codemods/test_sonar_jwt_decode_verify.py b/tests/codemods/test_sonar_jwt_decode_verify.py new file mode 100644 index 00000000..e5bc8573 --- /dev/null +++ b/tests/codemods/test_sonar_jwt_decode_verify.py @@ -0,0 +1,68 @@ +import json +from core_codemods.sonar.sonar_jwt_decode_verify import SonarJwtDecodeVerify +from codemodder.codemods.test import BaseSASTCodemodTest + + +class TestSonarJwtDecodeVerify(BaseSASTCodemodTest): + codemod = SonarJwtDecodeVerify + tool = "sonar" + + def test_name(self): + assert self.codemod.name == "jwt-decode-verify-S5659" + + def test_simple(self, tmpdir): + input_code = """ + import jwt + + SECRET_KEY = "mysecretkey" + payload = { + "user_id": 123, + "username": "john", + } + + encoded_jwt = jwt.encode(payload, SECRET_KEY, algorithm="HS256") + decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], verify=False) + decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], options={"verify_signature": False}) + """ + expected = """ + import jwt + + SECRET_KEY = "mysecretkey" + payload = { + "user_id": 123, + "username": "john", + } + + encoded_jwt = jwt.encode(payload, SECRET_KEY, algorithm="HS256") + decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], verify=True) + decoded_payload = jwt.decode(encoded_jwt, SECRET_KEY, algorithms=["HS256"], options={"verify_signature": True}) + """ + issues = { + "issues": [ + { + "rule": "python:S5659", + "status": "OPEN", + "component": f"{tmpdir / 'code.py'}", + "textRange": { + "startLine": 11, + "endLine": 11, + "startOffset": 76, + "endOffset": 88, + }, + }, + { + "rule": "python:S5659", + "status": "OPEN", + "component": f"{tmpdir / 'code.py'}", + "textRange": { + "startLine": 12, + "endLine": 12, + "startOffset": 84, + "endOffset": 111, + }, + }, + ] + } + self.run_and_assert( + tmpdir, input_code, expected, results=json.dumps(issues), num_changes=2 + )