Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log clean up. #32

Merged
merged 6 commits into from
Feb 14, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 45 additions & 18 deletions log-cleanup/airflow-log-cleanup.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,42 @@
"""
A maintenance workflow that you can deploy into Airflow to periodically clean out the task logs to avoid those getting too big.

airflow trigger_dag --conf '{"maxLogAgeInDays":30}' airflow-log-cleanup

--conf options:
maxLogAgeInDays:<INT> - Optional

"""
from airflow.models import DAG, Variable
from airflow.configuration import conf
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import os
import logging

try:
from airflow.utils import timezone #airflow.utils.timezone is available from v1.10 onwards
now = timezone.utcnow
except ImportError:
now = datetime.utcnow


DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "") # airflow-log-cleanup
START_DATE = now() - timedelta(minutes=1)
BASE_LOG_FOLDER = conf.get("core", "BASE_LOG_FOLDER")
SCHEDULE_INTERVAL = "@daily" # How often to Run. @daily - Once a day at Midnight
DAG_OWNER_NAME = "operations" # Who is listed as the owner of this DAG in the Airflow Web Server
ALERT_EMAIL_ADDRESSES = [] # List of email address to send email alerts to if this job fails
DEFAULT_MAX_LOG_AGE_IN_DAYS = Variable.get("max_log_age_in_days", 30) # Length to retain the log files if not already provided in the conf. If this is set to 30, the job will remove those files that are 30 days old or odler
DEFAULT_MAX_LOG_AGE_IN_DAYS = Variable.get("max_log_age_in_days", 30) # Length to retain the log files if not already provided in the conf. If this is set to 30, the job will remove those files that are 30 days old or odler
ENABLE_DELETE = True # Whether the job should delete the logs or not. Included if you want to temporarily avoid deleting the logs
NUMBER_OF_WORKERS = 1 # The number of worker nodes you have in Airflow. Will attempt to run this process for however many workers there are so that each worker gets its logs cleared.
DIRECTORIES_TO_DELETE = [BASE_LOG_FOLDER]
ENABLE_DELETE_CHILD_LOG = Variable.get("enable_delete_child_log", "False")
logging.info("ENABLE_DELETE_CHILD_LOG " + ENABLE_DELETE_CHILD_LOG)

if ENABLE_DELETE_CHILD_LOG.lower() == "true":
try:
CHILD_PROCESS_LOG_DIRECTORY = conf.get("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")
if CHILD_PROCESS_LOG_DIRECTORY is not ' ':
DIRECTORIES_TO_DELETE.append(CHILD_PROCESS_LOG_DIRECTORY)
except Exception:
logging.exception("Cloud not obtain CHILD_PROCESS_LOG_DIRECTORY from Airflow Configurations")

default_args = {
'owner': DAG_OWNER_NAME,
Expand All @@ -44,7 +53,8 @@

log_cleanup = """
echo "Getting Configurations..."
BASE_LOG_FOLDER='""" + BASE_LOG_FOLDER + """'
BASE_LOG_FOLDER="{{params.directory}}"
TYPE="{{params.type}}"
MAX_LOG_AGE_IN_DAYS="{{dag_run.conf.maxLogAgeInDays}}"
if [ "${MAX_LOG_AGE_IN_DAYS}" == "" ]; then
echo "maxLogAgeInDays conf variable isn't included. Using Default '""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'."
Expand All @@ -58,17 +68,22 @@
echo "BASE_LOG_FOLDER: '${BASE_LOG_FOLDER}'"
echo "MAX_LOG_AGE_IN_DAYS: '${MAX_LOG_AGE_IN_DAYS}'"
echo "ENABLE_DELETE: '${ENABLE_DELETE}'"
echo ""
echo "TYPE: '${TYPE}'"

echo ""
echo "Running Cleanup Process..."
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type f -mtime +${MAX_LOG_AGE_IN_DAYS}"
if [ $TYPE == file ];
then
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type f -mtime +${MAX_LOG_AGE_IN_DAYS}"
else
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type d -empty -mtime +${MAX_LOG_AGE_IN_DAYS}"
fi
echo "Executing Find Statement: ${FIND_STATEMENT}"
FILES_MARKED_FOR_DELETE=`eval ${FIND_STATEMENT}`
echo "Process will be Deleting the following directories:"
echo "Process will be Deleting the following File/directory:"
echo "${FILES_MARKED_FOR_DELETE}"
echo "Process will be Deleting `echo "${FILES_MARKED_FOR_DELETE}" | grep -v '^$' | wc -l ` file(s)" # "grep -v '^$'" - removes empty lines. "wc -l" - Counts the number of lines
echo "Process will be Deleting `echo "${FILES_MARKED_FOR_DELETE}" | grep -v '^$' | wc -l ` file/directory(s)" # "grep -v '^$'" - removes empty lines. "wc -l" - Counts the number of lines
echo ""

if [ "${ENABLE_DELETE}" == "true" ];
then
DELETE_STMT="${FIND_STATEMENT} -delete"
Expand All @@ -80,15 +95,27 @@
exit ${DELETE_STMT_EXIT_CODE}
fi
else
echo "WARN: You're opted to skip deleting the files!!!"
echo "WARN: You're opted to skip deleting the file(s)/directory(s)!!!"
fi
echo "Finished Running Cleanup Process"
"""

i=0
for log_cleanup_id in range(1, NUMBER_OF_WORKERS + 1):

log_cleanup_op = BashOperator(
task_id='log_cleanup_' + str(log_cleanup_id),
bash_command=log_cleanup,
provide_context=True,
dag=dag)
for directory in DIRECTORIES_TO_DELETE:
log_cleanup_file_op = BashOperator(
task_id='log_cleanup_file_' + str(i),
bash_command=log_cleanup,
provide_context=True,
params={"directory": str(directory), "type": "file"},
dag=dag)

log_cleanup_dir_op = BashOperator(
task_id='log_cleanup_directory_' + str(i),
bash_command=log_cleanup,
provide_context=True,
params={"directory": str(directory), "type":"directory"},
dag=dag)
i = i + 1

log_cleanup_file_op.set_downstream(log_cleanup_dir_op)