In 2023, India Ministry of Corporate Affairs (MCA) mandated all enterprises operating in Indiaโincluding US-based companies with branches in Indiaโto maintain daily backups of all financial data and related documents (๐ see MCA Notification 21st August 2022). For many Finance teams, complying to this mandate is added burden of manual work and costly implementations.
Finance Data Pipeline is a purpose-built data pipeline to help Finance teams comply with evolving accounting standards and regulations without increasing manual work by automatically orchestrating large volume of financial data across multiple systems. The data pipeline leverages parallel programming, REST APIs, and incremental load ELT architecture to support the following stack:
- ERP: NetSuite, SuiteScript
- Orchestrator: Airflow
- Data storage: Postgres, AWS S3
- Analytics: Tableau, Mode, Adaptive Planning
- Internal Tool: Retool
Finance Data Pipeline facilitates Finance teams' compliance requirements without increasing manual work. It leverages Airflow, parallel programming, and an incremental load ELT architecture to automatically:
- Extract data from NetSuite ERP by executing a pool of DAG tasks hourly to call a custom NetSuite SuiteScript API to extract transactions and related metadata such as vendors, customers, employees, departments, files, etc. asynchronously and dynamically.
- Load data into SQL warehouse and India-hosted S3 lake.
- Transform data to structured, audit-ready, and reportable data, empowering Finance and Accounting teams to consume the Financial data directly in analytics tools like Mode, Tableau, Adaptive, and Pigment, as well as custom reconciliation tools such as Retool and Superblocks.
Data engineers interested in this project must have knowledge of NetSuite, SuiteScript 2.1+, Accounting, and REST APIs to effectively operate the custom Airflow's NetSuite operators. The asynchronous and dynamic processing capability is available for both the pool of tasks and the DAG level (๐ see Step 4: Configure DAG).
Create a SuiteScript 2.1+ RESTlet script to deploy a custom NetSuite REST API, retrieve its script_id
, and store in .env
file similar to .env.example or in Airflow UI through Admin
/Variables
and Admin
/Connections
define(['N/search'], function (search) {
return onRequest: (c) => {
const { type, id, filters, columns } = c;
return search.create({
type
id,
filters,
columns
})
.run()
.getRange(start, end)
.map(rows => rows.getValue())
.reduce((rows, row) => {
return rows;
}, results: [])
}
})
git clone https://github.com/mykkelol/netsuite-data-pipeline
cd netsuite-data-pipeline
python3 -m pip install docker
python3 -m pip install docker-compose
docker build -t extending-airflow:latest .
docker-compose up -d
CREATE TABLE IF NOT EXISTS my_table_name (
id VARCHAR(255),
duedate DATE,
trandate DATE,
amount DECIMAL,
tranid VARCHAR(255),
entity VARCHAR(255),
status VARCHAR(255),
currency VARCHAR(255),
department VARCHAR(255),
record_type VARCHAR(255),
requester_name VARCHAR(255),
requester_email VARCHAR(255),
nextapprover_name VARCHAR(255),
nextapprover_email VARCHAR(255),
transaction_number VARCHAR(255),
PRIMARY KEY(id)
)
In dags_config.py, RECORD_TYPE
/search_id
is required to run the DAG. In the DAG below, the highlighted tasks represents the base transaction
query required in RECORD_TYPE
and search_id
.
Optionally, leverage parallel programming and Airflow Task Group to optimize and enrich the data pipeline by configuring the subsearches
property to add sub queries to support the base query. To do so, add tuples of (record_type
, search_id
, filters
) to configure the DAG to run multiple task instances automatically, dynamically and asychronously.
{
'type': 'transaction',
'search_id': 'search_id_india',
'subsearches': [
('transaction', 'customsearch_gl_posting_transactions_india', []),
('customer', 'customsearch_customer', []),
('currency', 'customsearch_currency', []),
('some_record_type', 'customsearch_id', []),
]
}
Lastly, the pipeline can be extended even further to execute multiple DAGs dynamically and asynchronously. To do so, add additional base query dictionaries to RECORD_TYPE
. Consider the following example, which involves a common scenario of a multi-subsidiary enterprise that operates with 2 subsidiaries (US, IN), then the pipeline can be configured to execute two DAGs for two base queries (or 2 subsidiaries).
[
{
'type': 'transaction',
'search_id': 'search_id_usa',
'subsearches': [
('transaction', 'customsearch_gl_posting_transactions_usa', []),
('some_record_type', 'customsearch_id', []),
]
},
{
'type': 'transaction',
'search_id': 'search_id_india',
'subsearches': [
('transaction', 'customsearch_gl_posting_transactions_india', []),
('some_record_type', 'customsearch_id', []),
]
}
]
Finance Data Pipeline is used by finance systems engineers in the developer community:
- You are free to fork and use this code directly according to the Apache License (๐ see LICENSE).
- Please do not copy it directly.
- Crediting the author is appreciated.
- Github @mykkelol
- LinkedIn /msihavong