## Migrating Oozie Workflows to Airflow CDE DAGs

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
!pip install pandas
!pip install requests_toolbelt
!pip install xmltodict



In [3]:
import migration_utility.workflow as workflow
import migration_utility.cdejob as cdejob
import migration_utility.cderesource as cderesource
import os

In [7]:
project_input_dir = 'input'
project_output_dir = 'output'

dag_file = 'combined.py'
dag_name = 'combined'

cde_prefix = 'jmontenaro_combined'
cde_resource_name = 'resource'
cde_job_name = 'job'

hive_connection = 'default-hive-aws'

In [8]:
os.environ["WORKLOAD_USER"] = "jmontenaro"
os.environ["WORKLOAD_PASSWORD"] = "Cloudera#123"
os.environ["JOBS_API_URL"] = "https://zph56zmm.cde-ntvvr5hx.go01-dem.ylcu-atmi.cloudera.site/dex/api/v1"

In [9]:
!mkdir -p {project_output_dir}

In [10]:
ow = workflow.OozieWorkflow(project_input_dir)

In [11]:
workflow_dict = ow.create_workflow_dict()
workflow_dict

Oozie workflow file workflow.xml found


{'workflow-app': {'@xmlns': 'uri:oozie:workflow:0.4',
  '@name': 'combined-workflow',
  'start': {'@to': 'spark_pi'},
  'action': [{'@name': 'spark_pi',
    'spark': {'@xmlns': 'uri:oozie:spark-action:0.1',
     'job-tracker': '${jobTracker}',
     'name-node': '${nameNode}',
     'prepare': {'delete': {'@path': '${nameNode}/user/jmontenaro/spark_pi/output-data'}},
     'master': '${master}',
     'mode': '${mode}',
     'name': 'spark_pi',
     'class': 'org.apache.spark.examples.SparkPi',
     'jar': 'example_spark_jobs/jobs/pi.scala',
     'spark-opts': '--executor-memory 2G --num-executors 5',
     'arg': 'value=10'},
    'ok': {'@to': 'show_databases'},
    'error': {'@to': 'kill_job'}},
   {'@name': 'show_databases',
    'hive': {'@xmlns': 'uri:oozie:hive-action:0.4',
     'job-tracker': '${jobTracker}',
     'name-node': '${nameNode}',
     'script': '${script_show_databases}'},
    'ok': {'@to': 'end'},
    'error': {'@to': 'kill_job'}}],
  'kill': {'@name': 'kill_job', 'messag

In [12]:
workflow_props = ow.create_workflow_props()
workflow_props

Properties file job.properties found


{'nameNode': 'hdfs://mynameservice',
 'master': 'local',
 'mode': 'client',
 'jobTracker': 'myjobtracker:8088',
 'script_show_databases': 'input/show_databases.hive'}

In [13]:
workflow_dict = ow.replace_workflow_props(workflow_dict, workflow_props)
workflow_dict

{'workflow-app': {'@xmlns': 'uri:oozie:workflow:0.4',
  '@name': 'combined-workflow',
  'start': {'@to': 'spark_pi'},
  'action': [{'@name': 'spark_pi',
    'spark': {'@xmlns': 'uri:oozie:spark-action:0.1',
     'job-tracker': 'myjobtracker:8088',
     'name-node': 'hdfs://mynameservice',
     'prepare': {'delete': {'@path': 'hdfs://mynameservice}/user/jmontenaro/spark_pi/output-data'}},
     'local': '${local',
     'client': 'client',
     'name': 'spark_pi',
     'class': 'org.apache.spark.examples.SparkPi',
     'jar': 'example_spark_jobs/jobs/pi.scala',
     'spark-opts': '--executor-memory 2G --num-executors 5',
     'arg': 'value=10'},
    'ok': {'@to': 'show_databases'},
    'error': {'@to': 'kill_job'}},
   {'@name': 'show_databases',
    'hive': {'@xmlns': 'uri:oozie:hive-action:0.4',
     'job-tracker': 'myjobtracker:8088',
     'name-node': 'hdfs://mynameservice',
     'script': 'input/show_databases.hive'},
    'ok': {'@to': 'end'},
    'error': {'@to': 'kill_job'}}],
  'k

In [14]:
cj = cdejob.CDEJob(workflow_dict, hive_connection, cde_prefix, cde_resource_name, dag_name)

In [15]:
cj.initialize_dag(project_output_dir, dag_file)
cj.dag_imports(project_output_dir, dag_file)
cj.dag_declaration('jmontenaro', project_output_dir, dag_file)

In [16]:
spark_payloads = cj.parse_oozie_workflow(project_output_dir, dag_file, workflow_dict)
spark_payloads

Extracted Job Name: spark_pi
Working on Spark CDE Job: jmontenaro_combined_spark_pi
Converted Spark Oozie Action into Spark CDE Payload


[{'name': 'jmontenaro_combined_spark_pi',
  'type': 'spark',
  'retentionPolicy': 'keep_indefinitely',
  'mounts': [{'dirPrefix': '/',
    'resourceName': 'jmontenaro_combined_resource'}],
  'spark': {'file': 'pi.scala',
   'conf': {'spark.pyspark.python': 'python3'},
   'executorMemory': '2G',
   'numExecutors': 5},
  'schedule': {'enabled': False}}]

In [17]:
with open(project_output_dir + "/" + dag_file, 'r') as f:
    print(f.read())

### Airflow DAG ###

import pendulum

from airflow import DAG
from cloudera.cdp.airflow.operators.cdw_operator import CDWOperator
from cloudera.cdp.airflow.operators.cde_operator import CDEJobRunOperator
from dateutil import parser
from datetime import datetime, timedelta

default_args = {
    'owner': 'jmontenaro',
    'retry_delay': timedelta(seconds=5),
    'depends_on_past': False,
    'start_date': pendulum.datetime(2016, 1, 1, tz="Europe/Amsterdam")
}

combined = DAG (
    'combined-pipeline-demo',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    is_paused_upon_creation=False
)

spark_pi_step = CDEJobRunOperator (
    task_id='spark_pi',
    dag=combined,
    job_name='jmontenaro_combined_spark_pi'
)

cdw_query = """show databases;"""

show_databases_step = CDWOperator (
    task_id="show_databases",
    dag=combined,
    cli_conn_id="default-hive-aws",
    hql=cdw_query,
    schema='default',
    use_proxy_user=False,
    query_isolation=True

In [16]:
airflow_cde_payload = cj.oozie_to_cde_airflow_payload(dag_file, cde_resource_name, cde_job_name)
airflow_cde_payload

Working on Airflow CDE Job: jmontenaro_combined_job
Converted DAG into Airflow CDE Payload


{'type': 'airflow',
 'airflow': {'dagFile': 'combined.py'},
 'identity': {'disableRoleProxy': True},
 'mounts': [{'dirPrefix': '/',
   'resourceName': 'jmontenaro_combined_resource'}],
 'name': 'jmontenaro_combined_job',
 'retentionPolicy': 'keep_indefinitely'}

In [17]:
cr = cderesource.CDEResource(os.environ["JOBS_API_URL"], os.environ["WORKLOAD_USER"], cde_prefix, cde_resource_name)

In [18]:
token = cr.set_cde_token(os.environ["WORKLOAD_PASSWORD"])

In [19]:
cr.create_cde_resource(token, cde_resource_name)

201



In [20]:
cr.upload_file(cde_resource_name, project_input_dir, "pi.scala", token)

Working on Job: pi.scala
Response Status Code 201



In [21]:
for spark_payload in spark_payloads:
    cr.create_job_from_resource(token, spark_payload)

Working on Job: jmontenaro_combined_spark_pi
Response Status Code 201





In [22]:
cr.upload_file(cde_resource_name, project_output_dir, dag_file, token)

Working on Job: combined.py
Response Status Code 201



In [23]:
cr.create_job_from_resource(token, airflow_cde_payload)

Working on Job: jmontenaro_combined_job
Response Status Code 201



