Skip to content

Commit

Permalink
fix multiple schema usage #282
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Jun 29, 2022
1 parent e4acd7d commit c970251
Show file tree
Hide file tree
Showing 16 changed files with 247 additions and 90 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/psf/black
rev: 22.3.0
rev: 22.6.0
hooks:
- id: black
- repo: https://github.com/pycqa/isort
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -71,7 +71,7 @@ PGSync is written in Python (supporting version 3.7 onwards) and the stack is co
PGSync leverages the [logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) feature of [Postgres](https://www.postgresql.org) (introduced in PostgreSQL 9.4) to capture a continuous stream of change events.
This feature needs to be enabled in your [Postgres](https://www.postgresql.org) configuration file by setting in the postgresql.conf file:
```
> wal_level = logical
wal_level = logical
```

You can select any pivot table to be the root of your document.
Expand Down
2 changes: 1 addition & 1 deletion bin/parallel_sync
Expand Up @@ -95,7 +95,7 @@ def fetch_tasks(doc: dict, block_size: Optional[int] = None) -> Generator:
pages: dict = {}
sync: Sync = Sync(doc)
root: Node = sync.tree.build(sync.nodes)
statement: sa.sql.selectable.Select = sa.select(
statement: sa.sql.Select = sa.select(
[
sa.literal_column("1").label("x"),
sa.literal_column("1").label("y"),
Expand Down
3 changes: 3 additions & 0 deletions examples/node/README
@@ -0,0 +1,3 @@
Demonstrates Adjacency List Relationships

- https://docs.sqlalchemy.org/en/14/orm/self_referential.html
48 changes: 48 additions & 0 deletions examples/node/data.py
@@ -0,0 +1,48 @@
import json
from typing import List

import click
from schema import Node
from sqlalchemy.orm import sessionmaker

from pgsync.base import pg_engine, subtransactions
from pgsync.helper import teardown
from pgsync.utils import get_config


@click.command()
@click.option(
"--config",
"-c",
help="Schema config",
type=click.Path(exists=True),
)
def main(config):

config: str = get_config(config)
teardown(drop_db=False, config=config)
documents: List[dict] = json.load(open(config))
engine: sa.engine.Engine = pg_engine(
database=documents[0].get("database", documents[0]["index"])
)

Session = sessionmaker(bind=engine, autoflush=True)
session = Session()

nodes: List[Node] = [
Node(id=1, name="Node A"),
Node(id=2, name="Node B"),
Node(id=3, name="Node C"),
Node(id=4, name="Node A_A", node_id=1),
Node(id=5, name="Node B_B", node_id=2),
Node(id=6, name="Node C_C", node_id=3),
Node(id=7, name="Node A_A_A", node_id=4),
Node(id=8, name="Node B_B_B", node_id=5),
Node(id=9, name="Node C_C_C", node_id=6),
]
with subtransactions(session):
session.add_all(nodes)


if __name__ == "__main__":
main()
30 changes: 30 additions & 0 deletions examples/node/schema.json
@@ -0,0 +1,30 @@
[
{
"database": "node",
"index": "node",
"nodes": {
"table": "node",
"label": "parent",
"children": [
{
"table": "node",
"label": "child",
"children": [
{
"table": "node",
"label": "grand_child",
"relationship": {
"variant": "object",
"type": "one_to_one"
}
}
],
"relationship": {
"variant": "object",
"type": "one_to_one"
}
}
]
}
}
]
46 changes: 46 additions & 0 deletions examples/node/schema.py
@@ -0,0 +1,46 @@
import json

import click
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base

from pgsync.base import create_database, pg_engine
from pgsync.helper import teardown
from pgsync.utils import get_config

Base = declarative_base()


class Node(Base):
__tablename__ = "node"
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
name = sa.Column(sa.String)
node_id = sa.Column(sa.Integer, sa.ForeignKey("node.id"))
children = sa.orm.relationship("Node", lazy="joined", join_depth=2)


def setup(config=None):
for document in json.load(open(config)):
database: str = document.get("database", document["index"])
create_database(database)
engine: sa.engine.Engine = pg_engine(database=database)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)


@click.command()
@click.option(
"--config",
"-c",
help="Schema config",
type=click.Path(exists=True),
)
def main(config):

config = get_config(config)
teardown(config=config)
setup(config)


if __name__ == "__main__":
main()
11 changes: 11 additions & 0 deletions examples/poc/schema.json
@@ -0,0 +1,11 @@
[
{
"database": "testdb",
"index": "test",
"nodes": {
"table": "user",
"schema": "user_profile",
"columns": []
}
}
]

0 comments on commit c970251

Please sign in to comment.