## Using the Oozie2CDE Module

In [1]:
import oozie2cde.workflow as wf
import oozie2cde.cdejob as cj
import oozie2cde.cderesource as rs
import os
from importlib import reload 

In [2]:
reload(wf)

<module 'oozie2cde.workflow' from '/home/cdsw/oozie2cde/workflow.py'>

In [3]:
project_dir = "oozie_workflows/oozie_hive_workflow_with_properties"
cde_resource_name = "hadoop2CDE_migration"

In [4]:
ow = wf.OozieWorkflow(project_dir)

In [5]:
workflow_d = ow.ooziexml_to_dict()
workflow_d

Oozie workflow file hive_properties_workflow.xml found


{'workflow-app': {'@xmlns': 'uri:oozie:workflow:0.4',
  '@name': 'simple-Workflow',
  'start': {'@to': 'Create_External_Table'},
  'action': [{'@name': 'Create_External_Table',
    'hive': {'@xmlns': 'uri:oozie:hive-action:0.4',
     'job-tracker': '${jobTracker}',
     'name-node': '${nameNode}',
     'script': '${script_name_external}'},
    'ok': {'@to': 'Create_orc_Table'},
    'error': {'@to': 'kill_job'}},
   {'@name': 'Create_orc_Table',
    'hive': {'@xmlns': 'uri:oozie:hive-action:0.4',
     'job-tracker': '${jobTracker}',
     'name-node': '${nameNode}',
     'script': '${script_name_orc}'},
    'ok': {'@to': 'Insert_into_Table'},
    'error': {'@to': 'kill_job'}},
   {'@name': 'Insert_into_Table',
    'hive': {'@xmlns': 'uri:oozie:hive-action:0.4',
     'job-tracker': '${jobTracker}',
     'name-node': '${nameNode}',
     'script': '${script_name_copy}',
     'param': '${database}'},
    'ok': {'@to': 'end'},
    'error': {'@to': 'kill_job'}}],
  'kill': {'@name': 'kill_job'

In [6]:
properties_dict = ow.parse_workflow_properties()
properties_dict

Properties file job1.properties found


{'nameNode': 'hdfs://rootname',
 'jobTracker': 'xyz.com:8088',
 'script_name_external': 'oozie_workflows/oozie_hive_workflow_with_properties/external.hive',
 'script_name_orc': 'oozie_workflows/oozie_hive_workflow_with_properties/orc.hive',
 'script_name_copy': 'oozie_workflows/oozie_hive_workflow_with_properties/Copydata.hive',
 'database_name': 'default'}

In [7]:
workflow_d_props = ow.workflow_properties_lookup(workflow_d, properties_dict)
workflow_d_props

{'workflow-app': {'@xmlns': 'uri:oozie:workflow:0.4',
  '@name': 'simple-Workflow',
  'start': {'@to': 'Create_External_Table'},
  'action': [{'@name': 'Create_External_Table',
    'hive': {'@xmlns': 'uri:oozie:hive-action:0.4',
     'job-tracker': 'xyz.com:8088',
     'name-node': 'hdfs://rootname',
     'script': 'oozie_workflows/oozie_hive_workflow_with_properties/external.hive'},
    'ok': {'@to': 'Create_orc_Table'},
    'error': {'@to': 'kill_job'}},
   {'@name': 'Create_orc_Table',
    'hive': {'@xmlns': 'uri:oozie:hive-action:0.4',
     'job-tracker': 'xyz.com:8088',
     'name-node': 'hdfs://rootname',
     'script': 'oozie_workflows/oozie_hive_workflow_with_properties/orc.hive'},
    'ok': {'@to': 'Insert_into_Table'},
    'error': {'@to': 'kill_job'}},
   {'@name': 'Insert_into_Table',
    'hive': {'@xmlns': 'uri:oozie:hive-action:0.4',
     'job-tracker': 'xyz.com:8088',
     'name-node': 'hdfs://rootname',
     'script': 'oozie_workflows/oozie_hive_workflow_with_properties

In [8]:
ow.query_properties_lookup(properties_dict)


Editing Hive Query File: orc.hive

The input Hive query is: 

Create Table orc_table(
name string, -- Concate value of first name and last name with space as seperator
yearofbirth int,
age int, -- Current year minus year of birth
address string,
zip int
)
STORED AS ORC
;

The output Hive query is: 

Create Table orc_table(
name string, -- Concate value of first name and last name with space as seperator
yearofbirth int,
age int, -- Current year minus year of birth
address string,
zip int
)
STORED AS ORC
;

Editing Hive Query File: external.hive

The input Hive query is: 

Create external table external_table(
name string,
age int,
address string,
zip int
)
row format delimited
fields terminated by ','
stored as textfile
location '/test/abc';

The output Hive query is: 

Create external table external_table(
name string,
age int,
address string,
zip int
)
row format delimited
fields terminated by ','
stored as textfile
location '/test/abc';

Editing Hive Query File: Copydata.hive

The 

### Converting Oozie Workflows to CDE Payloads

In [192]:
reload(cj)

<module 'oozie2cde.cdejob' from '/home/cdsw/oozie2cde/cdejob.py'>

In [193]:
#!mkdir "airflow_dags"
dag_dir = "airflow_dags"
dag_file_name = "my_dag.py"

In [194]:
spark_cde_job = cj.CdeJob(workflow_d_props, "oozie_migration")

In [195]:
spark_cde_job.initialize_dag(dag_dir, dag_file_name)

In [196]:
spark_cde_job.dag_imports(dag_dir, dag_file_name)

In [197]:
spark_cde_job.dag_declaration("pauldefusco", dag_dir, dag_file_name)

In [198]:
cde_payloads = spark_cde_job.parse_oozie_workflow(dag_dir, dag_file_name, workflow_d_props)

{'@name': 'Create_External_Table', 'hive': {'@xmlns': 'uri:oozie:hive-action:0.4', 'job-tracker': 'xyz.com:8088', 'name-node': 'hdfs://rootname', 'script': 'oozie_workflows/oozie_hive_workflow_with_properties/external.hive'}, 'ok': {'@to': 'Create_orc_Table'}, 'error': {'@to': 'kill_job'}}
{'@name': 'Create_orc_Table', 'hive': {'@xmlns': 'uri:oozie:hive-action:0.4', 'job-tracker': 'xyz.com:8088', 'name-node': 'hdfs://rootname', 'script': 'oozie_workflows/oozie_hive_workflow_with_properties/orc.hive'}, 'ok': {'@to': 'Insert_into_Table'}, 'error': {'@to': 'kill_job'}}
{'@name': 'Insert_into_Table', 'hive': {'@xmlns': 'uri:oozie:hive-action:0.4', 'job-tracker': 'xyz.com:8088', 'name-node': 'hdfs://rootname', 'script': 'oozie_workflows/oozie_hive_workflow_with_properties/Copydata.hive', 'param': 'database'}, 'ok': {'@to': 'end'}, 'error': {'@to': 'kill_job'}}


In [199]:
cde_payloads

[]

##### No Spark CDE Job Payloads were created because the workflow.xml file does not contain spark actions. Let's try again with a workflow containing a spark action.

In [200]:
project_dir = "oozie_workflows/spark_oozie_workflow"

In [201]:
ow = wf.OozieWorkflow(project_dir)

In [202]:
workflow_d = ow.ooziexml_to_dict()
workflow_d

Oozie workflow file spark_action_workflow.xml found


{'workflow-app': {'@xmlns': 'uri:oozie:workflow:0.5',
  '@name': 'SparkWordCount',
  'start': {'@to': 'spark-node'},
  'action': {'@name': 'spark-node',
   'spark': {'@xmlns': 'uri:oozie:spark-action:0.1',
    'job-tracker': '${jobTracker}',
    'name-node': '${nameNode}',
    'prepare': {'delete': {'@path': '${nameNode}/user/${wf:user()}/${examplesRoot}/output-data'}},
    'master': '${master}',
    'name': 'SparkPi',
    'class': 'org.apache.spark.examples.SparkPi',
    'jar': 'example_spark_jobs/jobs/pi.scala',
    'spark-opts': '--executor-memory 2G --num-executors 5',
    'arg': 'value=10'},
   'ok': {'@to': 'end'},
   'error': {'@to': 'fail'}},
  'kill': {'@name': 'fail',
   'message': 'Workflow failed, error\n            message[${wf:errorMessage(wf:lastErrorNode())}]'},
  'end': {'@name': 'end'}}}

In [203]:
properties_dict = ow.parse_workflow_properties()
properties_dict

No properties file found.

If properties file is expected, please ensure it is in the workflow directory.

If properties file is not expected, please ignore this message.



##### No ancilliary properties file found. We can proceed with the conversion.

In [204]:
#!mkdir "airflow_dags"
dag_dir = "airflow_dags"
dag_file_name = "my_spark_dag.py"
dag_name = "oozie_2_airflow_dag"

In [205]:
cde_airflow_job = cj.CdeJob(workflow_d_props, "oozie_migration", dag_name)

In [206]:
#Removes previously existing DAGs with this file name.
cde_airflow_job.initialize_dag(dag_dir, dag_file_name)

In [207]:
#Appends DAG import statememts. Run once. If run twice by mistake, rerun dag initialization method. 
cde_airflow_job.dag_imports(dag_dir, dag_file_name)

In [208]:
cde_airflow_job.dag_declaration("pauldefusco", dag_dir, dag_file_name)

In [209]:
spark_cde_payloads = cde_airflow_job.parse_oozie_workflow(dag_dir, dag_file_name, workflow_d)

Extracted Job Name: SparkPi
Working on Spark CDE Job: SparkPi
Converted Spark Oozie Action into Spark CDE Payload


In [210]:
spark_cde_payloads

[{'name': 'SparkPi',
  'type': 'spark',
  'retentionPolicy': 'keep_indefinitely',
  'mounts': [{'dirPrefix': '/', 'resourceName': 'oozie_migration'}],
  'spark': {'file': 'pi.scala',
   'conf': {'spark.pyspark.python': 'python3'},
   'executorMemory': '2G',
   'numExecutors': 5},
  'schedule': {'enabled': False}}]

##### We should also validate that the Airflow DAG has been created

In [211]:
with open(dag_dir+"/"+dag_file_name, 'r') as f:
    print(f.read())

# The new Airflow DAG
from dateutil import parser
    
from datetime import datetime, timedelta
    
import pendulum
    
from airflow import DAG
    
from airflow.operators.email import EmailOperator
    
from airflow.operators.python_operator import PythonOperator
    
from cloudera.cdp.airflow.operators.cdw_operator import CDWOperator
    
from cloudera.cdp.airflow.operators.cde_operator import CDEJobRunOperator

default_args = {
        'owner': 'pauldefusco',
    'retry_delay': timedelta(seconds=5),
    'depends_on_past': False,
    'start_date': pendulum.datetime(2016, 1, 1, tz="Europe/Amsterdam")
    }

oozie_2_airflow_dag = DAG(
    'airflow-pipeline-demo',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    is_paused_upon_creation=False
    )

SparkPi_Step = CDEJobRunOperator(
        task_id='SparkPi',
        dag=oozie_2_airflow_dag,
        job_name='SparkPi'
        )




##### One last thing before we can implement in CDE. We need to create the Payload for the Airflow DAG.

In [212]:
airflow_cde_payload = cde_airflow_job.oozie_to_cde_airflow_payload(dag_file_name, cde_resource_name, "oozie2airflow_dag")
airflow_cde_payload

Working on Airflow CDE Job: oozie2airflow_dag
Converted DAG into Airflow CDE Payload


{'type': 'airflow',
 'airflow': {'dagFile': 'my_spark_dag.py'},
 'identity': {'disableRoleProxy': True},
 'mounts': [{'dirPrefix': '/', 'resourceName': 'hadoop2CDE_migration'}],
 'name': 'oozie2airflow_dag',
 'retentionPolicy': 'keep_indefinitely'}

### Instantiating the Jobs in the CDE Virtual Cluster

In [213]:
reload(rs)

<module 'oozie2cde.cderesource' from '/home/cdsw/oozie2cde/cderesource.py'>

In [214]:
os.environ["WORKLOAD_USER"] = "pauldefusco"
os.environ["JOBS_API_URL"] = "https://tk5p4pn9.cde-6fr6l74r.go01-dem.ylcu-atmi.cloudera.site/dex/api/v1"

In [215]:
resource = rs.CdeResource(os.environ["JOBS_API_URL"], os.environ["WORKLOAD_USER"], cde_resource_name)

In [216]:
token = resource.set_cde_token(os.environ["WORKLOAD_PASSWORD"])

In [217]:
resource.create_cde_resource(token, cde_resource_name)

409
{"status":"error","message":"resource with name already exists"}


In [218]:
resource.upload_file(cde_resource_name, "example_spark_jobs/jobs", "pi.scala", token)

Working on Job: pi.scala
Response Status Code 201



In [219]:
resource.create_job_from_resource(token, spark_cde_payloads[0])

Working on Job: SparkPi
Response Status Code 500
{"status":"error","message":"job with name already exists"}




In [220]:
airflow_cde_payload

{'type': 'airflow',
 'airflow': {'dagFile': 'my_spark_dag.py'},
 'identity': {'disableRoleProxy': True},
 'mounts': [{'dirPrefix': '/', 'resourceName': 'hadoop2CDE_migration'}],
 'name': 'oozie2airflow_dag',
 'retentionPolicy': 'keep_indefinitely'}

In [221]:
resource.upload_file(cde_resource_name, "airflow_dags", "my_spark_dag.py", token)

Working on Job: my_spark_dag.py
Response Status Code 201



In [222]:
resource.create_job_from_resource(token, airflow_cde_payload)

Working on Job: oozie2airflow_dag
Response Status Code 201



