In [None]:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.models import Variable
from airflow.sensors.named_hive_partition_sensor import NamedHivePartitionSensor
from airflow.operators import LivyBatchGroupOperator
from airflow.operators.python_operator import PythonOperator
from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.hooks.hive_hooks import HiveMetastoreHook
from airflow.operators.papermill_plugins import PapermillOperator
import logging

log = logging.getLogger(__name__)
default_args = {
    'owner': 'ritesh',
    'depends_on_past': False,
    'start_date': datetime(2018, 12, 21),
    'email': ['ritesh.ratti@grab.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(seconds=60)
}


dag = DAG(dag_id='food.batch_food_wait_times_calc_livy',
          default_args=default_args,
          max_active_runs=2,
          concurrency=6,
          schedule_interval='40 * * * *',
          catchup=True)


params = {

    "spark_cores_wait_times_max": "160",
    "spark_cores_calc_metrics_max": "40",
    "spark_cores_export_metrics_max": "4",
    "spark_driver_memory": "4g",
    "hour_offset_from_execution_date": 3
}

def generate_livy_details(params, execution_date):
    livy_details = {
        'base_url': Variable.get('livy_base_url'),
        'data': {
            'file': Variable.get('food_wait_time_file_location'),
            'conf': {
                'spark.cores.max': "160"
            },
            'driverMemory': "4g",
            'args': ['--target-date={}'.format((execution_date -\
                                             timedelta(hours=3)).strftime('%Y%m%d%H'))]
        }
    }

    return livy_details



run_batch_job = LivyBatchGroupOperator(
    task_id='run_batch_job',
    get_parameters=generate_livy_details,
    poke_interval=5,
    dag=dag,
    depends_on_past=False,
    provide_context=True,
    retries=10,
    retry_delay=timedelta(seconds=60))


# JOB SEQUENCE

run_batch_job
