Skip to content

talc87/priority_data_pipeline_postgres_db

Repository files navigation

Priority ERP DWH

  1. About The Project
  2. Prerequisites
  3. The extractionConfig JSON
  4. API endpoints
  5. Roadmap
  6. Contributing
  7. License
  8. Contact
  9. Acknowledgments

About The Project

Priority is an ERP system for S&M business.
The project builds your Priority DWH based on a Postgres DB basedDWH based on Priority API. The API allow you to get the data exactly the same way the business user sees it on at the Priority GUI. Unlike connecting to the Priority SQL DB which requires the BI developer to create the user business logics using SQL queries.

(back to top)


Built With

LaravelLaravelLaravelLaravel

(back to top)

Data Platform Architecture

Will be Added

Prerequisites

Priority Prerequisites

The doc will not include a step by step guide of how to set the Priority user and API access rights and API subscription fees. I assume that you know how how to grant access to the API, how to set the relevnt screens etc. For any issues around this topic are, I recomand to contanct you Priority support team/system admin/implementor.

  1. Create a dedicated user with API access. The data platform doesn't write any data to your Priority system and required only ("read only") HTTP GET access
  2. Make sure you have the API address, username and password.
  3. For testing purposes you can use the free Priority API sandbox.

Setting the .venv file.

# SQL DB connection str:
sqlConnStr=postgresql+psycopg2://<username>:<password>@<host>:<port>/


# Mongo DB connection str:
mongoDbConnStr=mongodb+srv://<username>:<password>@<url>/


# Include in the .env file BUT DON'T MODIFY THE VALUES
metadataDbName=metadataDB
configDbName=admin
configCollectionName=configCollection
datatypeMappingCollectionName=datatypeMapping``

  • sqlConnStr: uri to a postgres DB. Don't delete the '/' at the end.
  • mongoDbConnStr: uri to the MongoDB you will use. Don't delete the '/' at the end.

DON'T Modify THE FOLLOWING VARIABLES OR VALUES

  • metadataDbName: the Mongo DB name which stores the metadata of the Priority ERP system.
  • configDbName: the Mongo DB name which stores the extractionConfig.
  • configCollectionName: The Mongo DB collection which stores the extractionConfig JSON documents.

MongoDB prerequisites

The MongoDB stores metadata, extraction logs and other app related data. While developing I used MongoDB free tier. Using a free tier on production is not recomanded. While tesing I used a paid tier of Azure Cosmos DB for MongoDB. Feel free ot use other MongoDB based DB of other cloud providers.

Don't forget to allow incoming traffic from the IP you are running the API on.


The extractionConfig JSON

The extractionConfig JSON describe all the information for extracting the data from the Priority system. The extractionConfig defines the API credentionals, url and extraction scope.

After completing reading this section, create your extractionConfig and insert the json document to the MongoDB '.<_id>'. configDbName is defined .env file and the '_id' is a uniqe identifier of the extractionConfig as desribed below.

In a case you have more then 1 accounting instance (more then Priority company code in you enviorment) each accounting instnace will be represented by a different extractionConfig. All accounting instaces will be included in the same accountID.

For example: company "The best smartphone" has 3 major business activities: sells smartphone to the customer (B2C), provide repairs and spare-part services and import smartphone and sell them as a wholesaler (B2B). each business activity is manages in a different company (accounting instance) in the Priority System. In the exaple above "The best smartphone" will be represented by an accountID (choose the string as you want). Inside this accountID there will be 3 extractionConfigs, one for each company code.

Check out the folder "extractionConfig_examples" to see examples of the extractionConfig json.

Populate extractionConfig JSON

{
	"_id": { "$oid": "678a5c74ee789f0826b9466a" },
	"datasourceName": "priority_companyA",
	"uri": "https://www.eshbelsaas.com/ui/odata/Priority/tabmob.ini/usdemo/",
	"apiUsername": "apidemo",
	"apiPassword": "123",
	"accountID": "03445d66",
	"systemTimezone": "Israel",
	"sourceSystem": "priority",
	"entities": [
		{
			"entityID": "ORDERS",
			"filterFlag": true,
			"filterField": "CURDATE",
			"expand": ["ORDERITEMS"],
			"lastRun": "2024-06-12 21:53:35",
			"datarStartDate": "2020-05-30 00:00:00"
		},
		{
			"entityID": "CTYPE",
			"filterFlag": false,
			"filterField": "",
			"expand": [],
			"lastRun": "2024-06-12 21:53:40",
			"datarStartDate": "2020-05-30 00:00:00"
		}
	]
}

Field details and description

  • _id- object id. a uniqe identifier of the extractionConfig. Generated automatically by the Mongo DB when inserting a new extractionConfig into the MongoDB.

  • datasourceName - string. Choose a name to the extractionConfig.

  • uri- string. The Prioriy API address according to your Priority system.

  • apiUsername- Username to the API user you created for the data extraction.

  • apiPassword- Password to the API user you created for the data extraction.

  • accountID- string. Enter the accountID you chose in section Priority Prerequisites > Choose accountID above.

  • PrioritySystemTimeZoneId- Priority system timezone. For the full timezones list refer to pytz-time-zones.

  • sourceSystem- string. No need to modify, keep 'priority'.

  • Entities- A list of entities (Priority screens) you ask to extract the data from. Each entity is represented by list item. You need to complete all the fields below:

    • entityID- entity name (Priority screen technical name).

    • filterFlag- boolean. A flag which determines whether a filter will be applied while extracting the data from this entity.

    • filterField- Determine according to which filter the data will be filtered too.

    • expand- a list of sub-forms/sub-screens that you want to extract together wi the entityID. In the example above together with the screen "ORDERS" the sub-screen "ORDERITEMS" will be extracted as well. the subscreen will be stored in a different table inside the Postgres DB.

    • lastRun- Auto-generated by the app, No need to modify. The last timestamp which the entityID was extracted from the API.

    • datarStartDate - timestamp (yyyy-MM-dd HH:mm:ss) The baseline timestamp which you want to extract the data from. In the example above the extraction will fetch all the ORDERS which thier.

For example:

In the extractionConfig above, each time the the data in the dwh will be refresh: The entity "ORDERS" will be filterd according to the "CURDATE" field. The value which be filterd depands if the data-refresh is incremental or not. If the refresh is incremental, all the orders from '2024-06-12 21:53:35' will be enterd to the dwh. If the refresh is not incremental, all the orders from '2020-05-30 00:00:00' will be enterd to the dwh. The entity 'CTYPE' won't be filterd (filterFlag set to false) and since there is no filterField, each refresh all the historical data will be enterd to the dwh. For more information about incremental refresh, refer to the API documention.


Postgres DB Prerequisites

The Postgres stores the data which extracted from the Priority system (Invoices, JE, PO etc.) and served as a DWH. I used Azure Database for PostgreSQL flexible server.

  • Create a DB and name it like the accountID on the extractionConfig JSON above.
  • Allow incoming traffic from the IP you are running the API on.
  • You don't need to create any tables, all tables will be created later automatically.


API endpoints


GET info

Return the configuration information related to database connections and collections.

url = "http://localhost:5000/info"

payload = ""
headers = {}

response = requests.request("GET", url, headers=headers, data=payload)

Response:

{
	"SQL connection string": "postgresql+psycopg2://postgres@localhost:5432/",
	"mongoDB collection that stores the datatypes mapping": "datatypeMapping",
	"mongoDB collection that stores the extraction cofnig documents": "configCollection",
	"mongoDB connection string": "mongodb+srv://talcohen0507:Taltool87!@erpdataplatform.t8ot6ep.mongodb.net/",
	"mongoDB that stores the extraction config documents": "priority_dwh_admin",
	"mongoDB that stores the metadata": "metadataDB"
}

POST extractionConfig

Insert a new extractionConfig json to the MongoDB under priority_dwh_admin.configCollection.

payload = {The extractionConfig JSON you want to insert.}


headers = {
  'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

Response:
_id- object id which generated randomly by the Mongo DB. success- boolean value if the json inserted successfully.

{
	"_id": "6766b998583ca0fa2c45ff63",
	"success": true
}

GET extractionConfig

Retrieve the requested extractionConfig json from the MongoDB under priority_dwh_admin.configCollection.

payload = {
	'datasourceId": "<datasource _id>'
}


headers = {
  'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)

Response:

{
	"_id": "6766bcf9583ca0fa2c45ff6a",
	"accountID": "03445d66",
	"apiPassword": "123",
	"apiUsername": "apidemo",
	"datasourceName": "priority_companyA",
	"entities": [
		{
			"EntityID": "ORDERS",
			"datarStartDate": "2020-05-30 00:00:00",
			"expand": ["ORDERITEMS"],
			"filterField": "",
			"filterFlag": false,
			"lastRun": "2024-06-12 21:53:35"
		},
		{
			"EntityID": "CTYPE",
			"datarStartDate": "2020-05-30 00:00:00",
			"expand": [],
			"filterField": "",
			"filterFlag": false,
			"lastRun": "2024-06-12 21:53:40"
		}
	],
	"sourceSystem": "priority",
	"submitTimestampUTC": "Sat, 21 Dec 2024 13:04:57 GMT",
	"systemTimezone": "Israel",
	"uri": "https://www.eshbelsaas.com/ui/odata/Priority/tabmob.ini/usdemo/"
}

GET pingApi

ping Priority API, Postgresg db and mongo db and check that the API can reach them.

Python syntax:

url = "http://localhost:5000/refreshPriorityMetadata"

Method = 'GET'

payload = {
           "datasourceId": "<data source _id>"
          }

headers = {
            'Content-Type': 'application/json'
          }

response = requests.request(Method, url, headers=headers, data=payload)

Response:
{
	"MomongoDB response": "{'ok': 1}",
	"Priority api response": "priority_api_response:200 reponse_text: OK",
	"SQL reponse": "Database connection is OK."
}

GET testExtractionconfigEntities

Testing access to the Priority entities by fetching the first records of each entity and subentity in the extractionconfig JSON.


Python Syntax

url = "http://localhost:5000/testExtractionconfigEntities"

Method = 'GET'

payload = {
           "datasourceId": "<data source _id>"
          }

headers = {
            'Content-Type': 'application/json'
          }

response = requests.request(Method, url, headers=headers, data=payload)

Header example:

{
	"datasourceId": "678a5c74ee789f0826b9466a"
}

Response:

[
	{
		"entity": "ORDERS", //entity name
		"result": 200, // reponse code from the priority API
		"url": "https://www.eshbelsaas.com/ui/odata/Priority/tabmob.ini/usdemo/ORDERS?%24expand=ORDERITEMS_SUBFORM&%24top=1" //request url which was send to the Priority API
	},
	{
		"entity": "CTYPE",
		"result": 200,
		"url": "https://www.eshbelsaas.com/ui/odata/Priority/tabmob.ini/usdemo/CTYPE?%24top=1"
	}
]

POST initialDataLoad

Deploy all the data platform from scratch.

  • Collect the Priority screens metadata and insert it into the metadata collection in the MongoDB.

  • Fetch the data from Priority API, insert it to the Postgres DB.
    When deploying the app for the first time, this is the first endpoint you should use after checking the API using pingapi endpoint.

    Actions:

    1. Delete all metadata documents from the MongoDB and write them again.
    2. Sends GET requests to the priority API and collect the data according to the extractionConfig JSON. The data will stored in the tables "stg_<entity_name>".
    3. For each entity and subform in the extractionConfig JSON a table will be created with all fields in the entity according to the entity datatypes.
{
	"datasourceId": "<data source _id>"
}

Response:

{
	"initialDataLoad": {
		"metadataRefreshResults": {
			"documentsDeleted": 3755, // # of metadata documents which were in the Mongo DB and deleted.
			"endTimestamp": "Sat, 18 Jan 2025 09:06:04 GMT",
			"metadataInserted": true, //boolean, if the new metadata was inserted successfully.
			"metadataRecordsBeforeDelete": 3755, //# of new metadata documents enterd to the MongoDB.
			"metadataRecordsDatatypedModified": 3755,
			"metadataRecordsExtractedFromApi": 3755, //# of new metadata documents which extracted from the api.
			"startTimestamp": "Sat, 18 Jan 2025 09:05:43 GMT",
			"totalTimeSeconds": 21.802948
		},
		"refreshtTablesData": [
			{
				"entityName": "orders", //entity name according to Priority
				"recordsWritten": 276, // 3 of records instrted to the table
				"tableName": "stg_orders" // table name
			},
			{
				"entityName": "orderitems_subform",
				"recordsWritten": 123,
				"tableName": "stg_orderitems"
			},
			{
				"entityName": "ctype",
				"recordsWritten": 5,
				"tableName": "stg_ctype"
			}
		],
		"sqlDeployedTables": {
			"exists": ["ORDERS", "ORDERITEMS", "CTYPE"], // Table which are already in the DWH and there was not need to rereactae or modify.
			"failed": [], // Table failes to creat.
			"success": [] // Table created in the DWH successfully
		}
	}
}

POST refreshData

Refresh and write the data into stg_ tables. incremental parameter set to False: all data in the stg tables will be dropped a the refresh will be preformed from datarStartDate till today.

incremental parameter set to True: refresh will be preformed from lastRun till today according to the filterField key.

refreshData endpoint request example:

Python syntax:

url = "http://localhost:5000/refreshData?incremental=False"

Method = 'POST'

payload = {
            "datasourceId": <data source _id>
          }

headers = {
            'Content-Type': 'application/json'
          }

response = requests.request(Method, url, headers=headers, data=payload)

POST refreshData endpoint response

{
	"refreshData": [
		{
			"entityName": "ORDERS", //the entity/sub-entity name according to the Priority system
			"recordsWritten": 284, //number of records which were written to the table
			"tableName": "stg_ORDERS" //Postgres table name which the data was written to
		},
		{
			"entityName": "ORDERITEMS_SUBFORM", //
			"recordsWritten": 13, //
			"tableName": "stg_ORDERITEMS" //
		},
		{
			"entityName": "CTYPE", //
			"recordsWritten": 5, //
			"tableName": "stg_CTYPE" //
		}
	]
}

POST resetDataPlatform

Reset the entire data platform. The end point will run the following:

  1. Delete all tables in the DWH.
  2. Run initialDataLoad

Python syntax:

url = "http://localhost:5000/resetDataPlatform

Method = 'POST'

payload = {
            "datasourceId": <data source _id>
          }

headers = {
            'Content-Type': 'application/json'
          }

response = requests.request(Method, url, headers=headers, data=payload)

POST resetDataPlatform endpoint response

{
	"resetDataPlatform": {
		"deleteAllTables": {
			// Tables which were in the DWH and were deleted.
			"dwhTables": [
				"stg_orders",
				"stg_orderitems",
				"orders",
				"orderitems",
				"ctype",
				"stg_ctype"
			],
			"results": "All 6 tables dropped."
		},

		"initialDataLoad": {
			"metadataRefreshResults": {
				"documentsDeleted": 3755, // # of metadata documents which were in the Mongo DB and deleted.
				"endTimestamp": "Sat, 18 Jan 2025 09:06:04 GMT",
				"metadataInserted": true, //boolean, if the new metadata was inserted successfully.
				"metadataRecordsBeforeDelete": 3755, //# of new metadata documents enterd to the MongoDB.
				"metadataRecordsDatatypedModified": 3755,
				"metadataRecordsExtractedFromApi": 3755, //# of new metadata documents which extracted from the api.
				"startTimestamp": "Sat, 18 Jan 2025 09:05:43 GMT",
				"totalTimeSeconds": 21.802948
			},
			"refreshtTablesData": [
				{
					"entityName": "orders", //entity name according to Priority
					"recordsWritten": 276, // 3 of records instrted to the table
					"tableName": "stg_orders" // table name
				},
				{
					"entityName": "orderitems_subform",
					"recordsWritten": 123,
					"tableName": "stg_orderitems"
				},
				{
					"entityName": "ctype",
					"recordsWritten": 5,
					"tableName": "stg_ctype"
				}
			],
			"sqlDeployedTables": {
				"exists": ["ORDERS", "ORDERITEMS", "CTYPE"], // Table which are already in the DWH and there was not need to rereactae or modify.
				"failed": [], // Table failes to creat.
				"success": [] // Table created in the DWH successfully
			}
		}
	}
}

(back to top)

Roadmap

Will be updated soon.

See the open issues for a full list of proposed features (and known issues).

(back to top)

Contributing

Will be updated soon.

(back to top)

License

Distributed under the MIT License.

Contact

Tal Cohen - LinkedIn

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors