Skip to content

Commit

Permalink
Merge pull request #976 from sul-dlss/t966-vma-ftp-download-fix
Browse files Browse the repository at this point in the history
Uses Vendor and Vendor Interface to query Database
  • Loading branch information
jermnelson committed May 16, 2024
2 parents b0f8067 + 5eff04e commit 376910f
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 93 deletions.
10 changes: 7 additions & 3 deletions libsys_airflow/dags/vendor/data_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@
def setup():
context = get_current_context()
params = context["params"]
params["download_path"] = download_path(
dag_run_download_path = download_path(
params["vendor_uuid"], params["vendor_interface_uuid"]
)
params["environment"] = os.getenv('HONEYBADGER_ENVIRONMENT', 'development')

logger.info(f"Params are {params}")
dag_run_download_path.mkdir(exist_ok=True)

# XCOM cannot serialize pathlib Path object
params["download_path"] = str(dag_run_download_path)

os.makedirs(params["download_path"], exist_ok=True)
logger.info(f"Params are {params}")

return params

Expand All @@ -80,6 +83,7 @@ def setup():
params["remote_path"],
params["download_path"],
params["filename_regex"],
params["vendor_uuid"],
params["vendor_interface_uuid"],
)

Expand Down
4 changes: 3 additions & 1 deletion libsys_airflow/dags/vendor/default_data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ def setup():
params["archive_regex"] = processing_options.get("archive_regex")

logger.info(f"Params are {params}")
assert os.path.exists(os.path.join(params["download_path"], params["filename"]))
assert (params["download_path"] / params["filename"]).exists()
# XCOM cannot serialize pathlib Paths
params["download_path"] = str(params["download_path"])

return params

Expand Down
30 changes: 16 additions & 14 deletions libsys_airflow/plugins/vendor/archive.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
import os
import shutil

from datetime import date
from pathlib import Path

from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
Expand All @@ -18,19 +19,23 @@
def archive_task(
downloaded_files: list[str],
download_path: str,
vendor_interface_uuid: str,
vendor_uuid: str,
vendor_interface_uuid: str,
):
pg_hook = PostgresHook("vendor_loads")
with Session(pg_hook.get_sqlalchemy_engine()) as session:
archive(
downloaded_files, download_path, vendor_interface_uuid, vendor_uuid, session
downloaded_files,
Path(download_path),
vendor_uuid,
vendor_interface_uuid,
session,
)


def archive(
downloaded_files: list[str],
download_path: str,
download_path: Path,
vendor_uuid: str,
vendor_interface_uuid: str,
session: Session,
Expand All @@ -39,7 +44,9 @@ def archive(
logger.info("No files to archive")
return

vendor_interface = VendorInterface.load(vendor_interface_uuid, session)
vendor_interface = VendorInterface.load_with_vendor(
vendor_uuid, vendor_interface_uuid, session
)
for filename in downloaded_files:
vendor_file = VendorFile.load_with_vendor_interface(
vendor_interface, filename, session
Expand All @@ -48,24 +55,19 @@ def archive(


def archive_file(
download_path: str,
download_path: Path,
vendor_file: VendorFile,
session: Session,
):
download_filepath = _filepath(download_path, vendor_file.vendor_filename)
download_filepath = download_path / vendor_file.vendor_filename
archive_path = get_archive_path(
vendor_file.vendor_interface.vendor.folio_organization_uuid,
vendor_file.vendor_interface.interface_uuid,
date.today(),
)
archive_filepath = _filepath(archive_path, vendor_file.vendor_filename)
print(f"Archive path: {archive_filepath}")
os.makedirs(archive_path, exist_ok=True)
archive_filepath = archive_path / vendor_file.vendor_filename
archive_path.mkdir(parents=True, exist_ok=True)
shutil.copyfile(download_filepath, archive_filepath)
vendor_file.archive_date = date.today()
session.commit()
logger.info(f"Archived {vendor_file.vendor_filename} to {archive_filepath}")


def _filepath(path: str, filename: str) -> str:
return os.path.join(path, filename)
13 changes: 12 additions & 1 deletion libsys_airflow/plugins/vendor/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def ftp_download_task(
remote_path: str,
download_path: str,
filename_regex: str,
vendor_uuid: str,
vendor_interface_uuid: str,
) -> list[str]:
logger.info(
Expand All @@ -111,6 +112,7 @@ def ftp_download_task(
remote_path or "",
download_path,
filter_strategy,
vendor_uuid,
vendor_interface_uuid,
mod_date_after,
)
Expand All @@ -136,6 +138,7 @@ def download(
remote_path: str,
download_path: str,
filter_strategy: Callable,
vendor_uuid: str,
vendor_interface_uuid: str,
mod_date_after: Optional[datetime],
) -> list[str]:
Expand All @@ -158,6 +161,7 @@ def download(
filtered_filenames,
adapter,
mod_date_after,
vendor_uuid,
vendor_interface_uuid,
engine,
)
Expand All @@ -179,6 +183,7 @@ def download(
filename,
adapter.get_size(filename),
"fetching_error",
vendor_uuid,
vendor_interface_uuid,
mod_time,
engine,
Expand All @@ -190,6 +195,7 @@ def download(
filename,
adapter.get_size(filename),
"fetched",
vendor_uuid,
vendor_interface_uuid,
mod_time,
engine,
Expand All @@ -211,12 +217,15 @@ def _record_vendor_file(
filename: str,
filesize: int | str | None,
status: str,
vendor_uuid: str,
vendor_interface_uuid: str,
vendor_timestamp: datetime,
engine: Engine,
):
with Session(engine) as session:
vendor_interface = VendorInterface.load(vendor_interface_uuid, session)
vendor_interface = VendorInterface.load_with_vendor(
vendor_uuid, vendor_interface_uuid, session
)
existing_vendor_file = VendorFile.load_with_vendor_interface(
vendor_interface, filename, session
)
Expand Down Expand Up @@ -317,6 +326,7 @@ def _filter_mod_date(
filenames: list[str],
adapter: Union[FTPAdapter, SFTPAdapter],
mod_date_after: Optional[datetime],
vendor_uuid: str,
vendor_interface_uuid: str,
engine: Engine,
) -> list[str]:
Expand All @@ -333,6 +343,7 @@ def _filter_mod_date(
filename,
adapter.get_size(filename),
"skipped",
vendor_uuid,
vendor_interface_uuid,
mod_time,
engine,
Expand Down
23 changes: 22 additions & 1 deletion libsys_airflow/plugins/vendor/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from datetime import datetime
import enum
import re
Expand All @@ -18,6 +19,9 @@
from sqlalchemy.sql.expression import true
from typing import List, Any

logger = logging.getLogger(__name__)

UPLOAD_FILE_REGEX = re.compile(r'^upload_only-(\d+)$')

Model = declarative_base()

Expand Down Expand Up @@ -161,7 +165,7 @@ def upload_only(self):

@classmethod
def load(cls, interface_uuid: str, session: Session) -> 'VendorInterface':
match = re.match(r'^upload_only-(\d+)$', interface_uuid)
match = UPLOAD_FILE_REGEX.match(interface_uuid)
if match:
id = int(match.group(1))
return session.get(cls, id)
Expand All @@ -170,6 +174,23 @@ def load(cls, interface_uuid: str, session: Session) -> 'VendorInterface':
select(cls).where(cls.folio_interface_uuid == interface_uuid)
).first()

@classmethod
def load_with_vendor(
cls, vendor_uuid: str, interface_uuid: str, session: Session
) -> 'VendorInterface':
match = UPLOAD_FILE_REGEX.match(interface_uuid)
if match:
id = int(match.group(1))
return session.get(cls, id)
vendor = session.scalars(
select(Vendor).where(Vendor.folio_organization_uuid == vendor_uuid)
).first()
return session.scalars(
select(cls)
.where(cls.folio_interface_uuid == interface_uuid)
.where(cls.vendor_id == vendor.id)
).first()


class FileStatus(enum.Enum):
not_fetched = "not_fetched"
Expand Down
28 changes: 14 additions & 14 deletions libsys_airflow/plugins/vendor/paths.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import os
from datetime import date
from pathlib import Path


def vendor_data_basepath() -> str:
return "/opt/airflow/vendor-data"
def vendor_data_basepath() -> Path:
return Path("/opt/airflow/vendor-data")


def downloads_basepath() -> str:
return os.path.join(vendor_data_basepath(), "downloads")
def downloads_basepath() -> Path:
return vendor_data_basepath() / "downloads"


def download_path(vendor_uuid: str, vendor_interface_uuid: str) -> str:
return os.path.join(downloads_basepath(), vendor_uuid, vendor_interface_uuid)
def download_path(vendor_uuid: str, vendor_interface_uuid: str) -> Path:
return downloads_basepath() / vendor_uuid / vendor_interface_uuid


def archive_basepath() -> str:
return os.path.join(vendor_data_basepath(), "archive")
def archive_basepath() -> Path:
return vendor_data_basepath() / "archive"


def archive_path(vendor_uuid: str, vendor_interface_uuid: str, archive_date: date):
return os.path.join(
archive_basepath(),
archive_date.strftime("%Y%m%d"),
vendor_uuid,
vendor_interface_uuid,
return (
archive_basepath()
/ archive_date.strftime("%Y%m%d")
/ vendor_uuid
/ vendor_interface_uuid
)
10 changes: 4 additions & 6 deletions libsys_airflow/plugins/vendor/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _extract_uuids(directory: str):
return output


def find_directories(archive_directory: str) -> list[str]:
def find_directories(archive_directory: pathlib.Path) -> list[str]:
"""
Iterates through archives to determine what vendor management
directories to delete based on age
Expand All @@ -79,24 +79,22 @@ def find_directories(archive_directory: str) -> list[str]:
prior_datestamp = (datetime.utcnow() - timedelta(days=PRIOR_DAYS)).strftime(
"%Y%m%d"
)
archive_directory_path = pathlib.Path(archive_directory)
for directory in sorted(archive_directory_path.iterdir()):
for directory in sorted(archive_directory.iterdir()):
if directory.stem <= prior_datestamp:
target_dirs.append(str(directory))
if len(target_dirs) < 1:
logger.info("No directories available for purging")
return target_dirs


def find_files(downloads_directory: str):
def find_files(downloads_directory: pathlib.Path):
"""
Iterates through downloads directory determing what files to
delete based on the file's age
"""
prior_timestamp = (datetime.utcnow() - timedelta(days=PRIOR_DAYS)).timestamp()
downloads_path = pathlib.Path(downloads_directory)
files = []
for file_path in downloads_path.glob("**/*"):
for file_path in downloads_directory.glob("**/*"):
if file_path.is_file() and file_path.stat().st_mtime <= prior_timestamp:
logger.info(f"Found {file_path}")
files.append(str(file_path.absolute()))
Expand Down

0 comments on commit 376910f

Please sign in to comment.