In [None]:
!pip install transformers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from transformers import BertModel, BertTokenizer
import torch
from torch import nn
from textwrap import wrap

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Initial parameters
DOCUMENT_PATH = '/content/drive/My Drive/Naveler/03.TECH/data_discovery/documents/BOE-A-2022-4972.pdf'
ind = 1 # set to 0 to classify by categories, set to 1 to classify by subcategories
CLASSIFY_BY = ['category', 'subcategory'][ind]
MAX_LEN = 250

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [None]:
if CLASSIFY_BY == 'category':
  possible_labels = ['Impacto energético', 'Innovación', 'Sector eléctrico', 'Alimentación y bebidas']
elif CLASSIFY_BY == 'subcategory':
  possible_labels = ['Calidad aire', 'Cambio climático', 'Gestión del agua', 'Eficiencia energética',
                     'GNL maritimo', 'Movilidad sostenible', 'Smart Grids', 'Consumo eléctrico', 'Redes eléctricas',
                     'Tarifas  / mediciones eléctricas', 'Vulnerabilidad energética', 'Bebidas energéticas']
NCLASSES = len(possible_labels)

output_dict = {}
for label in possible_labels:
  output_dict[label] = 0

In [None]:
# Define tokenizer
PRE_TRAINED_MODEL_NAME = 'dccuchile/bert-base-spanish-wwm-cased'
tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME)

In [None]:
# Define class for building model
class BERTTextClassifier(nn.Module):

  def __init__(self, n_classes):
    super(BERTTextClassifier, self).__init__()
    self.bert = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME, return_dict=False)
    self.drop = nn.Dropout(p=0.3)
    self.linear = nn.Linear(self.bert.config.hidden_size, n_classes)

  def forward(self, input_ids, attention_mask):
    _, cls_output = self.bert(
        input_ids = input_ids,
        attention_mask = attention_mask
    )
    drop_output = self.drop(cls_output)
    output = self.linear(drop_output)
    return output

In [None]:
# Model path
if CLASSIFY_BY == 'category':
  s = 'cat'
elif CLASSIFY_BY == 'subcategory':
  s = 'subcat'
MODEL_NAME = 'BERT_text_classifier_'+s
MODEL_PATH = '/content/drive/My Drive/Naveler/03.TECH/data_discovery/models/'+MODEL_NAME

In [None]:
# Upload model
loaded_model = BERTTextClassifier(NCLASSES)
loaded_model = loaded_model.to(device)
loaded_model.load_state_dict(torch.load(MODEL_PATH))
#model.eval()

Some weights of the model checkpoint at dccuchile/bert-base-spanish-wwm-cased were not used when initializing BertModel: ['cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.bias', 'cls.predictions.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertModel were not initialized from the model checkpoint at dccuchile/bert-base-spanish-wwm-cased and are newly initialized: ['bert.pooler.dense.we

<All keys matched successfully>

In [None]:
# Classify new data
def classifyParagraph(paragraph_text):
  encoding_paragraph = tokenizer.encode_plus(
      paragraph_text,
      max_length = MAX_LEN,
      truncation = True,
      add_special_tokens = True,
      return_token_type_ids = False,
      #pad_to_max_length = True,
      padding='max_length',
      return_attention_mask = True,
      return_tensors = 'pt'
      )
  
  input_ids = encoding_paragraph['input_ids'].to(device)
  attention_mask = encoding_paragraph['attention_mask'].to(device)
  output = loaded_model(input_ids, attention_mask)
  #print("\n".join(wrap(paragraph_text)))
  with torch.no_grad():
    G = nn.functional.softmax(output, dim=1)[0]
  k = len(possible_labels)
  #print(G.topk(k))
  for (p, y) in zip(*(G.topk(k))):
    label = possible_labels[y.item()]
    pond = p.item()
    output_dict[label] += pond

In [None]:
# generic.py
import logging


class FileHandler:
    def __init__(self):
        self.content = None
        self.logger = logging.getLogger('naveler.file_handlers')

In [None]:
!pip install textract

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# pdf_handler.py
import re

import textract

#from .generic import FileHandler
from typing import List


page_pattern = re.compile(r'\b[pP][aá]g\.?\s+\d+\b\s*$')


class ParagraphContent:
    """Class to contain the content of a paragraph from a PDF."""
    def __init__(self, line=None, page_number=1):
        self.page_number = page_number
        self.content = []
        if line:
            self.add_line(line)

    def add_line(self, line):
        self.content.append(line)

    def has_content(self):
        return True if self.content else False

    def get_content(self):
        return ' '.join(self.content)


class PDFHandler(FileHandler):
    """Load pdf file and extract text from that pdf"""

    def __init__(self):
        super().__init__()
        self.content: str = None
        self.split_lines_content: List[str] = None

    def get_page(self, file_path: str) -> None:
        """Parse pdf file to text

        :param str file_path: pdf file path
        """
        self.split_lines_content = None
        self.content = textract.process(file_path).decode('utf-8')

    def get_valid_lines(self) -> list:
        """Attempt to group lines into paragraphs."""

        paragraphs = []
        paragraph = ParagraphContent()

        page_number = 0

        # Treat paragraphs as a set of lines until one ends with a dot ('.')
        for line in self.content.splitlines():
            if page_pattern.search(line):
                page_number += 1

            # Skip useless lines such as numbers, pages...
            if len(line.replace(" ", "")) < 15:
                continue

            # Skip empty lines
            if not line.strip():
                continue

            if line.endswith('.'):
                paragraph.add_line(line)
                paragraphs.append(paragraph)
                # Define empty paragraph starting from now
                paragraph = ParagraphContent(
                    page_number=page_number
                )
                continue
            paragraph.add_line(line)

        # Include the final paragraph
        if paragraph.has_content():
            paragraphs.append(paragraph)

        return paragraphs

    def get_content(self) -> list:
        """Return all the elements in the document."""

        return self.get_valid_lines()

    def get_content_from_regex(self, regex_expression: str, group: int = 1) -> str:
        """Get content from regex expression

        :param str regex_expression: regular expression to extract the content
        :param int group: (optional) occurrence number of the regex
        :return: content extracted from regex
        :rtype: str
        """
        expression = re.compile(regex_expression)
        value = expression.search(self.content).group(group)

        return value

    def get_line(self, line_number: int) -> str:
        """Get exact line from the content

        :param int line_number: line number to extract
        :return: content of the following line
        :rtype: str
        """

        if self.split_lines_content is None:
            self.split_lines_content = self.content.splitlines()
        return self.split_lines_content[line_number]

In [None]:
def pdf_to_list_of_paragraphs(file_name):
    p = PDFHandler()
    p.get_page(file_name)

    data = p.get_content()

    corpus = []
    for paragraph in data:
        corpus.append(paragraph.get_content())
    
    return corpus

In [None]:
# Convert pdf to list of paragraphs
text = pdf_to_list_of_paragraphs(DOCUMENT_PATH)
number_of_paragraphs = len(text)

In [None]:
# Classification and output
for paragraph in text:
  classifyParagraph(paragraph)
for label in possible_labels:
  print("{}:\t{}%".format(label, round(100*output_dict[label]/number_of_paragraphs, 2)))

Calidad aire:	0.33%
Cambio climático:	0.04%
Gestión del agua:	0.31%
Eficiencia energética:	0.41%
GNL maritimo:	0.03%
Movilidad sostenible:	0.02%
Smart Grids:	0.11%
Consumo eléctrico:	0.11%
Redes eléctricas:	97.99%
Tarifas  / mediciones eléctricas:	0.15%
Vulnerabilidad energética:	0.49%
Bebidas energéticas:	0.01%
