Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LocalExecutor - Unable to make custom plugin works #443

Open
gfelot opened this issue Oct 6, 2019 · 2 comments
Open

LocalExecutor - Unable to make custom plugin works #443

gfelot opened this issue Oct 6, 2019 · 2 comments

Comments

@gfelot
Copy link

gfelot commented Oct 6, 2019

Wanted to learn airflow and using it through docker.
I used the LocalExecutor compose file with uncommented volume for the plugin folder.
Here my project structure :

├── dags
│   └── dag.py
├── docker-compose.yml
└── plugins
    ├── __init__.py
    ├── custom_plugins.py
    └── operators
        ├── __init__.py
        ├── facts_calculator.py
        ├── has_rows.py
        └── s3_to_redshift.py

I try then to follow the official documentation but I cannot import my custom operator.

Here the content of my custom_plugin.py (I tried to put it in the init file too). I cannot import from operators only without an error.

from airflow.plugins_manager import AirflowPlugin

from plugins.operators.facts_calculator import FactsCalculatorOperator
from plugins.operators.has_rows import HasRowsOperator
from plugins.operators.s3_to_redshift import S3ToRedshiftOperator


# Defining the plugin class
class CustomPlugin(AirflowPlugin):
    name = "custom_plugin"
    # A list of class(es) derived from BaseOperator
    operators = [
        FactsCalculatorOperator,
        HasRowsOperator,
        S3ToRedshiftOperator
    ]
    # A list of class(es) derived from BaseHook
    hooks = []
    # A list of class(es) derived from BaseExecutor
    executors = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of objects created from a class derived
    # from flask_admin.BaseView
    admin_views = []
    # A list of Blueprint object created from flask.Blueprint
    flask_blueprints = []
    # A list of menu links (flask_admin.base.MenuLink)
    menu_links = []

And in my /dags folder I do :

from airflow.operators import HasRowsOperator
from airflow.operators import S3ToRedshiftOperator
...

But I got an error from my IDE, if I try to run it I got this error:

[2019-10-06 15:34:03,054] {{dagbag.py:205}} ERROR - Failed to import: /usr/local/airflow/dags/dag.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 202, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 696, in _load
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/usr/local/airflow/dags/dag.py", line 5, in <module>
    from airflow.operators import HasRowsOperator
ImportError: cannot import name 'HasRowsOperator' from 'airflow.operators' (unknown location)

It has been 2 days that I'm block here. I tried different solution found here and there but I cannot make it work. Any idea how to fix it ? Any exemple, even with a dummy operator ?

@gfelot
Copy link
Author

gfelot commented Oct 6, 2019

EDIT :
I tried to find online working exemple and I found this one : https://github.com/blue-yonder/airflow-plugin-demo

I added the compose file without touching anything else and I also have the same issue. So I don't know where this come from but it's hard to me to belive that there is not something wrong with the docker image.

@nathairtras
Copy link

I had a lot of issues getting plugins to work recently. The directory structure that did the trick was:

├── dags
│   └── dag.py
├── docker-compose.yml
└── plugins
    └── something_plugin
        ├── __init__.py
        └── operators
            ├── __init__.py
            └── something_operator.py

Empty plugins/something_plugin/operators/__init__.py file.
For the plugins/something_plugin/__init__.py, something like:

from airflow.plugins_manager import AirflowPlugin
from plugin_folder.operators.something_operator import SomethingOperator

# Defining the plugin class
class SomethingPlugin(AirflowPlugin):
    name = "something_plugin"
    # A list of class(es) derived from BaseOperator
    operators = [
        SomethingOperator
    ]
    # A list of class(es) derived from BaseHook
    hooks = []
    # A list of class(es) derived from BaseExecutor
    executors = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of objects created from a class derived
    # from flask_admin.BaseView
    admin_views = []
    # A list of Blueprint object created from flask.Blueprint
    flask_blueprints = []
    # A list of menu links (flask_admin.base.MenuLink)
    menu_links = []

And for importing in a DAG,

from airflow.operators.something_plugin import SomethingOperator

Current version of Airflow should warn if you try to import from airflow.operators instead of airflow.operators.thing_operator for builtin operators (as well as hooks, etc.) In this case, from your plugin instead of from the relevant file. Hooks would be the same.

from airflow.hooks.something_plugin import SomethingHook

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants