Skip to content

Commit

Permalink
Merge 1b5a05b into 6035c6b
Browse files Browse the repository at this point in the history
  • Loading branch information
jgreben committed Apr 18, 2024
2 parents 6035c6b + 1b5a05b commit 3ccaa31
Show file tree
Hide file tree
Showing 21 changed files with 651 additions and 536 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1,4 +1,5 @@
log*
!log_config.py
.env
__pycache__/
.coveralls.yml
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Expand Up @@ -10,7 +10,7 @@ ENV SLUGIFY_USES_TEXT_UNIDECODE "yes"
USER airflow

COPY airflow.cfg requirements.txt pyproject.toml qa.sql poetry.lock ./

COPY log_config.py ./config/log_config.py
COPY libsys_airflow ./libsys_airflow
COPY bin ./bin

Expand Down
2 changes: 2 additions & 0 deletions Gemfile
Expand Up @@ -6,4 +6,6 @@ group :deployment do
gem 'capistrano'
gem 'capistrano-bundler'
gem 'dlss-capistrano'
gem 'ed25519'
gem 'bcrypt_pbkdf'
end
39 changes: 25 additions & 14 deletions Gemfile.lock
@@ -1,12 +1,14 @@
GEM
remote: https://rubygems.org/
specs:
airbrussh (1.4.0)
airbrussh (1.5.1)
sshkit (>= 1.6.1, != 1.7.0)
bundler-audit (0.9.0.1)
base64 (0.2.0)
bcrypt_pbkdf (1.1.0)
bundler-audit (0.9.1)
bundler (>= 1.2.0, < 3)
thor (~> 1.0)
capistrano (3.17.0)
capistrano (3.18.1)
airbrussh (>= 1.0.0)
i18n
rake (>= 10.0.0)
Expand All @@ -15,36 +17,45 @@ GEM
bundler-audit (~> 0.5)
capistrano (~> 3.0)
capistrano-bundler (>= 1.4)
capistrano-bundler (2.0.1)
capistrano-bundler (2.1.0)
capistrano (~> 3.1)
capistrano-one_time_key (0.1.0)
capistrano (~> 3.0)
capistrano-shared_configs (0.2.2)
concurrent-ruby (1.1.10)
dlss-capistrano (4.0.0)
concurrent-ruby (1.2.3)
dlss-capistrano (4.4.0)
capistrano (~> 3.0)
capistrano-bundle_audit (>= 0.3.0)
capistrano-one_time_key
capistrano-shared_configs
i18n (1.10.0)
ed25519 (1.3.0)
i18n (1.14.4)
concurrent-ruby (~> 1.0)
net-scp (3.0.0)
net-ssh (>= 2.6.5, < 7.0.0)
net-ssh (6.1.0)
rake (13.0.6)
sshkit (1.21.2)
mutex_m (0.2.0)
net-scp (4.0.0)
net-ssh (>= 2.6.5, < 8.0.0)
net-sftp (4.0.0)
net-ssh (>= 5.0.0, < 8.0.0)
net-ssh (7.2.3)
rake (13.2.0)
sshkit (1.22.1)
base64
mutex_m
net-scp (>= 1.1.2)
net-sftp (>= 2.1.2)
net-ssh (>= 2.8.0)
thor (1.2.1)
thor (1.3.1)

PLATFORMS
ruby
x86_64-darwin-20
x86_64-darwin-22

DEPENDENCIES
bcrypt_pbkdf
capistrano
capistrano-bundler
dlss-capistrano
ed25519

BUNDLED WITH
2.4.13
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -60,7 +60,7 @@ Based on the documentation, [Running Airflow in Docker](https://airflow.apache.o
6. Run `docker compose build` to build the customized Airflow image. (Note: the `usermod` command may take a while to complete when running the build.)
7. Run `docker compose up airflow-init` to initialize the Airflow database and create a user the first time you deploy Airflow.
8. Bring up Airflow, `docker compose up` to run the containers in the foreground. Use `docker compose up -d` to run as a daemon.
9. Access Airflow locally at http://localhost:3000. The default username and password are both `airflow`.
9. Access Airflow locally at http://localhost:8080. The default username and password are both `airflow`.
10. Log into the worker container using `docker exec -it libsys-airflow-airflow-worker-1 /bin/bash` to view the raw work files.

### For FOLIO migration loads
Expand Down
3 changes: 3 additions & 0 deletions airflow.cfg
Expand Up @@ -4,3 +4,6 @@ plugins_folder = /opt/airflow/libsys_airflow/plugins

[webserver]
warn_deployment_exposure = False

[logging]
logging_config_class = log_config.LOGGING_CONFIG
2 changes: 1 addition & 1 deletion config/deploy.rb
Expand Up @@ -17,7 +17,7 @@

# Default value for linked_dirs is []
# set :linked_dirs, %w[]
set :linked_dirs, %w[config vendor-data vendor-keys data-export-files]
set :linked_dirs, %w[.aws config vendor-data vendor-keys data-export-files]

# Default value for keep_releases is 5
set :keep_releases, 2
Expand Down
9 changes: 6 additions & 3 deletions docker-compose.prod.yaml
Expand Up @@ -68,6 +68,7 @@ x-airflow-common:
# Use simple http server on scheduler for health checks
# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
# yamllint enable rule:line-length
AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS: config.log_config.LOGGING_CONFIG
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
AIRFLOW_VAR_AEON_URL: ${AIRFLOW_VAR_AEON_URL}
AIRFLOW_VAR_AEON_KEY: ${AIRFLOW_VAR_AEON_KEY}
Expand Down Expand Up @@ -100,19 +101,21 @@ x-airflow-common:
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
DATABASE_USERNAME: ${DATABASE_USERNAME}
DATABASE_MAX_OVERFLOW: ${DATABASE_MAX_OVERFLOW:-20}
# HB Python library uses HONEYBADGER_ENVIRONMENT, which is different from Ruby library.
# HB library uses HONEYBADGER_ENVIRONMENT, which is different from Ruby library.
HONEYBADGER_ENVIRONMENT: ${HONEYBADGER_ENV}
HONEYBADGER_API_KEY: ${HONEYBADGER_API_KEY}
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/libsys_airflow:/opt/airflow/libsys_airflow
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/symphony:/opt/airflow/symphony
- ${AIRFLOW_PROJ_DIR:-.}/migration:/opt/airflow/migration
- ${AIRFLOW_PROJ_DIR:-.}/circ:/opt/airflow/circ
- ${AIRFLOW_PROJ_DIR:-.}/vendor-data:/opt/airflow/vendor-data
- ${AIRFLOW_PROJ_DIR:-.}/vendor-keys:/opt/airflow/vendor-keys
- ${AIRFLOW_PROJ_DIR:-.}/data-export-files:/opt/airflow/data-export-files
- ${AIRFLOW_PROJ_DIR:-.}/orafin-files:/opt/airflow/orafin-files
- ${AIRFLOW_PROJ_DIR:-.}/.aws:/home/airflow/.aws
user: "${AIRFLOW_UID:-50000}:0"
extra_hosts:
- host.docker.internal:host-gateway
Expand Down Expand Up @@ -250,8 +253,8 @@ services:
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/libsys_airflow/dags /sources/libsys_airflow/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,libsys_airflow/{dags,plugins}}
mkdir -p /sources/logs /sources/config /sources/libsys_airflow/dags /sources/libsys_airflow/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,config,libsys_airflow/{dags,plugins}}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
Expand Down
6 changes: 4 additions & 2 deletions docker-compose.yaml
Expand Up @@ -53,6 +53,7 @@ x-airflow-common:
build: .
environment:
&airflow-common-env
AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS: config.log_config.LOGGING_CONFIG
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
Expand Down Expand Up @@ -100,6 +101,7 @@ x-airflow-common:
- ${AIRFLOW_PROJ_DIR:-.}/vendor-keys:/opt/airflow/vendor-keys
- ${AIRFLOW_PROJ_DIR:-.}/data-export-files:/opt/airflow/data-export-files
- ${AIRFLOW_PROJ_DIR:-.}/orafin-files:/opt/airflow/orafin-files
- ${AIRFLOW_PROJ_DIR:-.}/.aws:/opt/airflow/.aws
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
Expand Down Expand Up @@ -258,8 +260,8 @@ services:
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
mkdir -p /sources/logs /sources/dags /sources/plugins /sources/config
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins,config}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
Expand Down
141 changes: 141 additions & 0 deletions libsys_airflow/dags/data_exports/full_dump_retrieval.py
@@ -0,0 +1,141 @@
import logging
import math

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import get_current_context
from airflow.decorators import task, task_group

from libsys_airflow.plugins.data_exports.full_dump_marc import (
fetch_number_of_records,
fetch_full_dump_marc,
refresh_view,
)
from libsys_airflow.plugins.data_exports.marc.transformer import Transformer
from libsys_airflow.plugins.data_exports.marc.transforms import remove_marc_fields
from sqlalchemy import exc

logger = logging.getLogger(__name__)


default_args = {
"owner": "libsys",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1),
}


with DAG(
"select_all_records",
default_args=default_args,
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
tags=["data export"],
) as dag:

start = EmptyOperator(task_id='start')

@task
def refresh_materialized_view():
refresh_view()

@task
def number_of_records():
return fetch_number_of_records()

@task
def do_batch_size() -> int:
context = get_current_context()
params = context.get("params", {}) # type: ignore
batch = params.get("batch_size", 50000)

return int(batch)

@task
def do_concurrency() -> list[int]:
context = get_current_context()
params = context.get("params", {}) # type: ignore
concurrency = params.get("concurrent_jobs", 10)

return [i for i in range(int(concurrency))]

@task
def calculate_div(**kwargs):
total = kwargs["number_of_records"]
batch_size = kwargs["number_in_batch"]
concurrent_jobs = kwargs["concurrent_jobs"]
shard = batch_size * 10

return math.ceil((total / len(concurrent_jobs)) / shard) * shard

@task(multiple_outputs=True)
def calculate_start_stop(div, job):
output = {"start": int(div * job), "stop": int((job + 1) * div)}
logger.info(f"Output in calculate_start_stop {output}")
return output

@task
def fetch_folio_records(batch_size, start, stop):
marc_file_list = []
for offset in range(start, stop, batch_size):
logger.info(f"fetch_folio_records: from {offset}")
try:
marc = fetch_full_dump_marc(offset=offset, batch_size=batch_size)
marc_file_list.append(marc)
except exc.OperationalError as err:
logger.warning(f"{err} for offset {offset}")
continue

return marc_file_list

@task_group(group_id="transform_marc")
def marc_transformations(marc_files: list):
@task
def transform_marc_records_add_holdings(marc_files: list):
transformer = Transformer()
for marc_file in marc_files:
transformer.add_holdings_items(marc_file=marc_file, full_dump=True)

@task
def transform_marc_records_remove_fields(marc_files: list):
for marc_file in marc_files:
remove_marc_fields(marc_file, full_dump=True)

transform_marc_records_add_holdings(
marc_files
) >> transform_marc_records_remove_fields(marc_files)

number_of_jobs = do_concurrency()

batch_size = do_batch_size()

total_records = number_of_records()

record_div = calculate_div(
number_of_records=total_records,
concurrent_jobs=number_of_jobs,
number_in_batch=batch_size,
)

update_view = refresh_materialized_view()

start_stop = calculate_start_stop.partial(div=record_div).expand(job=number_of_jobs)

marc_file_list = fetch_folio_records.partial(batch_size=batch_size).expand_kwargs(
start_stop
)

finish_transforms = marc_transformations.expand(marc_files=marc_file_list)

finish_processing_marc = EmptyOperator(
task_id="finish_marc",
)

start >> update_view >> total_records
finish_transforms >> finish_processing_marc

0 comments on commit 3ccaa31

Please sign in to comment.