diff --git a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py index 9a3b83c..829e3dd 100644 --- a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py +++ b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py @@ -65,7 +65,7 @@ def dump_target_objects_to_clickhouse( if len(objects_to_submit) % batch_size == 0: sink.dump(objects_to_submit, many=True) objects_to_submit = [] - log.info(f"Last IDs: {obj.pk}") + log.info(f"Last ID: {obj.pk}") time.sleep(sleep_time) if limit and count == limit: @@ -77,6 +77,7 @@ def dump_target_objects_to_clickhouse( if objects_to_submit: sink.dump(objects_to_submit, many=True) count += len(objects_to_submit) + log.info(f"Last ID: {objects_to_submit[-1].pk}") log.info(f"Dumped {count} objects to ClickHouse") @@ -152,13 +153,13 @@ def add_arguments(self, parser): parser.add_argument( "--batch_size", type=int, - default=1000, + default=10000, help="number of objects to dump in a single batch", ) parser.add_argument( "--sleep_time", type=int, - default=10, + default=1, help="number of seconds to sleep between batches", ) @@ -193,7 +194,7 @@ def handle(self, *args, **options): log.error(message) raise CommandError(message) - Sink = ModelBaseSink.get_sink_by_model(options["object"]) + Sink = ModelBaseSink.get_sink_by_model_name(options["object"]) sink = Sink(connection_overrides, log) dump_target_objects_to_clickhouse( sink, diff --git a/event_sink_clickhouse/sinks/base_sink.py b/event_sink_clickhouse/sinks/base_sink.py index 2fbdb06..94cf717 100644 --- a/event_sink_clickhouse/sinks/base_sink.py +++ b/event_sink_clickhouse/sinks/base_sink.py @@ -368,7 +368,7 @@ def is_enabled(cls): return enabled or waffle_flag.is_enabled() @classmethod - def get_sink_by_model(cls, model): + def get_sink_by_model_name(cls, model): """ Return the sink instance for the given model """