Skip to content

Commit

Permalink
add more documentations
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Sep 23, 2023
1 parent 61b04f2 commit 9511670
Show file tree
Hide file tree
Showing 25 changed files with 535 additions and 120 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
ports:
- 6379:6379
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.9
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.13
ports:
- 9200:9200
- 9300:9300
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,8 @@ venv.bak/
.rental*
.social*
.shakespeare*
old
examples/tiers/

# vscode
.vscode/
92 changes: 54 additions & 38 deletions bin/parallel_sync
Original file line number Diff line number Diff line change
@@ -1,44 +1,43 @@
#!/usr/bin/env python

"""
Parallel sync is an experimental feature that leverages the available
CPU's/Threads to increase throughput.
This is can be useful for environments that have a high network latency.
In this scenario, your PG database, Elasticsearch/OpenSearch, and PGSync
servers are on different networks with a delay between request/response time.
The main bottleneck, in this case, is usually the roundtrip of the database
query.
Even with server-side cursors, we are still only able to fetch
a limited number of records at a time from the cursor.
The delay in the next cursor fetch can slow down the overall sync
considerably.
The solution here is to perform an initial fast/parallel sync
to populate Elasticsearch/OpenSearch in a single iteration.
When this is complete, we can then continue to run the normal `pgsync`
as a daemon.
This approach uses the Tuple identifier record of the table columns.
Each table contains a system column - "ctid" of type "tid" that
identifies the page record and row number in each block.
We can use this to paginate the sync process.
Pagination here technically implies that we are splitting each paged record
between CPU's/Threads.
This allows us to perform Elasticserch/OpenSearch bulk inserts in parallel.
The "ctid" is a tuple of (page, row-number) e.g (1, 5) that identifies the
row in a disk page.
This method allows us to fetch all paged row records upfront and split them
into work units amongst the workers(threads/cpus).
Each chunk of work is defined by the BLOCK_SIZE and corresponds to the number
of root node records each worker needs to process.
The worker's query for each chunk of work filtering by the page number
and row numbers.
Parallel sync is an innovative, experimental feature designed to optimize
throughput by utilizing available CPUs/threads, particularly beneficial
in environments experiencing high network latency.
Scenario & Challenge:
In instances where your PG database, Elasticsearch/OpenSearch, and PGSync
servers operate on divergent networks, a delay in request/response time is
noticeable. The primary constraint emerges from the database query's roundtrip,
which even server-side cursors can address only to a limited extent by fetching
a certain number of records at a time. The consequent delay in fetching the
next cursor significantly hampers the overall synchronization speed.
Solution:
To mitigate this, the strategy is to conduct an initial fast/parallel sync,
thereby populating Elasticsearch/OpenSearch in a single iteration.
Post this, the regular pgsync can continue running as a daemon.
Approach and Technical Implementation:
The approach centers around utilizing the Tuple identifier record of the table
columns. Every table incorporates a system column – "ctid" of type "tid,"
which helps identify the page record and the row number in each block.
This element facilitates the pagination of the sync process.
Technically, pagination implies dividing each paged record amongst the
available CPUs/threads. This division enables the parallel execution of
Elasticsearch/OpenSearch bulk inserts. The "ctid" serves as a tuple
(for instance, (1, 5)), pinpointing the row in a disk page.
By leveraging this method, all paged row records are retrieved upfront and
allocated as work units across the worker threads/CPUs.
Each work unit, defined by the BLOCK_SIZE, denotes the number of root node
records assigned for each worker to process.
Subsequently, the workers execute queries for each assigned chunk of work,
filtered based on the page number and row numbers.
This systematic and parallel approach optimizes the synchronization process,
especially in environments challenged by network latency.
"""

import asyncio
Expand Down Expand Up @@ -67,12 +66,29 @@ from pgsync.utils import (


def save_ctid(page: int, row: int, name: str) -> None:
"""
Save the checkpoint for a given page and row in a file with the given name.
Args:
page (int): The page number to save.
row (int): The row number to save.
name (str): The name of the file to save the checkpoint in.
"""
checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{name}.ctid")
with open(checkpoint_file, "w+") as fp:
fp.write(f"{page},{row}\n")


def read_ctid(name: str) -> None:
"""
Reads the checkpoint file for the given name and returns the page and row numbers.
Args:
name (str): The name of the checkpoint file.
Returns:
tuple: A tuple containing the page and row numbers. If the checkpoint file does not exist, returns (None, None).
"""
checkpoint_file: str = os.path.join(CHECKPOINT_PATH, f".{name}.ctid")
if os.path.exists(checkpoint_file):
with open(checkpoint_file, "r") as fp:
Expand Down
37 changes: 9 additions & 28 deletions docker-compose-opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ version: '3.8'

services:
postgres:
image: debezium/postgres:16
ports:
- "15432:5432"
environment:
- POSTGRES_USER=pgsync
- POSTGRES_PASSWORD=PLEASE_CHANGE_ME
- POSTGRES_DB=postgres
extends:
file: docker-compose.yml
service: postgres
redis:
image: redis
command: redis-server --requirepass PLEASE_CHANGE_ME
extends:
file: docker-compose.yml
service: redis
opensearch:
image: opensearchproject/opensearch:latest
ports:
Expand All @@ -23,14 +20,9 @@ services:
- "DISABLE_INSTALL_DEMO_CONFIG=true"
- "DISABLE_SECURITY_PLUGIN=true"
pgsync:
build:
context: .
dockerfile: Dockerfile
command: ./runserver.sh
sysctls:
- net.ipv4.tcp_keepalive_time=200
- net.ipv4.tcp_keepalive_intvl=200
- net.ipv4.tcp_keepalive_probes=5
extends:
file: docker-compose.yml
service: pgsync
labels:
org.label-schema.name: "pgsync"
org.label-schema.description: "Postgres to OpenSearch sync"
Expand All @@ -40,16 +32,5 @@ services:
- redis
- opensearch
environment:
- PG_USER=pgsync
- PG_HOST=postgres
- PG_PORT=5432
- PG_PASSWORD=PLEASE_CHANGE_ME
- LOG_LEVEL=INFO
- ELASTICSEARCH_PORT=9200
- ELASTICSEARCH_SCHEME=http
- ELASTICSEARCH_HOST=opensearch
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_AUTH=PLEASE_CHANGE_ME
- ELASTICSEARCH=false
- OPENSEARCH=true
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ services:
image: redis
command: redis-server --requirepass PLEASE_CHANGE_ME
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.9
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.13
ports:
- "9201:9200"
- "9301:9300"
Expand Down
2 changes: 1 addition & 1 deletion docker/runserver.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ python $EXAMPLE_DIR/data.py --config $EXAMPLE_DIR/schema.json

bootstrap --config $EXAMPLE_DIR/schema.json

pgsync --config $EXAMPLE_DIR//schema.json --daemon
pgsync --config $EXAMPLE_DIR/schema.json --daemon
29 changes: 29 additions & 0 deletions pgsync/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@


class Payload(object):
"""
Represents a payload object that contains information about a database change event.
Attributes:
tg_op (str): The type of operation that triggered the event (e.g. INSERT, UPDATE, DELETE).
table (str): The name of the table that was affected by the event.
schema (str): The name of the schema that contains the table.
old (dict): The old values of the row that was affected by the event (for UPDATE and DELETE operations).
new (dict): The new values of the row that was affected by the event (for INSERT and UPDATE operations).
xmin (int): The transaction ID of the event.
indices (List[str]): The indices of the affected rows (for UPDATE and DELETE operations).
"""

__slots__ = ("tg_op", "table", "schema", "old", "new", "xmin", "indices")

def __init__(
Expand Down Expand Up @@ -409,6 +422,22 @@ def _logical_slot_changes(
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> sa.sql.Select:
"""
Returns a SQLAlchemy Select statement that selects changes from a logical replication slot.
Args:
slot_name (str): The name of the logical replication slot to read from.
func (sa.sql.functions._FunctionGenerator): The function to use to read from the slot.
txmin (Optional[int], optional): The minimum transaction ID to read from. Defaults to None.
txmax (Optional[int], optional): The maximum transaction ID to read from. Defaults to None.
upto_lsn (Optional[int], optional): The maximum LSN to read up to. Defaults to None.
upto_nchanges (Optional[int], optional): The maximum number of changes to read. Defaults to None.
limit (Optional[int], optional): The maximum number of rows to return. Defaults to None.
offset (Optional[int], optional): The number of rows to skip before returning. Defaults to None.
Returns:
sa.sql.Select: A SQLAlchemy Select statement that selects changes from the logical replication slot.
"""
filters: list = []
statement: sa.sql.Select = sa.select(
[sa.column("xid"), sa.column("data")]
Expand Down
11 changes: 10 additions & 1 deletion pgsync/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
"""PGSync Constants."""
"""
PGSync Constants.
This module contains constants used in PGSync.
It includes constants for relationship types, relationship variants, node attributes, relationship attributes,
relationship foreign keys, tg_op, JSONB operators, Elasticsearch types, Elasticsearch mapping parameters,
transform types, default postgres schema, built-in schemas, primary key identifier, logical decoding output plugin,
trigger function, materialized views, primary key delimiter, and replication slot patterns.
"""

import re

# Relationship types
Expand Down
13 changes: 12 additions & 1 deletion pgsync/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,18 @@ def teardown(
config: Optional[str] = None,
validate: bool = False,
) -> None:
"""Teardown helper."""
"""
Teardown helper.
Args:
drop_db (bool, optional): Whether to drop the database. Defaults to True.
truncate_db (bool, optional): Whether to truncate the database. Defaults to True.
delete_redis (bool, optional): Whether to delete Redis. Defaults to True.
drop_index (bool, optional): Whether to drop the index. Defaults to True.
delete_checkpoint (bool, optional): Whether to delete the checkpoint. Defaults to True.
config (Optional[str], optional): The configuration file path. Defaults to None.
validate (bool, optional): Whether to validate the configuration. Defaults to False.
"""
config: str = get_config(config)

for document in config_loader(config):
Expand Down
17 changes: 16 additions & 1 deletion pgsync/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,25 @@

@dataclass
class ForeignKey:
"""
A class representing a foreign key relationship between two tables.
Attributes:
foreign_key (Optional[dict]): A dictionary containing the parent and child table names.
parent (str): The name of the parent table.
child (str): The name of the child table.
"""

foreign_key: Optional[dict] = None

def __post_init__(self):
"""Foreignkey constructor."""
"""Initialize the ForeignKey object.
Sets the parent and child attributes based on the values in the foreign_key dictionary.
If the foreign_key dictionary is not provided, it is set to an empty dictionary.
Raises a RelationshipForeignKeyError if the foreign_key dictionary does not contain
both a parent and child key.
"""
self.foreign_key: str = self.foreign_key or dict()
self.parent: str = self.foreign_key.get("parent")
self.child: str = self.foreign_key.get("child")
Expand Down
12 changes: 12 additions & 0 deletions pgsync/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ def transform(self, doc: dict, **kwargs) -> dict:


class Plugins(object):
"""
A class representing a plugin.
Args:
package (str): The name of the package.
names (list, optional): A list of names. Defaults to None.
Attributes:
package (str): The name of the package.
names (list): A list of names.
"""

def __init__(self, package: str, names: Optional[list] = None):
self.package: str = package
self.names: list = names or []
Expand Down
14 changes: 14 additions & 0 deletions pgsync/search_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,20 @@ def get_search_client(
elasticsearch.RequestsHttpConnection,
],
) -> Union[opensearchpy.OpenSearch, elasticsearch.Elasticsearch]:
"""
Returns a search client based on the specified parameters.
Args:
url (str): The URL of the search client.
client (Union[opensearchpy.OpenSearch, elasticsearch.Elasticsearch]): The search client to use.
connection_class (Union[opensearchpy.RequestsHttpConnection, elasticsearch.RequestsHttpConnection]): The connection class to use.
Returns:
Union[opensearchpy.OpenSearch, elasticsearch.Elasticsearch]: The search client.
Raises:
None
"""
if settings.OPENSEARCH_AWS_HOSTED or settings.ELASTICSEARCH_AWS_HOSTED:
credentials = boto3.Session().get_credentials()
service: str = "aoss" if settings.OPENSEARCH_AWS_SERVERLESS else "es"
Expand Down
7 changes: 6 additions & 1 deletion pgsync/settings.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
"""PGSync settings."""
"""PGSync settings
This module contains the settings for PGSync.
It reads environment variables from a .env file and sets default values for each variable.
The variables are used to configure various parameters such as block size, checkpoint path, polling interval, etc.
"""
import logging
import logging.config
import os
Expand Down
20 changes: 20 additions & 0 deletions pgsync/singleton.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,29 @@


class Singleton(type):
"""
A metaclass that allows a class to have only one instance.
Usage:
class MyClass(metaclass=Singleton):
pass
"""

_instances: dict = {}

def __call__(cls, *args, **kwargs):
"""
If an instance of the class has already been created with the same arguments,
return that instance. Otherwise, create a new instance and return it.
Args:
cls: The class object.
*args: Positional arguments to be passed to the class constructor.
**kwargs: Keyword arguments to be passed to the class constructor.
Returns:
An instance of the class.
"""
if not args:
return super(Singleton, cls).__call__(*args, **kwargs)
database: str = args[0]["database"]
Expand Down
Loading

0 comments on commit 9511670

Please sign in to comment.