<a href="https://colab.research.google.com/github/sejas/Ticket-Entity-Recognizer---AI-Master-s-thesis/blob/master/MUIA_TFM_RECEIPT_Ticket_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Receipts Recognizer
- Author: Antonio Sejas
- UPM - MUIA - TFM
- License: MIT

In [None]:
from enum import Enum
class ShopType(Enum):
  retailer = 'retailer'
  restaurant = 'restaurant'
  supermarket = 'supermarket'

## OCRs

### Keras OCR
pip install keras-ocr

In [None]:
cd /content/drive/My\ Drive/proyectos/ai/receipts-recognizer

/content/drive/My Drive/proyectos/ai/receipts-recognizer


In [None]:
!pip install keras-ocr

Collecting keras-ocr
[?25l  Downloading https://files.pythonhosted.org/packages/13/72/c93a239e6cb6c6d604054c6d418f368338c2e23f83882600cbd5a888cc91/keras-ocr-0.8.4.tar.gz (166kB)
[K     |████████████████████████████████| 174kB 9.1MB/s 
[?25hCollecting essential_generators
[?25l  Downloading https://files.pythonhosted.org/packages/59/b1/979b823497488e5f13c9070fcd6a2e24f6d9c6fd5398e0fbeccc8158bd3b/essential_generators-0.9.2-py3-none-any.whl (9.5MB)
[K     |████████████████████████████████| 9.5MB 14.5MB/s 
Collecting validators
  Downloading https://files.pythonhosted.org/packages/41/4a/3360ff3cf2b4a1b9721ac1fbff5f84663f41047d9874b3aa1ac82e862c44/validators-0.18.1-py3-none-any.whl
Collecting fonttools
[?25l  Downloading https://files.pythonhosted.org/packages/58/89/932dc9478aae3c74f80f1cb390064fd4620d791e7f98b705160551e0ea39/fonttools-4.14.0-py3-none-any.whl (811kB)
[K     |████████████████████████████████| 819kB 64.3MB/s 
Collecting pyclipper
[?25l  Downloading https://files.pytho

In [None]:
images_paths_list = [
      './dataset/retailer/retailer-benetton.jpg',
      './dataset/retailer/retailer-benetton-2.jpg',
  ]

In [None]:
import matplotlib.pyplot as plt
import keras_ocr

pipeline = keras_ocr.pipeline.Pipeline()
images = [
    keras_ocr.tools.read(path) for path in images_paths_list
]

# predictions list of (word, box) tuples.
prediction_groups = pipeline.recognize(images)
print(prediction_groups)

# Plot the predictions
fig, axs = plt.subplots(nrows=len(images), figsize=(40, 40))
for ax, image, predictions in zip(axs, images, prediction_groups):
    keras_ocr.tools.drawAnnotations(image=image, predictions=predictions, ax=ax)

Looking for /root/.keras-ocr/craft_mlt_25k.h5
Downloading /root/.keras-ocr/craft_mlt_25k.h5


AssertionError: ignored

### Pytesseract OCR
pip install pytesseract

In [None]:
!sudo apt install tesseract-ocr
!pip install pytesseract

In [None]:
import pytesseract
import cv2

def image_path_to_text(image_path:str) -> str:
  try:
    img = cv2.imread(images_paths_list[1])
    return pytesseract.image_to_string(img)
  except Exception:
    print('failed: '+image_path)
    return ''

receipt_texts_list = []
for image_path in images_paths_list:
  print('image_path',image_path)
  receipt_texts_list.append(image_path_to_text(image_path))

print(receipt_texts_list[0])

## Spacy POS and NER
The performance is very low.
We cannot extract any result from this response.
Spacy doesn't perform well in this context of spanish and receipts.

In [None]:
!pip install spacy
!python -m spacy download es_core_news_sm

In [None]:
import spacy
import es_core_news_sm
nlp = es_core_news_sm.load()
doc  = nlp(receipt_texts_list[0])
for token in doc:
    print(token.text, token.lemma_, token.pos_, token.tag_, token.dep_,
            token.shape_, token.is_alpha, token.is_stop)

In [None]:
for ent in doc.ents:
  print(ent.label,ent.label_, ent.text)

## My own specific NER Tagger

### Generator of tickets and tags

In [None]:
from typing import List

In [None]:
class Attribute:
  def __init__(self, code='', value='', accuracy=0.0):
    self.code = code
    self.value = value
    self.accuracy = accuracy

class Receipt:
  trade_name = Attribute(code='TRADE_NAME')
  name = Attribute(code='NAME')
  address = Attribute(code='ADDR')
  phone = Attribute(code='PHONE')
  receipt_number = Attribute(code='RNUMBER')
  date = Attribute(code='DATE')
  total = Attribute(code='TOTAL')
  tax = Attribute(code='TAX')
  rate = Attribute(code='RATE')
  fiscal_id = Attribute(code='FISCAL_ID')
  
  @classmethod
  def _props(cls):   
    return [key for key in cls.__dict__.keys() if key[0] != '_']
  
  @classmethod
  def _prop_codes(cls):   
    return [getattr(cls, key).code for key in cls._props()]

# Products for future developments

In [None]:
def add_prefix_to_list(output_list:str)->str:
  biluo = ['B', 'I', 'L', 'U', 'O'] # Begin, In, Last, Unit, Outside
  new_output = []
  for word in output_list:
    for prefix in biluo:
      new_output.append(prefix+'-'+word)
  return new_output

OUTPUTS = add_prefix_to_list(Receipt._prop_codes())+['O', 'SEP']
OUTPUTS

['B-TRADE_NAME',
 'I-TRADE_NAME',
 'L-TRADE_NAME',
 'U-TRADE_NAME',
 'O-TRADE_NAME',
 'B-NAME',
 'I-NAME',
 'L-NAME',
 'U-NAME',
 'O-NAME',
 'B-ADDR',
 'I-ADDR',
 'L-ADDR',
 'U-ADDR',
 'O-ADDR',
 'B-PHONE',
 'I-PHONE',
 'L-PHONE',
 'U-PHONE',
 'O-PHONE',
 'B-RNUMBER',
 'I-RNUMBER',
 'L-RNUMBER',
 'U-RNUMBER',
 'O-RNUMBER',
 'B-DATE',
 'I-DATE',
 'L-DATE',
 'U-DATE',
 'O-DATE',
 'B-TOTAL',
 'I-TOTAL',
 'L-TOTAL',
 'U-TOTAL',
 'O-TOTAL',
 'B-TAX',
 'I-TAX',
 'L-TAX',
 'U-TAX',
 'O-TAX',
 'B-RATE',
 'I-RATE',
 'L-RATE',
 'U-RATE',
 'O-RATE',
 'B-FISCAL_ID',
 'I-FISCAL_ID',
 'L-FISCAL_ID',
 'U-FISCAL_ID',
 'O-FISCAL_ID',
 'O',
 'SEP']

In [None]:
from enum import Enum
class FakeType(Enum):
  fiscal_id = 'fiscal_id'
  trade_name = 'trade_name'
  name = 'name'
  address = 'address'
  phone = 'phone'
  subtotal = 'subtotal'
  total = 'total'
  ticket_number = 'ticket_number'
  tax_rate = 'tax_rate'
class Faker:

  of = ''
  def __init__(self, of:FakeType):
    self.of = of
    if self.of == FakeType.fiscal_id:
      return 'B38103792'
    elif self.of == FakeType.trade_name:
      return 'UNITED COLOR OF BENETTON'
    elif self.of == FakeType.name:
      return 'RAZZIA SL'
    elif self.of == FakeType.address:
      return 'C/. Mesa y Lopez 12'
    elif self.of == FakeType.phone:
      return '928 22 97 50'
    elif self.of == FakeType.subtotal:
      return ''
    elif self.of == FakeType.total:
      return ''
    elif self.of == FakeType.ticket_number:
      return '36 8390T'
    elif self.of == FakeType.tax_rate:
      return ''

class TicketGenerator:
  template = ''
  tags = ''

  def _sequence(self, text:str, process_sequence)->List[str]:
    result = text
    for fn in process_sequence:
      result = fn(result)
    return result
  def _replaces(self, text:str)->str:
    # generates extra empty lines that later are removed
    return text.replace('\n',' \n ').replace(':',' : ').replace('.',' . ')
  def _remove_stop_tokens(self, tokens_list=[]):
    stop_tokens = ['']
    return [token for token in tokens_list if token not in stop_tokens]
  def _split(self, text:str)->List[str]:
    return text.split(' ')
  def _lower(self, text:str)->str:
    return text.lower()
  def _tokenize(self):
    process_sequence = [
      self._lower,
      self._replaces,
      self._split,
      self._remove_stop_tokens
    ]
    return self._sequence(self.template.lower(), process_sequence)
  def _tags(self):
    """ To be implemented in the subclass """
    process_sequence = [
      self._replaces,
      self._split,
      self._remove_stop_tokens
    ]
    return self._sequence(self.tags, process_sequence)
  
  def generate(self, group_by_pairs=False):
    """ Returns a tuple of tokens, tags
    """
    tokens = self._tokenize()
    tags = self._tags()
    if len(tokens) == len(tags):
      return list(zip(tokens, tags)) if group_by_pairs else (list(tokens), list(tags))
    else:
      print('ERROR, tokens and tags must have the same len')
      return None

class SimpleBenettonGenerator(TicketGenerator):
  template = """<START> UNITED COLOR OF BENETTON
RAZZIA SL
CIF:B38103792

Comercio Minorista
C/. Mesa y Lopez 12
TLF: 928 22 97 50

CAJA: MERCHERIA2

FACT. SIMP.: 36 8390T

FECHA: 27/07/20

HORA: 13:65

OPERARIO: OPERARIO1 <END>""" % ()
  tags = """START B-NAME I-NAME I-NAME L-NAME
B-NAME L-NAME
O-FISCAL_ID SEP U-FISCAL_ID

O O
B-ADDR SEP I-ADDR I-ADDR I-ADDR L-ADDR
O-PHONE SEP B-PHONE I-PHONE I-PHONE L-PHONE

O SEP O

O-RNUMBER  SEP O-RNUMBER SEP SEP B-FICAL_ID L-FISCAL_ID

O-DATE SEP U-DATE

O SEP O SEP O

O SEP O END"""

class BenettonGenerator(TicketGenerator):
  template = """UNITED COLOR OF BENETTON
RAZZIA SL
CIF:B38103792

Comercio Minorista
B-ADDR. Mesa y Lopez 12
TLF: 928 22 97 50

CAJA: MERCHERIA2

FACT. SIMP.: 36 8390T

FECHA: 27/07/20

HORA: 13:65

OPERARIO: OPERARIO1

COD DESCRIPCION UND DTO PVP TOTAL
N74429 FELPA LETRAS NIÑA 4,99 4,99
ZI721IO FELPA+PANTY ALG NINO y 12,99 12,99

POLO M/C LISO BASICO
218110 ove nino 1 4,99 4,99

POLO M/C LISO BASICO
216110 our Wino 1 4,99 4,99

TOTAL LINEAS: 4
TOTAL : 32,96
ENTREGA: 32,96
CAMBIO: 0,00
FORMAS DECOBRO IMPORTE

TARJETAS 32,96

Gracias por su visita.

Los productos son de saldo. Durante los 20 dias
naturales siguientes a la compra (excepto ropa
interior, bao, fiesta y fantasia) podran ser
cambiados por otra prenda © por un vale, siempre
que no hayan sido usados, para lo que se precisa la
presentacion de la compra. Para realizar cambios y
emitir vales de promociones, presentar todas los
articulos. Se admiten devoluciones, estas solo
podran ser realizadas en la misma tienda de la
compra. Garantia comercial adicional segun Texto
Refundido de la Ley General para la Defensa de los
Consumidores y Usuarios. En caso de reclamacion,
puede dirigirse a sugerenclas@razzia es.

Puede consultar la informacion sobre Proteccion de
Datos en nuestra Pagina web:
http: //razzia.avisolegal, jnfo/,

We do not accept refund, To change execpt night
clothes, underwear, SW/MW8ar) gy Be is Pe raitial
the ticket until 19 days
"""


In [None]:
words, tags = SimpleBenettonGenerator().generate()
print(words)
print(tags)

['<start>', 'united', 'color', 'of', 'benetton', '\n', 'razzia', 'sl', '\n', 'cif', ':', 'b38103792', '\n', '\n', 'comercio', 'minorista', '\n', 'c/', '.', 'mesa', 'y', 'lopez', '12', '\n', 'tlf', ':', '928', '22', '97', '50', '\n', '\n', 'caja', ':', 'mercheria2', '\n', '\n', 'fact', '.', 'simp', '.', ':', '36', '8390t', '\n', '\n', 'fecha', ':', '27/07/20', '\n', '\n', 'hora', ':', '13', ':', '65', '\n', '\n', 'operario', ':', 'operario1', '<end>']
['START', 'B-NAME', 'I-NAME', 'I-NAME', 'L-NAME', '\n', 'B-NAME', 'L-NAME', '\n', 'O-FISCAL_ID', 'SEP', 'U-FISCAL_ID', '\n', '\n', 'O', 'O', '\n', 'B-ADDR', 'SEP', 'I-ADDR', 'I-ADDR', 'I-ADDR', 'L-ADDR', '\n', 'O-PHONE', 'SEP', 'B-PHONE', 'I-PHONE', 'I-PHONE', 'L-PHONE', '\n', '\n', 'O', 'SEP', 'O', '\n', '\n', 'O-RNUMBER', 'SEP', 'O-RNUMBER', 'SEP', 'SEP', 'B-FICAL_ID', 'L-FISCAL_ID', '\n', '\n', 'O-DATE', 'SEP', 'U-DATE', '\n', '\n', 'O', 'SEP', 'O', 'SEP', 'O', '\n', '\n', 'O', 'SEP', 'O', 'END']


### Create the vocabulary: Index words

In [None]:
# Wrods Indexing
word_to_ix = {word:ix for ix, word in enumerate(set(words))}
ix_to_word = {ix:word for ix, word in enumerate(set(words))}
# Tags indexing
tag_to_ix = {tag:ix for ix, tag in enumerate(set(tags))}
ix_to_tag = {ix:tag for ix, tag in enumerate(set(tags))}

In [None]:
print(tag_to_ix)
print(ix_to_tag)

{'O-PHONE': 0, 'U-DATE': 1, '\n': 2, 'O-FISCAL_ID': 3, 'I-NAME': 4, 'O-RNUMBER': 5, 'I-PHONE': 6, 'L-PHONE': 7, 'B-PHONE': 8, 'L-NAME': 9, 'SEP': 10, 'B-ADDR': 11, 'U-FISCAL_ID': 12, 'I-ADDR': 13, 'L-ADDR': 14, 'O': 15, 'L-FISCAL_ID': 16, 'O-DATE': 17, 'END': 18, 'B-NAME': 19, 'START': 20, 'B-FICAL_ID': 21}
{0: 'O-PHONE', 1: 'U-DATE', 2: '\n', 3: 'O-FISCAL_ID', 4: 'I-NAME', 5: 'O-RNUMBER', 6: 'I-PHONE', 7: 'L-PHONE', 8: 'B-PHONE', 9: 'L-NAME', 10: 'SEP', 11: 'B-ADDR', 12: 'U-FISCAL_ID', 13: 'I-ADDR', 14: 'L-ADDR', 15: 'O', 16: 'L-FISCAL_ID', 17: 'O-DATE', 18: 'END', 19: 'B-NAME', 20: 'START', 21: 'B-FICAL_ID'}


In [None]:
print(word_to_ix)
print(ix_to_word)

{':': 0, 'hora': 1, '65': 2, 'tlf': 3, '\n': 4, 'cif': 5, 'lopez': 6, 'mercheria2': 7, '36': 8, 'razzia': 9, 'color': 10, '928': 11, 'of': 12, 'fact': 13, '27/07/20': 14, 'operario1': 15, '8390t': 16, 'fecha': 17, 'caja': 18, 'operario': 19, 'sl': 20, '22': 21, '.': 22, '13': 23, '<end>': 24, 'comercio': 25, '97': 26, 'united': 27, 'c/': 28, 'y': 29, '<start>': 30, '50': 31, 'simp': 32, '12': 33, 'mesa': 34, 'benetton': 35, 'minorista': 36, 'b38103792': 37}
{0: ':', 1: 'hora', 2: '65', 3: 'tlf', 4: '\n', 5: 'cif', 6: 'lopez', 7: 'mercheria2', 8: '36', 9: 'razzia', 10: 'color', 11: '928', 12: 'of', 13: 'fact', 14: '27/07/20', 15: 'operario1', 16: '8390t', 17: 'fecha', 18: 'caja', 19: 'operario', 20: 'sl', 21: '22', 22: '.', 23: '13', 24: '<end>', 25: 'comercio', 26: '97', 27: 'united', 28: 'c/', 29: 'y', 30: '<start>', 31: '50', 32: 'simp', 33: '12', 34: 'mesa', 35: 'benetton', 36: 'minorista', 37: 'b38103792'}


### Words and Tags encoding

In [273]:
def hot_encoder_word(size, ix) -> List[int]:
  """ No need to one hot encoding. Because torch indexes the row/column """
  return [1 if ix == i else 0 for i in range(size)]

def _encode_seq(dictionary, seq) -> List[int]:
  hot_list = [dictionary[key] for key in seq]
  return torch.tensor(hot_list, dtype=torch.long)

def word_encoder(seq):
  return _encode_seq(dictionary=word_to_ix, seq=seq)

def tag_encoder(seq):
  return _encode_seq(dictionary=tag_to_ix, seq=seq)

print(word_encoder(['hora', 'united', 'benetton']))

tensor([ 1, 27, 35])


## Model

### LSTM

In [274]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# reproducible results
torch.manual_seed(7)

class Lstm(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.linear = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h = self.lstm(x)[0]
        x = self.linear(h)
        return x
    
    def get_states_across_time(self, x):
        h_c = None
        h_list, c_list = list(), list()
        with torch.no_grad():
            for t in range(x.size(1)):
                h_c = self.lstm(x[:, [t], :], h_c)[1]
                h_list.append(h_c[0])
                c_list.append(h_c[1])
            h = torch.cat(h_list)
            c = torch.cat(c_list)
        return h, c

class LSTM_NER(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super().__init__()
        self.hidden_dim = hidden_dim

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)

        # The LSTM takes word embeddings as inputs, and outputs hidden states
        # with dimensionality hidden_dim.
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)

        # The linear layer that maps from hidden state space to tag space
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
        tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores

### Train

In [275]:
def train(training_data, input_size, output_size, optimizer, loss_function, epochs=300):
  # See what the scores are before training
  # Note that element i,j of the output is the score for tag j for word i.
  # Here we don't need to train, so the code is wrapped in torch.no_grad()
  with torch.no_grad():
      inputs = word_encoder(training_data[0][0])
      tag_scores = model(inputs)
      print(tag_scores)

  for epoch in range(epochs):  # again, normally you would NOT do 300 epochs, it is toy data
      for sentence, tags in training_data:
          # Step 1. Remember that Pytorch accumulates gradients.
          # We need to clear them out before each instance
          model.zero_grad()

          # Step 2. Get our inputs ready for the network, that is, turn them into
          # Tensors of word indices.
          sentence_in = word_encoder(sentence)
          targets = tag_encoder(tags)

          # Step 3. Run our forward pass.
          tag_scores = model(sentence_in)

          # Step 4. Compute the loss, gradients, and update the parameters by
          #  calling optimizer.step()
          loss = loss_function(tag_scores, targets)
          loss.backward()
          optimizer.step()
  
  return model

### Tran Configuration

In [276]:
training_data = [(words, tags)]

# Setup the LSTM with training settings
embedding_dim = 32
hidden_dim = 32
input_size  = len(word_to_ix)
output_size = len(tag_to_ix)
optimizer = torch.optim.SGD(model.parameters(), lr=0.1) # optimizer   = torch.optim.RMSprop(model.parameters(), lr=0.001)
model = LSTM_NER(embedding_dim, hidden_dim, input_size, output_size) # model       = Lstm(input_size, hidden_size, output_size)
loss_function = nn.NLLLoss() # criterion   = torch.nn.CrossEntropyLoss() # only accepts one target
epochs  = 10

# Train the model
model = train(training_data, input_size, output_size, optimizer, loss_function, epochs)

tensor([[-3.3436, -3.1289, -3.2070,  ..., -3.0821, -2.9878, -3.0584],
        [-3.1972, -3.1369, -3.2617,  ..., -3.0855, -3.0518, -3.1784],
        [-3.1786, -3.0638, -3.2062,  ..., -3.0675, -2.9697, -3.1823],
        ...,
        [-3.3234, -3.0209, -3.2405,  ..., -3.1183, -2.9928, -3.0963],
        [-3.1898, -3.0225, -3.2752,  ..., -3.1920, -3.0296, -3.0548],
        [-3.1967, -2.9491, -3.2255,  ..., -3.1978, -3.0319, -3.0848]])


### Test

In [285]:
class Performance:
  def __init__(self, correct, total):
    self.correct = correct
    self.total = total
  
  @property
  def acc(self):
    return self.correct/self.total
  
  def __str__(self):
    return f'{self.correct}/{self.total} = {self.acc}'

def test(model, testing_data):
  batch_predicted = []
  batch_scores = []
  batch_performance = []
  with torch.no_grad():
      for receipt, tags  in testing_data:
        inputs = word_encoder(receipt)
        scores = model(inputs)
        predicted_labels = scores.argmax(dim=1)
        batch_predicted.append(predicted_labels)
        batch_scores.append(scores)
        n_correct = (predicted_labels == tag_encoder(tags)).sum().item()
        batch_performance.append(Performance(correct=n_correct, total=len(tags)))
  return batch_scores, batch_predicted, batch_performance

### Test Configuration

In [287]:
print(testing_data[0])
testing_data = training_data
tags_predicted, scores, performances = test(model, testing_data)
first_tags_predicted = tags_predicted[0]
print(first_tags_predicted) # Get the tags
#print(tags_scores[0].softmax(dim=1)) # Get the scores
print(performances[0])

(['<start>', 'united', 'color', 'of', 'benetton', '\n', 'razzia', 'sl', '\n', 'cif', ':', 'b38103792', '\n', '\n', 'comercio', 'minorista', '\n', 'c/', '.', 'mesa', 'y', 'lopez', '12', '\n', 'tlf', ':', '928', '22', '97', '50', '\n', '\n', 'caja', ':', 'mercheria2', '\n', '\n', 'fact', '.', 'simp', '.', ':', '36', '8390t', '\n', '\n', 'fecha', ':', '27/07/20', '\n', '\n', 'hora', ':', '13', ':', '65', '\n', '\n', 'operario', ':', 'operario1', '<end>'], ['START', 'B-NAME', 'I-NAME', 'I-NAME', 'L-NAME', '\n', 'B-NAME', 'L-NAME', '\n', 'O-FISCAL_ID', 'SEP', 'U-FISCAL_ID', '\n', '\n', 'O', 'O', '\n', 'B-ADDR', 'SEP', 'I-ADDR', 'I-ADDR', 'I-ADDR', 'L-ADDR', '\n', 'O-PHONE', 'SEP', 'B-PHONE', 'I-PHONE', 'I-PHONE', 'L-PHONE', '\n', '\n', 'O', 'SEP', 'O', '\n', '\n', 'O-RNUMBER', 'SEP', 'O-RNUMBER', 'SEP', 'SEP', 'B-FICAL_ID', 'L-FISCAL_ID', '\n', '\n', 'O-DATE', 'SEP', 'U-DATE', '\n', '\n', 'O', 'SEP', 'O', 'SEP', 'O', '\n', '\n', 'O', 'SEP', 'O', 'END'])
tensor([[-3.3436, -3.1289, -3.2070,  