Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure to inject our code after PROTO and FRAME #31

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 32 additions & 2 deletions fickling/analysis.py
@@ -1,12 +1,12 @@
import sys
from typing import Optional, TextIO, Tuple
from typing import Optional, Set, TextIO, Tuple

if sys.version_info < (3, 9):
from astunparse import unparse
else:
from ast import unparse

from .pickle import Pickled, Interpreter
from .pickle import Pickled, Proto, Interpreter


def check_safety(pickled: Pickled, stdout: Optional[TextIO] = None, stderr: Optional[TextIO] = None) -> bool:
Expand All @@ -33,6 +33,36 @@ def shorten_code(ast_node) -> Tuple[str, bool]:
reported_shortened_code.add(shortened_code)
return shortened_code, was_already_reported

had_proto = False
proto_versions: Set[int] = set()
for i, opcode in enumerate(pickled):
if isinstance(opcode, Proto):
if had_proto:
likely_safe = False
if i == 2:
suffix = "rd"
elif i == 1:
suffix = "nd"
elif i == 0:
suffix = "st"
else:
suffix = "th"
if opcode.version in proto_versions:
stdout.write(f"The {i+1}{suffix} opcode is a duplicate PROTO, which is unusual and may be "
f"indicative of a tampered pickle\n")
else:
stdout.write(f"The {i+1}{suffix} opcode is a duplicate PROTO with a different version than "
f"reported in the previous PROTO opcode, which is almost certainly a sign of a "
f"tampered pickle\n")
else:
had_proto = True
if opcode.version >= 2 and i > 0:
likely_safe = False
stdout.write(f"The protocol version is {opcode.version}, but the PROTO opcode is not the first "
f"opcode in the pickle, as required for versions 2 and later; this may be "
f"indicative of a tampered pickle\n")
proto_versions.add(opcode.version)

for node in pickled.non_standard_imports():
likely_safe = False
shortened, already_reported = shorten_code(node)
Expand Down
35 changes: 29 additions & 6 deletions fickling/pickle.py
Expand Up @@ -416,15 +416,21 @@ def insert_python(
# and then either immediately call the `eval` with a `Reduce` opcode (the default)
# or optionally insert the `Reduce` at the end (and hope that the existing code cleans up its stack so it
# remains how we left it! TODO: Add code to emulate the code afterward and confirm that the stack is sane!
self.insert(0, Global.create(module, attr))
self.insert(1, Mark())
i = 1
for arg in args:
i = 0
while isinstance(self[i], (Proto, Frame)):
i += 1
self.insert(i, Global.create(module, attr))
i += 1
self.insert(i, Mark())
i += 1
for arg in args:
self.insert(i, ConstantOpcode.new(arg))
self.insert(i + 1, Tuple())
i += 1
self.insert(i, Tuple())
i += 1
if run_first:
self.insert(i + 2, Reduce())
self.insert(i, Reduce())
i += 1
if use_output_as_unpickle_result:
self.insert(-1, Pop())
else:
Expand Down Expand Up @@ -729,6 +735,23 @@ def __str__(self):
class Proto(NoOp):
name = "PROTO"

@staticmethod
def create(version: int) -> "Proto":
return Proto(version)

def encode_body(self) -> bytes:
return bytes([self.version])

@property
def version(self) -> int:
if self.arg is None:
return 0
elif isinstance(self.arg, int):
return self.arg
else:
# Endianness shouldn't really matter here because there is only one byte for the version
return int.from_bytes(self.arg, "big", signed=False)


class Global(Opcode):
name = "GLOBAL"
Expand Down
8 changes: 8 additions & 0 deletions test/test_pickle.py
Expand Up @@ -112,3 +112,11 @@ def test_unused_variables(self):
self.assertEqual(len(unused), 1)
self.assertIn("_var0", unused)
self.assertFalse(check_safety(loaded))

def test_duplicate_proto(self):
pickled = dumps([1, 2, 3, 4])
loaded = Pickled.load(pickled)
self.assertTrue(check_safety(loaded))
loaded.insert(-1, fpickle.Proto.create(1))
loaded.insert(-1, fpickle.Proto.create(2))
self.assertFalse(check_safety(loaded))