In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, greatest
from pyspark.sql.types import IntegerType

In [2]:
spark = SparkSession.builder \
    .appName("Write AQI JSON from MinIO to Postgres") \
    .getOrCreate()

In [8]:
ow_raw = spark.read.json("data/openweather.json")
ow = ow_raw.select(
    col("coord.lat").alias("lat"),
    col("coord.lon").alias("lon"),
    col("list")[0]["components"]["pm2_5"].alias("pm25"),
    col("list")[0]["components"]["pm10"].alias("pm10"),
    col("list")[0]["components"]["co"].alias("co"),
    col("list")[0]["components"]["no2"].alias("no2"),
    col("list")[0]["components"]["o3"].alias("o3"),
    col("list")[0]["components"]["so2"].alias("so2")
)

In [9]:
# CO in ppm
ow = ow.withColumn("co", col("co") * 24.45 / (28.01 * 1000))
# NO2 in ppb
ow = ow.withColumn("no2", col("no2") * 24.45 / 46.01)
# SO2 in ppb
ow = ow.withColumn("so2", col("so2") * 24.45 / 64.07)
# O3 in ppb
ow = ow.withColumn("o3", col("o3") * 24.45 / 48.00)


In [10]:
def aqi_pm25(c): return (
    when((c >= 0) & (c <= 12.0),   ((c - 0)    * (50 - 0)    / (12.0 - 0)    + 0))  .
    when((c > 12.0) & (c <= 35.4), ((c - 12.1) * (100 - 51)  / (35.4 - 12.1) + 51)) .
    when((c > 35.4) & (c <= 55.4), ((c - 35.5) * (150 - 101) / (55.4 - 35.5) + 101)).
    when((c > 55.4) & (c <= 150.4),((c - 55.5) * (200 - 151) / (150.4 - 55.5)+ 151)).
    when((c > 150.4) & (c <= 250.4),((c - 150.5)*(300 - 201)/(250.4 - 150.5)+ 201)).
    when((c > 250.4) & (c <= 350.4),((c - 250.5)*(400 - 301)/(350.4 - 250.5)+ 301)).
    when((c > 350.4) & (c <= 500.4),((c - 350.5)*(500 - 401)/(500.4 - 350.5)+ 401))
)

def aqi_pm10(c): return (
    when((c >= 0) & (c <= 54),     ((c - 0)    * (50 - 0)    / (54 - 0)      + 0))  .
    when((c > 54) & (c <= 154),    ((c - 55)   * (100 - 51)  / (154 - 55)    + 51)) .
    when((c > 154) & (c <= 254),   ((c - 155)  * (150 - 101) / (254 - 155)   + 101)).
    when((c > 254) & (c <= 354),   ((c - 255)  * (200 - 151) / (354 - 255)   + 151)).
    when((c > 354) & (c <= 424),   ((c - 355)  * (300 - 201) / (424 - 355)   + 201)).
    when((c > 424) & (c <= 504),   ((c - 425)  * (400 - 301) / (504 - 425)   + 301)).
    when((c > 504) & (c <= 604),   ((c - 505)  * (500 - 401) / (604 - 505)   + 401))
)

def aqi_co(c): return (
    when((c >= 0.0) & (c <= 4.4),    ((c - 0.0)   * (50 - 0)   / (4.4 - 0.0)   + 0)).
    when((c > 4.4) & (c <= 9.4),     ((c - 4.5)   * (100 - 51) / (9.4 - 4.5)   + 51)).
    when((c > 9.4) & (c <= 12.4),    ((c - 9.5)   * (150 - 101)/ (12.4 - 9.5)  + 101)).
    when((c > 12.4) & (c <= 15.4),   ((c - 12.5)  * (200 - 151)/ (15.4 - 12.5) + 151)).
    when((c > 15.4) & (c <= 30.4),   ((c - 15.5)  * (300 - 201)/ (30.4 - 15.5) + 201)).
    when((c > 30.4) & (c <= 40.4),   ((c - 30.5)  * (400 - 301)/ (40.4 - 30.5) + 301)).
    when((c > 40.4) & (c <= 50.4),   ((c - 40.5)  * (500 - 401)/ (50.4 - 40.5) + 401))
)

def aqi_no2(c): return (
    when((c >= 0) & (c <= 53),     ((c - 0)    * (50 - 0)    / (53 - 0)     + 0))  .
    when((c > 53) & (c <= 100),    ((c - 54)   * (100 - 51)  / (100 - 54)   + 51)) .
    when((c > 100) & (c <= 360),   ((c - 101)  * (150 - 101) / (360 - 101)  + 101)).
    when((c > 360) & (c <= 649),   ((c - 361)  * (200 - 151) / (649 - 361)  + 151)).
    when((c > 649) & (c <= 1249),  ((c - 650)  * (300 - 201) / (1249 - 650) + 201)).
    when((c > 1249) & (c <= 1649), ((c - 1250) * (400 - 301) / (1649 - 1250)+ 301)).
    when((c > 1649) & (c <= 2049), ((c - 1650) * (500 - 401) / (2049 - 1650)+ 401))
)

def aqi_o3(c): return (
    when((c >= 0) & (c <= 54),     ((c - 0)    * (50 - 0)    / (54 - 0)     + 0))  .
    when((c > 54) & (c <= 70),     ((c - 55)   * (100 - 51)  / (70 - 55)    + 51)) .
    when((c > 70) & (c <= 85),     ((c - 71)   * (150 - 101) / (85 - 71)    + 101)).
    when((c > 85) & (c <= 105),    ((c - 86)   * (200 - 151) / (105 - 86)   + 151)).
    when((c > 105) & (c <= 200),   ((c - 106)  * (300 - 201) / (200 - 106)  + 201))
)

def aqi_so2(c): return (
    when((c >= 0) & (c <= 35),     ((c - 0)    * (50 - 0)    / (35 - 0)     + 0))  .
    when((c > 35) & (c <= 75),     ((c - 36)   * (100 - 51)  / (75 - 36)    + 51)) .
    when((c > 75) & (c <= 185),    ((c - 76)   * (150 - 101) / (185 - 76)   + 101)).
    when((c > 185) & (c <= 304),   ((c - 186)  * (200 - 151) / (304 - 186)  + 151)).
    when((c > 304) & (c <= 604),   ((c - 305)  * (300 - 201) / (604 - 305)  + 201)).
    when((c > 604) & (c <= 804),   ((c - 605)  * (400 - 301) / (804 - 605)  + 301)).
    when((c > 804) & (c <= 1004),  ((c - 805)  * (500 - 401) / (1004 - 805) + 401))
)


In [11]:
ow = ow.withColumn("aqi_pm25", aqi_pm25(col("pm25")).cast(IntegerType()))
ow = ow.withColumn("aqi_pm10", aqi_pm10(col("pm10")).cast(IntegerType()))
ow = ow.withColumn("aqi_co",   aqi_co(col("co")).cast(IntegerType()))
ow = ow.withColumn("aqi_no2",  aqi_no2(col("no2")).cast(IntegerType()))
ow = ow.withColumn("aqi_o3",   aqi_o3(col("o3")).cast(IntegerType()))
ow = ow.withColumn("aqi_so2",  aqi_so2(col("so2")).cast(IntegerType()))

ow = ow.withColumn("ow_aqi", greatest(
    "aqi_pm25", "aqi_pm10", "aqi_co", "aqi_no2", "aqi_o3", "aqi_so2"
))

In [1]:
import json

# Input/output file paths
input_path = "diagioi.geojson"
output_path = "vietnam_provinces.geojson"

# Load the original GeoJSON file
with open(input_path, "r", encoding="utf-8") as infile:
    data = json.load(infile)

# Filter features with "Loại" == "Dia gioi tinh"
filtered_features = [
    f for f in data.get("features", [])
    if f.get("properties", {}).get("Loai") in {"Dia gioi tinh", "Dia gioi quoc gia"}
]

# Create new FeatureCollection
filtered_geojson = {
    "type": "FeatureCollection",
    "crs": data.get("crs"),  # keep CRS if present
    "features": filtered_features
}

# Save to a new GeoJSON file
with open(output_path, "w", encoding="utf-8") as outfile:
    json.dump(filtered_geojson, outfile, ensure_ascii=False, indent=2)

print(f"✅ Done! Saved {len(filtered_features)} features to {output_path}")


✅ Done! Saved 176 features to vietnam_provinces.geojson


In [20]:
df.drop("aqi_pm25", "aqi_pm10", "aqi_co", "aqi_no2", "aqi_o3", "aqi_so2","lat","lon")

DataFrame[pm25: double, pm10: double, co: double, no2: double, o3: double, so2: double, aqi: int]

In [20]:
from datetime import datetime

# Replace with your desired date and time
date_str = "2024-01-06 00:00:00"

# Parse the string to a datetime object (assuming UTC)
dt = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")

# Convert to Unix timestamp
unix_timestamp = int(dt.timestamp())

print(f"Unix timestamp for {date_str} is {unix_timestamp}")

Unix timestamp for 2024-01-06 00:00:00 is 1704474000


In [None]:
import requests
import time
import csv
from datetime import datetime, timedelta
import os

# === Configuration ===
API_KEY = "3fce65f5a9ca73174e97bfb1a62d2f1a"  # Replace with your OpenWeather API key
OUTPUT_FILE = "adata.csv"
START_DATE = 1609434000
END_DATE = 1748707200
STEP_DAYS = 5
LAT = 21.0811211
LON = 105.8180306
BASE_URL = "http://api.openweathermap.org/data/2.5/air_pollution/history"

# === Helper Functions ===
def daterange_chunks(start_date, end_date, step_days):
    current = start_date
    while current < end_date:
        end_chunk = min(current + timedelta(days=step_days), end_date)
        yield current, end_chunk
        current = end_chunk

def fetch_aqi_data(lat, lon, start, end):
    start_ts = int(start.timestamp())
    end_ts = int(end.timestamp())
    params = {
        "lat": lat,
        "lon": lon,
        "start": start_ts,
        "end": end_ts,
        "appid": API_KEY
    }
    try:
        response = requests.get(BASE_URL, params=params)
        if response.status_code == 200:
            return response.json().get("list", [])
        else:
            print(f"Failed ({response.status_code}) lat={lat}, lon={lon}, {start} -> {end}")
            return []
    except Exception as e:
        print(f"Exception: {e} for lat={lat}, lon={lon}, {start}->{end}")
        return []

with open(OUTPUT_FILE, mode='a', newline='', encoding='utf-8') as csvfile:
    writer = csv.writer(csvfile)

    lat = LAT
    lon = LON
    lat_lon_key = f"{lat},{lon}"
    print(f"Fetching data for lat={lat}, lon={lon}")
    for start, end in daterange_chunks(START_DATE, END_DATE, STEP_DAYS):
        data = fetch_aqi_data(lat, lon, start, end)
        for entry in data:
            dt = datetime.utcfromtimestamp(entry["dt"]).strftime("%Y-%m-%d %H:%M:%S")
            components = entry["components"]
            row = [dt, lat, lon, entry.get("main", {}).get("aqi")] + [
                components.get("co"), components.get("no"), components.get("no2"),
                components.get("o3"), components.get("so2"),
                components.get("pm2_5"), components.get("pm10"),
                components.get("nh3")
            ]
            writer.writerow(row)
        time.sleep(1)

print("✅ Done. Data saved to", OUTPUT_FILE)


In [2]:
import requests

response = requests.get("http://api.waqi.info/feed/quangninh/?token=a95a9b0f19706b4fc9a630602240443d111e63a1")
response.json()

{'status': 'error', 'data': 'Unknown station'}

In [4]:
import requests
import time
import csv
import os
from datetime import datetime, timedelta

# === Configuration ===
API_KEY = "3fce65f5a9ca73174e97bfb1a62d2f1a"
OUTPUT_FILE = "historical-data.csv"
START_DATE = datetime(2025, 6, 1)
END_DATE = datetime(2025, 7, 2)
STEP_DAYS = 5
BASE_URL = "http://api.openweathermap.org/data/2.5/air_pollution/history"

# === List of Coordinates ===
COORDINATES = [
    (21.0811211, 105.8180306),
    (21.01525, 105.80013),
    (21.0491, 105.8831),
    (21.035584, 105.852771),
    (21.0215063, 105.8188748),
    (20.947866, 105.849405),
    (21.020424, 105.806926),
    (21.040195, 105.851938),
    (20.994886, 105.816753),
    (20.98832, 105.854897),
    (21.02022, 105.81232),
    (21.0072, 105.83572),
    (21.03982, 105.84726),
    (21.011785, 105.865184),
    (21.053722, 105.798063),
    (21.064117, 105.833387),
    (21.031934, 105.851439),
    (21.02629, 105.85109),
    (20.972052, 105.785624),
    (21.002383, 105.718038),
    (16.074, 108.217),
    (16.043252, 108.206826),
    (10.782978, 106.700711),
    (10.026977, 105.768249),
    (10.06934, 105.820391),
    (22.6782, 106.245),
    (22.67953, 106.215361),
    (20.805798, 106.629752),
    (21.593151, 105.8431043),
    (21.33847, 105.3673),
    (21.930528, 106.681389),
    (21.843325, 106.7506),
    (12.284358, 109.192524),
    (14.017737, 108.035181),
    (13.968848, 108.017617),
    (10.367976, 107.0844),
    (16.46226, 107.596351),
    (21.04975, 105.74187),
    (21.0390377, 107.0284572),
    (21.1194449, 105.9896021),
    (10.589853, 107.131743),
    (10.502431, 107.169408),
    (13.998599, 107.996482),
    (21.1873402, 106.0742958),
    (21.1527453, 106.111371),
    (21.1518069, 106.1518796),
    (20.661683, 106.058761),
    (20.927, 106.314),
    (20.938207, 106.249564),
    (21.0357507, 106.7641068),
    (21.020397, 106.700332),
    (21.006153, 106.859097),
    (21.0201158, 106.7004633),
    (21.032483, 105.832844),
    (18.675369, 105.690945),
    (21.0353984, 106.1025755),
    (10.65961, 106.727916),
    (21.028519, 105.855409),
    (21.05121, 105.78232),
    (21.0377269, 106.1468067),
    (20.9454412, 107.1306708),
    (21.0832037, 106.2807861),
    (21.0243474, 106.0172877),
    (21.1277395, 105.8895444),
    (13.953878, 108.656162),
    (21.028519, 105.855409),
    (21.148273, 105.913306),
    (21.110533, 105.760445),
    (20.786189, 105.779694),
    (20.648485, 105.82045),
    (21.016327, 105.648676),
    (21.03976, 105.765216),
    (20.919981, 105.712157),
    (20.858022, 105.765582),
    (21.056142, 105.573491),
    (21.257377, 105.851108),
    (20.89934, 105.577276),
    (21.228789, 105.75791)
]


UIDS = [
    8688,13026,1583,13427,8641,13431,13685,13686,13421,13018,13017,13014,13015,
    13442,13420,13423,13426,13016,13429,13439,1584,13658,8767,13687,14929,13252,
    13688,13672,13027,5506,13667,13668,1585,13417,13762,14642,12488,13251,11781,
    13414,14643,14644,13012,12961,12956,12963,13683,13675,13678,11778,14926,13444,
    11779,13433,13666,12975,
13756,
13419,
13013,
12962,
11780,
13253,
13415,
13416,
13418,
13419,
13422,
13424,
13425,
13428,
13430,
13432,
13434,
13435,
13436,
13437,
13438,
13440
]

# === Validation ===
if len(COORDINATES) != len(UIDS):
    raise ValueError(f"❌ Mismatch: {len(COORDINATES)} coordinates vs {len(UIDS)} UIDs")

# === Helper Functions ===
def daterange_chunks(start_date, end_date, step_days):
    current = start_date
    while current < end_date:
        end_chunk = min(current + timedelta(days=step_days), end_date)
        yield current, end_chunk
        current = end_chunk

def fetch_aqi_data(lat, lon, start, end):
    start_ts = int(start.timestamp())
    end_ts = int(end.timestamp())
    params = {
        "lat": lat,
        "lon": lon,
        "start": start_ts,
        "end": end_ts,
        "appid": API_KEY
    }
    try:
        response = requests.get(BASE_URL, params=params)
        if response.status_code == 200:
            return response.json().get("list", [])
        else:
            print(f"❌ Failed ({response.status_code}) lat={lat}, lon={lon}, {start} -> {end}")
            return []
    except Exception as e:
        print(f"❌ Exception: {e} for lat={lat}, lon={lon}, {start}->{end}")
        return []

# === Main Processing ===
file_exists = os.path.isfile(OUTPUT_FILE)

with open(OUTPUT_FILE, mode='a', newline='', encoding='utf-8') as csvfile:
    writer = csv.writer(csvfile)

    # Write headers only if file doesn't exist or is empty
    if not file_exists or os.path.getsize(OUTPUT_FILE) == 0:
        writer.writerow([
            "datetime", "lat", "lon", "aqi",
            "co", "no", "no2", "o3", "so2", "pm2_5", "pm10", "nh3", "uid"
        ])

    for idx, (lat, lon) in enumerate(COORDINATES):
        uid = UIDS[idx]
        print(f"📍 Fetching data for lat={lat}, lon={lon}, uid={uid}")
        for start, end in daterange_chunks(START_DATE, END_DATE, STEP_DAYS):
            data = fetch_aqi_data(lat, lon, start, end)
            for entry in data:
                dt = datetime.utcfromtimestamp(entry["dt"]).strftime("%Y-%m-%d %H:%M:%S")
                components = entry["components"]
                row = [
                    dt, lat, lon, entry.get("main", {}).get("aqi"),
                    components.get("co"), components.get("no"), components.get("no2"),
                    components.get("o3"), components.get("so2"),
                    components.get("pm2_5"), components.get("pm10"),
                    components.get("nh3"), uid
                ]
                writer.writerow(row)
            time.sleep(1)  # Respect API rate limits

print("✅ Done. Data saved to", OUTPUT_FILE)


📍 Fetching data for lat=21.0811211, lon=105.8180306, uid=8688


  dt = datetime.utcfromtimestamp(entry["dt"]).strftime("%Y-%m-%d %H:%M:%S")


📍 Fetching data for lat=21.01525, lon=105.80013, uid=13026
📍 Fetching data for lat=21.0491, lon=105.8831, uid=1583
📍 Fetching data for lat=21.035584, lon=105.852771, uid=13427
📍 Fetching data for lat=21.0215063, lon=105.8188748, uid=8641
📍 Fetching data for lat=20.947866, lon=105.849405, uid=13431
📍 Fetching data for lat=21.020424, lon=105.806926, uid=13685
📍 Fetching data for lat=21.040195, lon=105.851938, uid=13686
📍 Fetching data for lat=20.994886, lon=105.816753, uid=13421
📍 Fetching data for lat=20.98832, lon=105.854897, uid=13018
📍 Fetching data for lat=21.02022, lon=105.81232, uid=13017
📍 Fetching data for lat=21.0072, lon=105.83572, uid=13014
📍 Fetching data for lat=21.03982, lon=105.84726, uid=13015
📍 Fetching data for lat=21.011785, lon=105.865184, uid=13442
📍 Fetching data for lat=21.053722, lon=105.798063, uid=13420
📍 Fetching data for lat=21.064117, lon=105.833387, uid=13423
📍 Fetching data for lat=21.031934, lon=105.851439, uid=13426
📍 Fetching data for lat=21.02629, lon=

In [1]:
COORDINATES = [
    (21.0811211, 105.8180306),
    (21.01525, 105.80013),
    (21.0491, 105.8831),
    (21.035584, 105.852771),
    (21.0215063, 105.8188748),
    (20.947866, 105.849405),
    (21.020424, 105.806926),
    (21.040195, 105.851938),
    (20.994886, 105.816753),
    (20.98832, 105.854897),
    (21.02022, 105.81232),
    (21.0072, 105.83572),
    (21.03982, 105.84726),
    (21.011785, 105.865184),
    (21.053722, 105.798063),
    (21.064117, 105.833387),
    (21.031934, 105.851439),
    (21.02629, 105.85109),
    (20.972052, 105.785624),
    (21.002383, 105.718038),
    (16.074, 108.217),
    (16.043252, 108.206826),
    (10.782978, 106.700711),
    (10.026977, 105.768249),
    (10.06934, 105.820391),
    (22.6782, 106.245),
    (22.67953, 106.215361),
    (20.805798, 106.629752),
    (21.593151, 105.8431043),
    (21.33847, 105.3673),
    (21.930528, 106.681389),
    (21.843325, 106.7506),
    (12.284358, 109.192524),
    (14.017737, 108.035181),
    (13.968848, 108.017617),
    (10.367976, 107.0844),
    (16.46226, 107.596351),
    (21.04975, 105.74187),
    (21.0390377, 107.0284572),
    (21.1194449, 105.9896021),
    (10.589853, 107.131743),
    (10.502431, 107.169408),
    (13.998599, 107.996482),
    (21.1873402, 106.0742958),
    (21.1527453, 106.111371),
    (21.1518069, 106.1518796),
    (20.661683, 106.058761),
    (20.927, 106.314),
    (20.938207, 106.249564),
    (21.0357507, 106.7641068),
    (21.020397, 106.700332),
    (21.006153, 106.859097),
    (21.0201158, 106.7004633),
    (21.032483, 105.832844),
    (18.675369, 105.690945),
    (21.0353984, 106.1025755),
    (10.65961, 106.727916),
    (21.028519, 105.855409),
    (21.05121, 105.78232),
    (21.0377269, 106.1468067),
    (20.9454412, 107.1306708),
    (21.0832037, 106.2807861),
    (21.0243474, 106.0172877),
    (21.1277395, 105.8895444),
    (13.953878, 108.656162),
    (21.028519, 105.855409),
    (21.148273, 105.913306),
    (21.110533, 105.760445),
    (20.786189, 105.779694),
    (20.648485, 105.82045),
    (21.016327, 105.648676),
    (21.03976, 105.765216),
    (20.919981, 105.712157),
    (20.858022, 105.765582),
    (21.056142, 105.573491),
    (21.257377, 105.851108),
    (20.89934, 105.577276),
    (21.228789, 105.75791)
]


UIDS = [
    8688,13026,1583,13427,8641,13431,13685,13686,13421,13018,13017,13014,13015,
    13442,13420,13423,13426,13016,13429,13439,1584,13658,8767,13687,14929,13252,
    13688,13672,13027,5506,13667,13668,1585,13417,13762,14642,12488,13251,11781,
    13414,14643,14644,13012,12961,12956,12963,13683,13675,13678,11778,14926,13444,
    11779,13433,13666,12975,
13756,
13419,
13013,
12962,
11780,
13253,
13415,
13416,
13418,
13419,
13422,
13424,
13425,
13428,
13430,
13432,
13434,
13435,
13436,
13437,
13438,
13440
]

print(len(UIDS), len(COORDINATES))

78 78


In [7]:
import requests

response = requests.get("https://api.waqi.info/feed/@11779/?token=a95a9b0f19706b4fc9a630602240443d111e63a1")
response.json()

{'status': 'ok',
 'data': {'aqi': '-',
  'idx': 11779,
  'attributions': [{'url': 'http://cem.gov.vn/',
    'name': 'Vietnam Center For Environmental Monitoring Portal (cổng thông tin quan trắc môi trường)',
    'logo': 'Vietnam-CEM.png'},
   {'url': 'https://waqi.info/', 'name': 'World Air Quality Index Project'}],
  'city': {'geo': [21.0201158, 106.7004633],
   'name': 'Quảng Ninh/Phương Nam , Vietnam',
   'url': 'https://aqicn.org/city/vietnam/quang-ninh/phuong-nam-',
   'location': ''},
  'dominentpol': '',
  'iaqi': {'dew': {'v': 9},
   'h': {'v': 50.5},
   'p': {'v': 1022.5},
   't': {'v': 19.5},
   'w': {'v': 5.1}},
  'time': {'tz': '+07:00', 'v': 0},
  'forecast': {'daily': {'o3': [{'avg': 24,
      'day': '2025-01-07',
      'max': 28,
      'min': 21},
     {'avg': 22, 'day': '2025-01-08', 'max': 28, 'min': 17},
     {'avg': 13, 'day': '2025-01-09', 'max': 25, 'min': 9},
     {'avg': 13, 'day': '2025-01-10', 'max': 23, 'min': 9},
     {'avg': 16, 'day': '2025-01-11', 'max': 2

In [3]:
ow_raw = spark.read.json("data/openweather.json")
ow = ow_raw.select(
    col("coord.lat").alias("lat"),
    col("coord.lon").alias("lon"),
    col("list")[0]["components"]["pm2_5"].alias("pm25"),
    col("list")[0]["components"]["pm10"].alias("pm10"),
    col("list")[0]["components"]["co"].alias("co"),
    col("list")[0]["components"]["no2"].alias("no2"),
    col("list")[0]["components"]["o3"].alias("o3"),
    col("list")[0]["components"]["so2"].alias("so2"),
    col("list")[0]["dt"].alias("timestamp")
)
ow.show()

+-------+--------+-----+-----+------+----+-----+----+----------+
|    lat|     lon| pm25| pm10|    co| no2|   o3| so2| timestamp|
+-------+--------+-----+-----+------+----+-----+----+----------+
|21.0811| 105.818|37.25|42.45| 362.6|3.61|57.68|2.31|1747192712|
|21.0153|105.8001|37.25|42.45| 362.6|3.61|57.68|2.31|1747192748|
|21.0491|105.8831|37.25|42.45| 362.6|3.61|57.68|2.31|1747192748|
|21.0356|105.8528|37.25|42.45| 362.6|3.61|57.68|2.31|1747192748|
|21.0215|105.8189|37.25|42.45| 362.6|3.61|57.68|2.31|1747192748|
|20.9479|105.8494|38.03|43.61|368.91|3.79|53.32|2.54|1747192749|
|21.0204|105.8069|37.25|42.45| 362.6|3.61|57.68|2.31|1747192749|
|21.0356|105.8528|37.25|42.45| 362.6|3.61|57.68|2.31|1747192748|
|20.9949|105.8168|38.03|43.61|368.91|3.79|53.32|2.54|1747192749|
|20.9883|105.8549|38.03|43.61|368.91|3.79|53.32|2.54|1747192749|
|21.0204|105.8069|37.25|42.45| 362.6|3.61|57.68|2.31|1747192749|
|21.0072|105.8357|37.25|42.45| 362.6|3.61|57.68|2.31|1747192750|
|21.0356|105.8528|37.25|4

In [4]:
ow = ow.withColumn("datetime", to_timestamp(from_unixtime("timestamp")))
ow.show()

+-------+--------+-----+-----+------+----+-----+----+----------+-------------------+
|    lat|     lon| pm25| pm10|    co| no2|   o3| so2| timestamp|           datetime|
+-------+--------+-----+-----+------+----+-----+----+----------+-------------------+
|21.0811| 105.818|37.25|42.45| 362.6|3.61|57.68|2.31|1747192712|2025-05-14 10:18:32|
|21.0153|105.8001|37.25|42.45| 362.6|3.61|57.68|2.31|1747192748|2025-05-14 10:19:08|
|21.0491|105.8831|37.25|42.45| 362.6|3.61|57.68|2.31|1747192748|2025-05-14 10:19:08|
|21.0356|105.8528|37.25|42.45| 362.6|3.61|57.68|2.31|1747192748|2025-05-14 10:19:08|
|21.0215|105.8189|37.25|42.45| 362.6|3.61|57.68|2.31|1747192748|2025-05-14 10:19:08|
|20.9479|105.8494|38.03|43.61|368.91|3.79|53.32|2.54|1747192749|2025-05-14 10:19:09|
|21.0204|105.8069|37.25|42.45| 362.6|3.61|57.68|2.31|1747192749|2025-05-14 10:19:09|
|21.0356|105.8528|37.25|42.45| 362.6|3.61|57.68|2.31|1747192748|2025-05-14 10:19:08|
|20.9949|105.8168|38.03|43.61|368.91|3.79|53.32|2.54|1747192749|2

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, radians, sin, cos, sqrt, atan2, row_number, greatest, from_unixtime, to_timestamp
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, FloatType

# Start Spark session
spark = SparkSession.builder \
    .appName("Write AQI JSON from MinIO to Postgres") \
    .getOrCreate()

ow_raw = spark.read.json("data/openweather.json")
ow = ow_raw.select(
    col("coord.lat").alias("lat"),
    col("coord.lon").alias("lon"),
    col("list")[0]["components"]["pm2_5"].alias("pm25"),
    col("list")[0]["components"]["pm10"].alias("pm10"),
    col("list")[0]["components"]["co"].alias("co"),
    col("list")[0]["components"]["no2"].alias("no2"),
    col("list")[0]["components"]["o3"].alias("o3"),
    col("list")[0]["components"]["so2"].alias("so2"),
    col("list")[0]["dt"].alias("timestamp")
)

ow = ow.withColumn("co", col("co") * 24.45 / (28.01 * 1000))
ow = ow.withColumn("no2", col("no2") * 24.45 / 46.01)
ow = ow.withColumn("so2", col("so2") * 24.45 / 64.07)
ow = ow.withColumn("o3", col("o3") * 24.45 / 48.00)

def aqi_pm25(c): return (
    when((c >= 0) & (c <= 12.0),   ((c - 0)    * (50 - 0)    / (12.0 - 0)    + 0))  .
    when((c > 12.0) & (c <= 35.4), ((c - 12.1) * (100 - 51)  / (35.4 - 12.1) + 51)) .
    when((c > 35.4) & (c <= 55.4), ((c - 35.5) * (150 - 101) / (55.4 - 35.5) + 101)).
    when((c > 55.4) & (c <= 150.4),((c - 55.5) * (200 - 151) / (150.4 - 55.5)+ 151)).
    when((c > 150.4) & (c <= 250.4),((c - 150.5)*(300 - 201)/(250.4 - 150.5)+ 201)).
    when((c > 250.4) & (c <= 350.4),((c - 250.5)*(400 - 301)/(350.4 - 250.5)+ 301)).
    when((c > 350.4) & (c <= 500.4),((c - 350.5)*(500 - 401)/(500.4 - 350.5)+ 401))
)

def aqi_pm10(c): return (
    when((c >= 0) & (c <= 54),     ((c - 0)    * (50 - 0)    / (54 - 0)      + 0))  .
    when((c > 54) & (c <= 154),    ((c - 55)   * (100 - 51)  / (154 - 55)    + 51)) .
    when((c > 154) & (c <= 254),   ((c - 155)  * (150 - 101) / (254 - 155)   + 101)).
    when((c > 254) & (c <= 354),   ((c - 255)  * (200 - 151) / (354 - 255)   + 151)).
    when((c > 354) & (c <= 424),   ((c - 355)  * (300 - 201) / (424 - 355)   + 201)).
    when((c > 424) & (c <= 504),   ((c - 425)  * (400 - 301) / (504 - 425)   + 301)).
    when((c > 504) & (c <= 604),   ((c - 505)  * (500 - 401) / (604 - 505)   + 401))
)

def aqi_co(c): return (
    when((c >= 0.0) & (c <= 4.4),    ((c - 0.0)   * (50 - 0)   / (4.4 - 0.0)   + 0)).
    when((c > 4.4) & (c <= 9.4),     ((c - 4.5)   * (100 - 51) / (9.4 - 4.5)   + 51)).
    when((c > 9.4) & (c <= 12.4),    ((c - 9.5)   * (150 - 101)/ (12.4 - 9.5)  + 101)).
    when((c > 12.4) & (c <= 15.4),   ((c - 12.5)  * (200 - 151)/ (15.4 - 12.5) + 151)).
    when((c > 15.4) & (c <= 30.4),   ((c - 15.5)  * (300 - 201)/ (30.4 - 15.5) + 201)).
    when((c > 30.4) & (c <= 40.4),   ((c - 30.5)  * (400 - 301)/ (40.4 - 30.5) + 301)).
    when((c > 40.4) & (c <= 50.4),   ((c - 40.5)  * (500 - 401)/ (50.4 - 40.5) + 401))
)

def aqi_no2(c): return (
    when((c >= 0) & (c <= 53),     ((c - 0)    * (50 - 0)    / (53 - 0)     + 0))  .
    when((c > 53) & (c <= 100),    ((c - 54)   * (100 - 51)  / (100 - 54)   + 51)) .
    when((c > 100) & (c <= 360),   ((c - 101)  * (150 - 101) / (360 - 101)  + 101)).
    when((c > 360) & (c <= 649),   ((c - 361)  * (200 - 151) / (649 - 361)  + 151)).
    when((c > 649) & (c <= 1249),  ((c - 650)  * (300 - 201) / (1249 - 650) + 201)).
    when((c > 1249) & (c <= 1649), ((c - 1250) * (400 - 301) / (1649 - 1250)+ 301)).
    when((c > 1649) & (c <= 2049), ((c - 1650) * (500 - 401) / (2049 - 1650)+ 401))
)

def aqi_o3(c): return (
    when((c >= 0) & (c <= 54),     ((c - 0)    * (50 - 0)    / (54 - 0)     + 0))  .
    when((c > 54) & (c <= 70),     ((c - 55)   * (100 - 51)  / (70 - 55)    + 51)) .
    when((c > 70) & (c <= 85),     ((c - 71)   * (150 - 101) / (85 - 71)    + 101)).
    when((c > 85) & (c <= 105),    ((c - 86)   * (200 - 151) / (105 - 86)   + 151)).
    when((c > 105) & (c <= 200),   ((c - 106)  * (300 - 201) / (200 - 106)  + 201))
)

def aqi_so2(c): return (
    when((c >= 0) & (c <= 35),     ((c - 0)    * (50 - 0)    / (35 - 0)     + 0))  .
    when((c > 35) & (c <= 75),     ((c - 36)   * (100 - 51)  / (75 - 36)    + 51)) .
    when((c > 75) & (c <= 185),    ((c - 76)   * (150 - 101) / (185 - 76)   + 101)).
    when((c > 185) & (c <= 304),   ((c - 186)  * (200 - 151) / (304 - 186)  + 151)).
    when((c > 304) & (c <= 604),   ((c - 305)  * (300 - 201) / (604 - 305)  + 201)).
    when((c > 604) & (c <= 804),   ((c - 605)  * (400 - 301) / (804 - 605)  + 301)).
    when((c > 804) & (c <= 1004),  ((c - 805)  * (500 - 401) / (1004 - 805) + 401))
)

ow = ow.withColumn("aqi_pm25", aqi_pm25(col("pm25")).cast(IntegerType()))
ow = ow.withColumn("aqi_pm10", aqi_pm10(col("pm10")).cast(IntegerType()))
ow = ow.withColumn("aqi_co",   aqi_co(col("co")).cast(IntegerType()))
ow = ow.withColumn("aqi_no2",  aqi_no2(col("no2")).cast(IntegerType()))
ow = ow.withColumn("aqi_o3",   aqi_o3(col("o3")).cast(IntegerType()))
ow = ow.withColumn("aqi_so2",  aqi_so2(col("so2")).cast(IntegerType()))

ow = ow.withColumn("ow_aqi", greatest(
    "aqi_pm25", "aqi_pm10", "aqi_co", "aqi_no2", "aqi_o3", "aqi_so2"
))

aqicn_raw = spark.read.json("data/aqicn.json")
aqicn = aqicn_raw.selectExpr(
    "data.idx as uid",
    "data.city.name as station_name",
    "data.city.geo[0] as lat",
    "data.city.geo[1] as lon",
    "data.iaqi.h.v as humidity",
    "data.iaqi.t.v as temperature",
    "data.iaqi.w.v as wind",
    "data.iaqi.p.v as pressure",
    "data.aqi as aqicn_aqi"
)

# Add radians
aqicn = aqicn.withColumn("lat_rad", radians(col("lat"))).withColumn("lon_rad", radians(col("lon")))
ow = ow \
    .withColumnRenamed("lat", "ow_lat") \
    .withColumnRenamed("lon", "ow_lon") \
    .withColumn("ow_lat_rad", radians(col("ow_lat"))) \
    .withColumn("ow_lon_rad", radians(col("ow_lon")))

# Cross join and compute Haversine distance
joined = aqicn.crossJoin(ow)
joined = joined.withColumn("distance_km", 6371 * 2 * atan2(
    sqrt(
        sin((col("lat_rad") - col("ow_lat_rad")) / 2) ** 2 +
        cos(col("lat_rad")) * cos(col("ow_lat_rad")) *
        sin((col("lon_rad") - col("ow_lon_rad")) / 2) ** 2
    ),
    sqrt(
        1 - (
            sin((col("lat_rad") - col("ow_lat_rad")) / 2) ** 2 +
            cos(col("lat_rad")) * cos(col("ow_lat_rad")) *
            sin((col("lon_rad") - col("ow_lon_rad")) / 2) ** 2
        )
    )
))

# Create a window partitioned by UID, ordered by distance
window_spec = Window.partitionBy("uid").orderBy(col("distance_km").asc())

# Assign row numbers based on distance within each UID group
ranked = joined.withColumn("row_num", row_number().over(window_spec))

# Keep only the closest match (row_num == 1)
filtered = ranked.filter(col("row_num") == 1).drop("row_num")

filtered = filtered.withColumn("aqi",
    when((col("aqicn_aqi").isNotNull()) & (col("aqicn_aqi") != "-"), col("aqicn_aqi").cast("int"))
    .otherwise(col("ow_aqi"))
)

# Convert to timestamp type
filtered = filtered.withColumn("datetime", to_timestamp(from_unixtime("timestamp")))

# Select final columns
final = filtered.select(
    "uid", "station_name", "lat", "lon",
    "humidity", "temperature", "wind", "pressure", "datetime",
    "co", "no2", "o3", "so2", "pm25", "pm10", "aqi"
)

final.show()

+-----+--------------------+----------+-----------+--------+-----------+----+--------+-------------------+-------------------+------------------+------------------+-------------------+-----+-----+---+
|  uid|        station_name|       lat|        lon|humidity|temperature|wind|pressure|           datetime|                 co|               no2|                o3|                so2| pm25| pm10|aqi|
+-----+--------------------+----------+-----------+--------+-----------+----+--------+-------------------+-------------------+------------------+------------------+-------------------+-----+-----+---+
| 1583|Hanoi, Vietnam (H...|   21.0491|   105.8831|    84.0|       31.0| 3.6|  1002.0|2025-06-07 16:31:10|0.23198257765083896|0.9459030645511846|        32.9565625| 0.5113625721866708|20.32|21.78| 61|
| 1584|Da Nang, Vietnam ...|    16.074|    108.217|    70.0|       32.0| 4.1|  1003.0|2025-06-07 16:31:14|0.20735799714387718|2.4232123451423604|10.834406249999999| 0.3625331668487592| 6.12| 6.63|

In [2]:
import pandas as pd
df = pd.read_csv("historical-data.csv", header = True)
df.describe()

TypeError: Passing a bool to header is invalid. Use header=None for no header or header=int or list-like of ints to specify the row(s) making up the column names

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, greatest
from pyspark.sql.types import IntegerType

# Start Spark session
spark = SparkSession.builder \
    .appName("Write AQI JSON from MinIO to Postgres") \
    .getOrCreate()

df = spark.read.csv("historical-data.csv", header = True)

df.show()

+-------------------+----------+-----------+---+-------+----+-----+-----+-----+-----+-----+-----+----+
|           datetime|       lat|        lon|aqi|     co|  no|  no2|   o3|  so2|pm2_5| pm10|  nh3| uid|
+-------------------+----------+-----------+---+-------+----+-----+-----+-----+-----+-----+-----+----+
|2024-04-30 17:00:00|21.0811211|105.8180306|  4| 747.68|   0| 25.7|85.12|28.85|57.28|68.29|10.77|8688|
|2024-04-30 18:00:00|21.0811211|105.8180306|  4| 707.63|   0|21.08|79.39| 23.6|58.47|70.18| 9.88|8688|
|2024-04-30 19:00:00|21.0811211|105.8180306|  4| 694.27|   0|17.82|76.53|20.03|54.66|64.18| 9.75|8688|
|2024-04-30 20:00:00|21.0811211|105.8180306|  3| 680.92|   0|17.65|76.53|18.84|39.99|44.83| 9.37|8688|
|2024-04-30 21:00:00|21.0811211|105.8180306|  3|  687.6|   0|17.65|80.82|18.12|28.49|31.57| 8.99|8688|
|2024-04-30 22:00:00|21.0811211|105.8180306|  2|  714.3|   0|19.36|81.54|19.55|23.97|26.84| 9.25|8688|
|2024-04-30 23:00:00|21.0811211|105.8180306|  2| 774.38|   0|24.33|68.66|

In [None]:
df = df.withColumn("co", col("co") * 24.45 / (28.01 * 1000))
df = df.withColumn("no2", col("no2") * 24.45 / 46.01)
df = df.withColumn("so2", col("so2") * 24.45 / 64.07)
df = df.withColumn("o3", col("o3") * 24.45 / 48.00)

def aqi_pm25(c): return (
    when((c >= 0) & (c <= 12.0),   ((c - 0)    * (50 - 0)    / (12.0 - 0)    + 0))  .
    when((c > 12.0) & (c <= 35.4), ((c - 12.1) * (100 - 51)  / (35.4 - 12.1) + 51)) .
    when((c > 35.4) & (c <= 55.4), ((c - 35.5) * (150 - 101) / (55.4 - 35.5) + 101)).
    when((c > 55.4) & (c <= 150.4),((c - 55.5) * (200 - 151) / (150.4 - 55.5)+ 151)).
    when((c > 150.4) & (c <= 250.4),((c - 150.5)*(300 - 201)/(250.4 - 150.5)+ 201)).
    when((c > 250.4) & (c <= 350.4),((c - 250.5)*(400 - 301)/(350.4 - 250.5)+ 301)).
    when((c > 350.4) & (c <= 500.4),((c - 350.5)*(500 - 401)/(500.4 - 350.5)+ 401))
)

def aqi_pm10(c): return (
    when((c >= 0) & (c <= 54),     ((c - 0)    * (50 - 0)    / (54 - 0)      + 0))  .
    when((c > 54) & (c <= 154),    ((c - 55)   * (100 - 51)  / (154 - 55)    + 51)) .
    when((c > 154) & (c <= 254),   ((c - 155)  * (150 - 101) / (254 - 155)   + 101)).
    when((c > 254) & (c <= 354),   ((c - 255)  * (200 - 151) / (354 - 255)   + 151)).
    when((c > 354) & (c <= 424),   ((c - 355)  * (300 - 201) / (424 - 355)   + 201)).
    when((c > 424) & (c <= 504),   ((c - 425)  * (400 - 301) / (504 - 425)   + 301)).
    when((c > 504) & (c <= 604),   ((c - 505)  * (500 - 401) / (604 - 505)   + 401))
)

def aqi_co(c): return (
    when((c >= 0.0) & (c <= 4.4),    ((c - 0.0)   * (50 - 0)   / (4.4 - 0.0)   + 0)).
    when((c > 4.4) & (c <= 9.4),     ((c - 4.5)   * (100 - 51) / (9.4 - 4.5)   + 51)).
    when((c > 9.4) & (c <= 12.4),    ((c - 9.5)   * (150 - 101)/ (12.4 - 9.5)  + 101)).
    when((c > 12.4) & (c <= 15.4),   ((c - 12.5)  * (200 - 151)/ (15.4 - 12.5) + 151)).
    when((c > 15.4) & (c <= 30.4),   ((c - 15.5)  * (300 - 201)/ (30.4 - 15.5) + 201)).
    when((c > 30.4) & (c <= 40.4),   ((c - 30.5)  * (400 - 301)/ (40.4 - 30.5) + 301)).
    when((c > 40.4) & (c <= 50.4),   ((c - 40.5)  * (500 - 401)/ (50.4 - 40.5) + 401))
)

def aqi_no2(c): return (
    when((c >= 0) & (c <= 53),     ((c - 0)    * (50 - 0)    / (53 - 0)     + 0))  .
    when((c > 53) & (c <= 100),    ((c - 54)   * (100 - 51)  / (100 - 54)   + 51)) .
    when((c > 100) & (c <= 360),   ((c - 101)  * (150 - 101) / (360 - 101)  + 101)).
    when((c > 360) & (c <= 649),   ((c - 361)  * (200 - 151) / (649 - 361)  + 151)).
    when((c > 649) & (c <= 1249),  ((c - 650)  * (300 - 201) / (1249 - 650) + 201)).
    when((c > 1249) & (c <= 1649), ((c - 1250) * (400 - 301) / (1649 - 1250)+ 301)).
    when((c > 1649) & (c <= 2049), ((c - 1650) * (500 - 401) / (2049 - 1650)+ 401))
)

def aqi_o3(c): return (
    when((c >= 0) & (c <= 54),     ((c - 0)    * (50 - 0)    / (54 - 0)     + 0))  .
    when((c > 54) & (c <= 70),     ((c - 55)   * (100 - 51)  / (70 - 55)    + 51)) .
    when((c > 70) & (c <= 85),     ((c - 71)   * (150 - 101) / (85 - 71)    + 101)).
    when((c > 85) & (c <= 105),    ((c - 86)   * (200 - 151) / (105 - 86)   + 151)).
    when((c > 105) & (c <= 200),   ((c - 106)  * (300 - 201) / (200 - 106)  + 201))
)

def aqi_so2(c): return (
    when((c >= 0) & (c <= 35),     ((c - 0)    * (50 - 0)    / (35 - 0)     + 0))  .
    when((c > 35) & (c <= 75),     ((c - 36)   * (100 - 51)  / (75 - 36)    + 51)) .
    when((c > 75) & (c <= 185),    ((c - 76)   * (150 - 101) / (185 - 76)   + 101)).
    when((c > 185) & (c <= 304),   ((c - 186)  * (200 - 151) / (304 - 186)  + 151)).
    when((c > 304) & (c <= 604),   ((c - 305)  * (300 - 201) / (604 - 305)  + 201)).
    when((c > 604) & (c <= 804),   ((c - 605)  * (400 - 301) / (804 - 605)  + 301)).
    when((c > 804) & (c <= 1004),  ((c - 805)  * (500 - 401) / (1004 - 805) + 401))
)

df = df.withColumn("aqi_pm25", aqi_pm25(col("pm2_5")).cast(IntegerType()))
df = df.withColumn("aqi_pm10", aqi_pm10(col("pm10")).cast(IntegerType()))
df = df.withColumn("aqi_co",   aqi_co(col("co")).cast(IntegerType()))
df = df.withColumn("aqi_no2",  aqi_no2(col("no2")).cast(IntegerType()))
df = df.withColumn("aqi_o3",   aqi_o3(col("o3")).cast(IntegerType()))
df = df.withColumn("aqi_so2",  aqi_so2(col("so2")).cast(IntegerType()))

df = df.withColumn("aqi", greatest(
    "aqi_pm25", "aqi_pm10", "aqi_co", "aqi_no2", "aqi_o3", "aqi_so2"
))


TypeError: DataFrame.drop() got an unexpected keyword argument 'axis'

In [7]:
df = df.drop("aqi_pm25", "aqi_pm10", "aqi_co", "aqi_no2", "aqi_o3", "aqi_so2")
df.show()

+-------------------+----------+-----------+---+--------------------+----+------------------+------------------+------------------+-----+-----+-----+----+
|           datetime|       lat|        lon|aqi|                  co|  no|               no2|                o3|               so2|pm2_5| pm10|  nh3| uid|
+-------------------+----------+-----------+---+--------------------+----+------------------+------------------+------------------+-----+-----+-----+----+
|2024-04-30 17:00:00|21.0811211|105.8180306|151|5.697013819591385E-4|   0| 7.257488957660708|       22.08548125| 4.201400631951811|57.28|68.29|10.77|8688|
|2024-04-30 18:00:00|21.0811211|105.8180306|152|5.391849306063359E-4|   0| 5.952835300680455|20.598758886718752| 3.436847657333197|58.47|70.18| 9.88|8688|
|2024-04-30 19:00:00|21.0811211|105.8180306|148| 5.29005160567049E-4|   0| 5.032235534066686| 19.85669501953125|  2.91695163459254|54.66|64.18| 9.75|8688|
|2024-04-30 20:00:00|21.0811211|105.8180306|112|5.188330101161147E-4| 

In [1]:
!pip install graphviz

Collecting graphviz
  Downloading graphviz-0.21-py3-none-any.whl.metadata (12 kB)
Downloading graphviz-0.21-py3-none-any.whl (47 kB)
Installing collected packages: graphviz
Successfully installed graphviz-0.21


In [1]:
from graphviz import Digraph

# Create a new directed graph
db_diagram = Digraph(format='png')
db_diagram.attr(rankdir='LR', size='10')

# Define table: visual
db_diagram.node('visual', '''<<TABLE BORDER="1" CELLBORDER="1" CELLSPACING="0">
  <TR><TD COLSPAN="2"><B>visual</B></TD></TR>
  <TR><TD><B>uid</B></TD><TD>integer (PK)</TD></TR>
  <TR><TD>aqi</TD><TD>integer</TD></TR>
  <TR><TD>lat</TD><TD>double</TD></TR>
  <TR><TD>lon</TD><TD>double</TD></TR>
  <TR><TD>datetime</TD><TD>timestamp</TD></TR>
  <TR><TD>station_name</TD><TD>text</TD></TR>
  <TR><TD>wind</TD><TD>double</TD></TR>
  <TR><TD>humidity</TD><TD>double</TD></TR>
  <TR><TD>temperature</TD><TD>double</TD></TR>
  <TR><TD>pressure</TD><TD>double</TD></TR>
  <TR><TD>co</TD><TD>double</TD></TR>
  <TR><TD>no2</TD><TD>double</TD></TR>
  <TR><TD>o3</TD><TD>double</TD></TR>
  <TR><TD>so2</TD><TD>double</TD></TR>
  <TR><TD>pm25</TD><TD>double</TD></TR>
  <TR><TD>pm10</TD><TD>double</TD></TR>
</TABLE>>''')

# Define table: province
db_diagram.node('province', '''<<TABLE BORDER="1" CELLBORDER="1" CELLSPACING="0">
  <TR><TD COLSPAN="2"><B>province</B></TD></TR>
  <TR><TD>province</TD><TD>text</TD></TR>
  <TR><TD>datetime</TD><TD>timestamp</TD></TR>
  <TR><TD>co</TD><TD>double</TD></TR>
  <TR><TD>no2</TD><TD>double</TD></TR>
  <TR><TD>o3</TD><TD>double</TD></TR>
  <TR><TD>so2</TD><TD>double</TD></TR>
  <TR><TD>pm25</TD><TD>double</TD></TR>
  <TR><TD>pm10</TD><TD>double</TD></TR>
  <TR><TD>aqi</TD><TD>integer</TD></TR>
</TABLE>>''')

# Define table: historical
db_diagram.node('historical', '''<<TABLE BORDER="1" CELLBORDER="1" CELLSPACING="0">
  <TR><TD COLSPAN="2"><B>historical</B></TD></TR>
  <TR><TD><B>uid</B></TD><TD>integer (FK)</TD></TR>
  <TR><TD>aqi</TD><TD>integer</TD></TR>
  <TR><TD>lat</TD><TD>double</TD></TR>
  <TR><TD>lon</TD><TD>double</TD></TR>
  <TR><TD>datetime</TD><TD>timestamp</TD></TR>
  <TR><TD>co</TD><TD>double</TD></TR>
  <TR><TD>no2</TD><TD>double</TD></TR>
  <TR><TD>o3</TD><TD>double</TD></TR>
  <TR><TD>so2</TD><TD>double</TD></TR>
  <TR><TD>pm25</TD><TD>double</TD></TR>
  <TR><TD>pm10</TD><TD>double</TD></TR>
</TABLE>>''')

# Add foreign key relationship
db_diagram.edge('historical', 'visual', label='uid (FK)', arrowhead='normal')

# Render the diagram
db_diagram.render('/mnt/data/aqi_db_diagram', cleanup=False)
'/mnt/data/aqi_db_diagram.png'


ExecutableNotFound: failed to execute WindowsPath('dot'), make sure the Graphviz executables are on your systems' PATH