In [None]:
from datetime import timedelta
from airflow import DAG 
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

#DAG Args Task 1.1
default_args = {
    'owner' : 'Surabhi',
    'start_date' : days_ago(0),
    'email' : 'dummyemail@example.com',
    'email_on_failure' : True,
    'email_on_retry' : True,
    'retries' : 1,
    'retry_delay': timedelta(minutes=5),

}

#DAG definition Task 1.2

dag = DAG(
   'ETL_toll_data',
   schedule_interval = timedelta(days=1),
   default_args =default_args,
   description = 'Apache Airflow Final Assignment'
)

#Task 1.3 Unzip data 

unzip_data = BashOperator(
    task_id = 'unzip_data',
    bash_command = 'tar -xvf /home/project/airflow/dags/finalassignment/tolldata.tgz',
    dag = dag,
)

#Task 1.4 extract data from csv
extract_data_from_csv = BashOperator( 
    task_id = 'extract_csv',
    bash_command = 'cut -d"," -f1-4 vehicle-data.csv > csv_data.csv',
    dag = dag,
)

#Task 1.5 extract data from tsv
extract_data_from_tsv = BashOperator(
    task_id = 'extract_tsv',
    bash_command = 'cut -f5-7 tollplaza-data.tsv > tsv_data.csv',
    dag = dag,
)

#Task 1.6 extract data from fixed width file
extract_data_from_fixed_width = BashOperator(
task_id = 'extract_fixed_width',
bash_command = 'awk "NF{print $(NF-1),$NF}" "," payment-data.txt > fixed_width_data.csv',
dag = dag,
)

#Task 1.7 consolidate extracted data 
consolidate_data = BashOperator(
    task_id = 'consolidate_data',
    bash_command = 'paste csv_data.csv tsv_data.csv fixed_width_data.csv > extracted_data.csv',
    dag = dag,
)

#Task 1.8 transform and load 
transform_data = BashOperator(
    task_id = 'transform_data',
    bash_command = 'awk "$5 = toupper($5)" < extracted_data.csv > transformed_data.csv',
    dag = dag,
)

#Task 1.9 pipeline
unzip_data >> extract_data_from_csv >> extract_data_from_tsv >> extract_data_from_fixed_width >> consolidate_data >> transform_data