Skip to content

Commit

Permalink
code cleanup and prep for release
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Jan 3, 2024
1 parent a479afc commit 1e4c3c7
Show file tree
Hide file tree
Showing 32 changed files with 90 additions and 101 deletions.
4 changes: 2 additions & 2 deletions bin/bootstrap
Expand Up @@ -56,9 +56,9 @@ def main(teardown, config, user, password, host, port, verbose):

validate: bool = False if teardown else True

for document in config_loader(config):
for doc in config_loader(config):
sync: Sync = Sync(
document,
doc,
verbose=verbose,
validate=validate,
repl_slots=False,
Expand Down
4 changes: 1 addition & 3 deletions bin/es_mapping
Expand Up @@ -114,9 +114,7 @@ def main(config):
"""Create custom NGram analyzer for the default mapping."""

config: str = get_config(config)
for index in set(
[document["index"] for document in config_loader(config)]
):
for index in set([doc["index"] for doc in config_loader(config)]):
create_es_mapping(index)


Expand Down
20 changes: 7 additions & 13 deletions bin/parallel_sync
Expand Up @@ -114,7 +114,6 @@ class Task:
sync: Sync = Sync(
self.doc, verbose=self.verbose, validate=self.validate
)
sync.tree.build(sync.nodes)
txmin: int = sync.checkpoint
txmax: int = sync.txid_current
sync.search_client.bulk(
Expand Down Expand Up @@ -214,7 +213,6 @@ def synchronous(
) -> None:
sys.stdout.write("Synchronous\n")
sync: Sync = Sync(doc, verbose=verbose, validate=validate)
sync.tree.build(sync.nodes)
txmin: int = sync.checkpoint
txmax: int = sync.txid_current
index: str = sync.index
Expand Down Expand Up @@ -250,7 +248,6 @@ def multithreaded(
nthreads: int = nthreads or 1
queue: Queue = Queue()
sync: Sync = Sync(doc, verbose=verbose, validate=validate)
sync.tree.build(sync.nodes)

for _ in range(nthreads):
thread: Thread = Thread(
Expand Down Expand Up @@ -358,7 +355,6 @@ def run_task(
) -> int:
if sync is None:
sync: Sync = Sync(doc, verbose=verbose, validate=validate)
sync.tree.build(sync.nodes)
txmin: int = sync.checkpoint
txmax: int = sync.txid_current
sync.search_client.bulk(
Expand Down Expand Up @@ -423,20 +419,18 @@ def main(config, nprocs, mode, verbose):
show_settings()
config: str = get_config(config)

for document in config_loader(config):
tasks: t.Generator = fetch_tasks(document)
for doc in config_loader(config):
tasks: t.Generator = fetch_tasks(doc)
if mode == "synchronous":
synchronous(tasks, document, verbose=verbose)
synchronous(tasks, doc, verbose=verbose)
elif mode == "multithreaded":
multithreaded(tasks, document, nthreads=nprocs, verbose=verbose)
multithreaded(tasks, doc, nthreads=nprocs, verbose=verbose)
elif mode == "multiprocess":
multiprocess(tasks, document, ncpus=nprocs, verbose=verbose)
multiprocess(tasks, doc, ncpus=nprocs, verbose=verbose)
elif mode == "multithreaded_async":
multithreaded_async(
tasks, document, nthreads=nprocs, verbose=verbose
)
multithreaded_async(tasks, doc, nthreads=nprocs, verbose=verbose)
elif mode == "multiprocess_async":
multiprocess_async(tasks, document, ncpus=nprocs, verbose=verbose)
multiprocess_async(tasks, doc, ncpus=nprocs, verbose=verbose)


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions examples/airbnb/data.py
Expand Up @@ -20,8 +20,8 @@
def main(config):
config: str = get_config(config)
teardown(drop_db=False, config=config)
document: dict = next(config_loader(config))
database: str = document.get("database", document["index"])
doc: dict = next(config_loader(config))
database: str = doc.get("database", doc["index"])
with pg_engine(database) as engine:
Session = sessionmaker(bind=engine, autoflush=True)
session = Session()
Expand Down
4 changes: 2 additions & 2 deletions examples/airbnb/schema.py
Expand Up @@ -103,8 +103,8 @@ class Review(Base):


def setup(config: str) -> None:
for document in config_loader(config):
database: str = document.get("database", document["index"])
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
create_database(database)
with pg_engine(database) as engine:
Base.metadata.drop_all(engine)
Expand Down
4 changes: 2 additions & 2 deletions examples/ancestry/data.py
Expand Up @@ -17,8 +17,8 @@
def main(config):
config: str = get_config(config)
teardown(drop_db=False, config=config)
document: dict = next(config_loader(config))
database: str = document.get("database", document["index"])
doc: dict = next(config_loader(config))
database: str = doc.get("database", doc["index"])
with pg_engine(database) as engine:
Session = sessionmaker(bind=engine, autoflush=True)
session = Session()
Expand Down
4 changes: 2 additions & 2 deletions examples/ancestry/schema.py
Expand Up @@ -67,8 +67,8 @@ class GreatGrandChild(Base):


def setup(config: str) -> None:
for document in config_loader(config):
database: str = document.get("database", document["index"])
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
create_database(database)
with pg_engine(database) as engine:
Base.metadata.drop_all(engine)
Expand Down
4 changes: 2 additions & 2 deletions examples/book/benchmark.py
Expand Up @@ -138,8 +138,8 @@ def main(config, nsize, daemon, tg_op):
show_settings()

config: str = get_config(config)
document: dict = next(config_loader(config))
database: str = document.get("database", document["index"])
doc: dict = next(config_loader(config))
database: str = doc.get("database", doc["index"])
with pg_engine(database) as engine:
Session = sessionmaker(bind=engine, autoflush=False, autocommit=False)
session = Session()
Expand Down
6 changes: 3 additions & 3 deletions examples/book/data.py
Expand Up @@ -40,10 +40,10 @@ def main(config, nsize):
config: str = get_config(config)
teardown(drop_db=False, config=config)

for document in config_loader(config):
database: str = document.get("database", document["index"])
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
with pg_engine(database) as engine:
schema: str = document.get("schema", DEFAULT_SCHEMA)
schema: str = doc.get("schema", DEFAULT_SCHEMA)
connection = engine.connect().execution_options(
schema_translate_map={None: schema}
)
Expand Down
6 changes: 3 additions & 3 deletions examples/book/schema.py
Expand Up @@ -241,9 +241,9 @@ class BookShelf(Base):


def setup(config: str) -> None:
for document in config_loader(config):
database: str = document.get("database", document["index"])
schema: str = document.get("schema", DEFAULT_SCHEMA)
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
schema: str = doc.get("schema", DEFAULT_SCHEMA)
create_database(database)
create_schema(database, schema)
with pg_engine(database) as engine:
Expand Down
4 changes: 2 additions & 2 deletions examples/book_view/benchmark.py
Expand Up @@ -131,8 +131,8 @@ def main(config, nsize, daemon, tg_op):
show_settings()

config: str = get_config(config)
document: dict = next(config_loader(config))
database: str = document.get("database", document["index"])
doc: dict = next(config_loader(config))
database: str = doc.get("database", doc["index"])
with pg_engine(database) as engine:
Session = sessionmaker(bind=engine, autoflush=False, autocommit=False)
session = Session()
Expand Down
10 changes: 4 additions & 6 deletions examples/book_view/data.py
Expand Up @@ -20,10 +20,10 @@ def main(config):
config: str = get_config(config)
teardown(drop_db=False, config=config)

for document in config_loader(config):
database: str = document.get("database", document["index"])
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
with pg_engine(database) as engine:
schema: str = document.get("schema", DEFAULT_SCHEMA)
schema: str = doc.get("schema", DEFAULT_SCHEMA)
connection = engine.connect().execution_options(
schema_translate_map={None: schema}
)
Expand Down Expand Up @@ -106,9 +106,7 @@ def main(config):
with subtransactions(session):
session.add_all(books.values())

sync: Sync = Sync(document, validate=False)

sync.tree.build(sync.nodes)
sync: Sync = Sync(doc, validate=False)

sync.refresh_views()

Expand Down
6 changes: 3 additions & 3 deletions examples/book_view/schema.py
Expand Up @@ -44,9 +44,9 @@ class Book(Base):


def setup(config: str) -> None:
for document in config_loader(config):
database: str = document.get("database", document["index"])
schema: str = document.get("schema", DEFAULT_SCHEMA)
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
schema: str = doc.get("schema", DEFAULT_SCHEMA)
create_database(database)
create_schema(database, schema)
with pg_engine(database) as engine:
Expand Down
4 changes: 2 additions & 2 deletions examples/node/data.py
Expand Up @@ -19,8 +19,8 @@
def main(config):
config: str = get_config(config)
teardown(drop_db=False, config=config)
document = next(config_loader(config))
database: str = document.get("database", document["index"])
doc = next(config_loader(config))
database: str = doc.get("database", doc["index"])
with pg_engine(database) as engine:
Session = sessionmaker(bind=engine, autoflush=True)
session = Session()
Expand Down
4 changes: 2 additions & 2 deletions examples/node/schema.py
Expand Up @@ -24,8 +24,8 @@ class Node(Base):


def setup(config: str) -> None:
for document in config_loader(config):
database: str = document.get("database", document["index"])
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
create_database(database)
with pg_engine(database) as engine:
Base.metadata.drop_all(engine)
Expand Down
4 changes: 2 additions & 2 deletions examples/quiz/data.py
Expand Up @@ -17,8 +17,8 @@
def main(config):
config: str = get_config(config)
teardown(drop_db=False, config=config)
document = next(config_loader(config))
database: str = document.get("database", document["index"])
doc = next(config_loader(config))
database: str = doc.get("database", doc["index"])
with pg_engine(database) as engine:
Session = sessionmaker(bind=engine, autoflush=True)
session = Session()
Expand Down
4 changes: 2 additions & 2 deletions examples/quiz/schema.py
Expand Up @@ -107,8 +107,8 @@ class RealAnswer(Base):


def setup(config: str) -> None:
for document in config_loader(config):
database: str = document.get("database", document["index"])
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
create_database(database)
with pg_engine(database) as engine:
Base.metadata.drop_all(engine)
Expand Down
4 changes: 2 additions & 2 deletions examples/schemas/data.py
Expand Up @@ -17,8 +17,8 @@
def main(config):
config: str = get_config(config)
teardown(drop_db=False, config=config)
document = next(config_loader(config))
database: str = document.get("database", document["index"])
doc = next(config_loader(config))
database: str = doc.get("database", doc["index"])
with pg_engine(database) as engine:
Session = sessionmaker(bind=engine, autoflush=True)
session = Session()
Expand Down
4 changes: 2 additions & 2 deletions examples/schemas/schema.py
Expand Up @@ -33,8 +33,8 @@ class Child(Base):


def setup(config: str) -> None:
for document in config_loader(config):
database: str = document.get("database", document["index"])
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
create_database(database)
for schema in ("parent", "child"):
create_schema(database, schema)
Expand Down
4 changes: 2 additions & 2 deletions examples/social/data.py
Expand Up @@ -17,8 +17,8 @@
def main(config):
config: str = get_config(config)
teardown(drop_db=False, config=config)
document: dict = next(config_loader(config))
database: str = document.get("database", document["index"])
doc: dict = next(config_loader(config))
database: str = doc.get("database", doc["index"])
with pg_engine(database) as engine:
Session = sessionmaker(bind=engine, autoflush=True)
session = Session()
Expand Down
4 changes: 2 additions & 2 deletions examples/social/schema.py
Expand Up @@ -94,8 +94,8 @@ class UserTag(Base):


def setup(config: str) -> None:
for document in config_loader(config):
database: str = document.get("database", document["index"])
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
create_database(database)
with pg_engine(database) as engine:
Base.metadata.drop_all(engine)
Expand Down
4 changes: 2 additions & 2 deletions examples/starcraft/data.py
Expand Up @@ -17,8 +17,8 @@
def main(config):
config: str = get_config(config)
teardown(drop_db=False, config=config)
document = next(config_loader(config))
database: str = document.get("database", document["index"])
doc = next(config_loader(config))
database: str = doc.get("database", doc["index"])
with pg_engine(database) as engine:
Session = sessionmaker(bind=engine, autoflush=True)
session = Session()
Expand Down
4 changes: 2 additions & 2 deletions examples/starcraft/schema.py
Expand Up @@ -49,8 +49,8 @@ class Structure(Base):


def setup(config: str) -> None:
for document in config_loader(config):
database: str = document.get("database", document["index"])
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
create_database(database)
with pg_engine(database) as engine:
Base.metadata.drop_all(engine)
Expand Down
6 changes: 3 additions & 3 deletions examples/through/data.py
Expand Up @@ -19,10 +19,10 @@ def main(config):
config: str = get_config(config)
teardown(drop_db=False, config=config)

for document in config_loader(config):
database: str = document.get("database", document["index"])
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
with pg_engine(database) as engine:
schema: str = document.get("schema", DEFAULT_SCHEMA)
schema: str = doc.get("schema", DEFAULT_SCHEMA)
connection = engine.connect().execution_options(
schema_translate_map={None: schema}
)
Expand Down
6 changes: 3 additions & 3 deletions examples/through/schema.py
Expand Up @@ -65,9 +65,9 @@ class CustomerGroup(Base):


def setup(config: str) -> None:
for document in config_loader(config):
database: str = document.get("database", document["index"])
schema: str = document.get("schema", DEFAULT_SCHEMA)
for doc in config_loader(config):
database: str = doc.get("database", doc["index"])
schema: str = doc.get("schema", DEFAULT_SCHEMA)
create_database(database)
create_schema(database, schema)
with pg_engine(database) as engine:
Expand Down
1 change: 0 additions & 1 deletion pgsync/base.py
Expand Up @@ -1071,7 +1071,6 @@ def drop_database(database: str, echo: bool = False) -> None:
sa.text(f'DROP DATABASE IF EXISTS "{database}"'),
options={"isolation_level": "AUTOCOMMIT"},
)

logger.debug(f"Dropped database: {database}")


Expand Down
2 changes: 1 addition & 1 deletion plugins/character/groot.py
Expand Up @@ -7,7 +7,7 @@ class GrootPlugin(plugin.Plugin):
name: str = "Groot"

def transform(self, doc: dict, **kwargs) -> dict:
"""Demonstrates how to modify a document."""
"""Demonstrates how to modify a doc."""
doc_id: str = kwargs["_id"]
doc_index: str = kwargs["_index"]

Expand Down
6 changes: 3 additions & 3 deletions plugins/sample.py
Expand Up @@ -27,7 +27,7 @@ class VillainPlugin(plugin.Plugin):
name: str = "Villain"

def transform(self, doc: dict, **kwargs) -> dict:
"""Demonstrates how to modify a document."""
"""Demonstrates how to modify a doc."""
doc_id: str = kwargs["_id"]
doc_index: str = kwargs["_index"]

Expand All @@ -48,7 +48,7 @@ class HeroPlugin(plugin.Plugin):
name: str = "Hero"

def transform(self, doc: dict, **kwargs) -> dict:
"""Demonstrates how to modify a document."""
"""Demonstrates how to modify a doc."""
doc_id: str = kwargs["_id"]
doc_index: str = kwargs["_index"]

Expand All @@ -70,7 +70,7 @@ class GeometryPlugin(plugin.Plugin):
name: str = "Geometry"

def transform(self, doc: dict, **kwargs) -> dict:
"""Demonstrates how to modify a document."""
"""Demonstrates how to modify a doc."""
doc_index: str = kwargs["_index"]

if doc_index == "book":
Expand Down

0 comments on commit 1e4c3c7

Please sign in to comment.