In [1]:
import requests
from requests import Response
def get_response() -> Response | None:
    aqi_url = 'https://data.moenv.gov.tw/api/v2/aqx_p_488?api_key=e8dd42e6-9b8b-43f8-991e-b3dee723a52d&limit=1000&sort=datacreationdate desc&format=CSV'
    try:
        response:Response =requests.get(aqi_url)
        response.raise_for_status()
        if response.status_code == 200:
            print("下載成功")
            return response
        else:
            print("下載失敗")
            return None
    except Exception as e:
        print(e)
        print("連線失敗")
        return None

In [3]:
response:Response | None = get_response()
if response:
    rawString:str = response.text
from io import StringIO
import csv
from csv import DictReader
from pprint import pprint
file:StringIO = StringIO(rawString)
reader:DictReader = csv.DictReader(file)
data:list[dict] = list(reader)
pprint(data)

下載成功
[{'aqi': '37',
  'co': '0.16',
  'co_8hr': '0.1',
  'county': '屏東縣',
  'datacreationdate': '2024-03-28 18:00',
  'latitude': '22.260899',
  'longitude': '120.651472',
  'no': '0.3',
  'no2': '3.1',
  'nox': '3.4',
  'o3': '37.6',
  'o3_8hr': '40.7',
  'pm10': '32',
  'pm10_avg': '28',
  'pm2.5': '13',
  'pm2.5_avg': '11',
  'pollutant': '',
  'siteid': '313',
  'sitename': '屏東(枋山)',
  'so2': '0.5',
  'so2_avg': '0',
  'status': '良好',
  'unit': '',
  'winddirec': '293',
  'windspeed': '2.3'},
 {'aqi': '70',
  'co': '0.38',
  'co_8hr': '0.3',
  'county': '新北市',
  'datacreationdate': '2024-03-28 18:00',
  'latitude': '24.94902778',
  'longitude': '121.38352778',
  'no': '1.1',
  'no2': '14.6',
  'nox': '15.8',
  'o3': '32.2',
  'o3_8hr': '33.6',
  'pm10': '34',
  'pm10_avg': '39',
  'pm2.5': '22',
  'pm2.5_avg': '23.2',
  'pollutant': '細懸浮微粒',
  'siteid': '311',
  'sitename': '新北(樹林)',
  'so2': '1.2',
  'so2_avg': '0',
  'status': '普通',
  'unit': '',
  'winddirec': '47',
  'windspeed

In [10]:
from pydantic import BaseModel,Field,RootModel,field_validator

class Site(BaseModel):
    站點名稱:str = Field(alias='sitename')
    縣市:str = Field(alias='county')
    aqi:float
    時間:str = Field(alias='datacreationdate')
    pm25:float = Field(alias='pm2.5')

    @field_validator("pm25",mode='before')
    @classmethod
    def empty_to_zero(cls,value):
        if value=='':
            return '0.0'
        else:
            return value
        
class Sites(RootModel):
    root:list[Site]

    def __iter__(self):
        return iter(self.root)

    def __getitem__(self, item):
        return self.root[item]
    
sites:Sites = Sites.model_validate(data)
    
for item in sites:
    print(item)


站點名稱='屏東(枋山)' 縣市='屏東縣' aqi=37.0 時間='2024-03-28 18:00' pm25=13.0
站點名稱='新北(樹林)' 縣市='新北市' aqi=70.0 時間='2024-03-28 18:00' pm25=22.0
站點名稱='屏東(琉球)' 縣市='屏東縣' aqi=46.0 時間='2024-03-28 18:00' pm25=8.0
站點名稱='臺南(麻豆)' 縣市='臺南市' aqi=64.0 時間='2024-03-28 18:00' pm25=14.0
站點名稱='高雄(湖內)' 縣市='高雄市' aqi=47.0 時間='2024-03-28 18:00' pm25=8.0
站點名稱='彰化(員林)' 縣市='彰化縣' aqi=74.0 時間='2024-03-28 18:00' pm25=23.0
站點名稱='大城' 縣市='彰化縣' aqi=71.0 時間='2024-03-28 18:00' pm25=16.0
站點名稱='富貴角' 縣市='新北市' aqi=67.0 時間='2024-03-28 18:00' pm25=10.0
站點名稱='麥寮' 縣市='雲林縣' aqi=74.0 時間='2024-03-28 18:00' pm25=14.0
站點名稱='關山' 縣市='臺東縣' aqi=29.0 時間='2024-03-28 18:00' pm25=12.0
站點名稱='馬公' 縣市='澎湖縣' aqi=47.0 時間='2024-03-28 18:00' pm25=13.0
站點名稱='金門' 縣市='金門縣' aqi=84.0 時間='2024-03-28 18:00' pm25=28.0
站點名稱='馬祖' 縣市='連江縣' aqi=122.0 時間='2024-03-28 18:00' pm25=43.0
站點名稱='埔里' 縣市='南投縣' aqi=63.0 時間='2024-03-28 18:00' pm25=20.0
站點名稱='復興' 縣市='高雄市' aqi=66.0 時間='2024-03-28 18:00' pm25=23.0
站點名稱='永和' 縣市='新北市' aqi=52.0 時間='2024-03-28 18:00' pm25=18.0
站點名稱='竹山' 縣市='南投