Skip to content

Commit

Permalink
Merge pull request #1182 from Tarrasch/big-parameters-refactoring
Browse files Browse the repository at this point in the history
Major command line parsing overhaul (internal)
  • Loading branch information
Tarrasch committed Sep 28, 2015
2 parents c88dc56 + bba1ef2 commit 154c283
Show file tree
Hide file tree
Showing 12 changed files with 379 additions and 380 deletions.
46 changes: 39 additions & 7 deletions doc/configuration.rst
@@ -1,7 +1,7 @@
Configuration
=============

All configuration can be done by adding a configuration files. They are looked for in:
All configuration can be done by adding configuration files. They are looked for in:

* ``/etc/luigi/client.cfg``
* ``luigi.cfg`` (or its legacy name ``client.cfg``) in your current working directory
Expand All @@ -23,25 +23,57 @@ The config file is broken into sections, each controlling a different part of th
default-scheduler-host: luigi-host.mycompany.foo
error-email: foo@bar.baz

By default, all parameters will be overridden by matching values in the
configuration file. For instance if you have a Task definition:

.. _ParamConfigIngestion:

Parameters from config Ingestion
--------------------------------

All parameters can be overridden from configuration files. For instance if you
have a Task definition:

.. code:: python
class DailyReport(luigi.contrib.hadoop.JobTask):
date = luigi.DateParameter(default=datetime.date.today())
# ...
Then you can override the default value for date by providing it in the
configuration:
Then you can override the default value for ``DailyReport().date`` by providing
it in the configuration:

::

[DailyReport]
date: 2012-01-01

You can also use ``config_path`` as an argument to the ``Parameter`` if
you want to use a specific section in the config.
.. _ConfigClasses:

Configuration classes
*********************

Using the :ref:`ParamConfigIngestion` method. We derive the
conventional way to do global configuration. Imagine this configuration.

::

[mysection]
option: hello
intoption: 123


We can create a :py:class:`~luigi.Config` class:

.. code:: python
import luigi
# Config classes should be camel cased
class mysection(luigi.Config):
option = luigi.Parameter(default='world')
intoption = luigi.IntParameter(default=555)
mysection().option
mysection().intoption
Configurable options
Expand Down
137 changes: 137 additions & 0 deletions luigi/cmdline_parser.py
@@ -0,0 +1,137 @@
# -*- coding: utf-8 -*-
#
# Copyright 2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains luigi internal parsing logic. Things exposed here should
be considered internal to luigi.
"""

import argparse
import functools
from contextlib import contextmanager
from luigi.task_register import Register
import cached_property


class CmdlineParser(object):
"""
Helper for parsing command line arguments and used as part of the
context when instantiating task objects.
Normal luigi users should just use :py:func:`luigi.run`.
"""
_instance = None

@classmethod
def get_instance(cls):
""" Singleton getter """
return cls._instance

@classmethod
@contextmanager
def global_instance(cls, cmdline_args, allow_override=False):
"""
Meant to be used as a context manager.
"""
orig_value = cls._instance
assert (orig_value is None) or allow_override
new_value = None
try:
new_value = CmdlineParser(cmdline_args)
cls._instance = new_value
yield new_value
finally:
assert cls._instance is new_value
cls._instance = orig_value

def __init__(self, cmdline_args):
"""
Initialize cmd line args
"""
self.cmdline_args = cmdline_args
known_args, _ = self._build_parser().parse_known_args(args=cmdline_args)
self._attempt_load_module(known_args)
parser = self._build_parser(active_tasks=self._active_tasks())
# TODO: Use parse_args instead of parse_known_args, but can't be
# done just yet. Once `--task` is forced, it should be possible
known_args, _ = parser.parse_known_args(args=cmdline_args)
self._possibly_exit_with_help(parser, known_args)
if not self._active_tasks():
raise SystemExit('No task specified')
self.known_args = known_args # Also publically expose parsed arguments

@staticmethod
def _build_parser(active_tasks=set()):
parser = argparse.ArgumentParser(add_help=False)

for task_name, is_without_section, param_name, param_obj in Register.get_all_params():
add = functools.partial(param_obj._add_to_cmdline_parser, parser,
param_name, task_name, is_without_section=is_without_section)
add(glob=True)
if task_name in active_tasks:
add(glob=False)

return parser

@cached_property.cached_property
def _task_name(self):
"""
Get the task name
If the task does not exist, raise SystemExit
"""
parser = self._build_parser()
_, unknown = parser.parse_known_args(args=self.cmdline_args)
if len(unknown) > 0:
task_name = unknown[0]
return task_name

def get_task_cls(self):
"""
Get the task class
"""
return Register.get_task_cls(self._task_name)

def is_local_task(self, task_name):
"""
Used to see if unqualified command line parameters should be added too.
"""
return task_name in self._active_tasks()

def _active_tasks(self):
"""
Set of task families that should expose their parameters unqualified.
"""
root_task = self._task_name
return set([root_task] if root_task else [])

@staticmethod
def _attempt_load_module(known_args):
"""
Load the --module parameter
"""
module = known_args.module
if module:
__import__(module)

@staticmethod
def _possibly_exit_with_help(parser, known_args):
"""
Check if the user passed --help, if so, print a message and exit.
"""
if known_args.help:
parser.print_help()
raise SystemExit('Exiting due to --help was passed')
132 changes: 10 additions & 122 deletions luigi/interface.py
Expand Up @@ -18,7 +18,6 @@
This module contains the bindings for command line integration and dynamic loading of tasks
"""

import argparse
import logging
import logging.config
import os
Expand All @@ -35,7 +34,7 @@
from luigi import task
from luigi import worker
from luigi import execution_summary
from luigi.task_register import Register
from luigi.cmdline_parser import CmdlineParser


def setup_interface_logging(conf_file=None):
Expand Down Expand Up @@ -107,13 +106,16 @@ class core(task.Config):
description='Configuration file for logging')
module = parameter.Parameter(
default=None,
description='Used for dynamic loading of modules') # see _DynamicArgParseInterface
description='Used for dynamic loading of modules')
parallel_scheduling = parameter.BoolParameter(
default=False,
description='Use multiprocessing to do scheduling in parallel.')
assistant = parameter.BoolParameter(
default=False,
description='Run any task from the scheduler.')
help = parameter.BoolParameter(
default=False,
description='Help option flag, for --help')


class _WorkerSchedulerFactory(object):
Expand Down Expand Up @@ -187,121 +189,6 @@ def _schedule_and_run(tasks, worker_scheduler_factory=None, override_defaults=No
return success


def _add_task_parameters(parser, task_cls):
for param_name, param in task_cls.get_params():
param.add_to_cmdline_parser(parser, param_name, task_cls.task_family, glob=False)


def _get_global_parameters():
seen_params = set()
for task_name, is_without_section, param_name, param in Register.get_all_params():
if param in seen_params:
continue
seen_params.add(param)
yield task_name, is_without_section, param_name, param


def _add_global_parameters(parser):
for task_name, is_without_section, param_name, param in _get_global_parameters():
param.add_to_cmdline_parser(parser, param_name, task_name, glob=True, is_without_section=is_without_section)


def _get_task_parameters(task_cls, args):
# Parse a str->str dict to the correct types
params = {}
for param_name, param in task_cls.get_params():
param.parse_from_args(param_name, task_cls.task_family, args, params)
return params


def _set_global_parameters(args):
# Note that this is not side effect free
for task_name, is_without_section, param_name, param in _get_global_parameters():
param.set_global_from_args(param_name, task_name, args, is_without_section=is_without_section)


class _ArgParseInterface(object):
"""
Takes the task as the command, with parameters specific to it.
"""

def parse_task(self, cmdline_args=None):
if cmdline_args is None:
cmdline_args = sys.argv[1:]

parser = argparse.ArgumentParser()

_add_global_parameters(parser)

# Parse global arguments and pull out the task name.
# We used to do this using subparsers+command, but some issues with
# argparse across different versions of Python (2.7.9) made it hard.
args, unknown = parser.parse_known_args(args=[a for a in cmdline_args if a != '--help'])
if len(unknown) == 0:
# In case it included a --help argument, run again
parser.parse_known_args(args=cmdline_args)
raise SystemExit('No task specified')

task_name = unknown[0]
task_cls = Register.get_task_cls(task_name)

# Add a subparser to parse task-specific arguments
subparsers = parser.add_subparsers(dest='command')
subparser = subparsers.add_parser(task_name)

# Add both task and global params here so that we can support both:
# test.py --global-param xyz Test --n 42
# test.py Test --n 42 --global-param xyz
_add_global_parameters(subparser)
_add_task_parameters(subparser, task_cls)

# Workaround for bug in argparse for Python 2.7.9
# See https://mail.python.org/pipermail/python-dev/2015-January/137699.html
subargs = parser.parse_args(args=cmdline_args)
for key, value in vars(subargs).items():
if value: # Either True (for boolean args) or non-None (everything else)
setattr(args, key, value)

# Notice that this is not side effect free because it might set global params
_set_global_parameters(args)
task_params = _get_task_parameters(task_cls, args)

return [task_cls(**task_params)]

def parse(self, cmdline_args=None):
return self.parse_task(cmdline_args)


class _DynamicArgParseInterface(_ArgParseInterface):
"""
Uses --module as a way to load modules dynamically
Usage:
.. code-block:: console
python whatever.py --module foo_module FooTask --blah xyz --x 123
This will dynamically import foo_module and then try to create FooTask from this.
"""

def parse(self, cmdline_args=None):
if cmdline_args is None:
cmdline_args = sys.argv[1:]

parser = argparse.ArgumentParser()

_add_global_parameters(parser)

args, unknown = parser.parse_known_args(args=[a for a in cmdline_args if a != '--help'])
module = args.module

if module:
__import__(module)

return self.parse_task(cmdline_args)


def run(cmdline_args=None, main_task_cls=None,
worker_scheduler_factory=None, use_dynamic_argparse=None, local_scheduler=False):
"""
Expand All @@ -321,14 +208,15 @@ def run(cmdline_args=None, main_task_cls=None,
if cmdline_args is None:
cmdline_args = sys.argv[1:]

interface = _DynamicArgParseInterface()

if main_task_cls:
cmdline_args.insert(0, main_task_cls.task_family)
if local_scheduler:
cmdline_args.insert(0, '--local-scheduler')
tasks = interface.parse(cmdline_args)
return _schedule_and_run(tasks, worker_scheduler_factory)

with CmdlineParser.global_instance(cmdline_args) as cp:
task_cls = cp.get_task_cls()
task = task_cls()
return _schedule_and_run([task], worker_scheduler_factory)


def build(tasks, worker_scheduler_factory=None, **env_params):
Expand Down

0 comments on commit 154c283

Please sign in to comment.