In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook
import datetime as dt
import requests
from airflow.exceptions import AirflowException
import tempfile
import pandas as pd
import os

# download wikipedia pageviews data
def get_data(s3_conn_id, bucket_name, object_key_prefix, **context):
    year, month, day, hour, *_ = context['execution_date'].timetuple()
    url = (
        "http://10.34.124.114:7080/other/pageviews/"
        f"{year}/{year}-{month:0>2}/"
        f"pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    print(url) # 打印URL來確認是否符合預期
    # download wiki pageviews
    resp = requests.get(url)
    if resp.status_code != 200:
        raise AirflowException(f"wikipedia pageviews data download fail, the resp status[{resp.status_code}]")
    # 透過Connection來取得S3Hook物件的實例
    s3_hook = S3Hook(aws_conn_id=s3_conn_id)
    # write rocket launch data to s3
    with tempfile.NamedTemporaryFile('wb+') as fp:
        temp_filename = fp.name  # 暫存檔案名
        # use the same file hierarchy pattern
        object_key = (
            f"{object_key_prefix}/task:get_data/"
            f"{year}/{year}-{month:0>2}/"
            f"pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
        )
        try:
            fp.write(resp.content)
            fp.flush()
            s3_hook.load_file(filename=temp_filename,
                              bucket_name=bucket_name,
                              key=object_key,
                              replace=True)
            print(f'put wikipedia pageviews data to s3: [{bucket_name}] -> {object_key}, success!')
            # push key value via xcoms
            context["task_instance"].xcom_push(key="object_key", value=object_key)
        except Exception as e:
            raise AirflowException(f"put wikipedia pageviews data to s3 fail:{e}")

# task to unzip and extract data
def extract_data(s3_conn_id, bucket_name, object_key_prefix, **context):
    # datetime elements for data partition
    year, month, day, hour, *_ = context['execution_date'].timetuple()
    # create s3 connection
    s3_hook = S3Hook(aws_conn_id=s3_conn_id)
    # retrieve object key from xcoms
    source_object_key = context["task_instance"].xcom_pull(task_ids="get_data", key="object_key")
    # download s3 object to local temp file
    try:
        # 暫存檔案名
        source_temp_filename = s3_hook.download_file(bucket_name=bucket_name,key=source_object_key)
        print(f'download wikipedia pageviews data to local: {source_temp_filename}, success!')
        # read wiki pageviews to dataframe
        print(f'read and extract:{source_temp_filename}!')
        df = pd.read_csv(source_temp_filename,
                         names=['domain', 'title', 'view_count', 'response_size'],
                         compression='gzip',
                         delimiter=' ')
        df2 = df[
            (df['domain'] == 'en') &
            (df['title'].isin(["Google", "Amazon", "Apple", "Microsoft", "Facebook"]))
            ]
        print(df2.info())
        print(df2.head())
        # remove temp file
        os.remove(source_temp_filename)
        # use the same file hierarchy pattern
        target_object_key = (
            f"{object_key_prefix}/task:extract_data/"
            f"{year}/{year}-{month:0>2}/"
            f"pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.csv"
        )
        # write dataframe as parquet to s3
        with tempfile.NamedTemporaryFile('w+') as fp2:
            target_temp_filename = fp2.name  # 暫存檔案名
            try:
                df2.to_csv(target_temp_filename, index=False)
                s3_hook.load_file(filename=target_temp_filename,
                                  bucket_name=bucket_name,
                                  key=target_object_key,
                                  replace=True)
                print(f'upload wikipedia pageviews extract data to s3: {target_temp_filename}, success!')
                # push key value via xcoms
                context["task_instance"].xcom_push(key="object_key", value=target_object_key)
            except Exception as e:
                raise AirflowException(f"upload wikipedia pageviews data to s3 fail:{e}")
    except Exception as e:
        raise AirflowException(f"download wikipedia pageviews data fail:{e}")

# task to load the data to database
def load_data(s3_conn_id, bucket_name, object_key_prefix, pg_conn_id, table_name, **context):
    # datetime elements for data partition
    year, month, day, hour, *_ = context['execution_date'].timetuple()
    # create s3 connection
    s3_hook = S3Hook(aws_conn_id=s3_conn_id)
    # retrieve object key from xcoms
    source_object_key = context["task_instance"].xcom_pull(task_ids="extract_data", key="object_key")
    # download s3 object to local temp file
    try:
        # 暫存檔案名
        source_temp_filename = s3_hook.download_file(bucket_name=bucket_name, key=source_object_key)
        print(f'download wikipedia pageviews count data to local: {source_temp_filename}, success!')
        # read wiki pageviews to dataframe
        print(f'read and extract:{source_temp_filename}!')
        # there are 4 columns: domain, title, view_count, response_size
        df = pd.read_csv(source_temp_filename)
        # keep columns: title & view_count
        df_selected = df[["title","view_count"]]
        # rename columns: title -> pagename, view_count -> pageviewcount,
        df_renamed = df_selected.rename(columns={'title':'pagename', 'view_count':'pageviewcount'})
        # add extra two columns: datetime, batch_id
        df_renamed['datetime'] = context['execution_date']
        batch_id = f'{year}{month:0>2}{day:0>2}-{hour:0>2}'
        print(f'batch_id: {batch_id}')
        df_renamed['batch_id'] = batch_id
        print(df_renamed.info())
        print(df_renamed.head())
        # create postgres connection
        pg_hook = PostgresHook(postgres_conn_id=pg_conn_id)
        # delete records for the same batch_id (so we can run this task again and again)
        pg_hook.run(f"DELETE FROM {table_name} WHERE batch_id='{batch_id}';")
        # get sqlalchemy engine instance
        sqlalchemy_engine = pg_hook.get_sqlalchemy_engine()
        try:
            # dump dataframe to sql table via pandas.to_sql() method
            df_renamed.to_sql(table_name, sqlalchemy_engine,
                              if_exists='append', index=False, chunksize=5000)
        except Exception as e:
            raise AirflowException(f"dump dataframe to table:{table_name} fail:{e}")
    except Exception as e:
        raise AirflowException(f"download wikipedia pageviews count data fail:{e}")



default_args = {
    'owner':'EMPLOYEE_ID', # owner是DAG的開發者, 例如: 員工8703147
}

dag = DAG(
    dag_id="deXX_05_pageviews_etl", # prefix必需是tenant id, 例如: de00
    description="dag to download wikipedia pageviews",
    start_date=dt.datetime(2019,7,1,0),
    schedule_interval="@hourly",
    end_date=dt.datetime(2019,7,1,3),
    catchup=True,
    max_active_runs=1,
    default_args=default_args,
    access_control={
        'deXX': {'can_read', 'can_edit'} # 設定DAG歸屬那個團隊[tenant id]與權限
    },
    tags=['de08'],
)

# task to download wikipedia pageviews data
task_get_data = PythonOperator(
    task_id='get_data',
    python_callable=get_data,
    op_kwargs={
        's3_conn_id': 'deXX_minio',
        'bucket_name': 'EMPLOYEE_ID', # 有英文字元要轉為小寫
        'object_key_prefix': 'de08'
    },
    dag=dag,
)

# task to unzip and extract data
task_extract_data = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    op_kwargs={
            's3_conn_id': 'deXX_minio',
            'bucket_name': 'EMPLOYEE_ID', # 有英文字元要轉為小寫
            'object_key_prefix': 'de08',
        },
    dag=dag,
)

# task to load pageview data into postgres db
task_load_data = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    op_kwargs={
            's3_conn_id': 'deXX_minio',
            'bucket_name': 'EMPLOYEE_ID',
            'object_key_prefix': 'de08',
            'pg_conn_id': 'deXX_postgres',
            'table_name': 'pageview_counts_staging'
        },
    dag=dag,
)


# Set dependencies between all tasks
task_get_data >> task_extract_data >> task_load_data