# Tables

In [8]:

import re
import logging

logger = logging.getLogger(__name__)

DEFAULT_OPTIONS = {"field_separator": r"\s+", "line_separator": r"\n"}


def tables(self, content, output):
    """Try to extract tables from an invoice"""

    for table in self["tables"]:

        # First apply default options.
        plugin_settings = DEFAULT_OPTIONS.copy()
        plugin_settings.update(table)
        table = plugin_settings

        # Validate settings
        assert "start" in table, "Table start regex missing"
        assert "end" in table, "Table end regex missing"
        assert "body" in table, "Table body regex missing"

        start = re.search(table["start"], content)
        end = re.search(table["end"], content)

        if not start or not end:
            logger.warning("no table body found - start %s, end %s", start, end)
            continue

        table_body = content[start.end() : end.start()]

        for line in re.split(table["line_separator"], table_body):
            # if the line has empty lines in it , skip them
            if not line.strip("").strip("\n") or not line:
                continue

            match = re.search(table["body"], line)
            if match:
                for field, value in match.groupdict().items():
                    # If a field name already exists, do not overwrite it
                    if field in output:
                        continue

                    if field.startswith("date") or field.endswith("date"):
                        output[field] = self.parse_date(value)
                        if not output[field]:
                            logger.error("Date parsing failed on date '%s'", value)
                            return None
                    elif field.startswith("amount"):
                        output[field] = self.parse_number(value)
                    else:
                        output[field] = value
            logger.debug("ignoring *%s* because it doesn't match anything", line)

# LINES

In [9]:

import re
import logging

logger = logging.getLogger(__name__)

DEFAULT_OPTIONS = {"field_separator": r"\s+", "line_separator": r"\n"}


def lines(self, content, output):
    """Try to extract lines from the invoice"""

    # First apply default options.
    plugin_settings = DEFAULT_OPTIONS.copy()
    plugin_settings.update(self["lines"])
    self["lines"] = plugin_settings

    # Validate settings
    assert "start" in self["lines"], "Lines start regex missing"
    assert "end" in self["lines"], "Lines end regex missing"
    assert "line" in self["lines"], "Line regex missing"

    start = re.search(self["lines"]["start"], content)
    end = re.search(self["lines"]["end"], content)
    if not start or not end:
        logger.warning("no lines found - start %s, end %s", start, end)
        return
    content = content[start.end() : end.start()]
    lines = []
    current_row = {}
    if "first_line" not in self["lines"] and "last_line" not in self["lines"]:
        self["lines"]["first_line"] = self["lines"]["line"]
    for line in re.split(self["lines"]["line_separator"], content):
        # if the line has empty lines in it , skip them
        if not line.strip("").strip("\n") or not line:
            continue
        if "first_line" in self["lines"]:
            match = re.search(self["lines"]["first_line"], line)
            if match:
                if "last_line" not in self["lines"]:
                    if current_row:
                        lines.append(current_row)
                    current_row = {}
                if current_row:
                    lines.append(current_row)
                current_row = {
                    field: value.strip() if value else ""
                    for field, value in match.groupdict().items()
                }
                continue
        if "last_line" in self["lines"]:
            match = re.search(self["lines"]["last_line"], line)
            if match:
                for field, value in match.groupdict().items():
                    current_row[field] = "%s%s%s" % (
                        current_row.get(field, ""),
                        current_row.get(field, "") and "\n" or "",
                        value.strip() if value else "",
                    )
                if current_row:
                    lines.append(current_row)
                current_row = {}
                continue
        match = re.search(self["lines"]["line"], line)
        if match:
            for field, value in match.groupdict().items():
                current_row[field] = "%s%s%s" % (
                    current_row.get(field, ""),
                    current_row.get(field, "") and "\n" or "",
                    value.strip() if value else "",
                )
            continue
        logger.debug("ignoring *%s* because it doesn't match anything", line)
    if current_row:
        lines.append(current_row)

    types = self["lines"].get("types", [])
    for row in lines:
        for name in row.keys():
            if name in types:
                row[name] = self.coerce_type(row[name], types[name])

    if lines:
        output["lines"] = lines

# Templates that join lines and tables Script

### (check the path) 

In [11]:

import re
import dateparser
from unidecode import unidecode
import logging
from collections import OrderedDict
#from .plugins import lines, tables

logger = logging.getLogger(__name__)

OPTIONS_DEFAULT = {
    "remove_whitespace": False,
    "remove_accents": False,
    "lowercase": False,
    "currency": "EUR",
    "date_formats": [],
    "languages": [],
    "decimal_separator": ".",
    "replace": [],  # example: see templates/fr/fr.free.mobile.yml
}

PLUGIN_MAPPING = {"lines": lines, "tables": tables}


class InvoiceTemplate(OrderedDict):
    """
    Represents single template files that live as .yml files on the disk.
    Methods
    -------
    prepare_input(extracted_str)
        Input raw string and do transformations, as set in template file.
    matches_input(optimized_str)
        See if string matches keywords set in template file
    parse_number(value)
        Parse number, remove decimal separator and add other options
    parse_date(value)
        Parses date and returns date after parsing
    coerce_type(value, target_type)
        change type of values
    extract(optimized_str)
        Given a template file and a string, extract matching data fields.
    """

    def __init__(self, *args, **kwargs):
        super(InvoiceTemplate, self).__init__(*args, **kwargs)

        # Merge template-specific options with defaults
        self.options = OPTIONS_DEFAULT.copy()

        for lang in self.options["languages"]:
            assert len(lang) == 2, "lang code must have 2 letters"

        if "options" in self:
            self.options.update(self["options"])

        # Set issuer, if it doesn't exist.
        if "issuer" not in self.keys():
            self["issuer"] = self["keywords"][0]

    def prepare_input(self, extracted_str):
        """
        Input raw string and do transformations, as set in template file.
        """

        # Remove withspace
        if self.options["remove_whitespace"]:
            optimized_str = re.sub(" +", "", extracted_str)
        else:
            optimized_str = extracted_str

        # Remove accents
        if self.options["remove_accents"]:
            optimized_str = unidecode(optimized_str)

        # convert to lower case
        if self.options["lowercase"]:
            optimized_str = optimized_str.lower()

        # specific replace
        for replace in self.options["replace"]:
            assert len(replace) == 2, "A replace should be a list of 2 items"
            optimized_str = optimized_str.replace(replace[0], replace[1])

        return optimized_str

    def matches_input(self, optimized_str):
        """See if string matches keywords set in template file"""

        if all([keyword in optimized_str for keyword in self["keywords"]]):
            logger.debug("Matched template %s", self["template_name"])
            return True

    def parse_number(self, value):
        assert (
            value.count(self.options["decimal_separator"]) < 2
        ), "Decimal separator cannot be present several times"
        # replace decimal separator by a |
        amount_pipe = value.replace(self.options["decimal_separator"], "|")
        # remove all possible thousands separators
        amount_pipe_no_thousand_sep = re.sub(r"[.,\s]", "", amount_pipe)
        # put dot as decimal sep
        return float(amount_pipe_no_thousand_sep.replace("|", "."))

    def parse_date(self, value):
        """Parses date and returns date after parsing"""
        res = dateparser.parse(
            value,
            date_formats=self.options["date_formats"],
            languages=self.options["languages"],
        )
        logger.debug("result of date parsing=%s", res)
        return res

    def coerce_type(self, value, target_type):
        if target_type == "int":
            if not value.strip():
                return 0
            return int(self.parse_number(value))
        elif target_type == "float":
            if not value.strip():
                return 0.0
            return float(self.parse_number(value))
        elif target_type == "date":
            return self.parse_date(value)
        assert False, "Unknown type"

    def extract(self, optimized_str):
        """
        Given a template file and a string, extract matching data fields.
        """

        logger.debug("START optimized_str ========================")
        logger.debug(optimized_str)
        logger.debug("END optimized_str ==========================")
        logger.debug(
            "Date parsing: languages=%s date_formats=%s",
            self.options["languages"],
            self.options["date_formats"],
        )
        logger.debug(
            "Float parsing: decimal separator=%s", self.options["decimal_separator"]
        )
        logger.debug("keywords=%s", self["keywords"])
        logger.debug(self.options)

        # Try to find data for each field.
        output = {}
        output["issuer"] = self["issuer"]

        for k, v in self["fields"].items():
            if k.startswith("static_"):
                logger.debug("field=%s | static value=%s", k, v)
                output[k.replace("static_", "")] = v
            else:
                logger.debug("field=%s | regexp=%s", k, v)

                sum_field = False
                if k.startswith("sum_amount") and type(v) is list:
                    k = k[4:]  # remove 'sum_' prefix
                    sum_field = True
                # Fields can have multiple expressions
                if type(v) is list:
                    res_find = []
                    for v_option in v:
                        res_val = re.findall(v_option, optimized_str)
                        if res_val:
                            if sum_field:
                                res_find += res_val
                            else:
                                res_find.extend(res_val)
                else:
                    res_find = re.findall(v, optimized_str)
                if res_find:
                    logger.debug("res_find=%s", res_find)
                    if k.startswith("date") or k.endswith("date"):
                        output[k] = self.parse_date(res_find[0])
                        if not output[k]:
                            logger.error(
                                "Date parsing failed on date '%s'", res_find[0]
                            )
                            return None
                    elif k.startswith("amount"):
                        if sum_field:
                            output[k] = 0
                            for amount_to_parse in res_find:
                                output[k] += self.parse_number(amount_to_parse)
                        else:
                            output[k] = self.parse_number(res_find[0])
                    else:
                        res_find = list(set(res_find))
                        if len(res_find) == 1:
                            output[k] = res_find[0]
                        else:
                            output[k] = res_find
                else:
                    logger.warning("regexp for field %s didn't match", k)

        output["currency"] = self.options["currency"]

        # Run plugins:
        for plugin_keyword, plugin_func in PLUGIN_MAPPING.items():
            if plugin_keyword in self.keys():
                plugin_func.extract(self, optimized_str, output)

        # If required fields were found, return output, else log error.
        if "required_fields" not in self.keys():
            required_fields = ["date", "amount", "invoice_number", "issuer"]
        else:
            required_fields = []
            for v in self["required_fields"]:
                required_fields.append(v)

        if set(required_fields).issubset(output.keys()):
            output["desc"] = "Invoice from %s" % (self["issuer"])
            logger.debug(output)
            return output
        else:
            fields = list(set(output.keys()))
            logger.error(
                "Unable to match all required fields. "
                "The required fields are: {0}. "
                "Output contains the following fields: {1}.".format(
                    required_fields, fields
                )
            )
            return None