Skip to content

Commit

Permalink
chore: Use server-side (named) cursors instead of paginating locally h…
Browse files Browse the repository at this point in the history
…ttps://www.psycopg.org/docs/usage.html#server-side-cursors. chore: Rename plugin -> check like in DB.
  • Loading branch information
jpmckinney committed Aug 9, 2023
1 parent ea77bff commit a9dab16
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 184 deletions.
35 changes: 7 additions & 28 deletions contracting_process/field_level/report_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
logger = logging.getLogger("pelican.contracting_process.field_level.report_examples")

examples_cap = 20
page_size = 2000


def create(dataset_id):
Expand Down Expand Up @@ -68,30 +67,13 @@ def create(dataset_id):

logger.info("Starting processing pages.")

processed_count = page_size
id = -1
pager = 0

while processed_count == page_size:
processed_count = 0
cursor = get_cursor()

cursor.execute(
"""\
SELECT id, result
FROM field_level_check
WHERE
id > %(id)s
AND dataset_id = %(dataset_id)s
ORDER BY id
LIMIT %(limit)s
""",
{"id": id, "dataset_id": dataset_id, "limit": page_size},
with get_cursor(name="field_level_report_examples") as named_cursor:
named_cursor.execute(
"SELECT result FROM field_level_check WHERE dataset_id = %(dataset_id)s",
{"dataset_id": dataset_id},
)

rows = cursor.fetchall()

for row in rows:
for i, row in enumerate(named_cursor, 1):
result = row["result"]
meta = result["meta"]

Expand Down Expand Up @@ -120,11 +102,8 @@ def create(dataset_id):
if not path_check["coverage"]["overall_result"] or not path_check["quality"]["check_results"]:
break

processed_count += 1
id = row["id"]

pager += 1
logger.info("Processed page %s", pager)
if i % 1000:
logger.info("Processed %s field-level check results", i)

logger.info("Storing field level check report for dataset_id %s", dataset_id)
cursor.execute(
Expand Down
95 changes: 37 additions & 58 deletions dataset/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,86 +9,65 @@

logger = logging.getLogger("pelican.dataset.processor")

page_size = 1000


def do_work(dataset_id):
processed_count = 1000
id = -1
no_item_processed = True
pager = 0
scope = {}
check_scope = {}
meta_data_aggregator_scope = {}

while processed_count == page_size:
processed_count = 0
cursor = get_cursor()

cursor.execute(
"SELECT * FROM data_item WHERE id > %(id)s AND dataset_id = %(dataset_id)s ORDER BY id LIMIT %(limit)s",
{"id": id, "dataset_id": dataset_id, "limit": page_size},
with get_cursor(name="dataset") as named_cursor:
named_cursor.execute(
"SELECT id, data FROM data_item WHERE dataset_id = %(dataset_id)s",
{"dataset_id": dataset_id},
)

items = cursor.fetchall()

for item in items:
for plugin_name, plugin in definitions.items():
for i, item in enumerate(named_cursor, 1):
for check_name, check in definitions.items():
logger.log(
settings.CustomLogLevels.CHECK_TRACE,
"Computing %s check for item_id %s.",
plugin_name,
check_name,
item["id"],
)

scope.setdefault(plugin_name, {})
scope[plugin_name] = plugin.add_item(scope[plugin_name], item["data"], item["id"])

processed_count += 1
id = item["id"]
no_item_processed = False
check_scope.setdefault(check_name, {})
check_scope[check_name] = check.add_item(check_scope[check_name], item["data"], item["id"])

meta_data_aggregator_scope = meta_data_aggregator.add_item(
meta_data_aggregator_scope, item["data"], item["id"]
)

pager += 1
if i % 1000:
logger.info("Processed %s data items", i)

logger.info("Processed page %s", pager)

cursor.close()

if no_item_processed:
if "i" not in locals():
logger.info("No item with dataset_id %s found. Skipping dataset checks computation.", dataset_id)
return

for plugin_name, plugin in definitions.items():
logger.info("Getting result for %s dataset check.", plugin_name)
result = plugin.get_result(scope[plugin_name])
save_dataset_level_check(plugin_name, result, dataset_id)
with get_cursor() as cursor:
for check_name, check in definitions.items():
logger.info("Getting result for %s dataset check.", check_name)
result = check.get_result(check_scope[check_name])

if "meta" not in result or result["meta"] is None:
result["meta"] = {}

result["meta"]["version"] = result["version"]
meta = json.dumps(result["meta"])

cursor.execute(
"""\
INSERT INTO dataset_level_check (check_name, result, value, meta, dataset_id)
VALUES (%(check_name)s, %(result)s, %(value)s, %(meta)s, %(dataset_id)s)
""",
{
"check_name": check_name,
"result": result["result"],
"value": result["value"],
"meta": meta,
"dataset_id": dataset_id,
},
)

logger.info("Saving meta data for dataset_id %s", dataset_id)
meta_data = meta_data_aggregator.get_result(meta_data_aggregator_scope)
meta_data_aggregator.update_meta_data(meta_data, dataset_id)


def save_dataset_level_check(check_name, result, dataset_id):
if "meta" not in result or result["meta"] is None:
result["meta"] = {}

result["meta"]["version"] = result["version"]
meta = json.dumps(result["meta"])

with get_cursor() as cursor:
cursor.execute(
"""\
INSERT INTO dataset_level_check (check_name, result, value, meta, dataset_id)
VALUES (%(check_name)s, %(result)s, %(value)s, %(meta)s, %(dataset_id)s)
""",
{
"check_name": check_name,
"result": result["result"],
"value": result["value"],
"meta": meta,
"dataset_id": dataset_id,
},
)
14 changes: 11 additions & 3 deletions pelican/util/services.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import threading
from typing import Any

import psycopg2.extensions
Expand All @@ -11,6 +12,7 @@
global db_connected, db_connection
db_connected = False
db_connection = None
db_cursor_idx = 0

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,16 +71,22 @@ def publish(*args: Any, **kwargs: Any) -> None:
# PostgreSQL


def get_cursor() -> psycopg2.extensions.cursor:
def get_cursor(name="") -> psycopg2.extensions.cursor:
"""
Connect to the database, if needed, and return a database cursor.
"""
global db_connected, db_connection
global db_connected, db_connection, db_cursor_idx
if not db_connected:
db_connection = psycopg2.connect(settings.DATABASE_URL)
db_connected = True

return db_connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
kwargs = {}
if name:
db_cursor_idx += 1
# https://github.com/django/django/blob/4.2.x/django/db/backends/postgresql/base.py#L469
kwargs["name"] = f"{name}-{threading.current_thread().ident}-{db_cursor_idx}"

return db_connection.cursor(cursor_factory=psycopg2.extras.DictCursor, **kwargs)


def commit() -> None:
Expand Down
Loading

0 comments on commit a9dab16

Please sign in to comment.