-
Notifications
You must be signed in to change notification settings - Fork 394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Log clean up. #32
Log clean up. #32
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,22 +12,35 @@ | |
from airflow.operators.bash_operator import BashOperator | ||
from datetime import datetime, timedelta | ||
import os | ||
import logging | ||
|
||
try: | ||
from airflow.utils import timezone #airflow.utils.timezone is available from v1.10 onwards | ||
now = timezone.utcnow | ||
except ImportError: | ||
now = datetime.utcnow | ||
|
||
|
||
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "") # airflow-log-cleanup | ||
START_DATE = now() - timedelta(minutes=1) | ||
BASE_LOG_FOLDER = conf.get("core", "BASE_LOG_FOLDER") | ||
SCHEDULE_INTERVAL = "@daily" # How often to Run. @daily - Once a day at Midnight | ||
DAG_OWNER_NAME = "operations" # Who is listed as the owner of this DAG in the Airflow Web Server | ||
ALERT_EMAIL_ADDRESSES = [] # List of email address to send email alerts to if this job fails | ||
DEFAULT_MAX_LOG_AGE_IN_DAYS = Variable.get("max_log_age_in_days", 30) # Length to retain the log files if not already provided in the conf. If this is set to 30, the job will remove those files that are 30 days old or odler | ||
DEFAULT_MAX_LOG_AGE_IN_DAYS = Variable.get("max_log_age_in_days", 30) # Length to retain the log files if not already provided in the conf. If this is set to 30, the job will remove those files that are 30 days old or odler | ||
ENABLE_DELETE = True # Whether the job should delete the logs or not. Included if you want to temporarily avoid deleting the logs | ||
NUMBER_OF_WORKERS = 1 # The number of worker nodes you have in Airflow. Will attempt to run this process for however many workers there are so that each worker gets its logs cleared. | ||
DIRECTORIES_TO_DELETE = [BASE_LOG_FOLDER] | ||
ENABLE_DELETE_CHILD_LOG = Variable.get("enable_delete_child_log", "False") | ||
logging.info("ENABLE_DELETE_CHILD_LOG " + ENABLE_DELETE_CHILD_LOG) | ||
|
||
if ENABLE_DELETE_CHILD_LOG == "True": | ||
try: | ||
CHILD_PROCESS_LOG_DIRECTORY=conf.get("scheduler", "CHILD_PROCESS_LOG_DIRECTORY") | ||
if CHILD_PROCESS_LOG_DIRECTORY is not ' ': | ||
DIRECTORIES_TO_DELETE.append( CHILD_PROCESS_LOG_DIRECTORY ) | ||
except Exception: | ||
logging.info("CHILD process directory path not available!!") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use logging.error() and print exception stacktrace |
||
|
||
|
||
default_args = { | ||
'owner': DAG_OWNER_NAME, | ||
|
@@ -44,7 +57,9 @@ | |
|
||
log_cleanup = """ | ||
echo "Getting Configurations..." | ||
BASE_LOG_FOLDER='""" + BASE_LOG_FOLDER + """' | ||
BASE_LOG_FOLDER=""{{params.directory}}"" | ||
TYPES=""{{params.type}}"" | ||
DELETE_TYPE=""{{params.deleteType}}"" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is never set based off the task definitions |
||
MAX_LOG_AGE_IN_DAYS="{{dag_run.conf.maxLogAgeInDays}}" | ||
if [ "${MAX_LOG_AGE_IN_DAYS}" == "" ]; then | ||
echo "maxLogAgeInDays conf variable isn't included. Using Default '""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'." | ||
|
@@ -58,17 +73,22 @@ | |
echo "BASE_LOG_FOLDER: '${BASE_LOG_FOLDER}'" | ||
echo "MAX_LOG_AGE_IN_DAYS: '${MAX_LOG_AGE_IN_DAYS}'" | ||
echo "ENABLE_DELETE: '${ENABLE_DELETE}'" | ||
echo "" | ||
echo "TYPE: '${TYPE}'" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add an extra space so ${TYPE} lines up with the print statements above it |
||
|
||
echo "" | ||
echo "Running Cleanup Process..." | ||
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type f -mtime +${MAX_LOG_AGE_IN_DAYS}" | ||
if [ $TYPE == file ]; | ||
then | ||
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type f -mtime +${MAX_LOG_AGE_IN_DAYS}" | ||
else | ||
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type d -empty -mtime +${MAX_LOG_AGE_IN_DAYS}" | ||
fi | ||
echo "Executing Find Statement: ${FIND_STATEMENT}" | ||
FILES_MARKED_FOR_DELETE=`eval ${FIND_STATEMENT}` | ||
echo "Process will be Deleting the following directories:" | ||
echo "Process will be Deleting the following File/directory:" | ||
echo "${FILES_MARKED_FOR_DELETE}" | ||
echo "Process will be Deleting `echo "${FILES_MARKED_FOR_DELETE}" | grep -v '^$' | wc -l ` file(s)" # "grep -v '^$'" - removes empty lines. "wc -l" - Counts the number of lines | ||
echo "Process will be Deleting `echo "${FILES_MARKED_FOR_DELETE}" | grep -v '^$' | wc -l ` ${DELETE_TYPE}(s)" # "grep -v '^$'" - removes empty lines. "wc -l" - Counts the number of lines | ||
echo "" | ||
|
||
if [ "${ENABLE_DELETE}" == "true" ]; | ||
then | ||
DELETE_STMT="${FIND_STATEMENT} -delete" | ||
|
@@ -80,15 +100,27 @@ | |
exit ${DELETE_STMT_EXIT_CODE} | ||
fi | ||
else | ||
echo "WARN: You're opted to skip deleting the files!!!" | ||
echo "WARN: You're opted to skip deleting the file(s)/directory(s)!!!" | ||
fi | ||
echo "Finished Running Cleanup Process" | ||
""" | ||
|
||
i=0 | ||
for log_cleanup_id in range(1, NUMBER_OF_WORKERS + 1): | ||
|
||
log_cleanup_op = BashOperator( | ||
task_id='log_cleanup_' + str(log_cleanup_id), | ||
bash_command=log_cleanup, | ||
provide_context=True, | ||
dag=dag) | ||
for directory in DIRECTORIES_TO_DELETE: | ||
log_cleanup_file_op = BashOperator( | ||
task_id='log_cleanup_file' + str(i), | ||
bash_command=log_cleanup, | ||
provide_context=True, | ||
params={"directory": str(directory), "type": "file"}, | ||
dag=dag) | ||
|
||
log_cleanup_dir_op = BashOperator( | ||
task_id='log_cleanup_dir' + str( i ), | ||
bash_command=log_cleanup, | ||
provide_context=True, | ||
params={"directory": str(directory),"type":"directory"}, | ||
dag=dag ) | ||
i = i + 1 | ||
|
||
log_cleanup_file_op.set_downstream(log_cleanup_dir_op) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See if you can ignore case because people might enter in all lower case