Skip to content

API endpoint to validate and process incoming message that comes from Backend Team to our Database. It is similar to CDC process that empowers message queue with help from Google PubSub

Notifications You must be signed in to change notification settings

okzapradhana/pubsub-gatekeeper-api

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Gatekeeper API with Google PubSub

About

API endpoint to validate and process incoming message that comes from Backend Team to our Database.

Those payloads MUST pass the gatekeeper's schema rule (specified in validator/schema.py) in order to can be processed to Database.

It is similar to CDC process that empowers message queue with help from Google PubSub.

Architecture

archi From this architecture, it will receive payloads that comes from API request to our endpoint. Which need to validate by Flask + jsonschema .

If the payload is invalid, push the metrics to Pushgateway which can be pulled by Prometheus later.

But if the payload is valid, besides push the metrics to Pushgateway it also publish the payload message to Topic with Google PubSub.

Then some subscribers listen to specific topic and consumes message that published to that topic. Then, every message will be processed as a Database transaction either delete operation, insert operation or both.

But, naturally BigQuery doesn't support multiple statement transactions as it's not intend as transaction Database, thus how we handle this case? I've made implementation on that which you might see more in here

Tech Stack:

  1. Python
  2. Flask
  3. Google PubSub
  4. Google BigQuery
  5. Prometheus Pushgateway

Setup

Initial

To use this project on your computer. You need to clone this repository first.

git clone https://github.com/okzapradhana/pubsub-gatekeeper-api.git

Then you may refer to Python and Prometheus section

Python

  1. Create your python virtual environment
    python -m venv venv
    
  2. Activate the environment
    source venv/bin/activate
    
  3. Install the depedencies
    pip install -r requirements.txt
    

Prometheus Pushgateway

Prometheus pushgateway allows batch jobs to expose their metrics to Prometheus. Such as how many specific endpoint got hit, how many valid and invalid payload request that sent to the endpoint, etc. You can read more on docs here

First of all, you need to pull the image from DockerHub

Then, how to run the pulled image? I included those steps on How To Use

docker pull prom/pushgateway

Environment Variables

Create .env on root of your project directory that corresponds to .env.example in this repository which are:

GOOGLE_APPLICATION_CREDENTIALS=
PROJECT_ID=
DATASET_ID=
TOPIC_ID=
SUBSCRIPTION_ID=
PUSHGATEWAY_PROMETHEUS_HOST=

Note:
Points your GOOGLE_APPLICATION_CREDENTIALS to your service account file path.

Google Cloud

  1. Install gcloud if you haven't installed it yet.
  2. Simply run the bash script named pubsub.sh to create the topic and subscriber which will subscribe/listen to created topic/
    ./pubsub.sh
    
  3. Be sure that you MUST create/download your Service Account and points your GOOGLE_APPLICATION_CREDENTIALS to the downloaded service account path.

How To Use

  1. Run the flask APP by:

    FLASK_APP=app/main.py FLASK_ENV=development flask run --port <port>
    

    If you are in development phase, and want to auto reload every changes you made to the code. But if you are in production. Use:

    FLASK_APP=app/main.py FLASK_ENV=production flask run --port <port>
    

    You can specify the port based on your need, for example 8080

  2. Open your other terminal. Then run the subscriber to listen and fetch every message sent to topic by the publisher by executing:

    python services/subscriber.py
    
  3. Run the docker image by:

    docker run -d -p 9091:9091 prom/pushgateway
    

    If the container is running as expected, you can access localhost:9091 using your browser.

    Then, fill the PUSHGATEWAY_PROMETHEUS_HOST variable with localhost:9091 , you don't have to set 9091 as the port. You can change the port when running the image based on your needs.

Pushgateway Metrics Result

  1. How many valid payload that has been sent by hitting our API ? valid-schema-count

  2. How many invalid payload that has been sent by hitting our API ? invalid-schema-count

    As this two examples are not enough, this one is included in what can/should I improve on this project.

Testing

Functional Test

To perform functional testing on this project. Simply run:

pytest

Load Test

This project is using locust to perform performance test. To run this, open your terminal and execute this command:

locust

Then go to http://localhost:8089 and set

  1. Max users
  2. Spawn user per second
  3. Host

Locust tutorial: https://docs.locust.io/en/stable/what-is-locust.html

Load Test result locust-result

For load test I used maximum 1000 users with 5 users spawned per second (spawn rate)

Explanation

We have to create API endpoint which received message from Backend Team. The example of validated payload which has right structure is:

{
    "activities": [
        {
            "operation": "delete",
            "table": "table32",
            "value_to_delete": {
                "col_names": ["a", "c"],
                "col_types": ["INTEGER", "TEXT"],
                "col_values": [1, "2018-03-27 11:58:28.988414"]
            }
        },
        {
            "operation": "insert",
            "table": "table32",
            "col_names": ["a", "c"],
            "col_types": ["INTEGER", "TEXT"],
            "col_values": [1, "2018-03-27 11:58:28.988414"]
        },
    ]
}

We're not just need to validate the payload schema to follows the right structure, but we also have to handle cases the operation logic. For example, we have to handle if the table is not available/exists when delete operation processed.

As of the requirement of this project is if the table is not exist on the Database, it will fail the entire transaction and output error. Note that transaction is the all operations inside the activities array. Thus it means that nothing operation happened.

Naturally, BigQuery didn't support for multiple transaction statements. And transactions are fully support on transactional database such as PostgreSQL, MySQL.

Then, how to deal with this BigQuery limitations?

How to Handle Transactions in BigQuery

In this project, I did something hacky to enable this features if we still want to use BigQuery as our Database. By utilize concept of Queue and dry run query in BigQuery I was able to tweaking BigQuery to perform transactions (commit & rollback) thus when there are errors in delete operation it will do rollback and do nothing on the BigQuery.

If you wish to see more, I have implemented it on services/bigquery_client.py

About

API endpoint to validate and process incoming message that comes from Backend Team to our Database. It is similar to CDC process that empowers message queue with help from Google PubSub

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published