Skip to content

Commit

Permalink
Fix linting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
jgreben committed Sep 13, 2022
1 parent f1410d5 commit a76fc57
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 60 deletions.
84 changes: 43 additions & 41 deletions dags/preceding_succeding_titles.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,55 @@
from plugins.folio.login import folio_login
from plugins.folio.helpers import post_to_okapi, put_to_okapi


logger = logging.getLogger(__name__)


"""
## Posts the json created by folio migration tools create_preceding_succeeding_titles method.
After a successful completion of the convert_instances_valid_json in
the symphony_marc_import DAG run, takes the json created by folio migration tools
and POSTS to the Okapi /preceeding-succeding-titles endpoint
"""


def post_to_folio(*args, **kwargs):
pattern = "preceding_succeding_titles*.json"
results = kwargs["results_dir"]
task_instance = kwargs["task_instance"]
jwt = task_instance.xcom_pull(task_ids="folio_login")

for file in Path(results).glob(pattern):
logger.info(f"opening {file}")
with open(file) as fo:
for obj in fo.readlines():
obj = json.loads(obj)

logger.info(f"Posting {obj}")
result = post_to_okapi(
token=jwt,
records=obj,
endpoint="/preceding-succeeding-titles",
payload_key=None,
**kwargs,
)

if "errors" in result:
logger.warn(f"{result['errors'][0]['message']} -- trying a PUT instead")
put_to_okapi(
token=jwt,
records=obj,
endpoint=f"/preceding-succeeding-titles/{obj['id']}",
payload_key=None,
)
pattern = "preceding_succeding_titles*.json"
results = kwargs["results_dir"]
task_instance = kwargs["task_instance"]
jwt = task_instance.xcom_pull(task_ids="folio_login")

for file in Path(results).glob(pattern):
logger.info(f"opening {file}")
with open(file) as fo:
for obj in fo.readlines():
obj = json.loads(obj)

logger.info(f"Posting {obj}")
result = post_to_okapi(
token=jwt,
records=obj,
endpoint="/preceding-succeeding-titles",
payload_key=None,
**kwargs,
)

if "errors" in result:
logger.warn(
f"{result['errors'][0]['message']} -- trying a PUT instead"
)
put_to_okapi(
token=jwt,
records=obj,
endpoint=f"/preceding-succeeding-titles/{obj['id']}",
payload_key=None,
)


def finish():
context = get_current_context()
_filename = context.get("params").get("filename_preceding_succeeding_titles")
logger.info(f"Finished migration {_filename}")
context = get_current_context()
_filename = context.get("params").get("filename_preceding_succeeding_titles")
logger.info(f"Finished migration {_filename}")


with DAG(
Expand All @@ -64,20 +71,15 @@ def finish():
max_active_runs=1,
) as dag:

login = PythonOperator(
task_id="folio_login", python_callable=folio_login
) # noqa
login = PythonOperator(task_id="folio_login", python_callable=folio_login) # noqa

preceding_succeeding_titles = PythonOperator(
task_id="load_preceding_succeeding_titles", python_callable=post_to_folio,
op_kwargs={
"results_dir": "/opt/airflow/migration/results" # noqa
},
task_id="load_preceding_succeeding_titles",
python_callable=post_to_folio,
op_kwargs={"results_dir": "/opt/airflow/migration/results"}, # noqa
)

wrap_up = PythonOperator(
task_id="log_finished", python_callable=finish
)
wrap_up = PythonOperator(task_id="log_finished", python_callable=finish)


login >> preceding_succeeding_titles >> wrap_up
6 changes: 2 additions & 4 deletions plugins/folio/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,7 @@ def put_to_okapi(**kwargs):
json=payload,
)

logger.info(
f"PUT Result status code {update_record_result.status_code}" # noqa
)
logger.info(f"PUT Result status code {update_record_result.status_code}") # noqa


def post_to_okapi(**kwargs) -> bool:
Expand Down Expand Up @@ -422,7 +420,7 @@ def _processes_tsv(tsv_base: str, tsv_notes: list, airflow, column_transforms):

# Add note columns to tsv_base_df if notes do not exist
if len(tsv_notes) < 1:
for note in ['CIRCNOTE', 'CIRCNOTE', 'TECHSTAFF', 'PUBLIC']:
for note in ["CIRCNOTE", "CIRCNOTE", "TECHSTAFF", "PUBLIC"]:
tsv_base_df[note] = np.NaN

tsv_notes_name_parts = tsv_base.name.split(".")
Expand Down
5 changes: 1 addition & 4 deletions plugins/tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
from airflow.models import Connection


from plugins.folio.db import (
add_inventory_triggers,
drop_inventory_triggers
)
from plugins.folio.db import add_inventory_triggers, drop_inventory_triggers


class MockCursor(pydantic.BaseModel):
Expand Down
8 changes: 5 additions & 3 deletions plugins/tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,11 @@ def mock_post(*args, **kwargs):
} ]
}""" # noqa
post_response.json = lambda: {
"errors" : [ {
"message" : "value already exists in table holdings_record: hld100000000027"
} ]
"errors": [
{
"message": "value already exists in table holdings_record: hld100000000027"
}
]
}

return post_response
Expand Down
21 changes: 13 additions & 8 deletions plugins/tests/test_preceding_succeeding_titles.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@

from pytest_mock import MockerFixture
from dags.preceding_succeding_titles import post_to_folio
from plugins.folio import helpers
from plugins.tests.mocks import ( # noqa
MockTaskInstance,
mock_file_system,
mock_dag_run,
mock_okapi_variable
mock_okapi_variable,
)


@pytest.fixture
def mock_okapi_post_error(monkeypatch, mocker: MockerFixture):
def mock_post(*args, **kwargs):
post_response = mocker.stub(name="post_result")
post_response.status_code = 400
post_response.json = lambda: { "errors" : [ {"message": "somme error"} ]}
post_response.json = lambda: {"errors": [{"message": "somme error"}]}
post_response.text = "error"
return post_response

Expand All @@ -33,19 +35,22 @@ def mock_put(*args, **kwargs):


def test_preceding_succeeding_titles(
mock_dag_run, mock_file_system, mock_okapi_post_error, mock_okapi_variable, caplog
):
mock_file_system, mock_okapi_post_error, mock_okapi_variable, caplog # noqa
):
results_dir = mock_file_system[3]

from plugins.folio import helpers
helpers._save_error_record_ids = MagicMock(airflow=results_dir)

# Create mock JSON file
titles_filename = f"preceding_succeding_titles_x.json"
titles_filename = "preceding_succeding_titles_x.json"
titles_file = results_dir / titles_filename
titles_file.write_text("""{ "id": "11111111-1111-1111-1111-111111111111", "title": "Preceding Title", "identifiers": [], "succeedingInstanceId": "22222222-2222-2222-2222-222222222222" }""")
titles_file.write_text(
"""{ "id": "11111111-1111-1111-1111-111111111111", "title": "Preceding Title", "identifiers": [], "succeedingInstanceId": "22222222-2222-2222-2222-222222222222" }"""
)

mock_task_instance = MockTaskInstance
post_to_folio(dag_run=mock_dag_run, task_instance=mock_task_instance, results_dir=results_dir)
post_to_folio(
dag_run=mock_dag_run, task_instance=mock_task_instance, results_dir=results_dir
)

assert "trying a PUT instead" in caplog.text

0 comments on commit a76fc57

Please sign in to comment.