Skip to content

Commit

Permalink
Merge pull request #866 from sul-dlss/852-sql-selections
Browse files Browse the repository at this point in the history
Select Ids using SQLExecuteQueryOperator connector
  • Loading branch information
shelleydoljack committed Mar 1, 2024
2 parents 2dbfcea + 165b75e commit 092c95f
Show file tree
Hide file tree
Showing 19 changed files with 265 additions and 113 deletions.
2 changes: 1 addition & 1 deletion config/deploy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# Default value for linked_dirs is []
# set :linked_dirs, %w[]
set :linked_dirs, %w[config vendor-data vendor-keys]
set :linked_dirs, %w[config vendor-data vendor-keys data-export-files]

# Default value for keep_releases is 5
set :keep_releases, 2
Expand Down
1 change: 1 addition & 0 deletions docker-compose.prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ x-airflow-common:
- ${AIRFLOW_PROJ_DIR:-.}/circ:/opt/airflow/circ
- ${AIRFLOW_PROJ_DIR:-.}/vendor-data:/opt/airflow/vendor-data
- ${AIRFLOW_PROJ_DIR:-.}/vendor-keys:/opt/airflow/vendor-keys
- ${AIRFLOW_PROJ_DIR:-.}/data-export-files:/opt/airflow/data-export-files
- ${AIRFLOW_PROJ_DIR:-.}/orafin-files:/opt/airflow/orafin-files
user: "${AIRFLOW_UID:-50000}:0"
extra_hosts:
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ x-airflow-common:
- ${AIRFLOW_PROJ_DIR:-.}/circ:/opt/airflow/circ
- ${AIRFLOW_PROJ_DIR:-.}/vendor-data:/opt/airflow/vendor-data
- ${AIRFLOW_PROJ_DIR:-.}/vendor-keys:/opt/airflow/vendor-keys
- ${AIRFLOW_PROJ_DIR:-.}/data-export-files:/opt/airflow/data-export-files
- ${AIRFLOW_PROJ_DIR:-.}/orafin-files:/opt/airflow/orafin-files
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
Expand Down
30 changes: 19 additions & 11 deletions libsys_airflow/dags/data_exports/gobi_selections.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.empty import EmptyOperator

from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from libsys_airflow.plugins.data_exports.instance_ids import (
fetch_record_ids,
save_ids_to_fs,
)

default_args = {
"owner": "libsys",
"depends_on_past": False,
Expand All @@ -21,13 +26,11 @@
default_args=default_args,
schedule=timedelta(days=int(Variable.get("schedule_gobi_days", 7))),
start_date=datetime(2024, 2, 26),
template_searchpath="/opt/airflow/libsys_airflow/plugins/data-export/sql",
catchup=False,
tags=["data_exports"],
tags=["data export"],
) as dag:
# Sample methods to be removed and replaced by real methods, along with imports when they are coded.
def fetch_marc_record_ids():
"Replace this with method from record selection module"

def folio_marc_records_for_id():
"Replace this with method from marc module"

Expand All @@ -37,10 +40,15 @@ def sample_marc_transform_1():
def save_transformed_marc():
"Replace this with method from marc writing module"

fetch_record_ids = PythonOperator(
fetch_folio_record_ids = PythonOperator(
task_id="fetch_record_ids_from_folio",
python_callable=fetch_marc_record_ids,
op_kwargs={},
python_callable=fetch_record_ids,
)

save_ids_to_file = PythonOperator(
task_id="save_ids_to_file",
python_callable=save_ids_to_fs,
op_kwargs={"vendor": "gobi"},
)

fetch_marc_records = PythonOperator(
Expand Down Expand Up @@ -72,6 +80,6 @@ def save_transformed_marc():
)


fetch_record_ids >> fetch_marc_records >> transform_marc_record
transform_marc_record >> write_marc_to_fs >> finish_fetching_marc
finish_fetching_marc >> send_to_vendor
fetch_folio_record_ids >> save_ids_to_file >> fetch_marc_records
fetch_marc_records >> transform_marc_record >> write_marc_to_fs
write_marc_to_fs >> finish_fetching_marc >> send_to_vendor
28 changes: 17 additions & 11 deletions libsys_airflow/dags/data_exports/google_selections.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.empty import EmptyOperator
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from libsys_airflow.plugins.data_exports.instance_ids import (
fetch_record_ids,
save_ids_to_fs,
)

default_args = {
"owner": "libsys",
"depends_on_past": False,
Expand All @@ -22,12 +26,9 @@
schedule=timedelta(days=int(Variable.get("schedule_google_days", 1))),
start_date=datetime(2024, 2, 26),
catchup=False,
tags=["data_exports"],
tags=["data export"],
) as dag:
# Sample methods to be removed and replaced by real methods, along with imports when they are coded.
def fetch_marc_record_ids():
"Replace this with method from record selection module"

def folio_marc_records_for_id():
"Replace this with method from marc module"

Expand All @@ -37,10 +38,15 @@ def sample_marc_transform_1():
def save_transformed_marc():
"Replace this with method from marc writing module"

fetch_record_ids = PythonOperator(
fetch_folio_record_ids = PythonOperator(
task_id="fetch_record_ids_from_folio",
python_callable=fetch_marc_record_ids,
op_kwargs={},
python_callable=fetch_record_ids,
)

save_ids_to_file = PythonOperator(
task_id="save_ids_to_file",
python_callable=save_ids_to_fs,
op_kwargs={"vendor": "google"},
)

fetch_marc_records = PythonOperator(
Expand Down Expand Up @@ -72,6 +78,6 @@ def save_transformed_marc():
)


fetch_record_ids >> fetch_marc_records >> transform_marc_record
transform_marc_record >> write_marc_to_fs >> finish_fetching_marc
finish_fetching_marc >> send_to_vendor
fetch_folio_record_ids >> save_ids_to_file >> fetch_marc_records
fetch_marc_records >> transform_marc_record >> write_marc_to_fs
write_marc_to_fs >> finish_fetching_marc >> send_to_vendor
28 changes: 17 additions & 11 deletions libsys_airflow/dags/data_exports/hathi_selections.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.empty import EmptyOperator
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from libsys_airflow.plugins.data_exports.instance_ids import (
fetch_record_ids,
save_ids_to_fs,
)

default_args = {
"owner": "libsys",
"depends_on_past": False,
Expand All @@ -22,12 +26,9 @@
schedule=timedelta(days=int(Variable.get("schedule_hathi_days", 1))),
start_date=datetime(2024, 2, 26),
catchup=False,
tags=["data_exports"],
tags=["data export"],
) as dag:
# Sample methods to be removed and replaced by real methods, along with imports when they are coded.
def fetch_marc_record_ids():
"Replace this with method from record selection module"

def folio_marc_records_for_id():
"Replace this with method from marc module"

Expand All @@ -37,10 +38,15 @@ def sample_marc_transform_1():
def save_transformed_marc():
"Replace this with method from marc writing module"

fetch_record_ids = PythonOperator(
fetch_folio_record_ids = PythonOperator(
task_id="fetch_record_ids_from_folio",
python_callable=fetch_marc_record_ids,
op_kwargs={},
python_callable=fetch_record_ids,
)

save_ids_to_file = PythonOperator(
task_id="save_ids_to_file",
python_callable=save_ids_to_fs,
op_kwargs={"vendor": "hathi"},
)

fetch_marc_records = PythonOperator(
Expand Down Expand Up @@ -72,6 +78,6 @@ def save_transformed_marc():
)


fetch_record_ids >> fetch_marc_records >> transform_marc_record
transform_marc_record >> write_marc_to_fs >> finish_fetching_marc
finish_fetching_marc >> send_to_vendor
fetch_folio_record_ids >> save_ids_to_file >> fetch_marc_records
fetch_marc_records >> transform_marc_record >> write_marc_to_fs
write_marc_to_fs >> finish_fetching_marc >> send_to_vendor
28 changes: 17 additions & 11 deletions libsys_airflow/dags/data_exports/nielsen_selections.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.empty import EmptyOperator
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from libsys_airflow.plugins.data_exports.instance_ids import (
fetch_record_ids,
save_ids_to_fs,
)

default_args = {
"owner": "libsys",
"depends_on_past": False,
Expand All @@ -22,12 +26,9 @@
schedule=timedelta(days=int(Variable.get("schedule_nielsen_days", 1))),
start_date=datetime(2024, 2, 26),
catchup=False,
tags=["data_exports"],
tags=["data export"],
) as dag:
# Sample methods to be removed and replaced by real methods, along with imports when they are coded.
def fetch_marc_record_ids():
"Replace this with method from record selection module"

def folio_marc_records_for_id():
"Replace this with method from marc module"

Expand All @@ -37,10 +38,15 @@ def sample_marc_transform_1():
def save_transformed_marc():
"Replace this with method from marc writing module"

fetch_record_ids = PythonOperator(
fetch_folio_record_ids = PythonOperator(
task_id="fetch_record_ids_from_folio",
python_callable=fetch_marc_record_ids,
op_kwargs={},
python_callable=fetch_record_ids,
)

save_ids_to_file = PythonOperator(
task_id="save_ids_to_file",
python_callable=save_ids_to_fs,
op_kwargs={"vendor": "nielson"},
)

fetch_marc_records = PythonOperator(
Expand Down Expand Up @@ -72,6 +78,6 @@ def save_transformed_marc():
)


fetch_record_ids >> fetch_marc_records >> transform_marc_record
transform_marc_record >> write_marc_to_fs >> finish_fetching_marc
finish_fetching_marc >> send_to_vendor
fetch_folio_record_ids >> save_ids_to_file >> fetch_marc_records
fetch_marc_records >> transform_marc_record >> write_marc_to_fs
write_marc_to_fs >> finish_fetching_marc >> send_to_vendor
37 changes: 19 additions & 18 deletions libsys_airflow/dags/data_exports/oclc_selections.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.empty import EmptyOperator
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

import logging

logger = logging.getLogger(__name__)
from libsys_airflow.plugins.data_exports.instance_ids import (
fetch_record_ids,
save_ids_to_fs,
)

default_args = {
"owner": "libsys",
Expand All @@ -20,32 +20,33 @@
"retry_delay": timedelta(minutes=1),
}


with DAG(
"select_oclc_records",
default_args=default_args,
schedule=timedelta(days=int(Variable.get("schedule_oclc_days", 7))),
start_date=datetime(2024, 2, 25),
catchup=False,
tags=["data_exports"],
tags=["data export"],
) as dag:
# Sample methods to be removed and replaced by real methods, along with imports when they are coded.
def fetch_marc_record_ids():
logger.info("Replace this with method from record selection module")

def folio_marc_records_for_id():
logger.info("Replace this with method from marc module")
"Replace this with method from marc module"

def sample_marc_transform_1():
logger.info("Replace this with method from marc processing module")
"Replace this with method from marc processing module"

def save_transformed_marc():
logger.info("Replace this with method from marc writing module")
"Replace this with method from marc writing module"

fetch_record_ids = PythonOperator(
fetch_folio_record_ids = PythonOperator(
task_id="fetch_record_ids_from_folio",
python_callable=fetch_marc_record_ids,
op_kwargs={},
python_callable=fetch_record_ids,
)

save_ids_to_file = PythonOperator(
task_id="save_ids_to_file",
python_callable=save_ids_to_fs,
op_kwargs={"vendor": "oclc"},
)

fetch_marc_records = PythonOperator(
Expand Down Expand Up @@ -77,6 +78,6 @@ def save_transformed_marc():
)


fetch_record_ids >> fetch_marc_records >> transform_marc_record
transform_marc_record >> write_marc_to_fs >> finish_fetching_marc
finish_fetching_marc >> send_to_vendor
fetch_folio_record_ids >> save_ids_to_file >> fetch_marc_records
fetch_marc_records >> transform_marc_record >> write_marc_to_fs
write_marc_to_fs >> finish_fetching_marc >> send_to_vendor
Loading

0 comments on commit 092c95f

Please sign in to comment.