diff --git a/pypdf/_page.py b/pypdf/_page.py index 520d71653..86e438216 100644 --- a/pypdf/_page.py +++ b/pypdf/_page.py @@ -883,14 +883,29 @@ def _content_stream_rename( raise KeyError(f"type of operands is {type(operands)}") return stream + @staticmethod + def _content_bytes_rename( + stream: bytes, + rename: Dict[bytes, bytes], + ) -> bytes: + for k, v in rename.items(): + stream = re.sub(k + WHITESPACES_AS_REGEXP, v + b" ", stream) + return stream + @staticmethod def _add_transformation_matrix( contents: Any, pdf: Union[None, PdfReaderProtocol, PdfWriterProtocol], ctm: CompressedTransformationMatrix, - ) -> ContentStream: + ) -> Union[ContentStream, bytes]: """Add transformation matrix at the beginning of the given contents stream.""" a, b, c, d, e, f = ctm + if isinstance(contents, bytes): + return ( + f"{FloatObject(a).myrepr()} {FloatObject(b).myrepr()} " + f"{FloatObject(c).myrepr()} {FloatObject(d).myrepr()} " + f"{FloatObject(e).myrepr()} {FloatObject(f).myrepr()} cm\n" + ).encode() + contents contents = ContentStream(contents, pdf) contents.operations.insert( 0, @@ -1053,27 +1068,33 @@ def _merge_page( # First we work on merging the resource dictionaries. This allows us # to find out what symbols in the content streams we might need to # rename. + within_writer = False + pdf = None try: assert isinstance(self.indirect_reference, IndirectObject) - if hasattr( - self.indirect_reference.pdf, "_add_object" - ): # ---------- to detect PdfWriter - return self._merge_page_writer( - page2, page2transformation, ctm, over, expand - ) + pdf = self.indirect_reference.pdf + if hasattr(pdf, "_add_object"): # ---------- to detect PdfWriter + # return self._merge_page_writer( + # page2, page2transformation, ctm, over, expand + # ) + within_writer = True except (AssertionError, AttributeError): pass - new_resources = DictionaryObject() + new_resources = DictionaryObject() # !!!!!!!!! Reader only? rename = {} try: - original_resources = cast(DictionaryObject, self[PG.RESOURCES].get_object()) + original_resources = cast(DictionaryObject, self[PG.RESOURCES]) except KeyError: original_resources = DictionaryObject() + if within_writer: + self[NameObject(PG.RESOURCES)] = original_resources try: - page2resources = cast(DictionaryObject, page2[PG.RESOURCES].get_object()) + page2resources = cast(DictionaryObject, page2[PG.RESOURCES]) except KeyError: page2resources = DictionaryObject() + + # !!!!!!!!!!! a revoir new_annots = ArrayObject() for page in (self, page2): @@ -1092,207 +1113,209 @@ def _merge_page( RES.SHADING, RES.PROPERTIES, ): - new, newrename = self._merge_resources( - original_resources, page2resources, res - ) - if new: - new_resources[NameObject(res)] = new - rename.update(newrename) - - # Combine /ProcSet sets, making sure there's a consistent order - new_resources[NameObject(RES.PROC_SET)] = ArrayObject( - sorted( - set( - original_resources.get(RES.PROC_SET, ArrayObject()).get_object() - ).union( - set(page2resources.get(RES.PROC_SET, ArrayObject()).get_object()) - ) - ) - ) - - new_content_array = ArrayObject() - original_content = self.get_contents() - if original_content is not None: - original_content.isolate_graphics_state() - new_content_array.append(original_content) - - page2content = page2.get_contents() - if page2content is not None: - rect = getattr(page2, MERGE_CROP_BOX) - page2content.operations.insert( - 0, - ( - map( - FloatObject, - [ - rect.left, - rect.bottom, - rect.width, - rect.height, - ], - ), - "re", - ), - ) - page2content.operations.insert(1, ([], "W")) - page2content.operations.insert(2, ([], "n")) - if page2transformation is not None: - page2content = page2transformation(page2content) - page2content = PageObject._content_stream_rename( - page2content, rename, self.pdf - ) - page2content.isolate_graphics_state() - if over: - new_content_array.append(page2content) + if within_writer: + newrename : Dict[str, Any] = {} + if res in page2resources: + if res not in original_resources: + original_resources[NameObject(res)] = DictionaryObject() + _, newrename = self._merge_resources( + original_resources, page2resources, res, False + ) else: - new_content_array.insert(0, page2content) - - # if expanding the page to fit a new page, calculate the new media box size - if expand: - self._expand_mediabox(page2, ctm) - - self.replace_contents(ContentStream(new_content_array, self.pdf)) - self[NameObject(PG.RESOURCES)] = new_resources - self[NameObject(PG.ANNOTS)] = new_annots - - def _merge_page_writer( - self, - page2: "PageObject", - page2transformation: Optional[Callable[[Any], ContentStream]] = None, - ctm: Optional[CompressedTransformationMatrix] = None, - over: bool = True, - expand: bool = False, - ) -> None: - # First we work on merging the resource dictionaries. This allows us - # to find out what symbols in the content streams we might need to - # rename. - assert isinstance(self.indirect_reference, IndirectObject) - pdf = self.indirect_reference.pdf - - rename = {} - if PG.RESOURCES not in self: - self[NameObject(PG.RESOURCES)] = DictionaryObject() - original_resources = cast(DictionaryObject, self[PG.RESOURCES].get_object()) - if PG.RESOURCES not in page2: - page2resources = DictionaryObject() - else: - page2resources = cast(DictionaryObject, page2[PG.RESOURCES].get_object()) - - for res in ( - RES.EXT_G_STATE, - RES.FONT, - RES.XOBJECT, - RES.COLOR_SPACE, - RES.PATTERN, - RES.SHADING, - RES.PROPERTIES, - ): - if res in page2resources: - if res not in original_resources: - original_resources[NameObject(res)] = DictionaryObject() - _, newrename = self._merge_resources( - original_resources, page2resources, res, False + new, newrename = self._merge_resources( + original_resources, page2resources, res ) - rename.update(newrename) + if new: + new_resources[NameObject(res)] = new # !!!!! + rename.update(newrename) # Combine /ProcSet sets. - if RES.PROC_SET in page2resources: - if RES.PROC_SET not in original_resources: - original_resources[NameObject(RES.PROC_SET)] = ArrayObject() - arr = cast(ArrayObject, original_resources[RES.PROC_SET]) - for x in cast(ArrayObject, page2resources[RES.PROC_SET]): - if x not in arr: - arr.append(x) - arr.sort() - - if PG.ANNOTS in page2: - if PG.ANNOTS not in self: - self[NameObject(PG.ANNOTS)] = ArrayObject() - annots = cast(ArrayObject, self[PG.ANNOTS].get_object()) - if ctm is None: - trsf = Transformation() - else: - trsf = Transformation(ctm) - for a in cast(ArrayObject, page2[PG.ANNOTS]): - a = a.get_object() - aa = a.clone( - pdf, - ignore_fields=("/P", "/StructParent", "/Parent"), - force_duplicate=True, - ) - r = cast(ArrayObject, a["/Rect"]) - pt1 = trsf.apply_on((r[0], r[1]), True) - pt2 = trsf.apply_on((r[2], r[3]), True) - aa[NameObject("/Rect")] = ArrayObject( - ( - min(pt1[0], pt2[0]), - min(pt1[1], pt2[1]), - max(pt1[0], pt2[0]), - max(pt1[1], pt2[1]), + if within_writer: + if RES.PROC_SET in page2resources: + if RES.PROC_SET not in original_resources: + original_resources[NameObject(RES.PROC_SET)] = ArrayObject() + arr = cast(ArrayObject, original_resources[RES.PROC_SET]) + for x in cast(ArrayObject, page2resources[RES.PROC_SET]): + if x not in arr: + arr.append(x) + arr.sort() + else: + # Combine /ProcSet sets, making sure there's a consistent order + new_resources[NameObject(RES.PROC_SET)] = ArrayObject( + sorted( + set( + original_resources.get(RES.PROC_SET, ArrayObject()).get_object() + ).union( + set( + page2resources.get(RES.PROC_SET, ArrayObject()).get_object() + ) ) ) - if "/QuadPoints" in a: - q = cast(ArrayObject, a["/QuadPoints"]) - aa[NameObject("/QuadPoints")] = ArrayObject( - cast(tuple, trsf.apply_on((q[0], q[1]), True)) - + cast(tuple, trsf.apply_on((q[2], q[3]), True)) - + cast(tuple, trsf.apply_on((q[4], q[5]), True)) - + cast(tuple, trsf.apply_on((q[6], q[7]), True)) - ) - try: - aa["/Popup"][NameObject("/Parent")] = aa.indirect_reference - except KeyError: - pass - try: - aa[NameObject("/P")] = self.indirect_reference - annots.append(aa.indirect_reference) - except AttributeError: - pass + ) - new_content_array = ArrayObject() - original_content = self.get_contents() + original_content = self._get_contents_as_bytes() if original_content is not None: - original_content.isolate_graphics_state() - new_content_array.append(original_content) - - page2content = page2.get_contents() + original_content = b"q\n" + original_content + b"\nQ\n" + else: + original_content = b"" + page2content = page2._get_contents_as_bytes() + before_page2 = b"" if page2content is not None: - rect = getattr(page2, MERGE_CROP_BOX) - page2content.operations.insert( - 0, - ( - map( - FloatObject, - [ - rect.left, - rect.bottom, - rect.width, - rect.height, - ], - ), - "re", - ), - ) - page2content.operations.insert(1, ([], "W")) - page2content.operations.insert(2, ([], "n")) + rec = getattr(page2, MERGE_CROP_BOX) + before_page2 = ( + f"{FloatObject(rec.left).myrepr()} {FloatObject(rec.bottom).myrepr()} " + f"{FloatObject(rec.width).myrepr()} {FloatObject(rec.width).myrepr()} re\n" + "W\nn\n" + ).encode() if page2transformation is not None: - page2content = page2transformation(page2content) - page2content = PageObject._content_stream_rename( - page2content, rename, self.pdf - ) - page2content.isolate_graphics_state() + trsf = page2transformation(b"") + if not isinstance(trsf,bytes): + trsf = cast(StreamObject, trsf).get_data() + before_page2 += trsf + page2content = PageObject._content_bytes_rename(page2content, rename) + page2content = b"q\n" + before_page2 + page2content + b"\nQ\n" if over: - new_content_array.append(page2content) + original_content += page2content else: - new_content_array.insert(0, page2content) + original_content = page2content + original_content # if expanding the page to fit a new page, calculate the new media box size if expand: self._expand_mediabox(page2, ctm) - self.replace_contents(new_content_array) - # self[NameObject(PG.CONTENTS)] = ContentStream(new_content_array, pdf) - # self[NameObject(PG.RESOURCES)] = new_resources - # self[NameObject(PG.ANNOTS)] = new_annots + st = EncodedStreamObject() + st.set_data(original_content) + # st.flate_encode() + self.replace_contents(st) + + if not within_writer: + self[NameObject(PG.RESOURCES)] = new_resources + self[NameObject(PG.ANNOTS)] = new_annots + + ## def _merge_page_writer( + ## self, + ## page2: "PageObject", + ## page2transformation: Optional[Callable[[Any], ContentStream]] = None, + ## ctm: Optional[CompressedTransformationMatrix] = None, + ## over: bool = True, + ## expand: bool = False, + ## ) -> None: + ## # First we work on merging the resource dictionaries. This allows us + ## # to find out what symbols in the content streams we might need to + ## # rename. + ## assert isinstance(self.indirect_reference, IndirectObject) + ## pdf = self.indirect_reference.pdf + ## + ## rename = {} + ## if PG.RESOURCES not in self: + ## self[NameObject(PG.RESOURCES)] = DictionaryObject() + ## original_resources = cast(DictionaryObject, self[PG.RESOURCES].get_object()) + ## if PG.RESOURCES not in page2: + ## page2resources = DictionaryObject() + ## else: + ## page2resources = cast(DictionaryObject, page2[PG.RESOURCES].get_object()) + ## + ## for res in ( + ## RES.EXT_G_STATE, + ## RES.FONT, + ## RES.XOBJECT, + ## RES.COLOR_SPACE, + ## RES.PATTERN, + ## RES.SHADING, + ## RES.PROPERTIES, + ## ): + ## if res in page2resources: + ## if res not in original_resources: + ## original_resources[NameObject(res)] = DictionaryObject() + ## _, newrename = self._merge_resources( + ## original_resources, page2resources, res, False + ## ) + ## rename.update(newrename) + ## # Combine /ProcSet sets. + ## if RES.PROC_SET in page2resources: + ## if RES.PROC_SET not in original_resources: + ## original_resources[NameObject(RES.PROC_SET)] = ArrayObject() + ## arr = cast(ArrayObject, original_resources[RES.PROC_SET]) + ## for x in cast(ArrayObject, page2resources[RES.PROC_SET]): + ## if x not in arr: + ## arr.append(x) + ## arr.sort() + ## + ## if PG.ANNOTS in page2: + ## if PG.ANNOTS not in self: + ## self[NameObject(PG.ANNOTS)] = ArrayObject() + ## annots = cast(ArrayObject, self[PG.ANNOTS].get_object()) + ## if ctm is None: + ## trsf = Transformation() + ## else: + ## trsf = Transformation(ctm) + ## for a in cast(ArrayObject, page2[PG.ANNOTS]): + ## a = a.get_object() + ## aa = a.clone( + ## pdf, + ## ignore_fields=("/P", "/StructParent", "/Parent"), + ## force_duplicate=True, + ## ) + ## r = cast(ArrayObject, a["/Rect"]) + ## pt1 = trsf.apply_on((r[0], r[1]), True) + ## pt2 = trsf.apply_on((r[2], r[3]), True) + ## aa[NameObject("/Rect")] = ArrayObject( + ## ( + ## min(pt1[0], pt2[0]), + ## min(pt1[1], pt2[1]), + ## max(pt1[0], pt2[0]), + ## max(pt1[1], pt2[1]), + ## ) + ## ) + ## if "/QuadPoints" in a: + ## q = cast(ArrayObject, a["/QuadPoints"]) + ## aa[NameObject("/QuadPoints")] = ArrayObject( + ## cast(tuple, trsf.apply_on((q[0], q[1]), True)) + ## + cast(tuple, trsf.apply_on((q[2], q[3]), True)) + ## + cast(tuple, trsf.apply_on((q[4], q[5]), True)) + ## + cast(tuple, trsf.apply_on((q[6], q[7]), True)) + ## ) + ## try: + ## aa["/Popup"][NameObject("/Parent")] = aa.indirect_reference + ## except KeyError: + ## pass + ## try: + ## aa[NameObject("/P")] = self.indirect_reference + ## annots.append(aa.indirect_reference) + ## except AttributeError: + ## pass + ## + ## original_content = self._get_contents_as_bytes() + ## if original_content is not None: + ## original_content = b"q\n" + original_content + "\nQ\n" + ## else: + ## original_content = b"" + ## + ## page2content = page2._get_contents_as_bytes() + ## before_page2 = b"" + ## if page2content is not None: + ## rect = getattr(page2, MERGE_CROP_BOX) + ## before_page2 = ( + ## f"{FloatObject(rect.left).myrepr()} {FloatObject(rect.bottom).myrepr()} " + ## f"{FloatObject(rect.width).myrepr()} {FloatObject(rect.width).myrepr()} re\n" + ## "W\nn\n" + ## ).encode() + ## if page2transformation is not None: + ## before_page2 += page2transformation(b"") + ## page2content = PageObject._content_bytes_rename(page2content, rename) + ## page2content = b"q\n" + page2content + "\nQ\n" + ## if over: + ## original_content += page2content + ## else: + ## original_content = page2content + original_content + ## + ## # if expanding the page to fit a new page, calculate the new media box size + ## if expand: + ## self._expand_mediabox(page2, ctm) + ## + ## self.replace_contents(original_content) + ## # self[NameObject(PG.CONTENTS)] = ContentStream(new_content_array, pdf) + ## # self[NameObject(PG.RESOURCES)] = new_resources + ## # self[NameObject(PG.ANNOTS)] = new_annots def _expand_mediabox( self, page2: "PageObject", ctm: Optional[CompressedTransformationMatrix] diff --git a/pypdf/generic/_data_structures.py b/pypdf/generic/_data_structures.py index 88a17d85a..8a4b0cb12 100644 --- a/pypdf/generic/_data_structures.py +++ b/pypdf/generic/_data_structures.py @@ -961,6 +961,10 @@ def get_data(self) -> Union[bytes, str]: def set_data(self, data: bytes) -> None: # deprecated from ..filters import FlateDecode + if SA.FILTER not in self: + self[NameObject(SA.FILTER)] = NameObject(FT.FLATE_DECODE) + self.decoded_self = DecodedStreamObject() + if self.get(SA.FILTER, "") == FT.FLATE_DECODE: if not isinstance(data, bytes): raise TypeError("data must be bytes") diff --git a/tests/test_writer.py b/tests/test_writer.py index 358fdb1cf..5c844fc8e 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -1621,6 +1621,7 @@ def test_watermark_rendering(tmp_path): assert image_similarity(png_path, target_png_path) >= 0.95 + @pytest.mark.skipif(GHOSTSCRIPT_BINARY is None, reason="Requires Ghostscript") def test_watermarking_reportlab_rendering(tmp_path): """