# Information Extraction from Electricity Invoices


In [1]:
import os, sys, pickle, time, uuid, shutil
import numpy as np
import pandas as pd
from ast import literal_eval
from joblib import dump, load
import re, subprocess
import matplotlib.pyplot as plt
import seaborn as sn

# Constant definitions
NWORDS = 11 #number of words per training sentence
NCF = 5     #number of words for calculating custom features (maximum=NWORDS/2)


In [None]:
# Code for converting part of the training/test set into our (sentence,label) format

from idsem2list import idsem_to_list

generate_train = True
nbills = 100

# List template directories
if generate_train:
    base_dir = "../dataset/idsem_txt/training/"
else:
    base_dir = "../dataset/idsem_txt/test/"

directories = os.listdir(base_dir)

if generate_train:
    out_dir = "train_files_" + str(nbills)
else:
    out_dir = "test_files_" + str(nbills)

ext = 0
for directory in directories:
    ext = ext+1
    train_data = idsem_to_list(base_dir+directory, nbills, out_dir, ext)


In [3]:
# Use label codes for indexing the DataFrame
label_codes = literal_eval(open("dictionaries/label_codes.txt", mode="r", encoding="utf-8").read())

# Create inverse mapping of label codes 
code_labels = {i: c for c,i in label_codes.items()}

# Read spanish stop words
stop_words = literal_eval(open("dictionaries/spanish_stop_words.txt", mode="r", encoding="utf-8").read())
stop_words = list(stop_words)

In [4]:
from sklearn.base import BaseEstimator, TransformerMixin

class WordVectorizer(TransformerMixin, BaseEstimator):
   """Extract features from the central word and its context"""
   
   def __init__(self, NCF=5):
      self.NCF = NCF
   
   def word_type(self, S): 
      """WordNType: alphabetic, numeric, alphanumeric, numericcomma"""
      f = pd.Series(0, index=S.index)
      f[S.str.isalnum()==True] = 1
      f[S.str.isalpha()==True] = 2
      f[S.str.isdigit()==True] = 3
      f[S.str.contains('\b[0-9,.]+$', regex=True)==True] = 4
      return f
   
   def word_pretype(self, S): 
      """WordNPretype: money, DNI, email, web, postcode"""
      money = r'^\d+[,.]\d\d$'
      email = r"^[a-zA-Z0-9.!#$%&'*+/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}"\
              "[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$"
      web   = r"^(https?:\/\/)?(www\.)?([a-zA-Z0-9]+(-?[a-zA-Z0-9])*\.)+[\w]{2,}(\/\S*)?$"
      web2  = r"^(http:\/\/www\.|https:\/\/www\.|http:\/\/|https:\/\/)?[a-z0-9]+([\-\.]{1}[a-z0-9]+)*\.[a-z]{2,5}(:[0-9]{1,5})?(\/.*)?$"

      dni1  = r'^[klmxyzKLMXYZ][-]?\d{7}[-]?[a-zA-Z]$'
      dni2  = r'^\d{8}[-]?[a-zA-Z]$'
      pcode = r"^[0-5]\d{4}$"
      date  = r"^\d\d[/\-.]\d\d[/\-.]\d{4}$"
      
      f = pd.Series(0, index=S.index)
      f[S.str.contains(money, regex=True)==True] = 1
      f[S.str.contains(dni1,  regex=True)==True] = 2
      f[S.str.contains(dni2,  regex=True)==True] = 2
      f[S.str.contains(email, regex=True)==True] = 3
      f[S.str.contains(web2,   regex=True)==True] = 4
      f[S.str.contains(pcode, regex=True)==True] = 5
      f[S.str.contains(date,  regex=True)==True] = 6
      return f
      
   def word_mesure(self, S): 
      """WordNMeasure: euro, euroday, eurokw, eurokwh, 
         kw, kwday, kwmonth, kwhour, %"""
         
      measures = {'€':1,      '€/día': 2,    '€/kw': 3,     '€/kwh': 4, 
                  'eur': 1,   'eur/día': 2,  'eur/kw': 3,   'eur/kwh': 4,
                  'euros': 1, 'euros/día': 2,'euros/kw': 3, 'euros/kwh': 4,
                  'kw': 6,    'kw/mes': 7,   'kwh': 8,      '%': 9}
      L = S.str.lower()
      f = L.map(measures)
      f.fillna(0, inplace = True)
      return f

   def word_capital(self, S): 
      "WordNCapital: firstlettercapital, onewordcapital, allcapital, nocapital"
      f = pd.Series(0, index=S.index) 
      f[S.str.contains("^[A-Z][a-z]+$", regex=True)==True] = 1
      f[S.str.contains("^[A-Z]$", regex=True)==True] = 2
      f[S.str.isupper()==True] = 3
      f[S.str.islower()==True] = 4
      f[S.str.istitle()==True] = 5
      return f
 
   def word_colons(self, S):    
      "WordNBeforeColons: 1 if it is followed by a semicolon or 0 otherwise"
      f = pd.Series(0, index=S.index) 
      f[S.str.contains(":")==True] = 1
      f[S.str.contains(";")==True] = 2
      f[S.str.contains(".")==True] = 3
      return f

   def fit(self, X, y=None):
      return self

   def transform(self, X, y=None):
      """ Create custom features for a given word column in DataFrame"""
      W = pd.DataFrame()
      
      if True:
         # Use all the words in the training sentence
         F = pd.DataFrame([t.split() for t in X], index = X.index)

         print("NCF: ", self.NCF)
         # Convert center word 
         if self.NCF>=0:
            W["Type0"]  = self.word_type(F[5])
            W["PreType0"]  = self.word_pretype(F[5])
            W["Measure0"]  = self.word_mesure(F[5])
            W["Capital0"]  = self.word_capital(F[5])
            W["Colons0"]  = self.word_colons(F[5])
            
            for i in range(self.NCF):
               # Convert previous words
               W["Type-"+str(i+1)]  = self.word_type(F[5-i-1])
               W["PreType-"+str(i+1)]  = self.word_pretype(F[5-i-1])
               W["Measure-"+str(i+1)]  = self.word_mesure(F[5-i-1])
               W["Capital-"+str(i+1)]  = self.word_capital(F[5-i-1])
               W["Colons-"+str(i+1)]  = self.word_colons(F[5-i-1])
               # Convert following words
               W["Type+"+str(i+1)]  = self.word_type(F[5+i+1])
               W["PreType+"+str(i+1)]  = self.word_pretype(F[5+i+1])
               W["Measure+"+str(i+1)]  = self.word_mesure(F[5+i+1])
               W["Capital+"+str(i+1)]  = self.word_capital(F[5+i+1])
               W["Colons+"+str(i+1)]  = self.word_colons(F[5+i+1])

      return W


In [6]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import RidgeClassifier, LogisticRegression
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF
from sklearn.naive_bayes import MultinomialNB, GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC, LinearSVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
from sklearn.pipeline import Pipeline, FeatureUnion

# Models 
def create_model(type):
   """ Possible Models """
   model ={
      "logistic": LogisticRegression(C=5, solver="saga", tol=0.01),
      "nbayes": MultinomialNB(alpha=0.01),
      "svmrbf": SVC(gamma=0.001, C=100.0),
      "svmlin": LinearSVC(random_state=0, class_weight="balanced", tol=1e-5),
      "decisiontree": DecisionTreeClassifier(random_state=42),
      "randomforest": RandomForestClassifier(n_estimators=6, random_state=42, n_jobs=-1)
   }

   return model[type]

all_models=["logistic", "nbayes", "svmrbf", "svmlin", 
            "decisiontree", "randomforest"]

# models for testing
models=["logistic", "nbayes"] 
        #"svmlin", "decisiontree", "randomforest", "svmrbf" ]



In [None]:
def process_input_file(file_bill, verbose=False):
    """Function that converts the PDF or image file to TXT"""

    file_txt = 'tmp/' + str(uuid.uuid4())

    file_type = os.path.splitext(file_bill)[1][1:].lower()

    # convert input file to txt
    if file_type == 'pdf':
        #convert pdf to text
        if verbose: print("Reading pdf file...")
        with open(file_bill,  "rb") as f:
           pdf = pdftotext.PDF(f)
        if verbose: print("Number of pages: ", len(pdf))

        fd=open(file_txt + '.txt', "w");

        if verbose: print("Converting to txt in ", file_txt + '.txt')

        fd.write("\n".join(pdf))
        fd.close()
    else:
        if file_type == 'txt':
            #if a txt file just copy to destiny
            shutil.copyfile(file_bill, file_txt + '.txt')
        else:
            #convert image to text
            subprocess.call(['tesseract', file_bill, file_txt,
                             '--oem', '1', '-l', 'spa'])

    file_txt = file_txt + '.txt'
    
    lines = open(file_txt, 'r').readlines()
    
    os.remove(file_txt)
    
    return lines


def read_file(filename, verbose=False):
   """
   Read the content of one file
   """
   l=[]
   if verbose: print("Reading file ", filename)
   with open(filename,'r', encoding="utf-8") as f: 
         l=literal_eval(f.read())
   return l
   

def read_directory(directory, verbose=False):
   """
   Read the files in a directory and
   inserts the data in a list
   """
   X = []
   if verbose: print("Accessing directory ", directory)
   for filename in os.listdir(directory):
      l=read_file(directory+"/"+filename, verbose)
      X += l
         
   if verbose: print('Training data contains ', len(X), ' samples')
   
   return X


def create_dataframe(data):
   """
   Function to create a DataFrame from a text training set 
   It processes word and line features
   In this case there is only one label per line
   """
   # Add columns for the text and each label in the DataFrame
   F = pd.DataFrame(data, columns = ["Text", "Label"])

   #extract the center and next words
   a = F["Text"].str.split(expand=True)
   F["Word"] = a[5]
   F["NextWord"] = a[6]

   # remove # from the name of the labels
   F["Label"] = F["Label"].apply(lambda x: x[1:])
   F.index.names = ['Id']

   return F


def read_data(filename, no_split, is_directory=True, verbose=False):
   ''' Read the data from the training dataset file
       Separate the features from the labels
       Separate dataset in train and validation sets
   '''
   
   # Read the DataFrame as Text + Label
   if is_directory:
      X = read_directory(filename, verbose)
   else:
      X = read_file(filename, verbose)
        
   # Read DataFrame with one Label column
   X = create_dataframe(X)

   # Number and percentage of labels  
   n = X['Label'].value_counts()
   S = n.sum()

   # Take out the label column and transform label names into codes
   y = pd.DataFrame(X["Label"], columns = ["Label"])
   n = y.value_counts()
   n[0:] *= 100
   f = open("count-labels.txt", "w")
   f.write(n.to_string())
   f.close()
    
   y = y.applymap(lambda x: label_codes[x]) 
      
   # Remove label from feature set DataFrame
   X.drop("Label", axis=1, inplace=True)

   # Encode labels with dictionary codes
   y.index.rename('Id', inplace = True)
   
   if no_split:
      return X, y
   else:
      X_train, X_valid, y_train, y_valid = train_test_split(
         X, y, train_size=0.8, test_size=0.2, random_state=0, 
         shuffle=True
      )

      return X_train, X_valid, y_train, y_valid

In [None]:
from sklearn import metrics

def evaluate(clf, X, y, outputfile=None):
   """ Evaluate the performance of a model with the validation/test set """

   # Preprocessing of validation data, get predictions
   predicted = clf.predict(X.Text)
   mae = metrics.mean_absolute_error(y, predicted)
   print('  - MAE:', mae)

   #metrics precision recall f1-score support
   labels = y.unique()
   index = [code_labels[i] for i in labels]

   clr = metrics.classification_report(y, predicted,
         labels=labels, target_names = index, digits=4)

   labels=list(code_labels.keys())
   cm = metrics.confusion_matrix(y, predicted, labels=labels, normalize='pred')
   plt.figure(figsize = (10,7))
   sn.heatmap(cm, annot=False, xticklabels=labels, yticklabels=labels, cmap="Greens", fmt='.1g')
   
   if outputfile != None:
      with open(outputfile+"_mae_clr.txt", "w") as fd:
         fd.write(str(mae))
         fd.write(str(clr))
      plt.savefig(outputfile+"_cm.svg")

   return mae, clr, cm


In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

# Training
def train(models, training_dir, ncf=NCF, verbose=False):
   """ Function for training the classifiers """

   # Read training data
   X_train, X_valid, y_train, y_valid = \
               read_data(training_dir, no_split=False, is_directory=True, verbose=verbose)

   clfs = {}
   for model in models:

      if verbose: print(" * Training ", model)
      
      # Create pipeline with several models using bag of words
      if ncf == -1:
         # without custom features
         clf = Pipeline([
            ('tfidf', TfidfVectorizer(stop_words=stop_words,
                        ngram_range=(1, 2), use_idf=True)), 
            ('clf', create_model(model))
         ])
      else:
         # with ncf custom features 
         clf = Pipeline([
            ('features', FeatureUnion([
               ('tfidf', TfidfVectorizer(stop_words=stop_words,
                        ngram_range=(1, 2), use_idf=True)), 
               ('word_features', WordVectorizer(ncf))
            ])),
            ('clf', create_model(model))
         ])

      # Fitting model to training data
      clf.fit(X_train.Text, y_train["Label"])
      if verbose: evaluate(clf, X_valid, y_valid["Label"],
               outputfile=f'train_results/{model}_{ncf}F')
      with open('models/'+model+'_model.pckl', "wb") as f:
         pickle.dump(clf, f)

      clfs[model] = clf

   return clfs


# Test with all test files in all templates
def test(clfs, test_dir, ncf=NCF, verbose=False):
   """ Function for training the classifiers """

   # Read test data
   X_test, y_test = read_data(test_dir, no_split=True, is_directory=True, verbose=verbose)

   for model, clf in clfs.items():
      print("Testing with", model)
      mae, clr, cm = evaluate(clf, X_test, y_test["Label"],
               outputfile=f'test_results/{model}_{ncf}F')
   return mae, clr, cm



In [None]:
# Train and test all the models

clfs = train(models, "train_files", verbose=True)
mae, clr, cm = test(clfs, "test_files", verbose=False)

In [None]:
# Test with each template separately

def test_T(clfs, test_dir, ncf=NCF, verbose=False):
    """ Function for training the classifiers """

    t = 1
    results ={}
    for filename in os.listdir(test_dir):
        # Read test data in the template
        X_test, y_test = read_data(test_dir+"/"+filename, no_split=True, is_directory=False, verbose=verbose)

        for model, clf in clfs.items():
            print(f"Testing with {model} and Template {t}")
            mae, clr, cm = evaluate(clf, X_test, y_test["Label"],
                                    outputfile=f'test_results/{model}_T{t}_{ncf}F')
            results[f"T{t}"] = (mae, clr, cm)
        t+=1

    return results
    
results = test_T(clfs, "test_files", verbose=True)


In [None]:
# Test changing the NCF (Number of Current Features) from 0 to 5

train_files = "train_files"
test_files = "test_files"

for ncf in range(-1,NCF):
    clfs = train(models, train_files, ncf=ncf, verbose=True)
    mae, clr, cm = test(clfs, test_files, ncf=ncf, verbose=False)
