# Air Quality 

> We create a `{dagster}` job that loads air quality data from a public source and then turns on the air purifiers as necessary.

In [2]:
# | default_exp dagster.air_quality
# | export
import json
import requests

from urllib.parse import urljoin
from dagster import job, op, ScheduleDefinition
from os import environ

from tjhome.devices import TuyaAirPurifier

In [None]:
from nbdev.showdoc import show_doc

## Air Quality Source 

We fetch air quality data for the nearest location from the  [aqicn.org API](https://aqicn.org/json-api/doc/).

In [None]:
# | export
class AirQualityAPIClient:
    """
    Air Quality Index API Client

    [Documentation](https://aqicn.org/json-api/doc/)
    """

    base_url = "http://api.waqi.info/"

    def __init__(self, token: str):
        self.token = token

    def fetch_aqi_in_latlng(self, lat: float, lng: float):
        resp = requests.get(
            urljoin(self.base_url, f"/feed/geo:{lat};{lng}/"),
            params={"token": self.token},
        )
        return int(resp.json()["data"]["aqi"])

## Dagster Job 

We create a dagster job that performs the automation. 

In [None]:
# | export
@op
def current_aqi(context) -> int:
    "Fetch the current AQI from AQICN.org"
    client = AirQualityAPIClient(environ["AQI_API_TOKEN"])
    aqi = client.fetch_aqi_in_latlng(*json.loads(environ["HOME_LOCATION"]))
    context.log.info(f"AQI Index: {aqi}")
    return aqi

In [None]:
show_doc(current_aqi)

---

### current_aqi

>      current_aqi (context)

Fetch the current AQI from AQICN.org

In [None]:
# | export
@op
def air_purifier_control(
    context,
    current_aqi,  # Current AQI reading
    threshold=50,  # AQI Threshold above which to turn on air purifier
) -> None:
    """
    If the current AQI exceeds the defined threshold, we turn on the air purifier.
    Otherwise, we turn it off.
    """
    for device_id in json.loads(environ["AIR_PURIFIER_DEVICE_IDS"]):
        device = TuyaAirPurifier(
            device_id,
            environ["TUYA_CLIENT_ID"],
            environ["TUYA_CLIENT_SECRET"],
            "63",
            environ["TUYA_USERNAME"],
            environ["TUYA_PASSWORD"],
        )
        if current_aqi > threshold:
            context.log.info(f"AQI {threshold} exceeded. Turning on purifier...")
            device.turn_on()
        else:
            context.log.info(f"AQI {threshold} reached. Turning off purifier...")
            device.turn_off()
        return

In [None]:
show_doc(air_purifier_control)

---

### air_purifier_control

>      air_purifier_control (context, current_aqi, threshold=50)

If the current AQI exceeds the defined threshold, we turn on the air purifier.
Otherwise, we turn it off.

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| context |  |  |  |
| current_aqi |  |  | Current AQI reading |
| threshold | int | 50 | AQI Threshold above which to turn on air purifier |
| **Returns** | **None** |  |  |

In [None]:
# |export
@job
def regulate_air_quality():
    "Dagster Job regulates air quality"
    air_purifier_control(current_aqi())


regulate_air_quality_schedule = ScheduleDefinition(
    job=regulate_air_quality, cron_schedule="*/1 * * * *"
)

In [None]:
show_doc(regulate_air_quality)

---

### regulate_air_quality

>      regulate_air_quality ()

Dagster Job regulates air quality