Skip to content

Commit

Permalink
Drops mod_inventory indices, runs ingestion, and adds indices back
Browse files Browse the repository at this point in the history
  • Loading branch information
jermnelson committed Jul 6, 2022
1 parent eec1cd6 commit 4868877
Showing 1 changed file with 59 additions and 3 deletions.
62 changes: 59 additions & 3 deletions dags/srs_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
from pathlib import Path

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.models import Variable
from airflow.operators.python import get_current_context
from airflow.providers.postgres.operators.postgres import PostgresOperator


from filesplit.split import Split
from folio_migration_tools.library_configuration import LibraryConfiguration
Expand Down Expand Up @@ -43,10 +45,50 @@ def add_marc_to_srs():
batch POSTS to the Okapi endpoint
"""

get_inventory_indices = PostgresOperator(
task_id="get-inventory-indices",
sql="""
SELECT indexname, indexdef FROM pg_indexes
WHERE schemaname = 'sul_mod_inventory_storage' AND
(tablename = 'instance' OR tablename = 'holdings_records'
OR tablename = 'item')
""",
postgres_conn_id="postgres_folio",
database="okapi",
)

@task
def drop_indexes():
index_result = get_inventory_indices.output["return_value"]
sql = ""
for row in index_result:
name = row[0]
if name.endswith("pkey"):
continue
sql = f"{sql}DROP INDEX sul_mod_inventory_storage.{name}"
return PostgresOperator(
task_id="remove-srs-indices",
sql=sql,
postgres_conn_id="postgres_folio",
)

@task
def save_inventory_storage_sql():
"""
### Saves mod-inventory-storage indices to file
"""
index_result = get_inventory_indices.output["return_value"]
with open("sql/sul_mod_inventory_storage_indexes.sql", "w+") as fo:
for index in index_result:
fo.write(f"{index[1]}\n")
fo.write("VACUUM ANALYZE sul_mod_inventory_storage.instance;\n")
fo.write("VACUUM ANALYZE sul_mod_inventory_storage.holdings_record;\n")
fo.write("VACUUM ANALYZE sul_mod_inventory_storage.item;\n")

@task
def ingestion_marc():
"""
### Ingests
### Ingests SRS record as JSON
"""
context = get_current_context()
srs_filename = context.get("params").get("srs_filename")
Expand All @@ -67,6 +109,12 @@ def ingestion_marc():
MAX_ENTITIES=Variable.get("MAX_SRS_ENTITIES", 500),
)

restore_indices = PostgresOperator(
task_id="restore-inventory-indices",
sql="sql/sul_mod_inventory_storage_indexes.sql",
postgres_conn_id="postgres_folio",
)

@task
def cleanup():
context = get_current_context()
Expand All @@ -80,7 +128,15 @@ def finish():
srs_filename = context.get("params").get("srs_filename")
logger.info(f"Finished migration {srs_filename}")

ingestion_marc() >> cleanup() >> finish()
ingestion_marc_task = ingestion_marc()
(
get_inventory_indices
>> [save_inventory_storage_sql(), drop_indexes()]
>> ingestion_marc_task
>> restore_indices
>> cleanup()
>> finish()
)


ingest_marc_to_srs = add_marc_to_srs()

0 comments on commit 4868877

Please sign in to comment.