diff --git a/bin/bootstrap b/bin/bootstrap index cd9bd288..a71af38d 100755 --- a/bin/bootstrap +++ b/bin/bootstrap @@ -56,9 +56,9 @@ def main(teardown, config, user, password, host, port, verbose): validate: bool = False if teardown else True - for document in config_loader(config): + for doc in config_loader(config): sync: Sync = Sync( - document, + doc, verbose=verbose, validate=validate, repl_slots=False, diff --git a/bin/es_mapping b/bin/es_mapping index fb290bff..d11fadac 100755 --- a/bin/es_mapping +++ b/bin/es_mapping @@ -114,9 +114,7 @@ def main(config): """Create custom NGram analyzer for the default mapping.""" config: str = get_config(config) - for index in set( - [document["index"] for document in config_loader(config)] - ): + for index in set([doc["index"] for doc in config_loader(config)]): create_es_mapping(index) diff --git a/bin/parallel_sync b/bin/parallel_sync index 0a9236fe..7d5fdb70 100755 --- a/bin/parallel_sync +++ b/bin/parallel_sync @@ -114,7 +114,6 @@ class Task: sync: Sync = Sync( self.doc, verbose=self.verbose, validate=self.validate ) - sync.tree.build(sync.nodes) txmin: int = sync.checkpoint txmax: int = sync.txid_current sync.search_client.bulk( @@ -214,7 +213,6 @@ def synchronous( ) -> None: sys.stdout.write("Synchronous\n") sync: Sync = Sync(doc, verbose=verbose, validate=validate) - sync.tree.build(sync.nodes) txmin: int = sync.checkpoint txmax: int = sync.txid_current index: str = sync.index @@ -250,7 +248,6 @@ def multithreaded( nthreads: int = nthreads or 1 queue: Queue = Queue() sync: Sync = Sync(doc, verbose=verbose, validate=validate) - sync.tree.build(sync.nodes) for _ in range(nthreads): thread: Thread = Thread( @@ -358,7 +355,6 @@ def run_task( ) -> int: if sync is None: sync: Sync = Sync(doc, verbose=verbose, validate=validate) - sync.tree.build(sync.nodes) txmin: int = sync.checkpoint txmax: int = sync.txid_current sync.search_client.bulk( @@ -423,20 +419,18 @@ def main(config, nprocs, mode, verbose): show_settings() config: str = get_config(config) - for document in config_loader(config): - tasks: t.Generator = fetch_tasks(document) + for doc in config_loader(config): + tasks: t.Generator = fetch_tasks(doc) if mode == "synchronous": - synchronous(tasks, document, verbose=verbose) + synchronous(tasks, doc, verbose=verbose) elif mode == "multithreaded": - multithreaded(tasks, document, nthreads=nprocs, verbose=verbose) + multithreaded(tasks, doc, nthreads=nprocs, verbose=verbose) elif mode == "multiprocess": - multiprocess(tasks, document, ncpus=nprocs, verbose=verbose) + multiprocess(tasks, doc, ncpus=nprocs, verbose=verbose) elif mode == "multithreaded_async": - multithreaded_async( - tasks, document, nthreads=nprocs, verbose=verbose - ) + multithreaded_async(tasks, doc, nthreads=nprocs, verbose=verbose) elif mode == "multiprocess_async": - multiprocess_async(tasks, document, ncpus=nprocs, verbose=verbose) + multiprocess_async(tasks, doc, ncpus=nprocs, verbose=verbose) if __name__ == "__main__": diff --git a/examples/airbnb/data.py b/examples/airbnb/data.py index 880d263a..57bb6fd4 100644 --- a/examples/airbnb/data.py +++ b/examples/airbnb/data.py @@ -20,8 +20,8 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document: dict = next(config_loader(config)) - database: str = document.get("database", document["index"]) + doc: dict = next(config_loader(config)) + database: str = doc.get("database", doc["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) session = Session() diff --git a/examples/airbnb/schema.py b/examples/airbnb/schema.py index bb320e8d..fb492718 100644 --- a/examples/airbnb/schema.py +++ b/examples/airbnb/schema.py @@ -103,8 +103,8 @@ class Review(Base): def setup(config: str) -> None: - for document in config_loader(config): - database: str = document.get("database", document["index"]) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) create_database(database) with pg_engine(database) as engine: Base.metadata.drop_all(engine) diff --git a/examples/ancestry/data.py b/examples/ancestry/data.py index 77f23ab9..f38683e8 100644 --- a/examples/ancestry/data.py +++ b/examples/ancestry/data.py @@ -17,8 +17,8 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document: dict = next(config_loader(config)) - database: str = document.get("database", document["index"]) + doc: dict = next(config_loader(config)) + database: str = doc.get("database", doc["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) session = Session() diff --git a/examples/ancestry/schema.py b/examples/ancestry/schema.py index 4d25f8fe..e4d348f1 100644 --- a/examples/ancestry/schema.py +++ b/examples/ancestry/schema.py @@ -67,8 +67,8 @@ class GreatGrandChild(Base): def setup(config: str) -> None: - for document in config_loader(config): - database: str = document.get("database", document["index"]) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) create_database(database) with pg_engine(database) as engine: Base.metadata.drop_all(engine) diff --git a/examples/book/benchmark.py b/examples/book/benchmark.py index 19862215..95109550 100644 --- a/examples/book/benchmark.py +++ b/examples/book/benchmark.py @@ -138,8 +138,8 @@ def main(config, nsize, daemon, tg_op): show_settings() config: str = get_config(config) - document: dict = next(config_loader(config)) - database: str = document.get("database", document["index"]) + doc: dict = next(config_loader(config)) + database: str = doc.get("database", doc["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=False, autocommit=False) session = Session() diff --git a/examples/book/data.py b/examples/book/data.py index 85fdae8e..64264f98 100644 --- a/examples/book/data.py +++ b/examples/book/data.py @@ -40,10 +40,10 @@ def main(config, nsize): config: str = get_config(config) teardown(drop_db=False, config=config) - for document in config_loader(config): - database: str = document.get("database", document["index"]) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) with pg_engine(database) as engine: - schema: str = document.get("schema", DEFAULT_SCHEMA) + schema: str = doc.get("schema", DEFAULT_SCHEMA) connection = engine.connect().execution_options( schema_translate_map={None: schema} ) diff --git a/examples/book/schema.py b/examples/book/schema.py index 3b49c0d3..db47672a 100644 --- a/examples/book/schema.py +++ b/examples/book/schema.py @@ -241,9 +241,9 @@ class BookShelf(Base): def setup(config: str) -> None: - for document in config_loader(config): - database: str = document.get("database", document["index"]) - schema: str = document.get("schema", DEFAULT_SCHEMA) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) + schema: str = doc.get("schema", DEFAULT_SCHEMA) create_database(database) create_schema(database, schema) with pg_engine(database) as engine: diff --git a/examples/book_view/benchmark.py b/examples/book_view/benchmark.py index 1f80aba7..0336f55e 100644 --- a/examples/book_view/benchmark.py +++ b/examples/book_view/benchmark.py @@ -131,8 +131,8 @@ def main(config, nsize, daemon, tg_op): show_settings() config: str = get_config(config) - document: dict = next(config_loader(config)) - database: str = document.get("database", document["index"]) + doc: dict = next(config_loader(config)) + database: str = doc.get("database", doc["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=False, autocommit=False) session = Session() diff --git a/examples/book_view/data.py b/examples/book_view/data.py index 51da00be..7aaa251d 100644 --- a/examples/book_view/data.py +++ b/examples/book_view/data.py @@ -20,10 +20,10 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - for document in config_loader(config): - database: str = document.get("database", document["index"]) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) with pg_engine(database) as engine: - schema: str = document.get("schema", DEFAULT_SCHEMA) + schema: str = doc.get("schema", DEFAULT_SCHEMA) connection = engine.connect().execution_options( schema_translate_map={None: schema} ) @@ -106,9 +106,7 @@ def main(config): with subtransactions(session): session.add_all(books.values()) - sync: Sync = Sync(document, validate=False) - - sync.tree.build(sync.nodes) + sync: Sync = Sync(doc, validate=False) sync.refresh_views() diff --git a/examples/book_view/schema.py b/examples/book_view/schema.py index bce00a8f..47e938ad 100644 --- a/examples/book_view/schema.py +++ b/examples/book_view/schema.py @@ -44,9 +44,9 @@ class Book(Base): def setup(config: str) -> None: - for document in config_loader(config): - database: str = document.get("database", document["index"]) - schema: str = document.get("schema", DEFAULT_SCHEMA) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) + schema: str = doc.get("schema", DEFAULT_SCHEMA) create_database(database) create_schema(database, schema) with pg_engine(database) as engine: diff --git a/examples/node/data.py b/examples/node/data.py index 00ed60b3..cb55fab4 100644 --- a/examples/node/data.py +++ b/examples/node/data.py @@ -19,8 +19,8 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document = next(config_loader(config)) - database: str = document.get("database", document["index"]) + doc = next(config_loader(config)) + database: str = doc.get("database", doc["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) session = Session() diff --git a/examples/node/schema.py b/examples/node/schema.py index d6662fb4..c21b753b 100644 --- a/examples/node/schema.py +++ b/examples/node/schema.py @@ -24,8 +24,8 @@ class Node(Base): def setup(config: str) -> None: - for document in config_loader(config): - database: str = document.get("database", document["index"]) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) create_database(database) with pg_engine(database) as engine: Base.metadata.drop_all(engine) diff --git a/examples/quiz/data.py b/examples/quiz/data.py index 7c97c93f..83ab4f67 100644 --- a/examples/quiz/data.py +++ b/examples/quiz/data.py @@ -17,8 +17,8 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document = next(config_loader(config)) - database: str = document.get("database", document["index"]) + doc = next(config_loader(config)) + database: str = doc.get("database", doc["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) session = Session() diff --git a/examples/quiz/schema.py b/examples/quiz/schema.py index efdf43ee..3aefb7c3 100644 --- a/examples/quiz/schema.py +++ b/examples/quiz/schema.py @@ -107,8 +107,8 @@ class RealAnswer(Base): def setup(config: str) -> None: - for document in config_loader(config): - database: str = document.get("database", document["index"]) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) create_database(database) with pg_engine(database) as engine: Base.metadata.drop_all(engine) diff --git a/examples/schemas/data.py b/examples/schemas/data.py index c768f409..61b770e6 100644 --- a/examples/schemas/data.py +++ b/examples/schemas/data.py @@ -17,8 +17,8 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document = next(config_loader(config)) - database: str = document.get("database", document["index"]) + doc = next(config_loader(config)) + database: str = doc.get("database", doc["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) session = Session() diff --git a/examples/schemas/schema.py b/examples/schemas/schema.py index 9731fdf3..cde2551c 100644 --- a/examples/schemas/schema.py +++ b/examples/schemas/schema.py @@ -33,8 +33,8 @@ class Child(Base): def setup(config: str) -> None: - for document in config_loader(config): - database: str = document.get("database", document["index"]) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) create_database(database) for schema in ("parent", "child"): create_schema(database, schema) diff --git a/examples/social/data.py b/examples/social/data.py index b0385cba..5f444bdf 100644 --- a/examples/social/data.py +++ b/examples/social/data.py @@ -17,8 +17,8 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document: dict = next(config_loader(config)) - database: str = document.get("database", document["index"]) + doc: dict = next(config_loader(config)) + database: str = doc.get("database", doc["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) session = Session() diff --git a/examples/social/schema.py b/examples/social/schema.py index ca26db59..ecc82ea2 100644 --- a/examples/social/schema.py +++ b/examples/social/schema.py @@ -94,8 +94,8 @@ class UserTag(Base): def setup(config: str) -> None: - for document in config_loader(config): - database: str = document.get("database", document["index"]) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) create_database(database) with pg_engine(database) as engine: Base.metadata.drop_all(engine) diff --git a/examples/starcraft/data.py b/examples/starcraft/data.py index a72921bf..bf86a75a 100644 --- a/examples/starcraft/data.py +++ b/examples/starcraft/data.py @@ -17,8 +17,8 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - document = next(config_loader(config)) - database: str = document.get("database", document["index"]) + doc = next(config_loader(config)) + database: str = doc.get("database", doc["index"]) with pg_engine(database) as engine: Session = sessionmaker(bind=engine, autoflush=True) session = Session() diff --git a/examples/starcraft/schema.py b/examples/starcraft/schema.py index 3f9972af..5a45bfb8 100644 --- a/examples/starcraft/schema.py +++ b/examples/starcraft/schema.py @@ -49,8 +49,8 @@ class Structure(Base): def setup(config: str) -> None: - for document in config_loader(config): - database: str = document.get("database", document["index"]) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) create_database(database) with pg_engine(database) as engine: Base.metadata.drop_all(engine) diff --git a/examples/through/data.py b/examples/through/data.py index 8a7deca2..e8e3784c 100644 --- a/examples/through/data.py +++ b/examples/through/data.py @@ -19,10 +19,10 @@ def main(config): config: str = get_config(config) teardown(drop_db=False, config=config) - for document in config_loader(config): - database: str = document.get("database", document["index"]) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) with pg_engine(database) as engine: - schema: str = document.get("schema", DEFAULT_SCHEMA) + schema: str = doc.get("schema", DEFAULT_SCHEMA) connection = engine.connect().execution_options( schema_translate_map={None: schema} ) diff --git a/examples/through/schema.py b/examples/through/schema.py index 098e21c4..238cbdea 100644 --- a/examples/through/schema.py +++ b/examples/through/schema.py @@ -65,9 +65,9 @@ class CustomerGroup(Base): def setup(config: str) -> None: - for document in config_loader(config): - database: str = document.get("database", document["index"]) - schema: str = document.get("schema", DEFAULT_SCHEMA) + for doc in config_loader(config): + database: str = doc.get("database", doc["index"]) + schema: str = doc.get("schema", DEFAULT_SCHEMA) create_database(database) create_schema(database, schema) with pg_engine(database) as engine: diff --git a/pgsync/base.py b/pgsync/base.py index c9c50f08..d5486db6 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -1071,7 +1071,6 @@ def drop_database(database: str, echo: bool = False) -> None: sa.text(f'DROP DATABASE IF EXISTS "{database}"'), options={"isolation_level": "AUTOCOMMIT"}, ) - logger.debug(f"Dropped database: {database}") diff --git a/plugins/character/groot.py b/plugins/character/groot.py index c1f5ee76..fe1aac86 100644 --- a/plugins/character/groot.py +++ b/plugins/character/groot.py @@ -7,7 +7,7 @@ class GrootPlugin(plugin.Plugin): name: str = "Groot" def transform(self, doc: dict, **kwargs) -> dict: - """Demonstrates how to modify a document.""" + """Demonstrates how to modify a doc.""" doc_id: str = kwargs["_id"] doc_index: str = kwargs["_index"] diff --git a/plugins/sample.py b/plugins/sample.py index abbfe77e..dbfdfb20 100644 --- a/plugins/sample.py +++ b/plugins/sample.py @@ -27,7 +27,7 @@ class VillainPlugin(plugin.Plugin): name: str = "Villain" def transform(self, doc: dict, **kwargs) -> dict: - """Demonstrates how to modify a document.""" + """Demonstrates how to modify a doc.""" doc_id: str = kwargs["_id"] doc_index: str = kwargs["_index"] @@ -48,7 +48,7 @@ class HeroPlugin(plugin.Plugin): name: str = "Hero" def transform(self, doc: dict, **kwargs) -> dict: - """Demonstrates how to modify a document.""" + """Demonstrates how to modify a doc.""" doc_id: str = kwargs["_id"] doc_index: str = kwargs["_index"] @@ -70,7 +70,7 @@ class GeometryPlugin(plugin.Plugin): name: str = "Geometry" def transform(self, doc: dict, **kwargs) -> dict: - """Demonstrates how to modify a document.""" + """Demonstrates how to modify a doc.""" doc_index: str = kwargs["_index"] if doc_index == "book": diff --git a/requirements/base.txt b/requirements/base.txt index 2698639f..4cfaa6c6 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -75,7 +75,7 @@ six==1.16.0 # opensearch-py # python-dateutil # requests-aws4auth -sqlalchemy==2.0.24 +sqlalchemy==2.0.25 # via -r requirements/base.in sqlparse==0.4.4 # via -r requirements/base.in diff --git a/requirements/dev.txt b/requirements/dev.txt index 2d636a10..04ced052 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -141,7 +141,7 @@ six==1.16.0 # opensearch-py # python-dateutil # requests-aws4auth -sqlalchemy==2.0.24 +sqlalchemy==2.0.25 # via -r requirements/base.in sqlparse==0.4.4 # via -r requirements/base.in diff --git a/scripts/migrate_schema_v2 b/scripts/migrate_schema_v2 index 54d65039..bc326740 100755 --- a/scripts/migrate_schema_v2 +++ b/scripts/migrate_schema_v2 @@ -30,20 +30,20 @@ def main(config): changed: bool = False - documents: list = [] - for document in json.load(open(config)): + docs: list = [] + for doc in json.load(open(config)): new_doc: dict = {} - for key, value in document.items(): + for key, value in doc.items(): if key == "nodes" and isinstance(value, list): new_doc["nodes"] = value[0] changed = True continue new_doc[key] = value - documents.append(new_doc) + docs.append(new_doc) if changed: with open(config, "w") as fp: - fp.write(json.dumps(documents, indent=4)) + fp.write(json.dumps(docs, indent=4)) else: os.unlink(backup) diff --git a/tests/test_sync_single_child_fk_on_parent.py b/tests/test_sync_single_child_fk_on_parent.py index 84a6c583..6490183d 100644 --- a/tests/test_sync_single_child_fk_on_parent.py +++ b/tests/test_sync_single_child_fk_on_parent.py @@ -582,7 +582,7 @@ def test_update_primary_key_non_concurrent( self, data, book_cls, publisher_cls, engine ): """Test sync updates primary_key then sync in non-concurrent mode.""" - document = { + doc = { "index": "testdb", "database": "testdb", "nodes": { @@ -600,7 +600,7 @@ def test_update_primary_key_non_concurrent( ], }, } - sync = Sync(document) + sync = Sync(doc) sync.search_client.bulk(sync.index, sync.sync()) sync.search_client.refresh("testdb") @@ -669,7 +669,7 @@ def test_update_primary_key_non_concurrent( "title": "The Rabbit Club", }, ] - assert_resync_empty(sync, document.get("node", {})) + assert_resync_empty(sync, doc.get("node", {})) sync.search_client.close() # TODO: Add another test like this and change @@ -678,7 +678,7 @@ def test_update_primary_key_concurrent( self, data, book_cls, publisher_cls ): """Test sync updates primary_key and then sync in concurrent mode.""" - document = { + doc = { "index": "testdb", "database": "testdb", "nodes": { @@ -696,7 +696,7 @@ def test_update_primary_key_concurrent( ], }, } - sync = Sync(document) + sync = Sync(doc) sync.search_client.bulk(sync.index, sync.sync()) sync.search_client.refresh("testdb") @@ -786,12 +786,12 @@ def poll_db(): "title": "The Rabbit Club", }, ] - assert_resync_empty(sync, document.get("node", {})) + assert_resync_empty(sync, doc.get("node", {})) sync.search_client.close() def test_insert_non_concurrent(self, data, book_cls, publisher_cls): """Test sync insert and then sync in non-concurrent mode.""" - document = { + doc = { "index": "testdb", "database": "testdb", "nodes": { @@ -809,7 +809,7 @@ def test_insert_non_concurrent(self, data, book_cls, publisher_cls): ], }, } - sync = Sync(document) + sync = Sync(doc) sync.search_client.bulk(sync.index, sync.sync()) sync.search_client.refresh("testdb") @@ -884,14 +884,14 @@ def test_insert_non_concurrent(self, data, book_cls, publisher_cls): "title": "Encyclopedia", }, ] - assert_resync_empty(sync, document.get("node", {})) + assert_resync_empty(sync, doc.get("node", {})) sync.search_client.close() def test_update_non_primary_key_non_concurrent( self, data, book_cls, publisher_cls ): """Test sync update and then sync in non-concurrent mode.""" - document = { + doc = { "index": "testdb", "database": "testdb", "nodes": { @@ -909,7 +909,7 @@ def test_update_non_primary_key_non_concurrent( ], }, } - sync = Sync(document) + sync = Sync(doc) sync.search_client.bulk(sync.index, sync.sync()) sync.search_client.refresh("testdb") @@ -974,14 +974,14 @@ def test_update_non_primary_key_non_concurrent( "title": "The Rabbit Club", }, ] - assert_resync_empty(sync, document.get("node", {})) + assert_resync_empty(sync, doc.get("node", {})) sync.search_client.close() def test_update_non_primary_key_concurrent( self, data, book_cls, publisher_cls ): """Test sync update and then sync in concurrent mode.""" - document = { + doc = { "index": "testdb", "database": "testdb", "nodes": { @@ -999,7 +999,7 @@ def test_update_non_primary_key_concurrent( ], }, } - sync = Sync(document) + sync = Sync(doc) sync.search_client.bulk(sync.index, sync.sync()) sync.search_client.refresh("testdb") @@ -1081,12 +1081,12 @@ def poll_db(): "title": "The Rabbit Club", }, ] - assert_resync_empty(sync, document.get("node", {})) + assert_resync_empty(sync, doc.get("node", {})) sync.search_client.close() def test_delete_concurrent(self, data, book_cls, publisher_cls): """Test sync delete and then sync in concurrent mode.""" - document = { + doc = { "index": "testdb", "database": "testdb", "nodes": { @@ -1105,7 +1105,7 @@ def test_delete_concurrent(self, data, book_cls, publisher_cls): }, } - sync = Sync(document) + sync = Sync(doc) sync.search_client.bulk(sync.index, sync.sync()) sync.search_client.refresh("testdb") @@ -1200,5 +1200,5 @@ def poll_db(): "title": "The Rabbit Club", }, ] - assert_resync_empty(sync, document.get("node", {})) + assert_resync_empty(sync, doc.get("node", {})) sync.search_client.close()