In [None]:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.mysql_operator import MySqlOperator
from gcloud import storage

from datetime import datetime, date, timedelta
from dateutil.relativedelta import relativedelta, SU

In [None]:
default_args = {
    'owner': 'sy',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['snugyun01@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

In [None]:
dag = DAG('first_dag', description='dag', default_args=default_args,
          schedule_interval='@once', catchup=False)


t = date.today() + relativedelta(weekday=SU(-2))
suffix = t.strftime("%Y%m%d")

first_task = BashOperator(
                    task_id='first_task_ios_{}'.format(t),
                    bash_command='bash script1.sh ios {t}'.format(t=t),
                    dag=dag)

second_task = BashOperator(
                    task_id='second_task_ios_{}'.format(t),
                    bash_command='bash script2.sh ios {t}'.format(t=t),
                    dag=dag)

# Google Storage Bucket에 있는 파일 개수 체크
client = storage.Client(project='project-name')
bucket = client.bucket("bucket-name")
blobs_file_ios = bucket.list_blobs(prefix='prefix_file_name_{}'.format(suffix))

query = "LOAD DATA LOCAL INFILE '~/airflow/download/{}' INTO TABLE {table} CHARACTER SET utf8 " \
        "FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' IGNORE 1 LINES "

i = 0
bulk_insert_tasks = []
for blob in blobs_file_ios:
    blob_name = blob.name
    file_name = blob_name.split("/")[2]
    task = MySqlOperator(task_id='insert_{}_{}'.format(t, i),
                         sql=query.format(file_name), mysql_conn_id='mysql_default', dag=dag)
    bulk_insert_tasks.append(task)
    i += 1


first_task >> second_task
second_task >> bulk_insert_tasks
