In [1]:
import sys
backend_path = '../backend'
if backend_path not in sys.path:
        sys.path.append(backend_path)

In [2]:
from sqlalchemy import create_engine, select, values, update, and_, or_
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
from app.models.models import Notice, ResourceLink
from app.models.schema import NoticeBase, ResourceLinkBase
import pendulum
import tempfile
import logging
import os
from zipfile import BadZipfile
import re
from tqdm import tqdm
import textract

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

In [3]:
load_dotenv()
DATABASE_URL = "postgresql+psycopg2://airflow:airflow@localhost:5432/airflow"

In [4]:
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
selected_date = pendulum.now("utc").subtract(days=1).strftime("%Y%m%d")

In [5]:
def get_doc_text(file_name, rm=True):
    """Textract a doc given its path

    Arguments:
        file_name {str} -- path to a doc
    """
    try:
        b_text = None
        try:
            b_text = textract.process(file_name, encoding="utf-8", errors="ignore")
        # ShellError with antiword occurs when an rtf is saved with a doc extension
        except textract.exceptions.ShellError as e:
            err_message = str(e)
            try:
                if "antiword" in err_message and file_name.endswith(".doc"):
                    new_name = file_name.replace(".doc", ".rtf")
                    os.rename(file_name, new_name)
                    b_text = textract.process(
                        new_name, encoding="utf-8", errors="ignore"
                    )
            except textract.exceptions.ShellError as ex:
                logger.error(
                    "Error extracting text from a DOC file. Check that all dependencies of textract are installed.\n{}".format(
                        ex
                    )
                )
        except textract.exceptions.MissingFileError as e:
            b_text = None
            logger.error(
                f"Couldn't textract {file_name} since the file couldn't be found: {e}",
                exc_info=True,
            )
        # This can be raised when a pdf is incorrectly saved as a .docx (GH183)
        except BadZipfile as e:
            if file_name.endswith(".docx"):
                new_name = file_name.replace(".docx", ".pdf")
                os.rename(file_name, new_name)
                b_text = textract.process(
                    new_name, encoding="utf-8", method="pdftotext", errors="ignore"
                )
            else:
                b_text = None
                logger.warning(
                    f"Exception occurred textracting {file_name}: {e}", exc_info=True
                )
        # TypeError is raised when None is passed to str.decode()
        # This happens when textract can't extract text from scanned documents
        except TypeError:
            b_text = None
        except Exception as e:
            if re.match("^(.*) file; not supported", str(e)):
                logger.warning(f"'{file_name}' is type {str(e)}")
            elif re.match("^The filename extension .zip is not yet supported", str(e)):
                logger.warning(
                    f"'{file_name}' is type zip and not supported by textract"
                )
            else:
                logger.warning(
                    f"Exception occurred textracting {file_name}: {e}", exc_info=True
                )
            b_text = None
        text = b_text.decode("utf8", errors="ignore").strip() if b_text else ""
        if rm:
            try:
                os.remove(file_name)
            except Exception as e:
                logger.error(f"{e}Unable to remove {file_name}", exc_info=True)
            finally:
                return text

    except Exception as e:
        logger.error(
            f"Error uncaught when trying to parse file {file_name}. Giving up and returning an empty string. {e}",
            exc_info=True,
        )
        text = ""

    return text

In [25]:
with SessionLocal() as session:
    subquery = (
        select(ResourceLink.notice_id).
        distinct()
    )
    stmt = (
        select(ResourceLink).
        where(and_(ResourceLink.notice_id.in_(subquery),
        ResourceLink.text.is_(None) 
        )).limit(20)
    )
    results = session.execute(stmt).scalars().all()
    resource_links = [ResourceLinkBase.model_validate(result).dict() for result in results]

In [26]:
len(resource_links)

20

In [8]:
res = requests.get(resource_links[0].get("url"))

In [9]:
res.headers

{'x-amz-id-2': 'Ee6kw1V3LokAembJL2dfp0P7+3Yitk1JQjoe1i5eRBOXWebezb1+Rw+5pjrEZUjAP7oWrCbok+CWKUEAcZVv1wYhgnL9t80KErFz238P0Zs=', 'x-amz-request-id': 'K2N00QYJAFJ56HV8', 'Date': 'Wed, 13 Mar 2024 13:15:21 GMT', 'Last-Modified': 'Mon, 11 Mar 2024 20:07:29 GMT', 'ETag': '"d2aec7bda6c1aff3588ffa8534f93f48"', 'x-amz-tagging-count': '2', 'x-amz-server-side-encryption': 'AES256', 'x-amz-version-id': 'u9LVwxTi_5WJGGXNi.V6QfuKc49VbazI', 'Content-Disposition': 'attachment; filename=Sol_1305M324Q0123.pdf', 'Accept-Ranges': 'bytes', 'Content-Type': 'application/octet-stream', 'Server': 'AmazonS3', 'Content-Length': '436199'}

Getting a redirect response to S3

In [10]:
res

<Response [200]>

In [11]:
res.headers

{'x-amz-id-2': 'Ee6kw1V3LokAembJL2dfp0P7+3Yitk1JQjoe1i5eRBOXWebezb1+Rw+5pjrEZUjAP7oWrCbok+CWKUEAcZVv1wYhgnL9t80KErFz238P0Zs=', 'x-amz-request-id': 'K2N00QYJAFJ56HV8', 'Date': 'Wed, 13 Mar 2024 13:15:21 GMT', 'Last-Modified': 'Mon, 11 Mar 2024 20:07:29 GMT', 'ETag': '"d2aec7bda6c1aff3588ffa8534f93f48"', 'x-amz-tagging-count': '2', 'x-amz-server-side-encryption': 'AES256', 'x-amz-version-id': 'u9LVwxTi_5WJGGXNi.V6QfuKc49VbazI', 'Content-Disposition': 'attachment; filename=Sol_1305M324Q0123.pdf', 'Accept-Ranges': 'bytes', 'Content-Type': 'application/octet-stream', 'Server': 'AmazonS3', 'Content-Length': '436199'}

In [13]:
res = requests.head(resource_links[0].get("url"), allow_redirects=True)
res

<Response [403]>

In [14]:
res.headers.get("Content-Length")

It seems like the entire file will have to be streamed to check for content-length. Will look into alternate methods of optimization after a baseline of the application logic is up and running

In [15]:
res = requests.get(resource_links[0].get("url"))

In [16]:
def get_file_name(res):
    file_name = res.headers.get("Content-Disposition").split('filename=')[1].strip('"')
    return file_name

def get_file_size(res):
    file_size = res.headers.get("Content-Length")
    return int(file_size)

In [17]:
get_file_name(res), get_file_size(res)

('Sol_1305M324Q0123.pdf', 436199)

In [18]:
for resource_link in tqdm(resource_links):
    res = requests.get(resource_link.get("url"))
    file_name = get_file_name(res)
    file_size = get_file_size(res)
    logger.info(f"Name: {file_name}")
    logger.info(f"Size: {file_size}")
    if file_size > 3000000:
        logger.info("File size exceeds threshold and will be skipped")
        continue
    prefix, suffix = os.path.splitext(file_name)
    suffix = '.' + suffix
    with tempfile.NamedTemporaryFile(prefix=prefix, suffix=suffix, delete=False) as tmp:
        tmp.write(res.content)
        tmp.flush()
        temp_path = tmp.name
        text = get_doc_text(temp_path, rm=True)
        with SessionLocal() as session:
            if text:
                # clean null characters before committing
                text = text.replace("\x00", "\uFFFD")
                stmt = (
                    update(ResourceLink).
                    where(ResourceLink.id == resource_link['id']).
                    values(text=text)
                )
                session.execute(stmt)
                session.commit()
            else:
                stmt = (
                    update(ResourceLink).
                    where(ResourceLink.id == resource_link['id']).
                    values(text="unparsable")
                )
                session.execute(stmt)
                session.commit()

  0%|          | 0/20 [00:00<?, ?it/s]2024-03-13 09:15:40,980 - INFO - Name: Sol_1305M324Q0123.pdf
2024-03-13 09:15:40,981 - INFO - Size: 436199
  5%|▌         | 1/20 [00:03<01:13,  3.89s/it]2024-03-13 09:15:45,014 - INFO - Name: W911PT24Q0051+-+3D+Printer+Maintenance.+Sintering+Furnace.pdf
2024-03-13 09:15:45,016 - INFO - Size: 1112243
 10%|█         | 2/20 [00:08<01:19,  4.42s/it]2024-03-13 09:15:49,817 - INFO - Name: Sol_140G0124Q0045_Amd_0003.pdf
2024-03-13 09:15:49,818 - INFO - Size: 353789
 15%|█▌        | 3/20 [00:11<01:03,  3.74s/it]2024-03-13 09:15:52,605 - INFO - Name: Sol_140G0124Q0045_Amd_0004.pdf
2024-03-13 09:15:52,606 - INFO - Size: 87063
 20%|██        | 4/20 [00:12<00:40,  2.54s/it]2024-03-13 09:15:53,454 - INFO - Name: B03_DOL_Wage_Determination.pdf
2024-03-13 09:15:53,455 - INFO - Size: 102546
 25%|██▌       | 5/20 [00:13<00:31,  2.11s/it]2024-03-13 09:15:54,667 - INFO - Name: Sol_140G0124Q0045_Amd_0002.pdf
2024-03-13 09:15:54,668 - INFO - Size: 89965
 30%|███       

In [22]:
assert isinstance(SessionLocal, sessionmaker)