Skip to content

Commit

Permalink
Merge 58b8d76 into 9cc0f18
Browse files Browse the repository at this point in the history
  • Loading branch information
jgreben committed May 7, 2024
2 parents 9cc0f18 + 58b8d76 commit 7bc2a8d
Show file tree
Hide file tree
Showing 22 changed files with 222 additions and 125 deletions.
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/gobi_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -71,13 +71,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/google_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -69,13 +69,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/hathi_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -69,13 +69,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/nielsen_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -69,13 +69,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/pod_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -71,13 +71,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/sharevde_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -69,13 +69,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
7 changes: 4 additions & 3 deletions libsys_airflow/dags/data_exports/west_selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from libsys_airflow.plugins.data_exports.marc.exports import marc_for_instances
from libsys_airflow.plugins.data_exports.marc.transforms import (
add_holdings_items_to_marc_files,
remove_marc_fields,
remove_fields_from_marc_files,
)

default_args = {
Expand Down Expand Up @@ -69,13 +69,14 @@
task_id="transform_folio_marc_record",
python_callable=add_holdings_items_to_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}",
"full_dump": False,
},
)

transform_marc_fields = PythonOperator(
task_id="transform_folio_remove_marc_fields",
python_callable=remove_marc_fields,
python_callable=remove_fields_from_marc_files,
op_kwargs={
"marc_file_list": "{{ ti.xcom_pull('fetch_marc_records_from_folio') }}"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ class DataExportDownloadView(AppBuilderBaseView):
def data_export_download_home(self):
content = []
for vendor in vendors['vendors']:
for path in pathlib.Path(f"data-export-files/{vendor}/marc-files").glob(
"*"
):
for path in pathlib.Path(
f"data-export-files/{vendor}/marc-files/updates"
).glob("*"):
content.append({vendor: [path_parent(path), path.name]})

for path in pathlib.Path(
f"data-export-files/{vendor}/marc-files/deletes"
).glob("*"):
content.append({vendor: [path_parent(path), path.name]})

for path in pathlib.Path(f"data-export-files/{vendor}/transmitted").glob(
Expand All @@ -37,6 +42,6 @@ def data_export_download_home(self):
@expose("/downloads/<vendor>/<folder>/<filename>")
def vendor_marc_record(self, vendor, folder, filename):
file_bytes = pathlib.Path(
f"data-export-files/{vendor}/{folder}/{filename}"
f"data-export-files/{vendor}/marc-files/{folder}/{filename}"
).read_bytes() # noqa
return file_bytes
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
vendors = json.load(vendor_file)


def upload_data_export_ids(ids_df: pd.DataFrame, vendor: str) -> str:
def upload_data_export_ids(ids_df: pd.DataFrame, vendor: str, kind: str) -> str:
if len(ids_df.columns) > 1:
raise ValueError("ID file has more than one column.")
tuples = list(ids_df.itertuples(index=False, name=None))
Expand All @@ -24,7 +24,7 @@ def upload_data_export_ids(ids_df: pd.DataFrame, vendor: str) -> str:
):
raise ValueError(f"{id[0]} is not a UUID.")

ids_path = save_ids(airflow="/opt/airflow", vendor=vendor, data=tuples)
ids_path = save_ids(airflow="/opt/airflow", vendor=vendor, data=tuples, kind=kind)

return ids_path

Expand All @@ -47,11 +47,12 @@ def run_data_export_upload(self):
try:
raw_csv = request.files["upload-data-export-ids"]
vendor = request.form.get("vendor")
kind = request.form.get("kind")
ids_df = pd.read_csv(raw_csv)
if not vendor:
raise Exception("You must choose a vendor!")
else:
upload_data_export_ids(ids_df, vendor)
upload_data_export_ids(ids_df, vendor, kind)
flash("Sucessfully uploaded ID file.")
except pd.errors.EmptyDataError:
flash("Warning! Empty UUID file.")
Expand Down
67 changes: 39 additions & 28 deletions libsys_airflow/plugins/data_exports/instance_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,58 @@
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator


def fetch_record_ids(**kwargs) -> list:
def fetch_record_ids(**kwargs) -> dict:
context = get_current_context()
params = context.get("params", {}) # type: ignore
airflow = kwargs.get("airflow", "/opt/airflow/libsys_airflow")
sql_list = sql_files(params=params, airflow=airflow)
results = []

for idx, sqlfile in enumerate(sql_list):
task_id = f"postgres_query_{idx}"
with open(sqlfile) as sqf:
query = sqf.read()

from_date = params.get("from_date", datetime.now().strftime('%Y-%m-%d'))
to_date = (datetime.now() + timedelta(1)).strftime('%Y-%m-%d')

results.extend(
SQLExecuteQueryOperator(
task_id=task_id,
conn_id="postgres_folio",
database=kwargs.get("database", "okapi"),
sql=query,
parameters={
"from_date": from_date,
"to_date": to_date,
},
).execute(context)
)
results = {"updates": [], "deletes": []} # type: dict

for kind in ["updates", "deletes"]:
sql_list = sql_files(params=params, airflow=airflow, kind=kind)

for idx, sqlfile in enumerate(sql_list):
task_id = f"postgres_query_{idx}"
with open(sqlfile) as sqf:
query = sqf.read()

from_date = params.get("from_date", datetime.now().strftime('%Y-%m-%d'))
to_date = (datetime.now() + timedelta(1)).strftime('%Y-%m-%d')

results[type].extend(
SQLExecuteQueryOperator(
task_id=task_id,
conn_id="postgres_folio",
database=kwargs.get("database", "okapi"),
sql=query,
parameters={
"from_date": from_date,
"to_date": to_date,
},
).execute(context)
)

return results


def sql_files(**kwargs) -> list:
sql_path = Path(kwargs.get("airflow", "/opt/airflow")) / "plugins/data_exports/sql"
kind = kwargs.get("kind")
sql_path = (
Path(kwargs.get("airflow", "/opt/airflow")) / f"plugins/data_exports/sql/{kind}"
)

return list(sql_path.glob("*.sql"))


def save_ids_to_fs(**kwargs) -> str:
def save_ids_to_fs(**kwargs) -> list[str]:
ids_path = []
airflow = kwargs.get("airflow", "/opt/airflow")
task_instance = kwargs["task_instance"]
vendor = kwargs["vendor"]
data = task_instance.xcom_pull(task_ids="fetch_record_ids_from_folio")
ids_path = save_ids(airflow=airflow, data=data, vendor=vendor)

for kind in ["updates", "deletes"]:
ids = save_ids(airflow=airflow, data=data[kind], kind=kind, vendor=vendor)
ids_path.append(ids)

return ids_path

Expand All @@ -58,8 +67,10 @@ def save_ids(**kwargs) -> str:
airflow = kwargs.get("airflow", "/opt/airflow")
vendor = kwargs.get("vendor")
data = kwargs.get("data", "")
kind = kwargs.get("kind")

data_path = (
Path(airflow) / f"data-export-files/{vendor}/instanceids/{filestamp}.csv"
Path(airflow) / f"data-export-files/{vendor}/instanceids/{kind}/{filestamp}.csv"
)
data_path.parent.mkdir(parents=True, exist_ok=True)

Expand Down
Loading

0 comments on commit 7bc2a8d

Please sign in to comment.