Skip to content

Commit

Permalink
Merge c689b1c into 12be2a0
Browse files Browse the repository at this point in the history
  • Loading branch information
jermnelson committed Jun 28, 2022
2 parents 12be2a0 + c689b1c commit ae473c7
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 122 deletions.
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ Based on the documentation, [Running Airflow in Docker](https://airflow.apache.o
2. Run `git submodule init` and then `git submodule update` to populated submodule.
4. If it's commented out, uncomment the line `- ./dags:/opt/airflow/dags` in docker-compose.yaml (under `volumes`, under `x-airflow-common`).
5. Start up docker locally.
6. Build the docker image with `Docker build .`
7. Create a `.env` file with the `AIRFLOW_UID` and `AIRFLOW_GROUP` values.
8. Run `docker-compose build` to build the customized airflow image. (Note: the `usermod` command may take a while to complete when running the build.)
9. Run `docker compose up airflow-init` to initialize the Airflow the first time you deploy Airflow
10. Bring up airflow, `docker compose up` to run the containers in the
foreground, use `docker compose up -d` to run as a daemon.
9. Run `docker-compose up airflow-init` to initialize the Airflow the first time you deploy Airflow
10. Bring up airflow, `docker-compose up` to run the containers in the
foreground, use `docker-compose up -d` to run as a daemon.
1. Access Airflow locally at http://localhost
1. Log into the worker container using `docker exec -it libsys-airflow_airflow-worker-1 /bin/bash` to view the raw work files.

Expand Down
39 changes: 16 additions & 23 deletions dags/bib_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@

from plugins.folio.items import run_items_transformer, post_folio_items_records

from plugins.folio.marc import post_marc_to_srs

logger = logging.getLogger(__name__)

sul_config = LibraryConfiguration(
Expand All @@ -48,7 +46,7 @@
iteration_identifier="",
)


max_entities = Variable.get("MAX_ENTITIES", 500)
parallel_posts = Variable.get("parallel_posts", 3)

default_args = {
Expand Down Expand Up @@ -237,10 +235,7 @@ def marc_only(*args, **kwargs):
>> convert_instances_valid_json
>> finish_conversion
)
(
convert_marc_to_folio_instances
>> finish_conversion
) # noqa
(convert_marc_to_folio_instances >> finish_conversion) # noqa
marc_only_convert_check >> [
convert_tsv_to_folio_holdings,
finish_conversion,
Expand Down Expand Up @@ -273,21 +268,11 @@ def marc_only(*args, **kwargs):
post_instances = PythonOperator(
task_id=f"post_to_folio_instances_{i}",
python_callable=post_folio_instance_records,
op_kwargs={"job": i, "MAX_ENTITIES": 25},
op_kwargs={"job": i, "MAX_ENTITIES": max_entities},
)

login >> post_instances >> finish_instances

marc_to_srs = PythonOperator(
task_id="marc-to-srs",
python_callable=post_marc_to_srs,
op_kwargs={
"library_config": sul_config,
},
)

finish_instances >> marc_to_srs >> finished_all_posts

marc_only_post_check = BranchPythonOperator(
task_id="marc-only-post-check",
python_callable=marc_only,
Expand All @@ -311,7 +296,7 @@ def marc_only(*args, **kwargs):
post_holdings = PythonOperator(
task_id=f"post_to_folio_holdings_{i}",
python_callable=post_folio_holding_records,
op_kwargs={"job": i, "MAX_ENTITIES": 25},
op_kwargs={"job": i, "MAX_ENTITIES": max_entities},
)

start_holdings >> post_holdings >> finish_holdings
Expand All @@ -322,20 +307,27 @@ def marc_only(*args, **kwargs):
post_items = PythonOperator(
task_id=f"post_to_folio_items_{i}",
python_callable=post_folio_items_records,
op_kwargs={"job": i, "MAX_ENTITIES": 25},
op_kwargs={"job": i, "MAX_ENTITIES": max_entities},
)

finish_holdings >> post_items >> finish_items >> finished_all_posts

archive_instances_holdings_items = PythonOperator(
task_id="archive_converted_files",
python_callable=archive_artifacts
task_id="archive_converted_files", python_callable=archive_artifacts
)

ingest_srs_records = TriggerDagRunOperator(
task_id="ingest-srs-records",
trigger_dag_id="add_marc_to_srs",
conf={
"srs_filename": "folio_srs_instances_{{ dag_run.run_id }}_bibs-transformer.json"
},
)

remediate_errors = TriggerDagRunOperator(
task_id="remediate-errors",
trigger_dag_id="fix_failed_record_loads",
trigger_run_id="{{ dag_run.run_id }}"
trigger_run_id="{{ dag_run.run_id }}",
)

finish_loading = DummyOperator(
Expand All @@ -345,4 +337,5 @@ def marc_only(*args, **kwargs):
monitor_file_mount >> move_transform_process >> marc_to_folio
marc_to_folio >> post_to_folio
post_to_folio >> archive_instances_holdings_items >> finish_loading
finish_loading >> ingest_srs_records
finish_loading >> remediate_errors
74 changes: 74 additions & 0 deletions dags/srs_ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import logging

from datetime import datetime

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.models import Variable

from folio_migration_tools.library_configuration import LibraryConfiguration

from plugins.folio.marc import post_marc_to_srs, remove_srs_json

logger = logging.getLogger(__name__)

sul_config = LibraryConfiguration(
okapi_url=Variable.get("OKAPI_URL"),
tenant_id="sul",
okapi_username=Variable.get("FOLIO_USER"),
okapi_password=Variable.get("FOLIO_PASSWORD"),
library_name="Stanford University Libraries",
base_folder="/opt/airflow/migration",
log_level_debug=True,
folio_release="lotus",
iteration_identifier="",
)


@dag(
schedule_interval=None,
start_date=datetime(2022, 6, 23),
catchup=False,
tags=["folio", "bib_import"],
)
def add_marc_to_srs():
"""
## Adds MARC JSON to Source Record Storage
After a successful symphony_marc_import DAG run, takes the
folio_srs_instances_{dag-run}_bibs-transformer.json file and attempts to
batch POSTS to the Okapi endpoint
"""

@task
def ingestion_marc():
"""
### Ingests
"""
context = get_current_context()
srs_filename = context.get("params").get("srs_filename")
logger.info(f"Starting ingestion of {srs_filename}")

post_marc_to_srs(
dag_run=context.get("dag_run"),
library_config=sul_config,
srs_file=srs_filename,
MAX_ENTITIES=Variable.get("MAX_SRS_ENTITIES", 500),
)

@task
def cleanup():
context = get_current_context()
srs_filename = context.get("params").get("srs_filename")
logger.info(f"Removing SRS JSON {srs_filename}")
remove_srs_json(srs_filename=srs_filename)

@task
def finish():
context = get_current_context()
srs_filename = context.get("params").get("srs_filename")
logger.info(f"Finished migration {srs_filename}")

ingestion_marc() >> cleanup() >> finish()


ingest_marc_to_srs = add_marc_to_srs()
28 changes: 18 additions & 10 deletions plugins/folio/marc.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,40 @@
import logging
import pathlib

from folio_migration_tools.migration_tasks.batch_poster import BatchPoster

logger = logging.getLogger(__name__)


def post_marc_to_srs(*args, **kwargs):
dag = kwargs["dag_run"]
dag = kwargs.get("dag_run")
srs_filepath = kwargs.get("srs_file")

task_config = BatchPoster.TaskConfiguration(
name="marc-to-srs-batch-poster",
migration_task_type="BatchPoster",
object_type="SRS",
files=[{
"file_name": f"folio_srs_instances_{dag.run_id}_bibs-transformer.json" # noqa
}],
files=[{"file_name": srs_filepath}],
batch_size=kwargs.get("MAX_ENTITIES", 1000),
)

library_config = kwargs["library_config"]
library_config.iteration_identifier = dag.run_id

srs_batch_poster = BatchPoster(
task_config,
library_config,
use_logging=False
)
srs_batch_poster = BatchPoster(task_config, library_config, use_logging=False)

srs_batch_poster.do_work()

srs_batch_poster.wrap_up()

logging.info("Finished posting MARC json to SRS")
logger.info("Finished posting MARC json to SRS")


def remove_srs_json(*args, **kwargs):
airflow = kwargs.get("airflow", "/opt/airflow")
srs_filename = kwargs["srs_filename"]

srs_filepath = pathlib.Path(airflow) / f"migration/results/{srs_filename}"

srs_filepath.unlink()
logger.info(f"Removed {srs_filepath}")
112 changes: 112 additions & 0 deletions plugins/tests/mocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import logging

import pydantic
import pytest
import requests

from airflow.models import Variable
from pytest_mock import MockerFixture

logger = logging.getLogger(__name__)


@pytest.fixture
def mock_okapi_success(monkeypatch, mocker: MockerFixture):
def mock_get(*args, **kwargs):
get_response = mocker.stub(name="get_result")
get_response.status_code = 200
logger.info(f"Args:\n{args}")
logger.info(f"Kwargs:\n{kwargs}")
return get_response

def mock_post(*args, **kwargs):
post_response = mocker.stub(name="post_result")
post_response.status_code = 201
post_response.headers = {"x-okapi-token": "jwtOkapi0"}
post_response.raise_for_status = lambda: None
post_response_elapsed = mocker.stub(name="post_elapsed")
post_response_elapsed.total_seconds = lambda: 30
post_response.elapsed = post_response_elapsed
return post_response

def mock_put(*args, **kwargs):
put_response = mocker.stub(name="put_result")
put_response.status_code = 200
put_response.headers = {"x-okapi-token": "jwtOkapi0"}
put_response.raise_for_status = lambda: None
return put_response

monkeypatch.setattr(requests, "get", mock_get)
monkeypatch.setattr(requests, "post", mock_post)
monkeypatch.setattr(requests, "put", mock_put)


@pytest.fixture
def mock_dag_run(mocker: MockerFixture):
dag_run = mocker.stub(name="dag_run")
dag_run.run_id = "manual_2022-03-05"
return dag_run


@pytest.fixture
def mock_okapi_variable(monkeypatch):
def mock_get(key):
return "https://okapi-folio.dev.edu"

monkeypatch.setattr(Variable, "get", mock_get)


@pytest.fixture
def mock_file_system(tmp_path):
airflow_path = tmp_path / "opt/airflow/"

# Mock source and target dirs
source_dir = airflow_path / "symphony"
source_dir.mkdir(parents=True)

sample_marc = source_dir / "sample.mrc"
sample_marc.write_text("sample")

target_dir = airflow_path / "migration/data/instances/"
target_dir.mkdir(parents=True)
# Makes directories for different type of data
(airflow_path / "migration/data/holdings").mkdir(parents=True)
(airflow_path / "migration/data/items").mkdir(parents=True)
(airflow_path / "migration/data/users").mkdir(parents=True)

# Mock Results, Reports, Mapping Files, and Archive Directories
results_dir = airflow_path / "migration/results"
results_dir.mkdir(parents=True)
(airflow_path / "migration/reports").mkdir(parents=True)
(airflow_path / "migration/mapping_files").mkdir(parents=True)
archive_dir = airflow_path / "migration/archive"
archive_dir.mkdir(parents=True)

# Mock .gitignore
gitignore = airflow_path / "migration/.gitignore"
gitignore.write_text("results\nreports")

# mock tmp dir
tmp = tmp_path / "tmp/"
tmp.mkdir(parents=True)

return [airflow_path, source_dir, target_dir, results_dir, archive_dir, tmp]


class MockFOLIOClient(pydantic.BaseModel):
okapi_url: str = "https://okapi.edu"


class MockTaskInstance(pydantic.BaseModel):
xcom_pull = lambda *args, **kwargs: "a0token" # noqa


class MockLibraryConfig(pydantic.BaseModel):
add_time_stamp_to_file_names: bool = False
iteration_identifier: str = "a-library-config"
log_level_debug: bool = False
base_folder: str = "/opt/airflow/migration"
okapi_password: str = "43nmSUL"
okapi_url: str = "https://okapi.edu"
okapi_username: str = "admin"
tenant_id: str = "a_lib"
2 changes: 1 addition & 1 deletion plugins/tests/test_delete_archived_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dags.delete_archived_data import delete_archived_data
from plugins.folio.helpers import archive_artifacts

from plugins.tests.test_helpers import mock_dag_run, mock_file_system # noqa
from plugins.tests.mocks import mock_dag_run, mock_file_system # noqa


def test_delete_archived_data(mock_dag_run, mock_file_system): # noqa
Expand Down
Loading

0 comments on commit ae473c7

Please sign in to comment.