In [1]:
import json
import requests
import logging
from datetime import datetime

from twilio.rest import Client

from airflow.models import DAG, Variable
from airflow.operators.python import PythonOperator



In [2]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [6]:
def fetch_weather_details(ti):
    app_id = "1c87f33f697c742522907b3afe815451"
    city = "Palghar"
    url = Variable.get('OPEN_WEATHER_MAP_PATH') + "?q=" + city + "&appid=" + app_id

    try:
        logger.info(f"Doing an API call to end_point {url}")
        response_text = requests.request("GET", url).text
        logger_info(f"Fetched API response {response_text}")
        response_dict = json.loads(response_text)
        weather_details = response_dict['main']
        logger.info(f"Data successfully pushed to airflow xcom {weather_details}")
    except Exception as e:
        logger.info(f"Error while fetching weather details with Exception {e}")

# ti.xcom_push(key = 'weather_data', value = weather_details)

def send_alert(ti):
    logger.info(f"Pulling airflow xcom")
    weather_details = ti.xcom_pull(tasks_id = 'fetch_weather_data', key = 'weather_data')
    logger.info(f"Successfully oulled airflow xcom {weather_details}")
    custom_message = f"Today's temperature is {int(weather_details['temp']-273)}"
    account_sid = "AC8770c12e4ea01bcc15a1580ebc5f5e8f"
    auth_key = "9161a52914b6a8bb2c809a5ea91bf688"

    try:
        logger.info(f"Doing twilio API call with custom message {custom_message}")
        client = client.message.create(
            body = custom_message,
            from_ = "+19793008098",
            to = "+917378427998"
        )
        logger.info(f"Successfully made API call")

    except Exception as e:
        logger.info(f"Failed to send weather alert")


with DAG(dag_id="weather_alert",
    default_args = {},
    schedule_interval = "0 10 * * *",
    start_date = datetime(2022, 12, 28, 10),
    catchup = False
    ) as dag:
    
    fetch_weather_data = PythonOperator(
        task_id = "fetch_weather_data",
        python_callable = fetch_weather_details,
        retries = 3
    )

    send_alert = PythonOperator(
        task_id = "send_alert",
        python_callable = send_alert,
        retries = 3
    )

In [None]:
# DAG sequence
# fetch_weather_data >> send_alert

In [None]:
# weather_details = ti.xcom_pull(tasks_id = 'fetch_weather_data', key = 'weather_data')