diff --git a/cob_datapipeline/prod_az_reindex_dag.py b/cob_datapipeline/prod_az_reindex_dag.py index 9bd2ef70..c94feb76 100644 --- a/cob_datapipeline/prod_az_reindex_dag.py +++ b/cob_datapipeline/prod_az_reindex_dag.py @@ -36,8 +36,6 @@ AZ_CLIENT_SECRET = Variable.get("AZ_CLIENT_SECRET") AZ_BRANCH = Variable.get("AZ_PROD_BRANCH") -EXECUTION_DATE = '-{{ execution_date.strftime("%Y-%m-%d_%H-%M-%S") }}' - # CREATE DAG DEFAULT_ARGS = { 'owner': 'airflow', @@ -64,6 +62,13 @@ Tasks with custom logic are relegated to individual Python files. """ +SET_COLLECTION_NAME = PythonOperator( + task_id='set_collection_name', + python_callable=datetime.now().strftime, + op_args=["%Y-%m-%d_%H-%M-%S"], + dag=DAG +) + GET_NUM_SOLR_DOCS_PRE = task_solrgetnumdocs( DAG, ALIAS, @@ -74,7 +79,7 @@ CREATE_COLLECTION = tasks.create_sc_collection( DAG, SOLR_CONN.conn_id, - CONFIGSET + EXECUTION_DATE, + CONFIGSET + "-{{ ti.xcom_pull(task_ids='set_collection_name') }}", REPLICATION_FACTOR, CONFIGSET ) @@ -84,7 +89,7 @@ bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_databases.sh ", env={**os.environ, **{ "HOME": AIRFLOW_USER_HOME, - "SOLR_AZ_URL": tasks.get_solr_url(SOLR_CONN, CONFIGSET + EXECUTION_DATE), + "SOLR_AZ_URL": tasks.get_solr_url(SOLR_CONN, CONFIGSET + "-{{ ti.xcom_pull(task_ids='set_collection_name') }}"), "AZ_CLIENT_ID": AZ_CLIENT_ID, "AZ_CLIENT_SECRET": AZ_CLIENT_SECRET, "AZ_BRANCH": AZ_BRANCH, @@ -96,14 +101,14 @@ GET_NUM_SOLR_DOCS_POST = task_solrgetnumdocs( DAG, - CONFIGSET + EXECUTION_DATE, + CONFIGSET +"-{{ ti.xcom_pull(task_ids='set_collection_name') }}", 'get_num_solr_docs_post', conn_id=SOLR_CONN.conn_id) SOLR_ALIAS_SWAP = tasks.swap_sc_alias( DAG, SOLR_CONN.conn_id, - CONFIGSET + EXECUTION_DATE, + CONFIGSET +"-{{ ti.xcom_pull(task_ids='set_collection_name') }}", ALIAS ) @@ -124,7 +129,7 @@ PUSH_COLLECTION = PushVariable( task_id="push_collection", name="AZ_PROD_COLLECTIONS", - value=CONFIGSET + EXECUTION_DATE, + value=CONFIGSET +"-{{ ti.xcom_pull(task_ids='set_collection_name') }}", dag=DAG) DELETE_COLLECTIONS = DeleteCollectionListVariable( @@ -132,7 +137,7 @@ solr_conn_id='SOLRCLOUD', list_variable="AZ_PROD_COLLECTIONS", skip_from_last=2, - skip_included=[CONFIGSET + EXECUTION_DATE], + skip_included=[CONFIGSET +"-{{ ti.xcom_pull(task_ids='set_collection_name') }}"], dag=DAG) POST_SLACK = PythonOperator( @@ -143,7 +148,8 @@ ) # SET UP TASK DEPENDENCIES -CREATE_COLLECTION.set_upstream(GET_NUM_SOLR_DOCS_PRE) +SET_COLLECTION_NAME.set_upstream(GET_NUM_SOLR_DOCS_PRE) +CREATE_COLLECTION.set_upstream(SET_COLLECTION_NAME) INDEX_DATABASES.set_upstream(CREATE_COLLECTION) GET_NUM_SOLR_DOCS_POST.set_upstream(INDEX_DATABASES) SOLR_ALIAS_SWAP.set_upstream(GET_NUM_SOLR_DOCS_POST) diff --git a/cob_datapipeline/qa_az_reindex_dag.py b/cob_datapipeline/qa_az_reindex_dag.py index 5c4db020..8079e3c5 100644 --- a/cob_datapipeline/qa_az_reindex_dag.py +++ b/cob_datapipeline/qa_az_reindex_dag.py @@ -36,8 +36,6 @@ AZ_CLIENT_SECRET = Variable.get("AZ_CLIENT_SECRET") AZ_BRANCH = Variable.get("AZ_QA_BRANCH") -EXECUTION_DATE = '-{{ execution_date.strftime("%Y-%m-%d_%H-%M-%S") }}' - # CREATE DAG DEFAULT_ARGS = { "owner": "cob-qa", @@ -64,6 +62,12 @@ Tasks with custom logic are relegated to individual Python files. """ +SET_COLLECTION_NAME = PythonOperator( + task_id="set_collection_name", + python_callable=datetime.now().strftime, + op_args=["%Y-%m-%d_%H-%M-%S"], + dag=DAG +) GET_NUM_SOLR_DOCS_PRE = task_solrgetnumdocs( DAG, @@ -75,7 +79,7 @@ CREATE_COLLECTION = tasks.create_sc_collection( DAG, SOLR_CONN.conn_id, - CONFIGSET + EXECUTION_DATE, + CONFIGSET + "-{{ ti.xcom_pull(task_ids='set_collection_name') }}", REPLICATION_FACTOR, CONFIGSET ) @@ -85,7 +89,7 @@ bash_command=AIRFLOW_HOME + "/dags/cob_datapipeline/scripts/ingest_databases.sh ", env={**os.environ, **{ "HOME": AIRFLOW_USER_HOME, - "SOLR_AZ_URL": tasks.get_solr_url(SOLR_CONN, CONFIGSET + EXECUTION_DATE), + "SOLR_AZ_URL": tasks.get_solr_url(SOLR_CONN, CONFIGSET + "-{{ ti.xcom_pull(task_ids='set_collection_name') }}"), "AZ_CLIENT_ID": AZ_CLIENT_ID, "AZ_CLIENT_SECRET": AZ_CLIENT_SECRET, "AZ_BRANCH": AZ_BRANCH, @@ -97,14 +101,14 @@ GET_NUM_SOLR_DOCS_POST = task_solrgetnumdocs( DAG, - CONFIGSET + EXECUTION_DATE, + CONFIGSET +"-{{ ti.xcom_pull(task_ids='set_collection_name') }}", "get_num_solr_docs_post", conn_id=SOLR_CONN.conn_id) SOLR_ALIAS_SWAP = tasks.swap_sc_alias( DAG, SOLR_CONN.conn_id, - CONFIGSET + EXECUTION_DATE, + CONFIGSET +"-{{ ti.xcom_pull(task_ids='set_collection_name') }}", ALIAS ) @@ -125,7 +129,7 @@ PUSH_COLLECTION = PushVariable( task_id="push_collection", name="AZ_QA_COLLECTIONS", - value=CONFIGSET + EXECUTION_DATE, + value=CONFIGSET +"-{{ ti.xcom_pull(task_ids='set_collection_name') }}", dag=DAG) DELETE_COLLECTIONS = DeleteCollectionListVariable( @@ -133,7 +137,7 @@ solr_conn_id='SOLRCLOUD', list_variable="AZ_QA_COLLECTIONS", skip_from_last=2, - skip_included=[CONFIGSET + EXECUTION_DATE], + skip_included=[CONFIGSET +"-{{ ti.xcom_pull(task_ids='set_collection_name') }}"], dag=DAG) POST_SLACK = PythonOperator( @@ -144,7 +148,8 @@ ) # SET UP TASK DEPENDENCIES -CREATE_COLLECTION.set_upstream(GET_NUM_SOLR_DOCS_PRE) +SET_COLLECTION_NAME.set_upstream(GET_NUM_SOLR_DOCS_PRE) +CREATE_COLLECTION.set_upstream(SET_COLLECTION_NAME) INDEX_DATABASES.set_upstream(CREATE_COLLECTION) GET_NUM_SOLR_DOCS_POST.set_upstream(INDEX_DATABASES) SOLR_ALIAS_SWAP.set_upstream(GET_NUM_SOLR_DOCS_POST) diff --git a/tests/prod_az_reindex_dag_test.py b/tests/prod_az_reindex_dag_test.py index c8ea3fb2..0d6d5856 100644 --- a/tests/prod_az_reindex_dag_test.py +++ b/tests/prod_az_reindex_dag_test.py @@ -22,6 +22,7 @@ def test_dag_interval_is_variable(self): def test_dag_tasks_present(self): """Unit test that the DAG instance contains the expected tasks.""" self.assertEqual(self.tasks, [ + "set_collection_name", "get_num_solr_docs_pre", "create_collection", "index_az", @@ -37,7 +38,8 @@ def test_dag_tasks_present(self): def test_dag_task_order(self): """Unit test that the DAG instance contains the expected dependencies.""" expected_task_deps = { - "create_collection": "get_num_solr_docs_pre", + "set_collection_name": "get_num_solr_docs_pre", + "create_collection": "set_collection_name", "index_az": "create_collection", "get_num_solr_docs_post": "index_az", "solr_alias_swap": "get_num_solr_docs_post", @@ -62,4 +64,4 @@ def test_index_az_task(self): self.assertEqual(task.env["AZ_BRANCH"], "AZ_BRANCH") self.assertEqual(task.env["AZ_CLIENT_ID"], "AZ_CLIENT_ID") self.assertEqual(task.env["AZ_CLIENT_SECRET"], "AZ_CLIENT_SECRET") - self.assertEqual(task.env["SOLR_AZ_URL"], "http://127.0.0.1:8983/solr/tul_cob-az-0-{{ execution_date.strftime(\"%Y-%m-%d_%H-%M-%S\") }}") + self.assertEqual(task.env["SOLR_AZ_URL"], "http://127.0.0.1:8983/solr/tul_cob-az-0-{{ ti.xcom_pull(task_ids='set_collection_name') }}") diff --git a/tests/qa_az_reindex_dag_test.py b/tests/qa_az_reindex_dag_test.py index a30c9f29..6862abdd 100644 --- a/tests/qa_az_reindex_dag_test.py +++ b/tests/qa_az_reindex_dag_test.py @@ -22,6 +22,7 @@ def test_dag_interval_is_variable(self): def test_dag_tasks_present(self): """Unit test that the DAG instance contains the expected tasks.""" self.assertEqual(self.tasks, [ + "set_collection_name", "get_num_solr_docs_pre", "create_collection", "index_az", @@ -37,7 +38,8 @@ def test_dag_tasks_present(self): def test_dag_task_order(self): """Unit test that the DAG instance contains the expected dependencies.""" expected_task_deps = { - "create_collection": "get_num_solr_docs_pre", + "set_collection_name": "get_num_solr_docs_pre", + "create_collection": "set_collection_name", "index_az": "create_collection", "get_num_solr_docs_post": "index_az", "solr_alias_swap": "get_num_solr_docs_post", @@ -62,4 +64,4 @@ def test_index_databases_task(self): self.assertEqual(task.env["AZ_BRANCH"], "AZ_BRANCH") self.assertEqual(task.env["AZ_CLIENT_ID"], "AZ_CLIENT_ID") self.assertEqual(task.env["AZ_CLIENT_SECRET"], "AZ_CLIENT_SECRET") - self.assertEqual(task.env["SOLR_AZ_URL"], "http://127.0.0.1:8983/solr/tul_cob-az-0-{{ execution_date.strftime(\"%Y-%m-%d_%H-%M-%S\") }}") + self.assertEqual(task.env["SOLR_AZ_URL"], "http://127.0.0.1:8983/solr/tul_cob-az-0-{{ ti.xcom_pull(task_ids='set_collection_name') }}")