Skip to content

Commit

Permalink
Merge 699c810 into 9cc0f18
Browse files Browse the repository at this point in the history
  • Loading branch information
jgreben committed May 7, 2024
2 parents 9cc0f18 + 699c810 commit 16676b5
Show file tree
Hide file tree
Showing 19 changed files with 177 additions and 100 deletions.
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/gobi_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -71,13 +71,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/google_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -69,13 +69,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/hathi_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -69,13 +69,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/nielsen_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -69,13 +69,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/pod_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -71,13 +71,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/sharevde_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -69,13 +69,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/west_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -69,13 +69,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ class DataExportDownloadView(AppBuilderBaseView):
def data_export_download_home(self):
content = []
for vendor in vendors['vendors']:
for path in pathlib.Path(f"data-export-files/{vendor}/marc-files").glob(
for path in pathlib.Path(f"data-export-files/{vendor}/marc-files/updates").glob(
"*"
):
content.append({vendor: [path_parent(path), path.name]})

for path in pathlib.Path(f"data-export-files/{vendor}/marc-files/deletes").glob(
"*"
):
content.append({vendor: [path_parent(path), path.name]})
Expand All @@ -37,6 +42,6 @@ def data_export_download_home(self):
@expose("/downloads/<vendor>/<folder>/<filename>")
def vendor_marc_record(self, vendor, folder, filename):
file_bytes = pathlib.Path(
f"data-export-files/{vendor}/{folder}/{filename}"
f"data-export-files/{vendor}/marc-files/{folder}/{filename}"
).read_bytes() # noqa
return file_bytes
67 changes: 39 additions & 28 deletions libsys_airflow/plugins/data_exports/instance_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,58 @@
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator


def fetch_record_ids(**kwargs) -> list:
def fetch_record_ids(**kwargs) -> dict:
context = get_current_context()
params = context.get("params", {}) # type: ignore
airflow = kwargs.get("airflow", "/opt/airflow/libsys_airflow")
sql_list = sql_files(params=params, airflow=airflow)
results = []

for idx, sqlfile in enumerate(sql_list):
task_id = f"postgres_query_{idx}"
with open(sqlfile) as sqf:
query = sqf.read()

from_date = params.get("from_date", datetime.now().strftime('%Y-%m-%d'))
to_date = (datetime.now() + timedelta(1)).strftime('%Y-%m-%d')

results.extend(
SQLExecuteQueryOperator(
task_id=task_id,
conn_id="postgres_folio",
database=kwargs.get("database", "okapi"),
sql=query,
parameters={
"from_date": from_date,
"to_date": to_date,
},
).execute(context)
)
results = {"updates": [], "deletes": []} # type: dict

for type in ["updates", "deletes"]:
sql_list = sql_files(params=params, airflow=airflow, type=type)

for idx, sqlfile in enumerate(sql_list):
task_id = f"postgres_query_{idx}"
with open(sqlfile) as sqf:
query = sqf.read()

from_date = params.get("from_date", datetime.now().strftime('%Y-%m-%d'))
to_date = (datetime.now() + timedelta(1)).strftime('%Y-%m-%d')

results[type].extend(
SQLExecuteQueryOperator(
task_id=task_id,
conn_id="postgres_folio",
database=kwargs.get("database", "okapi"),
sql=query,
parameters={
"from_date": from_date,
"to_date": to_date,
},
).execute(context)
)

return results


def sql_files(**kwargs) -> list:
sql_path = Path(kwargs.get("airflow", "/opt/airflow")) / "plugins/data_exports/sql"
type = kwargs.get("type")
sql_path = (
Path(kwargs.get("airflow", "/opt/airflow")) / f"plugins/data_exports/sql/{type}"
)

return list(sql_path.glob("*.sql"))


def save_ids_to_fs(**kwargs) -> str:
def save_ids_to_fs(**kwargs) -> list[str]:
ids_path = []
airflow = kwargs.get("airflow", "/opt/airflow")
task_instance = kwargs["task_instance"]
vendor = kwargs["vendor"]
data = task_instance.xcom_pull(task_ids="fetch_record_ids_from_folio")
ids_path = save_ids(airflow=airflow, data=data, vendor=vendor)

for type in ["updates", "deletes"]:
ids = save_ids(airflow=airflow, data=data[type], type=type, vendor=vendor)
ids_path.append(ids)

return ids_path

Expand All @@ -58,8 +67,10 @@ def save_ids(**kwargs) -> str:
airflow = kwargs.get("airflow", "/opt/airflow")
vendor = kwargs.get("vendor")
data = kwargs.get("data", "")
type = kwargs.get("type")

data_path = (
Path(airflow) / f"data-export-files/{vendor}/instanceids/{filestamp}.csv"
Path(airflow) / f"data-export-files/{vendor}/instanceids/{type}/{filestamp}.csv"
)
data_path.parent.mkdir(parents=True, exist_ok=True)

Expand Down
23 changes: 14 additions & 9 deletions libsys_airflow/plugins/data_exports/marc/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ def exclude_marc_by_vendor(self, marc_record: marcRecord, vendor: str):

return exclude

def retrieve_marc_for_instances(self, instance_file: pathlib.Path) -> str:
def retrieve_marc_for_instances(
self, instance_file: pathlib.Path, kind: str
) -> str:
"""
Called for each instanceid file in vendor or full-dump directory
For each ID row, writes and returns converted MARC from SRS
Expand All @@ -85,7 +87,7 @@ def retrieve_marc_for_instances(self, instance_file: pathlib.Path) -> str:
"Instance file does not exist for retrieve_marc_for_instances"
)

vendor_name = instance_file.parent.parent.name
vendor_name = instance_file.parent.parent.parent.name

marc_file = ""
with instance_file.open() as fo:
Expand All @@ -102,8 +104,9 @@ def retrieve_marc_for_instances(self, instance_file: pathlib.Path) -> str:
logger.info(f"Excluding {vendor_name}")
continue

marc_directory = instance_file.parent.parent.parent
marc_file = self.write_marc(
instance_file, instance_file.parent.parent, marc_record
instance_file, marc_directory, marc_record, kind
)

return marc_file
Expand Down Expand Up @@ -132,7 +135,7 @@ def retrieve_marc_for_full_dump(self, marc_filename: str, instance_ids: str) ->
f"Saving {len(instance_ids)} marc records to {marc_filename} in bucket."
)
marc_file = self.write_marc(
pathlib.Path(marc_filename), S3Path(full_dump_files), marc
pathlib.Path(marc_filename), S3Path(full_dump_files), marc, "."
)

return marc_file
Expand All @@ -158,22 +161,24 @@ def write_marc(
instance_file: pathlib.Path,
marc_directory: Union[pathlib.Path, S3Path],
marc: Union[list[marcRecord], marcRecord],
kind: str,
) -> str:
"""
Writes marc record to a file system (local or S3)
"""
marc_file_name = instance_file.stem
directory = marc_directory / "marc-files"
logger.info(f"Writing to directory: {directory}")
directory.mkdir(parents=True, exist_ok=True)
marc_file = directory / f"{marc_file_name}.mrc"
marc_file.touch()

mode = "wb"

if type(marc_directory).__name__ == 'PosixPath':
mode = "ab"
marc = [marc] # type: ignore
directory = directory / kind

logger.info(f"Writing to directory: {directory}")
directory.mkdir(parents=True, exist_ok=True)
marc_file = directory / f"{marc_file_name}.mrc"
marc_file.touch()

with marc_file.open(mode) as fo:
marc_writer = marcWriter(fo)
Expand Down
Loading

0 comments on commit 16676b5

Please sign in to comment.