Skip to content

Commit

Permalink
[AIRFLOW-1611] Customize logging
Browse files Browse the repository at this point in the history
Change the configuration of the logging to make
use of the python
logging and make the configuration easy
configurable. Some of the
settings which are now not needed anymore since
they can easily
be implemented in the config file.

Closes apache#2631 from Fokko/AIRFLOW-1611-customize-
logging-in-airflow
  • Loading branch information
Fokko Driesprong authored and Rares Mirica committed Dec 5, 2017
1 parent 0fbdffa commit e7d0d74
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 185 deletions.
44 changes: 14 additions & 30 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,18 @@ assists people when migrating to a new version.
## Airflow 1.9

### SSH Hook updates, along with new SSH Operator & SFTP Operator

SSH Hook now uses Paramiko library to create ssh client connection, instead of sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.
SSH Hook now uses Paramiko library to create ssh client connection, instead of sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.
- update SSHHook constructor
- use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer test_ssh_operator.py for usage info.
- SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer test_sftp_operator.py.py for usage info.
- No updates are required if you are using ftpHook, it will continue work as is.

### S3Hook switched to use Boto3

The airflow.hooks.S3_hook.S3Hook has been switched to use boto3 instead of the older boto (a.k.a. boto2). This result in a few backwards incompatible changes to the following classes: S3Hook:
- the constructors no longer accepts `s3_conn_id`. It is now called `aws_conn_id`.
- the default conneciton is now "aws_default" instead of "s3_default"
- the return type of objects returned by `get_bucket` is now boto3.s3.Bucket
- the return type of `get_key`, and `get_wildcard_key` is now an boto3.S3.Object.

If you are using any of these in your DAGs and specify a connection ID you will need to update the parameter name for the connection to "aws_conn_id": S3ToHiveTransfer, S3PrefixSensor, S3KeySensor, RedshiftToS3Transfer.

### Logging update

The logging structure of Airflow has been rewritten to make configuration easier and the logging system more transparent.

#### A quick recap about logging

A logger is the entry point into the logging system. Each logger is a named bucket to which messages can be written for processing. A logger is configured to have a log level. This log level describes the severity of the messages that the logger will handle. Python defines the following log levels: DEBUG, INFO, WARNING, ERROR or CRITICAL.

Each message that is written to the logger is a Log Record. Each log record also has a log level indicating the severity of that specific message. A log record can also contain useful metadata that describes the event that is being logged. This can include details such as a stack trace or an error code.
Expand All @@ -50,9 +39,7 @@ The main benefit is easier configuration of the logging by setting a single cent
logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
```

The logging configuration file needs to be on the `PYTHONPATH`, for example `$AIRFLOW_HOME/config`. This directory is loaded by default. Of course you are free to add any directory to the `PYTHONPATH`, this might be handy when you have the config in another directory or you mount a volume in case of Docker.

You can take the config from `airflow/config_templates/airflow_local_settings.py` as a starting point. Copy the contents to `${AIRFLOW_HOME}/config/airflow_local_settings.py`, and alter the config as you like.
The logging configuration file that contains the configuration needs te on the the `PYTHONPATH`, for example in `~/airflow/dags` or `~/airflow/plugins`. These directories are loaded by default, of course you are free to add a directory to the `PYTHONPATH`, this might be handy when you have the config in another directory or you mount a volume in case of Docker. As an example you can start from `airflow.config_templates.airflow_local_settings.LOGGING_CONFIG`:

```
LOGGING_CONFIG = {
Expand Down Expand Up @@ -119,19 +106,19 @@ Furthermore, this change also simplifies logging within the DAG itself:

```
root@ae1bc863e815:/airflow# python
Python 3.6.2 (default, Sep 13 2017, 14:26:54)
Python 3.6.2 (default, Sep 13 2017, 14:26:54)
[GCC 4.9.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow.settings import *
>>>
>>>
>>> from datetime import datetime
>>> from airflow import DAG
>>> from airflow.operators.dummy_operator import DummyOperator
>>>
>>>
>>> dag = DAG('simple_dag', start_date=datetime(2017, 9, 1))
>>>
>>>
>>> task = DummyOperator(task_id='task_1', dag=dag)
>>>
>>>
>>> task.log.error('I want to say something..')
[2017-09-25 20:17:04,927] {<stdin>:1} ERROR - I want to say something..
```
Expand All @@ -142,13 +129,13 @@ The `file_task_handler` logger is more flexible. You can change the default form

#### I'm using S3Log or GCSLogs, what do I do!?

If you are logging to Google cloud storage, please see the [Google cloud platform documentation](https://airflow.incubator.apache.org/integration.html#gcp-google-cloud-platform) for logging instructions.

If you are using S3, the instructions should be largely the same as the Google cloud platform instructions above. You will need a custom logging config. The `REMOTE_BASE_LOG_FOLDER` configuration key in your airflow config has been removed, therefore you will need to take the following steps:
- Copy the logging configuration from [`airflow/config_templates/airflow_logging_settings.py`](https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py) and copy it.
IF you are logging to either S3Log or GCSLogs, you will need a custom logging config. The `REMOTE_BASE_LOG_FOLDER` configuration key in your airflow config has been removed, therefore you will need to take the following steps:
- Copy the logging configuration from [`airflow/config_templates/airflow_logging_settings.py`](https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py) and copy it.
- Place it in a directory inside the Python import path `PYTHONPATH`. If you are using Python 2.7, ensuring that any `__init__.py` files exist so that it is importable.
- Update the config by setting the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The `REMOTE_BASE_LOG_FOLDER` key is not used anymore.
- Update the config by setting the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The `REMOTE_BASE_LOG_FOLDER` key is not used anymore.
- Set the `logging_config_class` to the filename and dict. For example, if you place `custom_logging_config.py` on the base of your pythonpath, you will need to set `logging_config_class = custom_logging_config.LOGGING_CONFIG` in your config as Airflow 1.8.

ELSE you don't need to change anything. If there is no custom config, the airflow config loader will still default to the same config.

### New Features

Expand All @@ -159,7 +146,6 @@ A new DaskExecutor allows Airflow tasks to be run in Dask Distributed clusters.
### Deprecated Features
These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer
supported and will be removed entirely in Airflow 2.0
- If you're using the `google_cloud_conn_id` or `dataproc_cluster` argument names explicitly in `contrib.operators.Dataproc{*}Operator`(s), be sure to rename them to `gcp_conn_id` or `cluster_name`, respectively. We've renamed these arguments for consistency. (AIRFLOW-1323)

- `post_execute()` hooks now take two arguments, `context` and `result`
(AIRFLOW-886)
Expand All @@ -171,7 +157,7 @@ supported and will be removed entirely in Airflow 2.0
- The pickle type for XCom messages has been replaced by json to prevent RCE attacks.
Note that JSON serialization is stricter than pickling, so if you want to e.g. pass
raw bytes through XCom you must encode them using an encoding like base64.
By default pickling is still enabled until Airflow 2.0. To disable it
By default pickling is still enabled until Airflow 2.0. To disable it
Set enable_xcom_pickling = False in your Airflow config.

## Airflow 1.8.1
Expand Down Expand Up @@ -283,8 +269,6 @@ supported and will be removed entirely in Airflow 2.0
Previously, `Operator.__init__()` accepted any arguments (either positional `*args` or keyword `**kwargs`) without
complaint. Now, invalid arguments will be rejected. (https://github.com/apache/incubator-airflow/pull/1285)

- The config value secure_mode will default to True which will disable some insecure endpoints/features

### Known Issues
There is a report that the default of "-1" for num_runs creates an issue where errors are reported while parsing tasks.
It was not confirmed, but a workaround was found by changing the default back to `None`.
Expand Down
30 changes: 5 additions & 25 deletions airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

DEFAULT_LOGGING_CONFIG = {
'version': 1,
Expand All @@ -37,9 +35,6 @@
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
Expand All @@ -52,12 +47,6 @@
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
# When using s3 or gcs, provide a customized LOGGING_CONFIG
# in airflow_local_settings within your PYTHONPATH, see UPDATING.md
Expand All @@ -78,20 +67,6 @@
# },
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['file.task'],
'level': LOG_LEVEL,
Expand All @@ -102,5 +77,10 @@
'level': LOG_LEVEL,
'propagate': True,
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
}
}
5 changes: 2 additions & 3 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from airflow import configuration as conf
from airflow import executors, models, settings
from airflow.exceptions import AirflowException
from airflow.logging_config import configure_logging
from airflow.models import DAG, DagRun
from airflow.settings import Stats
from airflow.task_runner import get_task_runner
Expand Down Expand Up @@ -372,9 +373,7 @@ def helper():
sys.stderr = f

try:
# Re-configure logging to use the new output streams
log_format = settings.LOG_FORMAT_WITH_THREAD_NAME
settings.configure_logging(log_format=log_format)
configure_logging()
# Re-configure the ORM engine as there are issues with multiple processes
settings.configure_orm()

Expand Down
13 changes: 0 additions & 13 deletions airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
#
import logging
import os
import sys
from logging.config import dictConfig

Expand All @@ -24,21 +23,9 @@
log = logging.getLogger(__name__)


def prepare_classpath():
config_path = os.path.join(conf.get('core', 'airflow_home'), 'config')
config_path = os.path.expanduser(config_path)

if config_path not in sys.path:
sys.path.append(config_path)


def configure_logging():
logging_class_path = ''
try:
# Prepare the classpath so we are sure that the config folder
# is on the python classpath and it is reachable
prepare_classpath()

logging_class_path = conf.get('core', 'logging_config_class')
except AirflowConfigException:
log.debug('Could not find key logging_config_class in config')
Expand Down
56 changes: 1 addition & 55 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,18 @@
from __future__ import print_function
from __future__ import unicode_literals

import atexit
import logging
import os
import pendulum

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool

from airflow import configuration as conf
from airflow.logging_config import configure_logging
from airflow.utils.sqlalchemy import setup_event_handlers

log = logging.getLogger(__name__)


TIMEZONE = pendulum.timezone('UTC')
try:
tz = conf.get("core", "default_timezone")
if tz == "system":
TIMEZONE = pendulum.local_timezone()
else:
TIMEZONE = pendulum.timezone(tz)
except:
pass
log.info("Configured default timezone %s" % TIMEZONE)


class DummyStatsLogger(object):
@classmethod
def incr(cls, stat, count=1, rate=1):
Expand Down Expand Up @@ -138,13 +122,10 @@ def configure_vars():


def configure_orm(disable_connection_pool=False):
log.debug("Setting up DB connection pool (PID %s)" % os.getpid())
global engine
global Session
engine_args = {}

pool_connections = conf.getboolean('core', 'SQL_ALCHEMY_POOL_ENABLED')
if disable_connection_pool or not pool_connections:
if disable_connection_pool:
engine_args['poolclass'] = NullPool
elif 'sqlite' not in SQL_ALCHEMY_CONN:
# Engine args not supported by sqlite
Expand All @@ -153,41 +134,10 @@ def configure_orm(disable_connection_pool=False):
'SQL_ALCHEMY_POOL_RECYCLE')

engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
reconnect_timeout = conf.getint('core', 'SQL_ALCHEMY_RECONNECT_TIMEOUT')
setup_event_handlers(engine, reconnect_timeout)

Session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=engine))


def dispose_orm():
""" Properly close pooled database connections """
log.debug("Disposing DB connection pool (PID %s)", os.getpid())
global engine
global Session

if Session:
Session.remove()
Session = None
if engine:
engine.dispose()
engine = None


def configure_adapters():
from pendulum import Pendulum
try:
from sqlite3 import register_adapter
register_adapter(Pendulum, lambda val: val.isoformat(' '))
except ImportError:
pass
try:
import MySQLdb.converters
MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
except ImportError:
pass


try:
from airflow_local_settings import *

Expand All @@ -197,12 +147,8 @@ def configure_adapters():

configure_logging()
configure_vars()
configure_adapters()
configure_orm()

# Ensure we close DB connections at scheduler and gunicon worker terminations
atexit.register(dispose_orm)

# Const stuff

KILOBYTE = 1024
Expand Down
48 changes: 0 additions & 48 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2546,54 +2546,6 @@ def test_send_mime_dryrun(self, mock_smtp, mock_smtp_ssl):
self.assertFalse(mock_smtp.called)
self.assertFalse(mock_smtp_ssl.called)

class LogTest(unittest.TestCase):
def setUp(self):
configuration.load_test_config()

def _log(self):
settings.configure_logging()

sio = six.StringIO()
handler = logging.StreamHandler(sio)
logger = logging.getLogger()
logger.addHandler(handler)

logging.debug("debug")
logging.info("info")
logging.warn("warn")

sio.flush()
return sio.getvalue()

def test_default_log_level(self):
s = self._log()
self.assertFalse("debug" in s)
self.assertTrue("info" in s)
self.assertTrue("warn" in s)

def test_change_log_level_to_debug(self):
configuration.set("core", "LOGGING_LEVEL", "DEBUG")
s = self._log()
self.assertTrue("debug" in s)
self.assertTrue("info" in s)
self.assertTrue("warn" in s)

def test_change_log_level_to_info(self):
configuration.set("core", "LOGGING_LEVEL", "INFO")
s = self._log()
self.assertFalse("debug" in s)
self.assertTrue("info" in s)
self.assertTrue("warn" in s)

def test_change_log_level_to_warn(self):
configuration.set("core", "LOGGING_LEVEL", "WARNING")
s = self._log()
self.assertFalse("debug" in s)
self.assertFalse("info" in s)
self.assertTrue("warn" in s)

def tearDown(self):
configuration.set("core", "LOGGING_LEVEL", "INFO")

if __name__ == '__main__':
unittest.main()

0 comments on commit e7d0d74

Please sign in to comment.