# Transformer

In [6]:
# !pip install fitz
# !pip install PyMuPdf

In [8]:
import sys
sys.path.insert(0, "../")

import os, gc, re, psutil, traceback
import timeit, datetime
import pandas as pd
import fitz
import ftfy
import nltk
import warnings
import pdfplumber, spacy
import time, warnings, traceback, spacy
import spacy
import os, time, warnings, traceback
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.multiprocessing

from abc import ABC, abstractmethod
from tqdm.notebook import tqdm
from transformers import pipeline
from googletrans import Translator
from collections import deque
from concurrent.futures import ThreadPoolExecutor

from etl.core.logger import WattleLogger
from etl.core.concrete import WattleExtract, WattleTransform
from etl.utils.base import WattleUtils
from etl.core.constants import (
    MSG_FILE_EXIST,
    MSG_FILE_DOWNLOADED,
    METHOD_EXEC,
    MSG_CSV_FILE_ERROR,
    MSG_FILE_CREATED,
    MSG_NOT_FOUND
)

msg = lambda txt: f"[{datetime.datetime.now()}]: {txt}"
def execution_time(func):
        def wrapper(*args, **kwargs):
            start = timeit.default_timer()
            result = func(*args, **kwargs)
            end = timeit.default_timer()
            m = psutil.virtual_memory()
            print(f"{msg(func.__name__)} \
took {end-start:.2f} sec | \
Memory | Used: {m.used / (1024 ** 3):.2f}GB \
({m.percent}%) | Available: \
{m.available/(1024 ** 3):.2f}GB of \
{m.total / (1024 ** 3):.3f}GB \
")
            return result
        return wrapper
print(msg("Done."))

ModuleNotFoundError: No module named 'PyMuPdf'

In [7]:
EXTRACTOR_MAX_WORDS  = 400
EXTRACTOR_MAX_LENGTH = 2000
SUMMARISER_MAX_WORDS = 500
TRANSLATOR_MAX_WORDS = 400
TRANSLATOR_SRC       = 'hr'
TRANSLATOR_DEST      = 'en'
DEFAULT_MULTIPLIER   = 0.7
SUMMARY_TEXT         = 'summary_text'

warnings.filterwarnings("ignore")

coeficient = lambda w, m: int(w * m)
inc     = lambda i: i + 1
merge   = lambda c,l : f"{c}".join(l)
length  = lambda l:  len(l) > 0
zero    = lambda l:  len(l) == 0
bigger  = lambda a,  b: len(a) > b if isinstance(a, list) else  a > b
smaller = lambda a,  b: len(a) < b if isinstance(a, list) else  a < b
cols    = lambda df, x: [el for el in df.columns if el.startswith(x)]
# get      = lambda a,b,m,n l: [r for r in l if r[a] == m and r[b] == n]

min_max_range = [
    (  0,    20, 0.85, 0.31),
    ( 20,    50, 0.75, 0.32),
    ( 50,   100, 0.74, 0.33),
    (100,   200, 0.73, 0.34),
    (200,   300, 0.72, 0.35),
    (300,   400, 0.71, 0.36),
    (400,   500, 0.70, 0.37)
]

def min_max(x):
    for n in min_max_range:
        if x > n[0] and x < n[1]:
            return int(x*n[2]), int(x*n[3])
    return int(x * 0.75), int(x * 0.35)

class Stats:
    def __init__(self, filename):
        self.pages = {}
        self.filename = filename

    def __str__(self):
        p, r, w, c = 0,0,0,0
        for pg in self.pages:
            if pg != p: p += 1
            r += int(self.pages[pg]['Paragraphs'])
            w += int(self.pages[pg]['Words'])
            c += int(self.pages[pg]['Characters'])
        return f"Stats: {p} pages, {r} paragraphs and {w} words, also with {c} characters. [{self.filename}]"

    def add(self, pg, text):
        if pg in self.pages:
            self.pages[pg]['Paragraphs'] += 1
            self.pages[pg]['Words'] += len(text.split())
            self.pages[pg]['Characters'] += len(text)
        else:
            self.pages[pg] = {'Page':pg, 'Paragraphs': 1, 'Words': len(text.split()), 'Characters': len(text)}

class ExtractPaper(WattleExtract):
    def __init__(self, log, params):
        super().__init__(log, params)
        assert isinstance(params['path'], str)
        assert isinstance(params['ext'], str)

        self.path = params['path']
        self.ext = params['ext']
        self.progress = None
        self.filelist = deque()
        self.statlist = deque()
        self.buffer = deque()
        self.counter = 0
        self.pages = params['pages'] if 'pages' in params else None
        self.max_words = params['max_words'] if 'max_words' in params else EXTRACTOR_MAX_WORDS
        self.max_length = params['max_length'] if 'max_length' in params else EXTRACTOR_MAX_LENGTH
        self.nlp = spacy.load("en_core_web_sm")
        self.lines = deque()
        self.extract()

    def make_senteces(self, block, pg):
        if 'lines' in block:
            self.lines.clear()
            self.counter += 1
            for line in block['lines']:
                txt = line['spans'][0]['text'].strip()
                txt = re.sub(r'[^\x00-\x7F]+', '', txt)
                self.lines.append(txt)

        if len(self.lines) <=0: return
        pg, words, length = inc(pg), 0,0
        text = ' '.join(self.lines).strip()
        self.stats.add(pg, text)
        self.lines.clear()
        sentences = [ f"{s}" for s in self.nlp(text).sents ]
        for s in sentences:
            words += len(s.split())
            length += len(s)
            if words < self.max_words and length < self.max_length:
                self.lines.append(s)
            else:
                text = ' '.join(self.lines); self.lines.clear();
                line = { 'text': text, 'pg': pg, 'block': self.counter, 'wc': len(text.split()), 'chars': len(text) }
                self.buffer.append(line)
                self.lines.append(s)
                words += len(s.split())
                length += len(s)

        if len(self.lines) > 0:
            line = { 'text': text, 'pg': pg, 'block': self.counter, 'wc': len(text.split()), 'chars': len(text) }
            self.buffer.append(line)
        self.lines.clear()

    def process_file(self, filename):
        try:
            if filename.endswith('.pdf'):
                pdf = fitz.open(filename);
                page_range = range(pdf.page_count) if self.pages is None else self.pages
                self.progress = tqdm(total=len(page_range), desc=f"Extracting: [{filename}]")
                for page_num in page_range:
                    page      = pdf[page_num]
                    page_dict = page.get_text("dict")
                    for block in page_dict['blocks']:
                        self.make_senteces(block, page_num)
                    time.sleep(0.1)
                    self.progress.update(1)
                del pdf
            df = pd.DataFrame(self.buffer)
            file_name = filename.replace(".pdf", ".csv")
            df.to_csv(file_name, index=False)
            del df
        except Exception as e:
            msg = f"{self.__class__}._process_file: {e}\n"
            msg += traceback.format_exc()
            raise Exception(msg)
        finally:
            gc.collect()

    def extract(self):
        super().extract()
        if not WattleUtils.path_exists(self.path):
            msg = MSG_NOT_FOUND.format("File", self.path)
            self.log.error(msg)
            raise FileNotFoundError(msg)

        for name in os.listdir(self.path):
            if name.endswith(self.ext):
                filename = os.path.join(self.path, name)
                self.filelist.append(filename)
                self.stats = Stats(name)
                self.process_file(name)
                self.statlist.append(self.stats)
                self.counter += 1
                self.log.info(self.stats)

        if zero(self.filelist):
            msg=f"WARNING: Files {self.ext} not found."
            self.log.warning(msg)
            raise Exception(msg)

class SummarisePaper(WattleTransform):
    def __init__(self, log, params):
        super().__init__(log, params)
        assert isinstance(params['path'], str)
        assert isinstance(params['ext'], str)
        assert isinstance(params['model'], str)
        assert isinstance(params['from_column'], str)
        assert isinstance(params['to_column'], str)
        assert isinstance(params['num_threads'], int)
        assert isinstance(params['min_threshold'], int)
        self.path = params['path']
        self.ext = params['ext']
        self.filelist = deque()
        self.statlist = deque()
        self.buffer   = deque()
        self.counter  = 0
        self.model         = params['model'] #if 'model' in params else "sshleifer/distilbart-cnn-12-6"
        self.from_column   = params['from_column']
        self.to_column     = params['to_column']
        self.num_threads   = int(params['num_threads'])
        self.min_threshold = int(params['min_threshold'])
        self.nlp           = spacy.load("en_core_web_sm")
        self.summariser    = pipeline("summarization", model=self.model)
        self.progress      = None
        self.transform()

    def transform(self):
        super().transform()
        if not WattleUtils.path_exists(self.path):
            msg = MSG_NOT_FOUND.format("File", self._input)
            self.log.error(msg)
            raise FileNotFoundError(msg)

        for name in os.listdir(self.path):
            if name.endswith(self.ext):
                filename = os.path.join(self.path, name)
                self.filelist.append(filename)
                self.stats = Stats(name)
                self.process_file(name)
                self.statlist.append(self.stats)
                self.counter += 1
                self.log.info(self.stats)

        if zero(self.filelist):
            msg=f"WARNING: Files {self.ext} not found."
            self.log.warning(msg)
            raise Exception(msg)

    def _column_check(self, df):
        if not self.from_column in df.columns:
            raise Exception( f"Column {self.from_column} doesn't exist in csv file." )

        if self.to_column in df.columns:
            if self.drop == True: df.drop(self.to_column, axis=1, inplace=True)

    def _apply(self, text):
        self.counter += 1
        wc = len(f"{text}".split())
        if not wc >= self.min_threshold:
            self.stats.add(self.counter, text);
            self.progress.update(1)
            return ''

        max, min = min_max(wc)
        summary  = self.summariser(text, max_length=max, min_length=min, do_sample=False)[0][SUMMARY_TEXT]
        self.stats.add(self.counter, summary);
        self.progress.update(1)
        return summary

    def process_file(self, filename):
        try:
            df = pd.read_csv(filename)
            self._column_check(df)
            self.progress = tqdm(total=len(df), desc=f"Summarisng: {filename}")
            with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
                column = list(executor.map(self._apply, df[self.from_column]))
            df.insert(0, self.to_column, column)
            df.to_csv(filename, index=False)
            del df
        except Exception as e:
            msg = f"{self.__class__}._process_file: {e}\n"
            msg += traceback.format_exc()
            raise Exception(msg)
        finally:
            gc.collect()

class TranslatePaper(WattleTransform):
    def __init__(self, log, params):
        super().__init__(log, params)
        assert isinstance(params['path'], str)
        assert isinstance(params['ext'], str)
        assert isinstance(params['from_column'], str)
        assert isinstance(params['to_column'], str)
        assert isinstance(params['src'], str)
        assert isinstance(params['dest'], str)
        assert isinstance(params['min_threshold'], int)
        assert isinstance(params['num_threads'], int)
        self.path = params['path']
        self.ext = params['ext']
        self.from_column = params['from_column']
        self.to_column = params['to_column']
        self.src = params['src']
        self.dest = params['dest']
        self.drop = params['drop'] if 'drop' in params else None
        self.min_threshold = params['min_threshold']
        self.num_threads = params['num_threads']
        self.progress = None
        self.filelist = deque()
        self.statlist = deque()
        self.buffer = deque()
        self.counter = 0
        self.transform()

    def _apply(self, text):
        self.counter += 1;
        wc = len(f"{text}".split())
        if not wc >= self.min_threshold:
            self.stats.add(self.counter, '')
            self.progress.update(1)
            return ''
        translator = Translator()
        translated = translator.translate(text, src=self.src, dest=self.dest).text
        self.stats.add(self.counter, translated)
        self.progress.update(1)
        return translated

    def _column_check(self, df):
        if not self.from_column in df.columns:
            raise Exception( f"Column {self.from_column} doesn't exist in csv file." )

        if self.to_column in df.columns:
            if self.drop == True: df.drop(self.to_column, axis=1, inplace=True)

    def process_file(self, filename):
        try:
            df = pd.read_csv(filename)
            self._column_check(df)
            self.progress = tqdm(total=len(df), desc=f"Translating: {filename}")
            num_threads = self.num_threads
            with ThreadPoolExecutor(max_workers=num_threads) as executor:
                column = list(executor.map(self._apply, df[self.from_column]))
            df.insert(0, self.to_column, column)
            df.to_csv(filename, index=False)
            del df
        except Exception as e:
            msg = f"{self.__class__}._process_file: {e}\n"
            # msg += traceback.format_exc()
            self.log.error(msg)
            raise Exception(msg)
        finally:
            gc.collect()

    def transform(self):
        super().transform()
        if not WattleUtils.path_exists(self.path):
            msg = MSG_NOT_FOUND.format("File", self._input)
            self.log.error(msg)
            raise FileNotFoundError(msg)

        for name in os.listdir(self.path):
            if name.endswith(self.ext):
                filename = os.path.join(self.path, name)
                self.filelist.append(filename)
                self.stats = Stats(name)
                self.process_file(name)
                self.statlist.append(self.stats)
                self.counter += 1
                self.log.info(self.stats)

        if zero(self.filelist):
            msg=f"WARNING: Files {self.ext} not found."
            self.log.warning(msg)
            raise Exception(msg)
params_str = """
logger:
    name: etl.log
    level: ERROR
extract:
    path: ../data
    ext: .pdf
    max_words: 400
    max_length: 2000
translate1:
    path: ../data
    ext: .csv
    from_column: text
    to_column: text-hr
    src: en
    dest: hr
    min_threshold: 2
    num_threads: 100
summarise:
    path: ../data
    ext: ".csv"
    model1: "sshleifer/distilbart-cnn-12-6"
    model2: "google/pegasus-xsum"
    model: "facebook/bart-large-cnn"
    from_column: text
    to_column: summary-en
    min_threshold: 50
    num_threads: 50
    drop: True
translate2:
    path: ../data
    ext: .csv
    from_column: summary-en
    to_column: summary-hr
    src: en
    dest: hr
    min_threshold: 50
    num_threads: 100
    drop: True
"""
import yaml
params = yaml.safe_load(params_str)
log = WattleLogger(params["logger"])
tt = ExtractPaper(log, params["extract"]); del tt
# tt = TranslatePaper(log, params['translate1']); del tt
# tt = SummarisePaper(log, params['summarise']);  del tt
# tt = TranslatePaper(log, params['translate2']); del tt

Exception: <class '__main__.ExtractPaper'>._process_file: name 'fitz' is not defined
Traceback (most recent call last):
  File "<ipython-input-7-ef0233b6dd1d>", line 116, in process_file
    pdf = fitz.open(filename);
NameError: name 'fitz' is not defined


In [25]:
!python3 -m pip uninstall PyMuPdf
!python3 -m pip install --no-cache-dir PyMuPdf

Collecting PyMuPdf
  Downloading PyMuPDF-1.19.6.tar.gz (2.3 MB)
     |################################| 2.3 MB 1.0 MB/s            
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: PyMuPdf
  Building wheel for PyMuPdf (setup.py) ... [?25lerror
[31m  ERROR: Command errored out with exit status 1:
   command: /usr/bin/python3 -u -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-6zc1_42u/pymupdf_6e42cad86ea74e39b4c821d2f205de53/setup.py'"'"'; __file__='"'"'/tmp/pip-install-6zc1_42u/pymupdf_6e42cad86ea74e39b4c821d2f205de53/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' bdist_wheel -d /tmp/pip-wheel-lnru1j5h
       cwd: /tmp/pip-install-6zc1_42u/pymupdf_6e42cad86ea74e39b4c821d2f205de53/