In [1]:
import json
import os
import re
from pathlib import Path
from typing import Any, Dict, List, Union  # From mypy library

import numpy as np
import pandas as pd
import requests
import xmltodict  # type: ignore
from dotenv import find_dotenv, load_dotenv
from jinja2 import Template
from PIL import Image, ImageSequence  # type: ignore

from pandas.api.types import is_datetime64_any_dtype, is_numeric_dtype, is_integer_dtype, is_object_dtype, is_string_dtype

## Set up test environment

In [2]:
number_of_rows = 100

load_dotenv(find_dotenv())

api_key = os.getenv('XTRACTA_API_KEY')
database_id = os.getenv('XTRACTA_DATABASE_ID')
header_workflow = os.getenv('XTRACTA_HEADER_ID')
line_workflow = os.getenv('XTRACTA_LINE_ID')

def delete_files(folder):
    file_generator = folder.glob('**/*')
    file_list = list(file_generator)
    for file in file_list:
        file.unlink()
    return list(file_list)

def move_files(samples_folder, file_to_copy, destination_folder):
    (destination_folder / file_to_copy).write_bytes((samples_folder / file_to_copy).read_bytes())
    return file_to_copy

def convert_to_numeric_and_date(df, dayfirst=True):
    for column in df.columns:
        if is_object_dtype(df[column]) or is_string_dtype(df[column]):
            try:
                df[column] = pd.to_numeric(df[column], downcast='integer')
            except:
                try:
                    df[column] = df[column].str.replace('$', '')
                    df[column] = df[column].str.replace(',', '')
                    df[column] = pd.to_numeric(df[column])
                except:
                    try:
                        df[column] = pd.to_datetime(df[column], dayfirst=dayfirst)
                    except:
                        pass
    return df


def random_dates(start, end, seed=1, replace=True, number_of_rows=100):
    dates = pd.date_range(start, end).to_series()
    return dates.sample(number_of_rows, replace=replace, random_state=seed).index
    
    
def dataframe_obfuscator(df, number_of_rows=100):
    for column in df.columns:
        if is_datetime64_any_dtype(df[column]):
            df[column] = random_dates(min(df[column]),max(df[column]), seed=1)
        elif is_integer_dtype(df[column]):
            df[column] = df[column].fillna(0)
            if min(df[column]) < max(df[column]):
                df[column] = np.random.randint(min(df[column]),max(df[column]),size=(number_of_rows))
            else:
                df[column] = min(df[column])
        elif is_numeric_dtype(df[column]):
            df[column] = df[column].fillna(0)
            df[column] = np.random.uniform(min(df[column]),max(df[column]),size=(number_of_rows))
        else:
            df[column] = 'random text'
    return df


def obfuscate_csv(data_file, dayfirst=True, number_of_rows=100):
    df = pd.read_csv(data_file, nrows=number_of_rows)
    df = convert_to_numeric_and_date(df)
    df = dataframe_obfuscator(df)
    df.to_csv(data_file, header=True, index=False)
    return df


def obfuscate_excel(data_file, dayfirst=True, number_of_rows=100):
    df = pd.read_excel(data_file, nrows=number_of_rows)
    display(df.head())
    df = convert_to_numeric_and_date(df)
    df = dataframe_obfuscator(df)
    df.to_excel(data_file, header=True, index=False)
    return df

In [3]:
test_p = Path.cwd()
test_dp = test_p / 'data'
test_sp = test_p / 'test_samples'
test_ip = test_p / 'input'
test_op = test_p / 'output'
test_jp = test_p / 'junk'
test_lp = test_p / 'lines'

In [4]:
test_file = '20190131_invoice.pdf'

delete_files(test_ip)
delete_files(test_op)
delete_files(test_lp)
move_files(test_sp, test_file, test_ip)

'20190131_invoice.pdf'

In [5]:
df = obfuscate_excel(test_dp / 'Purchase Order Master Data' / 'SP004.xls')
df.head(5)

Unnamed: 0,District Code,Purchase Order Number,Purchase Order Item Number,Warehouse ID,Item Description Detail,Date of Purchase Order,Current Due Date,Original QTY UOI,Current QTY UOI,Original Net Price UOI,...,Purchase Officer Name,Requested By ID,Requested By Name,Authorised By ID,Authorised By Name,Request by Pos,New Requested,New Requested By,Unnamed: 35,Waiver Indicator
0,random text,random text,1,random text,random text,2004-03-10,2018-03-11,0,0,6491104.0,...,random text,78202,random text,77024,random text,3589,74736.519189,random text,0,random text
1,random text,random text,1,random text,random text,2014-06-11,2004-09-18,0,0,8679200.0,...,random text,68862,random text,70172,random text,4188,51843.832059,random text,0,random text
2,random text,random text,1,random text,random text,2006-01-09,2014-12-20,0,0,4842345.0,...,random text,74320,random text,72947,random text,1698,76984.528703,random text,0,random text
3,random text,random text,1,random text,random text,2011-02-10,2018-04-15,0,0,1710717.0,...,random text,67547,random text,79324,random text,1818,51094.667632,random text,0,random text
4,random text,random text,1,random text,random text,2011-06-22,2006-07-20,0,0,6859257.0,...,random text,77710,random text,78016,random text,3595,77868.014697,random text,0,random text


Unnamed: 0,District Code,Purchase Order Number,Purchase Order Item Number,Warehouse ID,Item Description Detail,Date of Purchase Order,Current Due Date,Original QTY UOI,Current QTY UOI,Original Net Price UOI,...,Purchase Officer Name,Requested By ID,Requested By Name,Authorised By ID,Authorised By Name,Request by Pos,New Requested,New Requested By,Unnamed: 35,Waiver Indicator
0,random text,random text,1,random text,random text,2004-03-25,2018-03-26,0,0,4376045.0,...,random text,75376,random text,82468,random text,1341,74987.962448,random text,0,random text
1,random text,random text,1,random text,random text,2014-06-26,2004-10-03,0,0,2052403.0,...,random text,76744,random text,69648,random text,2367,30940.485275,random text,0,random text
2,random text,random text,1,random text,random text,2006-01-24,2015-01-04,0,0,9667424.0,...,random text,67138,random text,79816,random text,2036,12913.147772,random text,0,random text
3,random text,random text,1,random text,random text,2011-02-25,2018-04-30,0,0,10105500.0,...,random text,67674,random text,75500,random text,2243,46888.062599,random text,0,random text
4,random text,random text,1,random text,random text,2011-07-07,2006-08-04,0,0,9234465.0,...,random text,79500,random text,74527,random text,1262,56789.16526,random text,0,random text


In [6]:
df = obfuscate_csv(test_dp / 'Stock PO Master Data' / 'SP005.csv')
df.head()

Unnamed: 0,Purchase_Order_Number_Combined,District_Code,Warehouse_Identification,FPA_Group,FPA_Agreement_Number,FPA_Item_Number,Supplier_Price_Code1,UOI_Original_Quantity,UOI_Original_Net_Price1,UOI_Current_Quantity1,...,Total,Stock_Code,Item_Name_line,Stock_Description,Employee_Identification,FullName,Purchase_Order_Date,Supplier_Number,Supplier_Name,Item_Status_Code
0,random text,random text,random text,random text,0.592755,258,random text,162.545474,2719909.0,411.342288,...,1863.071059,1307289.0,random text,random text,random text,random text,2014-10-26,random text,random text,1
1,random text,random text,random text,random text,0.880898,133,random text,331.093679,2958159.0,206.001962,...,1199.869461,479984.2,random text,random text,random text,random text,2001-05-05,random text,random text,1
2,random text,random text,random text,random text,0.206622,119,random text,300.072629,6052685.0,284.818319,...,2696.81008,2034239.0,random text,random text,random text,random text,2011-08-06,random text,random text,1
3,random text,random text,random text,random text,0.693971,182,random text,410.838163,2313641.0,323.669329,...,3304.503022,1009348.0,random text,random text,random text,random text,2003-03-06,random text,random text,1
4,random text,random text,random text,random text,0.859946,40,random text,97.827652,1256672.0,372.690342,...,987.823371,2702589.0,random text,random text,random text,random text,2008-04-06,random text,random text,1


In [7]:
df = obfuscate_excel(test_dp / 'Supplier Master Data' / 'SP001.xls')
df.head(5)

Unnamed: 0,Supplier Number,Supplier Company Name,Address For Orders,Address For Invoices,Bank Account Name,ABN Number,Unnamed: 6,Unnamed: 7,Bank Account Number,Unnamed: 9,Branch Code
0,11243,random text,random text,random text,random text,random text,0,0,428766900.0,0,random text
1,13485,random text,random text,random text,random text,random text,0,0,203345800.0,0,random text
2,8468,random text,random text,random text,random text,random text,0,0,379038700.0,0,random text
3,6298,random text,random text,random text,random text,random text,0,0,651475700.0,0,random text
4,2717,random text,random text,random text,random text,random text,0,0,276714200.0,0,random text


Unnamed: 0,Supplier Number,Supplier Company Name,Address For Orders,Address For Invoices,Bank Account Name,ABN Number,Unnamed: 6,Unnamed: 7,Bank Account Number,Unnamed: 9,Branch Code
0,9122,random text,random text,random text,random text,random text,0,0,518868300.0,0,random text
1,3912,random text,random text,random text,random text,random text,0,0,487116400.0,0,random text
2,9589,random text,random text,random text,random text,random text,0,0,100497500.0,0,random text
3,4960,random text,random text,random text,random text,random text,0,0,588428200.0,0,random text
4,5666,random text,random text,random text,random text,random text,0,0,180507000.0,0,random text


## Functions for interacting with Xtracta's API

### Upload file

Uploads a PDf or image file for extraction. The classifier field is used if you want to assign a specific classifier to the document rather than letting Xtracta make its own classification decision

In [8]:
def upload_file(api_key, workflow_id, filename, classifier=""):
    classifier_xml = (
        f'<field_data><field name="Classifier">{classifier}</field></field_data>'
    )
    upload_url = "https://api-app.xtracta.com/v1/documents/upload"
    file = {"userfile": open(filename, mode="rb")}
    data = {
        "api_key": api_key,
        "workflow_id": workflow_id,
        "field_data": classifier_xml,
    }
    r = requests.post(upload_url, data=data, files=file)
    if r.status_code != 200:
        print(r.status_code)
        return t.text
    else:
        response = xmltodict.parse(r.text)
        return response["xml"]["document_id"]

In [9]:
test_document_id = upload_file(api_key, header_workflow, test_ip / test_file)
test_document_id

'99606933'

In [10]:
def get_document(api_key: str, document_id: str):
    
    """retrieves the full xml document from Xtracta and converts it to a dict"""
    
    documents_url = "https://api-app.xtracta.com/v1/documents"
    data = {"api_key": api_key, "document_id": document_id}
    try:
        r = requests.post(documents_url, data=data)
        response = xmltodict.parse(r.text)
        return response
    except Exception as e:
        return e.args

In [11]:
test_document = get_document(api_key, test_document_id)
test_document

OrderedDict([('documents_response',
              OrderedDict([('status', '200'),
                           ('message',
                            'The request has been successfully processed'),
                           ('document',
                            OrderedDict([('@revision', '1'),
                                         ('document_id', '99606933'),
                                         ('document_status', 'pre-processing'),
                                         ('api_download_status', 'active'),
                                         ('free_form', None),
                                         ('classification', None),
                                         ('classification_class', None),
                                         ('classification_design', None),
                                         ('document_url',
                                          'https://web1-akl.xtracta.com/akl_northcote_storage2_datasource4/1/93/19/iq204587316-jB5yJehe.pdf'),

In [12]:
def get_xtracta_status(
    api_key: str,
    workflow_id: str,
    status: str,
    api_download_status: str = "active",
    detailed: int = 0,
    documents_order: str = "asc",
) -> list:
    """Returns a list of all Xtracta documents with a particular status"""
    documents_url = "https://api-app.xtracta.com/v1/documents"
    data = {
        "api_key": api_key,
        "workflow_id": workflow_id,
        "document_status": status,
        "api_download_status": api_download_status,
        "items_per_page": 1000,
        "detailed": detailed,
        "documents_order": documents_order,
    }
    try:
        r = requests.post(documents_url, data=data)
        response = xmltodict.parse(r.text)
    except Exception as e:
        return [e.__str__]

    try:
        response_content = response["documents_response"]["document"]
        if type(response_content) == list:
            return response_content
        else:
            return [response_content]
    except Exception as e:
        if type(e).__name__ == "KeyError":
            return [f"No {status} documents in queue"]
        else:
            return [e]

In [13]:
test_reject_list = get_xtracta_status(api_key, header_workflow, 'reject')
test_reject_list

[OrderedDict([('@revision', '2'),
              ('document_id', '99585648'),
              ('document_status', 'reject'),
              ('number_of_pages', '1'),
              ('api_download_status', 'active'),
              ('free_form', None),
              ('classification', 'full'),
              ('classification_class', '1049514'),
              ('classification_design', '0'),
              ('rejection',
               OrderedDict([('reason',
                             [OrderedDict([('message',
                                            'Value is not present in "db_po_number" column of the "POs" database'),
                                           ('validation_rule',
                                            OrderedDict([('type',
                                                          'DATABASE_LIST')])),
                                           ('linked_field',
                                            OrderedDict([('field_id',
                                       

In [14]:
def find_documents_to_skip(api_key, header_workflow):
    
    """You only want to process documents that have data in the document body. 
    This function finds documents that are not in this state"""
    
    status_to_skip = ['reject', 'preprocessing', 'output-in-progress']
    items_to_skip = []
    for status in status_to_skip:
        queue = get_xtracta_status(api_key, header_workflow, status)
        for item in queue:
            if item != f'No {status} documents in queue':
                items_to_skip.append(item['document_id'])
    return items_to_skip      

In [15]:
find_documents_to_skip(api_key, header_workflow)

['99585648']

## Build the output dictionary from Xtracta data

In [16]:
def create_output(document: Dict[Any, Any]) -> Dict[Any, Any]:
    """Returns a dictionary with document_id, status and version as top level values 
    and remaining fields as key value pairs in a header section"""
    output = {}
    header_dict = document["documents_response"]["document"]["field_data"]["field"]
    header = transform_dict(header_dict)
    output["document_id"] = document["documents_response"]["document"]["document_id"]
    output["status"] = document["documents_response"]["document"]["document_status"]
    output["version"] = document["documents_response"]["document"]["@revision"]
    output["header"] = header
    return output


def transform_dict(start_dict):
    end_dict = {}
    for item in start_dict:
        end_dict[item["field_name"]] = item["field_value"]
    return end_dict

In [17]:
test_document = get_document(api_key, test_document_id)
test_output = create_output(test_document)
test_output

{'document_id': '99606933',
 'status': 'pre-processing',
 'version': '1',
 'header': {'supplier_abn': None,
  'supplier': None,
  'supplier_id': None,
  'invoice_number': None,
  'po_number': None,
  'line_count': None,
  'account_number': None,
  'invoice_date': None,
  'net_total': '0.00',
  'gst_total': None,
  'gross_total': None,
  'freight': '0.00',
  'bsb_number': None,
  'bank_account_number': None,
  'ok_to_process': None,
  'period_start_date': None,
  'period_end_date': None,
  'abn_from_db_by_po': None,
  'filename': '20190131_invoice.pdf'}}

In [18]:
def get_documents_wo_json(folder):
    json_files = []
    pdfs = []
    json_list = list(folder.glob("*.json"))
    pdf_list = list(folder.glob("*.pdf"))
    for file in json_list:
        json_files.append(file.stem)
    for pdf in pdf_list:
        pdfs.append(pdf.stem)
    new_documents = list(set(pdfs) - set(json_files))
    return new_documents

In [19]:
get_documents_wo_json(test_ip)

['20190131_invoice']

In [20]:
def open_document_ui(api_key: str, document_id: str) -> str:
    """Opens the Xtracta UI to fix and train documents"""
    documents_url = "https://api-app.xtracta.com/v1/documents/ui"
    data = {
        "api_key": api_key,
        "document_id": int(document_id),
        "buttons": "output,archive",
        "no_lockout": 1,
        "expire": 86400,
    }
    r = requests.post(documents_url, data=data)
    response = xmltodict.parse(r.text)
    return response["documents_response"]["url"]

In [21]:
test_document = get_document(api_key, test_document_id)
test_output = create_output(test_document)
display(test_output)
if test_output['status'] in ['reject', 'output']:
    display(open_document_ui(api_key, test_document_id))

{'document_id': '99606933',
 'status': 'pre-processing',
 'version': '1',
 'header': {'supplier_abn': None,
  'supplier': None,
  'supplier_id': None,
  'invoice_number': None,
  'po_number': None,
  'line_count': None,
  'account_number': None,
  'invoice_date': None,
  'net_total': '0.00',
  'gst_total': None,
  'gross_total': None,
  'freight': '0.00',
  'bsb_number': None,
  'bank_account_number': None,
  'ok_to_process': None,
  'period_start_date': None,
  'period_end_date': None,
  'abn_from_db_by_po': None,
  'filename': '20190131_invoice.pdf'}}

In [22]:
def update_document(
    api_key: str, document_id: str, delete: int = 0, api_download_status: str = "active"
) -> Dict[str, str]:
    """Updates document on Xtracta"""
    documents_url = "https://api-app.xtracta.com/v1/documents/update"
    data = {
        "api_key": api_key,
        "document_id": int(document_id),
        "delete": delete,
        "api_download_status": api_download_status,
    }
    r = requests.post(documents_url, data=data)
    response = xmltodict.parse(r.text)
    return response["documents_response"]

In [23]:
update_document(api_key, test_document_id, api_download_status='active')

OrderedDict([('status', '200'),
             ('message', 'The request has been successfully processed')])

In [24]:
def get_lines(document):
    lines_dict = document['documents_response']['document']['field_data']['field_set']['row']
    lines = []
    if len(lines_dict) > 1:
        for line_dict in lines_dict:
            line = transform_dict(line_dict['field'])
            lines.append(line)
    else:
        line = transform_dict(lines_dict['field'])
        lines.append(line)
    return lines

In [25]:
test_line_document_id = upload_file(api_key, line_workflow, test_ip / test_file)
test_line_document = get_document(api_key, test_line_document_id)
test_lines = get_lines(test_line_document)
test_lines

[{'po_item': None, 'description': None, 'qty': None, 'total': None}]

## Build output once in output status

In [26]:
def build_out_output(document, output):
    output['stem'] = output['header']['filename'].split('.')[0]
    output['new_filename'] = f"{output['header']['supplier_id']}-{output['header']['invoice_number']}"
    output['header']['emaildate'] = get_email_date(output['stem'])
    output['document_url'] = document['documents_response']['document']['document_url']
    output['image_urls'] = get_image_urls(document['documents_response']['document']['image_url'])
    return output

def get_email_date(stem):
    year = stem[:4]
    month = stem[4:6]
    day = stem[6:8]
    return f"{year}-{month}-{day}"

def get_image_urls(image_urls):
    if type(image_urls) != list:
        image_urls = [image_urls]
    return image_urls

In [45]:
test_document = get_document(api_key, test_document_id)
test_output = create_output(test_document)
full_test_output = build_out_output(test_document, test_output)
full_test_output

{'document_id': '99606933',
 'status': 'reject',
 'version': '2',
 'header': {'supplier_abn': None,
  'supplier': None,
  'supplier_id': None,
  'invoice_number': 'INV-3337',
  'po_number': '12345',
  'line_count': None,
  'account_number': None,
  'invoice_date': '2016-01-25',
  'net_total': '85.00',
  'gst_total': '8.50',
  'gross_total': '93.50',
  'freight': '0.00',
  'bsb_number': '4321432',
  'bank_account_number': '12341234',
  'ok_to_process': '0',
  'period_start_date': None,
  'period_end_date': None,
  'abn_from_db_by_po': None,
  'filename': '20190131_invoice.pdf',
  'emaildate': '2019-01-31'},
 'stem': '20190131_invoice',
 'new_filename': 'None-INV-3337',
 'document_url': 'https://web1-akl.xtracta.com/akl_northcote_storage2_datasource4/1/93/19/iq204587316-jB5yJehe.pdf',
 'image_urls': ['https://web1-akl.xtracta.com/akl_northcote_storage2_datasource4/1/24/e5/ds99606933-iq204587316-jB5yJehe7808-1-800.jpg']}

## Pull company name and location from filename

In [46]:
def add_company_location(output):
    company_extract = re.compile(r'.*\[<<(.*)>>\].*')
    company_location = company_extract.match(output['header']['email_subject'])[1]
    try:
        output['company'], output['location'] = company_location.split('-')
    except:
        output['company'] = company_location
        output['location'] = 'NA'
    return output

In [47]:
full_test_output['header']['email_subject'] = '234 [<<ABC-123>>] Here is a subject'
full_test_output = add_company_location(full_test_output)
full_test_output

{'document_id': '99606933',
 'status': 'reject',
 'version': '2',
 'header': {'supplier_abn': None,
  'supplier': None,
  'supplier_id': None,
  'invoice_number': 'INV-3337',
  'po_number': '12345',
  'line_count': None,
  'account_number': None,
  'invoice_date': '2016-01-25',
  'net_total': '85.00',
  'gst_total': '8.50',
  'gross_total': '93.50',
  'freight': '0.00',
  'bsb_number': '4321432',
  'bank_account_number': '12341234',
  'ok_to_process': '0',
  'period_start_date': None,
  'period_end_date': None,
  'abn_from_db_by_po': None,
  'filename': '20190131_invoice.pdf',
  'emaildate': '2019-01-31',
  'email_subject': '234 [<<ABC-123>>] Here is a subject'},
 'stem': '20190131_invoice',
 'new_filename': 'None-INV-3337',
 'document_url': 'https://web1-akl.xtracta.com/akl_northcote_storage2_datasource4/1/93/19/iq204587316-jB5yJehe.pdf',
 'image_urls': ['https://web1-akl.xtracta.com/akl_northcote_storage2_datasource4/1/24/e5/ds99606933-iq204587316-jB5yJehe7808-1-800.jpg'],
 'company'

## Write JSON files, create TIFs and move PDFs

In [48]:
def write_json_simple(filename, output):
    filename = filename.with_suffix('.json')
    with open(f"{filename}", "w") as f:
        f.write(json.dumps(output, indent=4))
    return filename

In [49]:
test_json_file = write_json_simple(test_ip / test_output['header']['filename'], test_output)
test_json_file

WindowsPath('C:/Users/hudge/Desktop/2019_projects/pipomatic/hudge/xtracta/input/20190131_invoice.json')

In [64]:
def move_from_input(api_key, document, ip, lp, op, jp):
    output = create_output(document)
    json_source = (ip / output['header']['filename']).with_suffix('.json')
    pdf_source = (ip / output['header']['filename']).with_suffix('.pdf')
    if document['documents_response']['document']['document_status'] == 'output':
        output = build_out_output(document, output)
        json_destination = json_destination = (op / output['new_filename']).with_suffix('.json')
        if not output['header']['line_count']:
            output['header']['line_count'] = 1
        if float(output['header']['line_count']) > 1:
            json_destination = (lp / output['new_filename']).with_suffix('.json')
            pdf_destination = (lp / output['new_filename']).with_suffix('.pdf')
            if pdf_source.exists():
                pdf_destination.write_bytes(pdf_source.read_bytes())
        with open(f"{json_destination}", "w") as f:
            f.write(json.dumps(output, indent=4))
        save_tif(output, op)
        if json_destination.exists() and json_source.exists():
            json_source.unlink()
            if pdf_source.exists():
                pdf_source.unlink()
        return 'File moved to output / lines'
    elif document['documents_response']['document']['document_status'] == 'qa':
        json_destination = (jp / output['header']['filename']).with_suffix('.json')
        json_source.replace(json_destination)
        if json_destination.exists():
            json_source.unlink()
            if pdf_source.exists():
                pdf_source.unlink()
            return 'File moved to junk'
    else:
        return 'File not moved'
    
        
def create_tif_image(image_urls):
    images = []
    for i, url in enumerate(image_urls):
        r = requests.get(url, stream=True)
        if i == 0:
            im = Image.open(r.raw)
        else:
            images.append(Image.open(r.raw))
    return im, images

def save_tif(output, op):
    new_name = (op / output['new_filename']).with_suffix('.tif')
    im, images = create_tif_image(output['image_urls'])
    im.save(f'{new_name}', save_all=True, append_images=images)
    return im, images
    

In [66]:
move_files(test_sp, test_file, test_ip)
write_json_simple(test_ip / test_output['header']['filename'], test_output)
move_from_input(api_key, test_document, test_ip, test_lp, test_op, test_jp)

'File not moved'

## Moving files in the file system

In [52]:
def clean_input_folder(ip):
    json_files = ip.glob('*.json')
    pdf_files = ip.glob('*.pdf')
    files = list(json_files)
    files.extend(list(pdf_files))
    all_files = ip.glob('*.*')
    for file in all_files:
        if file not in files:
            file.unlink()
    return True

In [53]:
move_files(test_sp, test_file, test_ip)
(test_ip / 'test.png').open('w').write('some text')
write_json_simple(test_ip / test_output['header']['filename'], test_output)
clean_input_folder(test_ip)

True

## Formatting XML for upload into Xtracta's database

Take a list of dicts and format it for uploading to Xtracta's database API

In [54]:
def update_database_data(api_key, database_id, out, refresh):
    documents_url = 'https://api-app.xtracta.com/v1/databases/data_add'
    data = {'api_key': api_key, 'database_id': int(database_id), 'data': out, 'refresh': refresh}
    r = requests.post(documents_url, data=data)
    response = xmltodict.parse(r.text)
    return response

In [55]:
def build_xml_data(supplier_data_dict):
    xml_rows = []
    for row in supplier_data_dict:
        po = {'column': [{'@id': '55261', '#text': f"{row['po_number']}"}, 
                         {'@id': '55264', '#text': f"{row['supplier_number']}"},
                         {'@id': '60223', '#text': f"{row['line_number']}"},
                         {'@id': '58133', '#text': f"{row['abn']}"},
                         {'@id': '58134', '#text': f"{row['bsb']}"},
                         {'@id': '58135', '#text': f"{row['bank_account']}"},
                         {'@id': '58242', '#text': f"{row['supplier_name']}"}]}
        xml_rows.append(po)
    xml_data = {'xml': {'row': xml_rows}}
    return xmltodict.unparse(xml_data, pretty=True)

## Build HTML file for handling rejections

In [56]:
def create_html_section(data, html_template):
    template = Template(html_template)
    html = template.render(data=data)
    return html


reject_queue_html_template = """
    <div class="column is-full">
        <p><strong>Total number of rejects in queue: {{data.reject_count}}</strong></p>
    </div>
    <div class="column is-full">  
        <h2><strong>{{data.output.header.supplier}}</strong></h2>
        <p><strong>Invoice number:</strong> {{data.output.header.invoice_number}}</p>
    </div>
    <div class="column is-two-fifths">
    <section class="section has-text-right">
        <p><strong>Net:</strong> {{"$%.2f"|format(data.output.header.net_total|float)}}</p>
        <p><strong>GST:</strong> {{"$%.2f"|format(data.output.header.gst_total|float)}}</p>
        <p><strong>Total:</strong> {{"$%.2f"|format(data.output.header.gross_total|float)}}</p>
    </section>
    <section class="section">
        <table class="table">
        <thead><tr><th>Field</th><th>Message</th></tr></thead>
        <tbody>
        {% for message in data.messages %}
        <tr>
            <th>{{message.field}}</th>
            <td>{{message.message}}</td>
        </tr>
        {% endfor %}
        </tbody>
        </table>
    </section>
    </div>
    <div class="column is-three-fifths has-text-centered">
        <p><a href="{{data.review_link}}" target="_blank">Review invoice</a></p>
        <p><img src="{{data.invoice_image}}" alt="Invoice Image" width="250"></p>
    </div>
"""


def get_reject_html(api_key, workflow_id, status, html_template):
    queue = get_xtracta_status(api_key, workflow_id, status)
    reject_count = len(queue)
    document_id = queue[0]["document_id"]
    reasons = queue[0]["rejection"]["reason"]
    messages = get_reject_info(api_key, document_id, reasons)
    document = get_document(api_key, document_id)
    image_url = document["documents_response"]["document"]["image_url"][0]
    output = create_output(document)
    review_link = open_document_ui(api_key, document_id)
    data = {
        "output": output,
        "reject_count": reject_count,
        "review_link": review_link,
        "invoice_image": image_url,
        "messages": messages,
    }
    html = create_html_section(data=data, html_template=html_template)
    return html


In [57]:
def get_reject_info(api_key, document_id, reasons):
    messages = []
    document = get_document(api_key, document_id)
    field_ids = get_field_ids(document)
    if type(reasons) != list:
        field_id = reasons["linked_field"]["field_id"]
        message = reasons["message"]
        messages.append({"field": field_ids[field_id], "message": message})
    else:
        for sub_item in reasons:
            field_id = sub_item["linked_field"]["field_id"]
            message = sub_item["message"]
            messages.append({"field": field_ids[field_id], "message": message})
    return messages


def get_field_ids(document):
    field_ids = {}
    fields = document["documents_response"]["document"]["field_data"]["field"]
    for field in fields:
        field_ids[field["field_id"]] = field["field_name"]
    return field_ids

## Build code

The remaining cells load the code to PIP

In [74]:
!jupyter nbconvert \
    --TagRemovePreprocessor.enabled=True \
    --TagRemovePreprocessor.remove_cell_tags="['build']" \
    --TemplateExporter.exclude_output=True \
    --to python "xtracta.ipynb"

first_line = """'Xtracta package'

__version__ = '1.5'

"""
script_file = Path.cwd() / 'xtracta.py'
script = script_file.read_text()
script_file.write_text(first_line + script)
username = script_file.parent.parent.name
system_name = script_file.parent.name
standardised_script_name = f'pipomatic_{username}_{system_name}.py'
script_file.replace(script_file.parent / standardised_script_name)
standardised_script_name
# 
#

[NbConvertApp] Converting notebook xtracta.ipynb to python
[NbConvertApp] Writing 17438 bytes to xtracta.py


'pipomatic_hudge_xtracta.py'

In [75]:
!black "pipomatic_hudge_xtracta.py"

reformatted pipomatic_hudge_xtracta.py
All done! \u2728 \U0001f370 \u2728
1 file reformatted.


In [42]:
# !flit publish