Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection as a target does not work in MWAA #79

Closed
liongitusr opened this issue Nov 2, 2022 · 2 comments
Closed

connection as a target does not work in MWAA #79

liongitusr opened this issue Nov 2, 2022 · 2 comments
Assignees
Labels
awaiting response Waiting for futher information from OP

Comments

@liongitusr
Copy link

liongitusr commented Nov 2, 2022

Running the example code producing the following on AWS MWAA :

session = settings.Session()  # type: ignore
existing = session.query(Connection).filter_by(conn_id="my_db_connection").first()

if existing is None:
    # For illustration purposes, and to keep the example self-contained, we create
    # a Connection using Airflow's ORM. However, any method of loading connections would
    # work, like Airflow's UI, Airflow's CLI, or in deployment scripts.

    my_conn = Connection(
        conn_id="my_db_connection",
        conn_type="redshift",
        description="redshift connection",
        host="abc.ch0n7gct0zxb.us-west-2.redshift.amazonaws.com",
        login="dbt_process_user",
        port=5439,
        schema="stage",
        password= get_secret("dev/dbt_process_user")['password'],  # pragma: allowlist secret
        # Other dbt parameters can be added as extras
        extra=json.dumps(dict(threads=4, sslmode="require")),
    )
    session.add(my_conn)
    session.commit()
with DAG(
    dag_id="dbt_tomasfarias", catchup=False, default_args=default_args, tags=["dbt", "loan tape"], schedule_interval="0 11 * * *"
) as dag:
    dbt_run = DbtRunOperator(
        task_id="dbt_run",
        target="my_db_connection",
        #dbt_bin="/usr/local/airflow/.local/bin/dbt",
        profiles_dir=None,
        project_dir="/usr/local/airflow/dags/dbt/etl/",
  
    )
[2022-11-02, 16:11:29 UTC] {{taskinstance.py:1703}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 140, in execute
    config = self.get_dbt_config()
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 185, in get_dbt_config
    return factory.create_config(**config_kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 374, in create_config
    initialize_config_values(config)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/main.py", line 170, in initialize_config_values
    cfg = read_user_config(parsed.profiles_dir)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 74, in read_user_config
    profile = read_profile(directory)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 50, in read_profile
    path = os.path.join(profiles_dir, 'profiles.yml')
  File "/usr/lib64/python3.7/posixpath.py", line 80, in join
    a = os.fspath(a)
TypeError: expected str, bytes or os.PathLike object, not NoneType
@tomasfarias
Copy link
Owner

tomasfarias commented Nov 13, 2022

Hi @liongitusr!

Thanks for opening an issue.

Looking at these lines of the traceback:

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 140, in execute
    config = self.get_dbt_config()
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 185, in get_dbt_config
    return factory.create_config(**config_kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 374, in create_config
    initialize_config_values(config)

It would appear you are running an outdated version of airflow-dbt-python: for example, we don't use the initialize_config_values function anymore, and the call to get_dbt_config is now in line 174, not 185.

Could you please let me know which versions of Apache Airflow (MWAA), airflow-dbt-python, and dbt-core are you running?

In the meantime, I've dug into the issue a little bit and added an integration test DAG to our testing pipeline in #82 that mimics the creation of an Airflow connection and uses it as target. The test is passing, but if you give me some more details I may be able to extend it to replicate your issue.

Thank you!

@tomasfarias tomasfarias added question Further information is requested awaiting response Waiting for futher information from OP and removed question Further information is requested labels Nov 13, 2022
@tomasfarias tomasfarias self-assigned this Nov 13, 2022
@tomasfarias
Copy link
Owner

tomasfarias commented Mar 12, 2023

We haven't gotten more information to investigate further, and it's likely that with the v1.0.0 release this issue is no longer relevant, at least in its current form. For these reasons, I'm closing this issue, but feel free to open a new one if the issue persists after upgrading airflow-dbt-python.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
awaiting response Waiting for futher information from OP
Projects
None yet
Development

No branches or pull requests

2 participants