In [None]:
#pip3 install apache-airflow 
#설치한 후 airflow 폴더 위치를 기억해둔다.

#dag파일을 만들고 난 후 아래 명령어로 실행하여 airflow 브라우저에 접속한다.
'''
1. airflow db init
2. airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email addmin@exeample.org 
3. cp download_rocket_launches.py ~/airflow/dags/ # 여기서 중요한 건 해당 실행 명령어가 airflow 폴더 path어야 한다는 것!
4. airflow webserver # 웹서버와 / 스케줄 서버는 모두 각 다른 터미널에서 실행해야 함
5. airflow scheduler # 웹서버와 / 스케줄 서버는 모두 각 다른 터미널에서 실행해야 함
--> 최종 : http://localhost:8080 브라우저에서 이동 후 암호는 상기 username과 password 입력값을 입력하여 로그인한다.
https://github.com/K9Ns/data-pipelines-with-apache-airflow/blob/main/chapter02/dags/download_rocket_launches.py
'''

In [7]:
#pip3 install apache-airflow
import json 
import pathlib
import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

In [8]:
'''
클래스 = 개체 = 인스턴스 (성형=틀 같은 것)

DAG는 모든 워크플로의 시작점입니다. 
워크플로 내의 모든 태스크는 DAG 개체를 참조하므로 Airflow는 어떤 태스크가 어떤 DAG에 속하는지 확인할 수 있다.
태스크 = DAG개체 
'''

'\nDAG는 모든 워크플로의 시작점입니다. \n워크플로 내의 모든 태스크는 DAG 개체를 참조하므로 Airflow는 어떤 태스크가 어떤 DAG에 속하는지 확인할 수 있다.\n태스크 = DAG개체 \n'

In [11]:
def _get_pictures(): #파이썬 함수는 결괏값을 파싱하고 모든 로켓 사진을 다운로드
    # 경로가 존재하는지 확인
    pathlib.path("/tmp/images").mkdir(parents=True, exist_ok=True) #경로가 없으면 디렉터리 생성

    # launches.json파일에 있는 모든 그림 파일을 다운로드
    with open("/tmp/launches.json") as f: #이전 단계의 태스크 결과 확인
        launches = json.load(f) #데이터를 섞을 수 있도록 딕셔너리로 읽기
        image_urls = [launch["image"] for launch in launches["results"]] #모든 발사에 대한 'image'의 URL값 읽기
        print(image_urls)
        for image_url in image_urls: #모든 이미지 url을 얻기 위한 루프
            try:
                response = requests.get(image_url) #각각의 이미 다운로드
                image_filename = image_url.split("/")[-1] #마지막 파일 이름만 가져온다. 예: https://host/rocket/elec.jpg --> elec.jpg
                target_file = f"/tmp/images/{image_filename}" #타켓 파일 저장 경로 구성
                with open(target_file, "wb") as f: #타켓파일 핸들 열기
                    f.write(response.content) #각각의 이미지 저장 = 파일경로에 이미지 쓰기
                print(f"Downloaded {image_url} to {target_file}") # 결과출력 - Airflow 로그에 저장하기 위해 stdout으로 출력
            #잠재적인 에러 포착 및 처리
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

In [9]:
'''
[1] DAG 객체 선언
#객체의 인스턴스 생성(구체화) - 모든 워크플로의 시작점
dag는 DAG클래스를 구체화한 인스턴스의이름입니다.
인스턴스 이름은 임의로 지정하면 됩니다. 예를 들어 rocket_dag 또는 whatever_name_you_like로 지정할 수 있다.
모든 오퍼레이터는 변수(소문자dag)를 참조하여 인스턴스가 어떤 DAG에 속한 것인지 Airflow에게 알려 줍니다.
'''
dag = DAG(  
    dag_id = "download_rocket_launches", #Airflow UI에 표시되는 DAG 이름 
    start_date = airflow.utils.dates.days_ago(14), #워크플로가 처음 실행되는 날짜/시간
    schedule_interval = None, #자동으로 실행되지 않음을 의미 --> Airflow UI를 통해 수동으로 실행, 2.4절에서 예약으로 실행하는 방법
)

'\ndag는 DAG클래스를 구체화한 인스턴스의이름입니다.\n인스턴스 이름은 임의로 지정하면 됩니다. 예를 들어 rocket_dag 또는 whatever_name_you_like로 지정할 수 있다.\n모든 오퍼레이터는 변수(소문자dag)를 참조하여 인스턴스가 어떤 DAG에 속한 것인지 Airflow에게 알려 줍니다.\n'

In [None]:
'''
[2] 태스크와 오퍼레이터 
각 오퍼레이터는 하나의 태스크를 수행하고 여러 개의 오퍼레이터가 Airflow의 워크플로 또는 DAG를 구성합니다. 
오퍼레이터는 서로 독립적으로 실행할 수 있지만, 순서를 정의해 실행 할수 있다.
'''

In [None]:
'''
태스크 목적 : 목적에 따라 bash/python/email/http를 구분하기 위함 (예: 데이터를 가져오는 작업)

태스크 name : 임의지정
실행  type : BashOperator
task_id = 태스크 명칭
bash_command = bash 명령어
dag : dag 객체명
'''

In [None]:
'''
태스크 목적 : 목적에 따라 bash/python/email/http를 구분하기 위함 (예: 데이터를 가져오는 작업)

태스크 name : 임의지정
실행  type : PythonOperator
task_id = 태스크 명칭
python_callable = 실행할 파이썬 함수명
dag : dag 객체명
'''

In [10]:
download_launches = BashOperator(
    task_id = "download_launches", #태스크 이름
    bash_command = "curl -o /tmp/launches.json 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", #실행할 배시 커맨드
    dag = dag, #DAG 변수에 대한 참조
)

In [13]:
get_pictures = PythonOperator(
    task_id = "get_pictures",
    python_callable = _get_pictures,
    dag = dag,
)

In [14]:
notify = BashOperator(
    task_id = "notify",
    bash_command = 'echo "there are no $(ls /tmp/images/ | wc -l) images."',
    dag = dag,
)

In [None]:
download_launches >> get_pictures >> notify # 태스크 실행 순서 설정