From ee016dd57a015382342ef54e94f5e481cde1071b Mon Sep 17 00:00:00 2001 From: Sung Yun Byeon Date: Mon, 5 Feb 2024 15:36:05 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Airflow=20=EC=A3=BC=EC=84=9D=20?= =?UTF-8?q?=ED=8F=AC=ED=95=A8=ED=95=9C=20=ED=8C=8C=EC=9D=BC=20(#183)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dags/01-bash-operator.py | 36 ++++--- .../dags/02-python-operator.py | 28 +++--- .../dags/03-python-operator-with-context.py | 70 +++++++------- .../dags/04-python-operator-with-jinja.py | 46 ++++----- .../05-python-operator-with-slack-noti.py | 37 +++----- .../dags/06-simple_elt.py | 94 +++++++++---------- 01-batch-serving(airflow)/dags/hello_world.py | 36 ++----- .../data/bike_schema.json | 52 ++++++++++ 8 files changed, 208 insertions(+), 191 deletions(-) create mode 100644 01-batch-serving(airflow)/data/bike_schema.json diff --git a/01-batch-serving(airflow)/dags/01-bash-operator.py b/01-batch-serving(airflow)/dags/01-bash-operator.py index 41138054..5687b525 100644 --- a/01-batch-serving(airflow)/dags/01-bash-operator.py +++ b/01-batch-serving(airflow)/dags/01-bash-operator.py @@ -3,32 +3,27 @@ from datetime import datetime, timedelta default_args = { - 'owner': 'kyle', - 'depends_on_past': False, # 이전 DAG의 Task가 성공, 실패 여부에 따라 현재 DAG 실행 여부가 결정. False는 과거의 실행 결과 상관없이 매일 실행한다 - 'start_date': datetime(2024, 1, 1), - 'retires': 1, # 실패시 재시도 횟수 - 'retry_delay': timedelta(minutes=5) # 만약 실패하면 5분 뒤 재실행 - # 'priority_weight': 10 # DAG의 우선 순위를 설정할 수 있음 - # 'execution_timeout': timedelta(seconds=300), # 실행 타임아웃 : 300초 넘게 실행되면 종료 + "owner": "kyle", + "depends_on_past": False, # 이전 DAG의 Task 성공 여부에 따라서 현재 Task를 실행할지 말지가 결정. False는 과거 Task의 성공 여부와 상관없이 실행 + "start_date": datetime(2024, 1, 1) } -# with 구문으로 DAG 정의 with DAG( - dag_id='bash_dag', - default_args=default_args, - schedule_interval='@once', # 1번 실행 - tags=['my_dags'] + dag_id="bash_dag", + default_args=default_args, + schedule_interval="@once", + tags=["my_dags"] ) as dag: - # BashOperator 사용 + task1 = BashOperator( - task_id='print_date', # task의 id - bash_command='date' # 실행할 bash command + task_id="print_date", # task의 id + bash_command="date" # 실행할 bash command를 저장 ) task2 = BashOperator( - task_id='sleep', - bash_command='sleep 5', - retries=2 # 만약 bash command가 실패하면 2번 재시도한다 + task_id="sleep", + bash_command="sleep 5", + retries=2 # 만약 bash command가 실패하면 2번 재시도 ) task3 = BashOperator( @@ -36,5 +31,6 @@ bash_command='pwd' ) - task1 >> task2 # task1 이후에 task2 실행 - task1 >> task3 # task1 이후에 task3 실행(2와 3을 병렬로 실행) \ No newline at end of file + task1 >> task2 # task1이 완료되면, task2를 실행 + task1 >> task3 # task1이 완료되면, task3을 실행 + # task1 >> [task2, task3] \ No newline at end of file diff --git a/01-batch-serving(airflow)/dags/02-python-operator.py b/01-batch-serving(airflow)/dags/02-python-operator.py index 2e5d9419..c8447fc2 100644 --- a/01-batch-serving(airflow)/dags/02-python-operator.py +++ b/01-batch-serving(airflow)/dags/02-python-operator.py @@ -3,16 +3,14 @@ from datetime import datetime, timedelta default_args = { - 'owner': 'kyle', - 'depends_on_past': False, # 이전 DAG의 Task가 성공, 실패 여부에 따라 현재 DAG 실행 여부가 결정. False는 과거의 실행 결과 상관없이 매일 실행한다 - 'start_date': datetime(2024, 1, 1), - 'end_date': datetime(2024, 1, 4), - 'retires': 1, # 실패시 재시도 횟수 + "owner": "kyle", + "depends_on_past": False, # 이전 DAG의 Task 성공 여부에 따라서 현재 Task를 실행할지 말지가 결정. False는 과거 Task의 성공 여부와 상관없이 실행 + "start_date": datetime(2024, 1, 1), + "end_date": datetime(2024, 1, 4), + 'retries': 1, # 실패시 재시도 횟수 'retry_delay': timedelta(minutes=5) # 만약 실패하면 5분 뒤 재실행 } - -# 사용할 함수 정의 def print_current_date(): date_kor = ["월", "화", "수", "목", "금", "토", "일"] date_now = datetime.now().date() @@ -20,16 +18,18 @@ def print_current_date(): print(f"{date_now}는 {date_kor[datetime_weeknum]}요일입니다") -# with 구문으로 DAG 정의 with DAG( - dag_id='python_dag1', - default_args=default_args, - schedule_interval='30 0 * * *', # UTC 시간 기준 0시 30분에 Daily로 실행하겠다! 한국 시간 기준 오전 9시 30분 - tags=['my_dags'] + dag_id="python_dag1", + default_args=default_args, + schedule_interval="30 0 * * *", # UTC 시간 기준으로 매일 0시 30분에 실행하겠다. 한국 시간으로 9시 30분에 실행! + tags=['my_dags'], + catchup=True ) as dag: + python_task = PythonOperator( - task_id='print_current_date', - python_callable=print_current_date # 실행할 python 함수 + task_id="print_current_date", + python_callable=print_current_date ) python_task + \ No newline at end of file diff --git a/01-batch-serving(airflow)/dags/03-python-operator-with-context.py b/01-batch-serving(airflow)/dags/03-python-operator-with-context.py index ecb8af06..cade890e 100644 --- a/01-batch-serving(airflow)/dags/03-python-operator-with-context.py +++ b/01-batch-serving(airflow)/dags/03-python-operator-with-context.py @@ -2,39 +2,34 @@ from airflow.operators.python import PythonOperator from datetime import datetime, timedelta -# 앞선 02-python-operator.py는 "date_now = datetime.now().date()"를 사용하기 때문에 -# 언제 실행해도 실행하는 시간 기준으로 실행됨 -# Airflow는 Batch 성으로 특정 시간대로 실행하는 도구인데, 위와 같이 now 등을 사용하면 실행하는 시점 기준으로 실행이 됩니다(원래 기대했던 실행 시점이 아닌, 동작 시점) -# Airflow는 항상 현재 최신 작업만 실행하는 것은 아니고, 과거 날짜를 실행해야 하는 경우도 있음(Backfill이란 용어 사용) -# 따라서 코드 상에서 now(), SQL 상에서 current_date() 등을 사용하지 않고, Airflow에서 실행하기로 했던 시간을 넣어줘야 합니다 -# execution_date라고 부르다가 logical_date로 수정함 +# 앞선 02-python-operator.py는 "date_now = datetime.now().date()"를 사용했기 때문에 +# 언제 실행해도 우리가 실행하는 시간 기준으로 실행됨 +# Airflow Batch성으로 특정 시간대로 실행하는 도구. now 등을 잘 쓰지 않음. 의도한 시간, 날짜 주입해서 사용 +# Airflow로 과거 날짜로 실행해야 하는 경우도 존재. 과거 데이터 마이그레이션 +# 코드 상에서 now(), SQL 상에서도 current_date() 사용하지 않고, Airflow에서 실행하기로 했던 시간을 넣어줘야 함 +# execution_date, logical_date +# 멱등성 : 연산을 여러 번 적용하더라도 결과가 달라지지 않는 성질 default_args = { - 'owner': 'kyle', - 'depends_on_past': False, # 이전 DAG의 Task가 성공, 실패 여부에 따라 현재 DAG 실행 여부가 결정. False는 과거의 실행 결과 상관없이 매일 실행한다 - 'start_date': datetime(2024, 1, 1), - 'end_date': datetime(2024, 1, 4), - 'retires': 1, # 실패시 재시도 횟수 - 'retry_delay': timedelta(minutes=5) # 만약 실패하면 5분 뒤 재실행 + "owner": "kyle", + "depends_on_past": False, # 이전 DAG의 Task 성공 여부에 따라서 현재 Task를 실행할지 말지가 결정. False는 과거 Task의 성공 여부와 상관없이 실행 + "start_date": datetime(2024, 1, 1), + "end_date": datetime(2024, 1, 4) } - -def print_current_date_with_context_variable(*args, **kwargs): +def print_current_date_with_context(*args, **kwargs): """ - {'conf': , - 'dag': , - 'dag_run': , - 'data_interval_end': DateTime(2024, 1, 1, 0, 30, 0, tzinfo=Timezone('UTC')), - 'data_interval_start': DateTime(2024, 1, 2, 0, 30, 0, tzinfo=Timezone('UTC')), - 'ds': '2024-01-01', - 'ds_nodash': '20240101', - 'next_execution_date': .deprecated_proxy..deprecated_func at 0x108654b70>> - } + kwargs: {'conf': , + 'dag': , + 'dag_run': , + 'data_interval_end': DateTime(2024, 1, 2, 0, 30, 0, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2024, 1, 1, 0, 30, 0, tzinfo=Timezone('UTC')), + 'ds': '2024-01-01', 'ds_nodash': '20240101', 'execution_date': ._deprecated_proxy_factory at 0x110a40670>, 'execution_date', DateTime(2024, 1, 1, 0, 30, 0, tzinfo=Timezone('UTC')))>, 'expanded_ti_count': None, 'inlets': [], 'logical_date': DateTime(2024, 1, 1, 0, 30, 0, tzinfo=Timezone('UTC')), 'macros': , 'next_ds': ._deprecated_proxy_factory at 0x110a40670>, 'next_ds', '2024-01-02')>, 'next_ds_nodash': ._deprecated_proxy_factory at 0x110a40670>, 'next_ds_nodash', '20240102')>, 'next_execution_date': ._deprecated_proxy_factory at 0x110a40670>, 'next_execution_date', DateTime(2024, 1, 2, 0, 30, 0, tzinfo=Timezone('UTC')))>, 'outlets': [], 'params': {}, 'prev_data_interval_start_success': None, 'prev_data_interval_end_success': None, 'prev_ds': ._deprecated_proxy_factory at 0x110a40670>, 'prev_ds', '2023-12-31')>, 'prev_ds_nodash': ._deprecated_proxy_factory at 0x110a40670>, 'prev_ds_nodash', '20231231')>, 'prev_execution_date': ._deprecated_proxy_factory at 0x110a40670>, 'prev_execution_date', DateTime(2023, 12, 31, 0, 30, 0, tzinfo=Timezone('UTC')))>, 'prev_execution_date_success': ._deprecated_proxy_factory at 0x110a40670>, 'prev_execution_date_success', None)>, 'prev_start_date_success': None, 'run_id': 'scheduled__2024-01-01T00:30:00+00:00', 'task': , 'task_instance': , 'task_instance_key_str': 'python_dag_with_context__print_current_date_with_context__20240101', 'test_mode': False, 'ti': , 'tomorrow_ds': ._deprecated_proxy_factory at 0x110a40670>, 'tomorrow_ds', '2024-01-02')>, 'tomorrow_ds_nodash': ._deprecated_proxy_factory at 0x110a40670>, 'tomorrow_ds_nodash', '20240102')>, 'triggering_dataset_events': .get_triggering_events at 0x110a8fca0>>, 'ts': '2024-01-01T00:30:00+00:00', 'ts_nodash': '20240101T003000', 'ts_nodash_with_tz': '20240101T003000+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': ._deprecated_proxy_factory at 0x110a40670>, 'yesterday_ds', '2023-12-31')>, 'yesterday_ds_nodash': ._deprecated_proxy_factory at 0x110a40670>, 'yesterday_ds_nodash', '20231231')>, 'templates_dict': None} """ - print(f"kwargs : {kwargs}") - execution_date = kwargs['ds'] - execution_date_nodash = kwargs['ds_nodash'] + print(f"kwargs: {kwargs}") + execution_date = kwargs["ds"] + execution_date_nodash = kwargs["ds_nodash"] print(f"execution_date_nodash : {execution_date_nodash}") execution_date = datetime.strptime(execution_date, "%Y-%m-%d").date() date_kor = ["월", "화", "수", "목", "금", "토", "일"] @@ -42,17 +37,16 @@ def print_current_date_with_context_variable(*args, **kwargs): print(f"{execution_date}는 {date_kor[datetime_weeknum]}요일입니다") -# with 구문으로 DAG 정의 + with DAG( - dag_id='python_dag_with_context', - default_args=default_args, - schedule_interval='30 0 * * *', - tags=['my_dags'] + dag_id="python_dag_with_context", + default_args=default_args, + schedule_interval="30 0 * * *", + tags=['my_dags'], + catchup=True ) as dag: + PythonOperator( - task_id='print_current_date_with_context_variable', - python_callable=print_current_date_with_context_variable, - provide_context=True # True일 경우에 Airflow Task Instance의 Attribute를 Keyword Argument로 Python 함수에서 사용할 수 있음 - ) - - # task가 1개일 경우엔 순서를 명시하지 않아도 실행 + task_id="print_current_date_with_context", + python_callable=print_current_date_with_context + ) \ No newline at end of file diff --git a/01-batch-serving(airflow)/dags/04-python-operator-with-jinja.py b/01-batch-serving(airflow)/dags/04-python-operator-with-jinja.py index b56d3113..704c3d2d 100644 --- a/01-batch-serving(airflow)/dags/04-python-operator-with-jinja.py +++ b/01-batch-serving(airflow)/dags/04-python-operator-with-jinja.py @@ -2,45 +2,45 @@ from airflow.operators.python import PythonOperator from datetime import datetime, timedelta -# 앞의 03-python-operator-with-context는 provide_context=True 옵션을 주고 Attribute에 접근 -# 이 방식이 아닌 Airflow의 Template 방식을 사용. Jinja Template => Flask 자주 활용되는 템플릿 -# Python에서는 Template이랑 provide_context=True와 큰 차이를 못 느낄 수도 있지만, SQL Opearator나 다른 오퍼레이터에선 유용하게 사용됨(템플릿) -# 쿼리문(WHERE절)에 Airflow의 execution_date를 인자로 넣고 실행 -# Jinja Template : Airflow의 미리 정의된 템플릿. {{ ds }}, {{ ds_nodash }} 라고 정의 -# Airflow Operator에 넘겨주면 실행 과정에서 템플릿 기반으로 값이 업데이트됨 - +# 앞의 03-python-operator-with-context는 kwargs로 여러 정보를 같이 주입. ds, ds_nodash +# Jinja Template 사용하면 ds를 kwargs['ds']. {{ ds }} +# Flask, FastAPI에서도 자주 사용 +# Python에서는 kwargs로 접근하시면 빠르게 가능. SQL. 쿼리문에서 WHERE 조건에 exeuction_date="2024-01-01" +# {{ ds }}, {{ ds_nodash }} +# Airflow의 Operator에 template_fields 여기에 있는 값들은 Airflow가 실행 과정에서 {{ ds }} => "2024-01-01" 변환 default_args = { - 'owner': 'kyle', - 'depends_on_past': False, # 이전 DAG의 Task가 성공, 실패 여부에 따라 현재 DAG 실행 여부가 결정. False는 과거의 실행 결과 상관없이 매일 실행한다 - 'start_date': datetime(2024, 1, 1), - 'end_date': datetime(2024, 1, 4), - 'retires': 1, # 실패시 재시도 횟수 - 'retry_delay': timedelta(minutes=5) # 만약 실패하면 5분 뒤 재실행 + "owner": "kyle", + "depends_on_past": False, # 이전 DAG의 Task 성공 여부에 따라서 현재 Task를 실행할지 말지가 결정. False는 과거 Task의 성공 여부와 상관없이 실행 + "start_date": datetime(2024, 1, 1), + "end_date": datetime(2024, 1, 4) } -# 사용할 함수 정의 def print_current_date_with_jinja(date): execution_date = datetime.strptime(date, "%Y-%m-%d").date() date_kor = ["월", "화", "수", "목", "금", "토", "일"] datetime_weeknum = execution_date.weekday() print(f"{execution_date}는 {date_kor[datetime_weeknum]}요일입니다") + return execution_date -# with 구문으로 DAG 정의 with DAG( - dag_id='python_dag_with_jinja', - default_args=default_args, - schedule_interval='30 0 * * *', - tags=['my_dags'] + dag_id="python_dag_with_jinja", + default_args=default_args, + schedule_interval="30 0 * * *", + tags=['my_dags'], + catchup=True ) as dag: - execution_date = "{{ ds }}" # Template 정의 + execution_date = "{{ ds }}" python_task_jinja = PythonOperator( - task_id='print_current_date_with_jinja', + task_id="print_current_date_with_jinja", python_callable=print_current_date_with_jinja, - op_args=[execution_date] + # op_args=[execution_date] + op_kwargs = { + "date": execution_date + } ) - python_task_jinja + python_task_jinja \ No newline at end of file diff --git a/01-batch-serving(airflow)/dags/05-python-operator-with-slack-noti.py b/01-batch-serving(airflow)/dags/05-python-operator-with-slack-noti.py index 8d15799b..571594ba 100644 --- a/01-batch-serving(airflow)/dags/05-python-operator-with-slack-noti.py +++ b/01-batch-serving(airflow)/dags/05-python-operator-with-slack-noti.py @@ -1,41 +1,34 @@ -# slack_notifier에 선언한 webhook 전송 함수를 활용하여 slack 알림을 제공하는 예제 +# slack_notifier에 선언한 webhook 전송 함수를 활용해 slack 알림을 제공하는 예제 from airflow import DAG from airflow.operators.python import PythonOperator -from datetime import datetime, timedelta +from datetime import datetime from airflow.exceptions import AirflowFailException from utils.slack_notifier import task_fail_slack_alert, task_succ_slack_alert default_args = { - 'owner': 'kyle', - 'depends_on_past': False, - 'start_date': datetime(2024, 1, 1), - 'end_date': datetime(2024, 1, 4), - 'retires': 1, - 'retry_delay': timedelta(minutes=5), + "owner": "kyle", + "depends_on_past": False, # 이전 DAG의 Task 성공 여부에 따라서 현재 Task를 실행할지 말지가 결정. False는 과거 Task의 성공 여부와 상관없이 실행 + "start_date": datetime(2024, 1, 1), + "end_date": datetime(2024, 1, 4) } - def _handle_job_error() -> None: raise AirflowFailException("Raise Exception.") - with DAG( - dag_id='python_dag_with_slack_webhook', + dag_id="python_dag_with_slack_webhook", default_args=default_args, - schedule_interval='30 0 * * *', - tags=['my_dags'], - catchup=False, - on_failure_callback=task_fail_slack_alert, - # on_success_callback=task_succ_slack_alert # 성공 알림 필요 시 추가 + schedule_interval="30 0 * * * ", + tags=["my_dags"], + catchup=True, + on_failure_callback=task_fail_slack_alert ) as dag: - execution_date = "{{ ds }}" - + send_slack_noti = PythonOperator( - task_id='raise_exception_and_send_slack_noti', - python_callable=_handle_job_error, - op_args=[execution_date] + task_id="raise_exception_and_send_slack_noti", + python_callable=_handle_job_error ) - send_slack_noti \ No newline at end of file + send_slack_noti diff --git a/01-batch-serving(airflow)/dags/06-simple_elt.py b/01-batch-serving(airflow)/dags/06-simple_elt.py index c9ba6f6c..fe72c8d2 100644 --- a/01-batch-serving(airflow)/dags/06-simple_elt.py +++ b/01-batch-serving(airflow)/dags/06-simple_elt.py @@ -1,36 +1,35 @@ -# ELT 파이프라인을 개발합니다(Extract - Load - Transfer) +# ELT 파이프라인을 개발합니다(Extract - Load - Trasnfer). 데이터 엔지니어링을 진행할 때, ELT / ETL # 1) Extract : 데이터를 추출하는 과정. 여기선 Cloud Storage에 업로드(현업에선 Database에서 추출) -# 2) Load : Cloud Storage의 Bucket에 Data 저장된 것을 데이터 웨어하우스인 BigQuery로 저장 +# 2) Load : Cloud Stroage의 Bucket에 Data가 저장. 저장된 Data를 데이터 웨어하우스인 BigQuery로 저장 # 3) Transform : Load로 저장된 데이터를 Transform. BigQuery 쿼리로 데이터를 처리 -# pip install apache-airflow-providers-google==10.14.0 +# pip install apache-airflow-providers-google from airflow import DAG from datetime import datetime, timedelta from pathlib import Path -from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator -from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator -from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator # GCS 데이터를 BigQuery로 옮김 +from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator # BigQuery에서 Query를 실행 +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator # Local에서 GCS로 데이터를 옮김 from utils.slack_notifier import task_fail_slack_alert, task_succ_slack_alert -execution_date = '{{ ds_nodash }}' #20240101 -execution_date_with_slash = "{{ macros.ds_format(ds, '%Y-%m-%d', '%Y/%m/%d') }}" # 2024/01/01 +execution_date = "{{ ds_nodash }}" # 20240101 -# 아래 2개는 여러분들의 Google Cloud Project, Bucket 입력 PROJECT_ID = "boostcamp-ai-tech-serving" BUCKET_NAME = "boostcamp-ai-tech-gcs" -FILE_NAME = f"bike_data_{execution_date}.csv" -LOCAL_FILE_PATH = str(Path(__file__).parent.parent / "data" / FILE_NAME) # 파일의 할아버지(dags -> 01-batch-serving(airflow))의 data 폴더 +FILE_NAME = f"bike_data_{execution_date}.csv" # bike_data_20240101.csv +LOCAL_FILE_PATH = str(Path(__file__).parent.parent / "data" / FILE_NAME) -GCS_PATH = f"{execution_date_with_slash}/bike_data.csv" +GCS_PATH = f"{execution_date}/bike_data.csv" +# 현업에서 데이터를 저장할 때, 날짜로 구분을 많이 함 +# 2024/01/01/bike_data.csv +# 2024/01/02/bike_data.csv default_args = { - 'owner': 'kyle', - 'depends_on_past': False, - 'start_date': datetime(2024, 1, 1), - 'end_date': datetime(2024, 1, 4), - 'retires': 1, - 'retry_delay': timedelta(minutes=5), + "owner": "kyle", + "depends_on_past": False, + "start_date": datetime(2024, 1, 1), + "end_date": datetime(2024, 1, 4) } schema_fields = [ @@ -86,58 +85,57 @@ } ] - with DAG( - dag_id='simple_elt_pipeline', + dag_id="simpel_elt_pipeline", default_args=default_args, - schedule_interval='30 0 * * *', - tags=['my_dags'], - catchup=False, - on_failure_callback=task_fail_slack_alert, - # on_success_callback=task_succ_slack_alert # 성공 알림 필요 시 추가 + schedule_interval="30 0 * * *", + tags=["my_dags"], + catchup=True ) as dag: - + + # 1) Extract : Local To GCS extract_data = LocalFilesystemToGCSOperator( task_id="extract_data", src=LOCAL_FILE_PATH, - bucket=BUCKET_NAME, - dst=GCS_PATH + dst=GCS_PATH, + bucket=BUCKET_NAME ) - - + + # 2) Load : GCS To BigQuery load_csv = GCSToBigQueryOperator( task_id="gcs_to_bigquery", - gcp_conn_id="google_cloud_default", - bucket=f'{BUCKET_NAME}', - source_objects=[GCS_PATH], + bucket=BUCKET_NAME, + source_objects=[GCS_PATH], + destination_project_dataset_table=f"{PROJECT_ID}.temp.bike_{execution_date}", # temp.bike_20240101 schema_fields=schema_fields, source_format='CSV', skip_leading_rows=1, - create_disposition='CREATE_IF_NEEDED', - destination_project_dataset_table=f'{PROJECT_ID}.temp.bike_{execution_date}', # temp dataset 생성 필요 - write_disposition='WRITE_TRUNCATE' + create_disposition="CREATE_IF_NEEDED", + write_disposition="WRITE_TRUNCATE", + location="US" ) + # 3) Transform : BigQuery에서 Query 실행해서 다시 BigQuery에 저장 # dummy_date 별 COUNT sql_query = f""" - SELECT - dummy_date, - start_station_id, - end_station_id, - COUNT(bikeid) as cnt + SELECT + dummy_date, + start_station_id, + end_station_id, + COUNT(bikeid) AS cnt FROM `{PROJECT_ID}.temp.bike_{execution_date}` - GROUP BY - dummy_date, - start_station_id, + GROUP BY + dummy_date, + start_station_id, end_station_id """ - transform = BigQueryExecuteQueryOperator( - task_id='run_query', + task_id="run_query", sql=sql_query, use_legacy_sql=False, - write_disposition='WRITE_TRUNCATE', - destination_dataset_table=f"temp.bike_agg_{execution_date}" + allow_large_results=True, + write_disposition="WRITE_TRUNCATE", + destination_dataset_table=f"{PROJECT_ID}.temp.bike_agg" ) extract_data >> load_csv >> transform \ No newline at end of file diff --git a/01-batch-serving(airflow)/dags/hello_world.py b/01-batch-serving(airflow)/dags/hello_world.py index f29cbad9..1aa48841 100644 --- a/01-batch-serving(airflow)/dags/hello_world.py +++ b/01-batch-serving(airflow)/dags/hello_world.py @@ -1,5 +1,3 @@ -# hello_world.py - from datetime import timedelta from airflow import DAG @@ -8,40 +6,26 @@ from airflow.operators.python import PythonOperator -def print_world() -> None: - print("world") - +def print_world(): + print("World") -# with 구문으로 DAG 정의를 시작합니다. with DAG( - dag_id="hello_world", # DAG의 식별자용 아이디입니다. - description="My First DAG", # DAG에 대해 설명합니다. - start_date=days_ago(2), # DAG 정의 기준 2일 전부터 시작합니다. - schedule_interval="0 6 * * *", # 매일 06:00에 실행합니다. - tags=["my_dags"], # 태그 목록을 정의합니다. 추후에 DAG을 검색하는데 용이합니다. + dag_id="Hello_world", + description="My First DAG", + start_date=days_ago(2), + schedule_interval="0 6 * * *", # cron 표현식. 매일 오전 6시 0분에 실행하겠다 (UTC). 한국은 UTC+9. 한국 시간으로는 6+9=15 + tags=["my_dags"] ) as dag: - - # 테스크를 정의합니다. - # bash 커맨드로 echo hello 를 실행합니다. + t1 = BashOperator( task_id="print_hello", bash_command="echo Hello", - owner="heumsi", # 이 작업의 오너입니다. 보통 작업을 담당하는 사람 이름을 넣습니다. - retries=3, # 이 테스크가 실패한 경우, 3번 재시도 합니다. - retry_delay=timedelta(minutes=5), # 재시도하는 시간 간격은 5분입니다. + owner="heumsi" ) - # 테스크를 정의합니다. - # python 함수인 print_world를 실행합니다. t2 = PythonOperator( task_id="print_world", - python_callable=print_world, - depends_on_past=True, - owner="heumsi", - retries=3, - retry_delay=timedelta(minutes=5), + python_callable=print_world ) - # 테스크 순서를 정합니다. - # t1 실행 후 t2를 실행합니다. t1 >> t2 \ No newline at end of file diff --git a/01-batch-serving(airflow)/data/bike_schema.json b/01-batch-serving(airflow)/data/bike_schema.json new file mode 100644 index 00000000..bf7675f1 --- /dev/null +++ b/01-batch-serving(airflow)/data/bike_schema.json @@ -0,0 +1,52 @@ +[ + { + "mode": "NULLABLE", + "name": "trip_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "subscriber_type", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "bikeid", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "start_time", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "start_station_id", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "start_station_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "end_station_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "end_station_name", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "duration_minutes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "dummy_date", + "type": "DATE" + } +]