Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gateway down when doing simple PDF extraction #5576

Closed
themantalope opened this issue Jan 6, 2023 · 4 comments
Closed

Gateway down when doing simple PDF extraction #5576

themantalope opened this issue Jan 6, 2023 · 4 comments

Comments

@themantalope
Copy link
Contributor

Describe the bug
Using the PDFSegmenter, a gateway runtime error occurs. I can confirm that the issue is not with the text or image extraction of the PDFSegmenter as that runs without error. After the code runs, it seems to hang for about 10 additional seconds then I get an error message:

ERROR  gateway/rep-0/GatewayRuntime@29851 Error while getting responses from deployments: failed to connect to all addresses |Gateway: Communication error with deployment segmenter at         [01/05/23 23:57:41]
       address(es) {'0.0.0.0:59488'}. Head or worker(s) may be down.                                                                                                                                               
ERROR  gateway/rep-0/GatewayRuntime@29851 Error while getting responses from deployments: failed to connect to all addresses |Gateway: Communication error with deployment segmenter at                            
       address(es) {'0.0.0.0:59488'}. Head or worker(s) may be down. 

Additional details:

test.py:

from docarray import DocumentArray
from jina import Flow
from pdf_segmenter import PDFSegmenter

# print-to-pdf of https://courses.cs.vt.edu/csonline/AI/Lessons/VisualProcessing/OCRscans_files/bowers.jpg
docs = DocumentArray.from_files(["data/*.pdf"],)

print(docs.summary())

print(docs[0].summary())

for doc in docs:
  doc.load_uri_to_blob()

flow = Flow(protocol='http',timeout_ctrl=100000,timeout_ready=100000,timeout_send=100000, prefetch=1)\
    .add(uses=PDFSegmenter, name="segmenter", install_requirements=True,
)

with flow:
    output = flow.index(docs, return_results=True)


# print(output.summary())

# for o in output:
#     print('-----------------')
#     print(o.summary())
#     print(len(o.chunks))

pdf_segmenter.py:

import io
from typing import List

import fitz
import numpy as np
import pdfplumber
from jina import Document, DocumentArray, Executor, requests
from jina.logging.logger import JinaLogger


class PDFSegmenter(Executor):
    def __init__(
        self,
        *args,
        **kwargs,
    ):
        """
        :class:`PDFSegmenter` Extracts data (text and images) from PDF files.
        Stores images (`mime_type`=image/*) on chunk level ('c') and text segments (`mime_type`=text/plain)
        on chunk level ('c') in the root ('r') Document.
        """
        super().__init__(*args, **kwargs)
        self.logger = JinaLogger(context=self.__class__.__name__)

    @requests
    def craft(self, docs: DocumentArray, **kwargs):
        """
        Read PDF files. Extracts data from them.
        Checks if the input is a string of the filename,
        or if it's the file in bytes.
        It will then extract the data from the file, creating a list for images,
        and text.
        :param docs: Array of Documents.
        """
        for doc in docs:
            pdf_img, pdf_text = self._parse_pdf(doc)
            # print('getting images....')
            if pdf_img is not None:
                images = self._extract_image(pdf_img)
                doc.chunks.extend(
                    [Document(tensor=img, mime_type='image/*') for img in images]
                )
            # print('getting text....')
            if pdf_text is not None:
                texts = self._extract_text(pdf_text)
                # print('texts: ', texts)
                # for t in texts:
                #     print(t)
                doc.chunks.extend(
                    [Document(text=t, mime_type='text/plain') for t in texts]
                )
            print('doc: ', doc)
        print('should be done')
        return

    def _parse_pdf(self, doc: Document):
        pdf_img = None
        pdf_text = None
        try:
            # when loading from URI, we should prioritize blob
            # order is important. check test `tests/unit/test_exec.py::test_order_blob_uri`
            if doc.blob:
                pdf_img = fitz.open(stream=doc.blob, filetype='pdf')
                pdf_text = pdfplumber.open(io.BytesIO(doc.blob))
            elif doc.uri:
                pdf_img = fitz.open(doc.uri)
                pdf_text = pdfplumber.open(doc.uri)
        except Exception as ex:
            self.logger.error(f'Failed to open due to: {ex}')
        return pdf_img, pdf_text

    def _extract_text(self, pdf_text) -> List[str]:
        # Extract text
        with pdf_text:
            texts = []
            count = len(pdf_text.pages)
            for i in range(count):
                page = pdf_text.pages[i]
                texts.append(page.extract_text(x_tolerance=1, y_tolerance=1))
            return texts

    def _extract_image(self, pdf_img) -> List['np.ndarray']:
        with pdf_img:
            images = []
            for page in range(len(pdf_img)):
                print('current images: ', images)
                for img in pdf_img.get_page_images(page):
                    xref = img[0]
                    pix = fitz.Pixmap(pdf_img, xref)
                    # read data from buffer and reshape the array into 3-d format
                    np_arr = (
                        np.frombuffer(pix.samples, dtype=np.uint8)
                        .reshape(pix.h, pix.w, pix.n)
                        .astype('float32')
                    )
                    if pix.n - pix.alpha < 4:  # if gray or RGB
                        if pix.n == 1:  # convert gray to rgb
                            images.append(np.concatenate((np_arr,) * 3, -1))
                        elif pix.n == 4:  # remove transparency layer
                            images.append(np_arr[..., :3])
                        else:
                            images.append(np_arr)
                    else:  # if CMYK:
                        pix = fitz.Pixmap(fitz.csRGB, pix)  # Convert to RGB
                        np_arr = (
                            np.frombuffer(pix.samples, dtype=np.uint8)
                            .reshape(pix.h, pix.w, pix.n)
                            .astype('float32')
                        )
                        images.append(np_arr)
        return images

Here is the output with running JINA_LOG_LEVEL=DEBUG python test.py:

╭────────────────── Documents Summary ──────────────────╮
│                                                       │
│   Type                   DocumentArrayInMemory        │
│   Length                 1                            │
│   Homogenous Documents   True                         │
│   Common Attributes      ('id', 'mime_type', 'uri')   │
│   Multimodal dataclass   False                        │
│                                                       │
╰───────────────────────────────────────────────────────╯
╭───────────────────── Attributes Summary ─────────────────────╮
│                                                              │
│   Attribute   Data type   #Unique values   Has empty value   │
│  ──────────────────────────────────────────────────────────  │
│   id          ('str',)    1                False             │
│   mime_type   ('str',)    1                False             │
│   uri         ('str',)    1                False             │
│                                                              │
╰──────────────────────────────────────────────────────────────╯
None
📄 Document: b791272a4c237eacf92d9538b50405e8
╭─────────────────────────┬────────────────────────────────────────────────────╮
│ Attribute               │ Value                                              │
├─────────────────────────┼────────────────────────────────────────────────────┤
│ mime_type               │ application/pdf                                    │
│ uri                     │ data/rg.25si055505.pdf                             │
╰─────────────────────────┴────────────────────────────────────────────────────╯
None
WARNI… JINA@42018 Error getting the directory name from PDFSegmenter. `--install-requirements` option is only valid when `uses` is a configuration file.                                        [01/06/23 00:12:18]
⠋ Waiting ... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0/0 -:--:--DEBUG  segmenter/rep-0@42052 <pdf_segmenter.PDFSegmenter object at 0x7f950401ac40> is successfully loaded!                                                                                      [01/06/23 00:12:18]
DEBUG  segmenter/rep-0@42052 start listening on 0.0.0.0:53273                                                                                                                                                      
DEBUG  segmenter/rep-0@42052 run grpc server forever                                                                                                                                                               
DEBUG  gateway/rep-0/GatewayRuntime@42053 adding connection for deployment segmenter/heads/0 to grpc://0.0.0.0:53273                                                                            [01/06/23 00:12:18]
DEBUG  gateway/rep-0/GatewayRuntime@42053 start server bound to 0.0.0.0:56301                                                                                                                                      
DEBUG  segmenter/rep-0@42018 ready and listening                                                                                                                                                [01/06/23 00:12:18]
DEBUG  gateway/rep-0@42018 ready and listening                                                                                                                                                  [01/06/23 00:12:18]
─────────────────────────────────────────────────────────────────────────────────────────── 🎉 Flow is ready to serve! ────────────────────────────────────────────────────────────────────────────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│  ⛓      Protocol                   GRPC  │
│  🏠        Local          0.0.0.0:56301  │
│  🔒      Private     10.120.14.79:56301  │
│  🌍       Public    165.124.167.4:56301  │
╰──────────────────────────────────────────╯
DEBUG  Flow@42018 2 Deployments (i.e. 2 Pods) are running in this Flow                                                                                                                          [01/06/23 00:12:19]
DEBUG  GRPCClient@42018 connected to 0.0.0.0:56301                                                                                                                                              [01/06/23 00:12:20]
DEBUG  segmenter/rep-0@42052 got an endpoint discovery request                                                                                                                                  [01/06/23 00:12:20]
DEBUG  segmenter/rep-0@42052 recv DataRequest at /index with id: c6f4046393a447f4b794046211a3b2cb                                                                                                                  
doc:  <Document ('id', 'blob', 'mime_type', 'uri', 'chunks') at b791272a4c237eacf92d9538b50405e8>
should be done
DEBUG  gateway/rep-0/GatewayRuntime@42053 GRPC call to segmenter errored, with error <AioRpcError of RPC that terminated with:                                                                  [01/06/23 00:12:39]
               status = StatusCode.UNAVAILABLE                                                                                                                                                                     
               details = "Socket closed"                                                                                                                                                                           
               debug_error_string = "{"created":"@1672985559.056961950","description":"Error received from peer                                                                                                    
       ipv4:0.0.0.0:53273","file":"src/core/lib/surface/call.cc","file_line":966,"grpc_message":"Socket closed","grpc_status":14}"                                                                                 
       > and for the 1th time.                                                                                                                                                                                     
DEBUG  gateway/rep-0/GatewayRuntime@42053 GRPC call to deployment segmenter failed with error <AioRpcError of RPC that terminated with:                                                                            
               status = StatusCode.UNAVAILABLE                                                                                                                                                                     
               details = "Socket closed"                                                                                                                                                                           
               debug_error_string = "{"created":"@1672985559.056961950","description":"Error received from peer                                                                                                    
       ipv4:0.0.0.0:53273","file":"src/core/lib/surface/call.cc","file_line":966,"grpc_message":"Socket closed","grpc_status":14}"                                                                                 
       >, for retry attempt 1/3. Trying next replica, if available.                                                                                                                                                
DEBUG  gateway/rep-0/GatewayRuntime@42053 GRPC call to segmenter errored, with error <AioRpcError of RPC that terminated with:                                                                                     
               status = StatusCode.UNAVAILABLE                                                                                                                                                                     
               details = "failed to connect to all addresses"                                                                                                                                                      
               debug_error_string = "{"created":"@1672985559.072485749","description":"Failed to pick                                                                                                              
       subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3260,"referenced_errors":[{"created":"@1672985559.072484937","description":"failed to connect to                     
       all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"                                                                                                          
       > and for the 2th time.                                                                                                                                                                                     
DEBUG  gateway/rep-0/GatewayRuntime@42053 GRPC call to deployment segmenter failed with error <AioRpcError of RPC that terminated with:                                                                            
               status = StatusCode.UNAVAILABLE                                                                                                                                                                     
               details = "failed to connect to all addresses"                                                                                                                                                      
               debug_error_string = "{"created":"@1672985559.072485749","description":"Failed to pick                                                                                                              
       subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3260,"referenced_errors":[{"created":"@1672985559.072484937","description":"failed to connect to                     
       all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"                                                                                                          
       >, for retry attempt 2/3. Trying next replica, if available.                                                                                                                                                
DEBUG  gateway/rep-0/GatewayRuntime@42053 GRPC call to segmenter errored, with error <AioRpcError of RPC that terminated with:                                                                                     
               status = StatusCode.UNAVAILABLE                                                                                                                                                                     
               details = "failed to connect to all addresses"                                                                                                                                                      
               debug_error_string = "{"created":"@1672985559.083677854","description":"Failed to pick                                                                                                              
       subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3260,"referenced_errors":[{"created":"@1672985559.083662525","description":"failed to connect to                     
       all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"                                                                                                          
       > and for the 3th time.                                                                                                                                                                                     
DEBUG  gateway/rep-0/GatewayRuntime@42053 GRPC call to deployment segmenter failed with error <AioRpcError of RPC that terminated with:                                                                            
               status = StatusCode.UNAVAILABLE                                                                                                                                                                     
               details = "failed to connect to all addresses"                                                                                                                                                      
               debug_error_string = "{"created":"@1672985559.083677854","description":"Failed to pick                                                                                                              
       subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3260,"referenced_errors":[{"created":"@1672985559.083662525","description":"failed to connect to                     
       all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"                                                                                                          
       >, for retry attempt 3/3. Trying next replica, if available.                                                                                                                                                
DEBUG  gateway/rep-0/GatewayRuntime@42053 GRPC call to segmenter errored, with error <AioRpcError of RPC that terminated with:                                                                                     
               status = StatusCode.UNAVAILABLE                                                                                                                                                                     
               details = "failed to connect to all addresses"                                                                                                                                                      
               debug_error_string = "{"created":"@1672985559.095512863","description":"Failed to pick                                                                                                              
       subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3260,"referenced_errors":[{"created":"@1672985559.095511871","description":"failed to connect to                     
       all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":167,"grpc_status":14}]}"                                                                                                          
       > and for the 4th time.                                                                                                                                                                                     
DEBUG  gateway/rep-0/GatewayRuntime@42053 GRPC call for segmenter failed, retries exhausted                                                                                                                        
DEBUG  gateway/rep-0/GatewayRuntime@42053 resetting connection for segmenter to 0.0.0.0:53273                                                                                                                      
ERROR  gateway/rep-0/GatewayRuntime@42053 Error while getting responses from deployments: failed to connect to all addresses |Gateway: Communication error with deployment segmenter at                            
       address(es) {'0.0.0.0:53273'}. Head or worker(s) may be down.                                                                                                                                               
ERROR  GRPCClient@42018 gRPC error: StatusCode.UNAVAILABLE failed to connect to all addresses |Gateway: Communication error with deployment segmenter at address(es) {'0.0.0.0:53273'}. Head or [01/06/23 00:12:39]
       worker(s) may be down.                                                                                                                                                                                      
       The ongoing request is terminated as the server is not available or closed already.                                                                                                                         
DEBUG  gateway/rep-0@42018 waiting for ready or shutdown signal from runtime                                                                                                                    [01/06/23 00:12:39]
DEBUG  gateway/rep-0@42018 terminate                                                                                                                                                                               
DEBUG  gateway/rep-0@42018 terminating the runtime process                                                                                                                                                         
DEBUG  gateway/rep-0@42018 runtime process properly terminated                                                                                                                                                     
DEBUG  gateway/rep-0/GatewayRuntime@42053 Received signal SIGTERM                                                                                                                                                  
DEBUG  gateway/rep-0@42053 process terminated                                                                                                                                                   [01/06/23 00:13:03]
DEBUG  gateway/rep-0@42018 terminated                                                                                                                                                           [01/06/23 00:13:03]
DEBUG  gateway/rep-0@42018 joining the process                                                                                                                                                                     
DEBUG  gateway/rep-0@42018 successfully joined the process                                                                                                                                                         
DEBUG  segmenter/rep-0@42018 waiting for ready or shutdown signal from runtime                                                                                                                  [01/06/23 00:13:03]
DEBUG  segmenter/rep-0@42018 terminate                                                                                                                                                                             
DEBUG  segmenter/rep-0@42018 terminating the runtime process                                                                                                                                                       
DEBUG  segmenter/rep-0@42018 runtime process properly terminated                                                                                                                                                   

I also note that this does not occur with every PDF. The PDF which is causing problems is attached.
rg.25si055505.pdf

Describe how you solve it


Environment

- jina 3.13.0
- docarray 0.20.1
- jcloud 0.1.6
- jina-hubble-sdk 0.28.0
- jina-proto 0.1.13
- protobuf 4.21.12
- proto-backend upb
- grpcio 1.47.2
- pyyaml 6.0
- python 3.8.15
- platform Linux
- platform-release 4.15.0-200-generic
- platform-version #211-Ubuntu SMP Thu Nov 24 18:16:04 UTC 2022
- architecture x86_64
- processor x86_64
- uid 185410937616950
- session-id 60a8000a-8d89-11ed-af64-a8a15912fa36
- uptime 2023-01-06T00:14:28.766910
- ci-vendor (unset)
- internal False
* JINA_DEFAULT_HOST (unset)
* JINA_DEFAULT_TIMEOUT_CTRL (unset)
* JINA_DEPLOYMENT_NAME (unset)
* JINA_DISABLE_UVLOOP (unset)
* JINA_EARLY_STOP (unset)
* JINA_FULL_CLI (unset)
* JINA_GATEWAY_IMAGE (unset)
* JINA_GRPC_RECV_BYTES (unset)
* JINA_GRPC_SEND_BYTES (unset)
* JINA_HUB_NO_IMAGE_REBUILD (unset)
* JINA_LOG_CONFIG (unset)
* JINA_LOG_LEVEL (unset)
* JINA_LOG_NO_COLOR (unset)
* JINA_MP_START_METHOD (unset)
* JINA_OPTOUT_TELEMETRY (unset)
* JINA_RANDOM_PORT_MAX (unset)
* JINA_RANDOM_PORT_MIN (unset)
* JINA_LOCKS_ROOT (unset)
* JINA_K8S_ACCESS_MODES (unset)
* JINA_K8S_STORAGE_CLASS_NAME (unset)
* JINA_K8S_STORAGE_CAPACITY (unset)
* JINA_STREAMER_ARGS (unset)
@AnneYang720
Copy link
Contributor

This error is actually due to Message jina.DataRequestProto exceeds maximum protobuf size of 2GB, so it only occurs with certain pdf files. I get this error message with protobuf==3.20.3. Maybe this error is somehow hidden by grpcio with lastest version of protobuf.

For example, if you change
doc.chunks.extend([Document(tensor=img, mime_type='image/*') for img in images])
to
doc.chunks.extend([Document(tensor=img, mime_type='image/*') for img in images[:1]])
and similarly change text to
doc.chunks.extend([Document(text=t, mime_type='text/plain') for t in texts[:1]])
you can notice the error is gone.

@themantalope
Copy link
Contributor Author

@AnneYang720 thank you for your help.

Yes, I also dropped the installed protobuf to 3.18 and got an error regarding the message size exceeding 2GB. Setting the gprc server message size settings to greater than 2GB in the Flow setup also throws an error.

The issue with your suggested approach is that it removes the data associated with the first page of the document. For example, the output of print(len(texts[0]), sum([len(t) for t in texts])) for the document in question is 2538 47318, so I'd just be removing data.

Is there a guide for handling large files with Jina? Seems like this could be an issue, especially for videos, images, and large PDF files. Or is this happening because the data is all getting loaded as a DocumentArray in memory?

For example in the example-video-search-app I see that documents are indexed one at a time. Does that affect message size?

@AnneYang720
Copy link
Contributor

My example was just another attempt to verify the error is because of the size limit.

After extraction of images from pdf file, the images with shape 4350 3300 3 are much larger than the file itself. You can simply store them in your executor and change return to return DocumentArray() or return None in def craft. By default, it returns the original docs object (see the doc).

A more general way we suggest is that you extract the images first and store them elsewhere (such as local file system). Instead of

doc.chunks.extend([Document(tensor=img, mime_type='image/*') for img in images])

you can do

doc.chunks.extend([Document(uri='data/your_image.png', mime_type='image/*') for img in images])

and call the function load_uri_to_image_tensor() when you need to work with the images.

@themantalope
Copy link
Contributor Author

@AnneYang720 thank you for your input.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants