# CRAWL DATA AIR QUALITY TỪ CÁC TRẠM Ở WEB OPENAQ, 6 THÁNG LƯU 1 LẦN VÀO FILE CSV

# CELL 1: Cài đặt và Khởi tạo

In [1]:
import requests
import pandas as pd
import boto3
import io
import os
import glob
from datetime import datetime, timezone
from botocore import UNSIGNED
from botocore.config import Config

# Load secret from .env
from dotenv import load_dotenv
load_dotenv()  # loads .env in project root

API_KEY = os.getenv("OPENAQ_API_KEY")
if not API_KEY:
    raise RuntimeError("OPENAQ_API_KEY not set in environment (.env missing or not loaded)")

BASE_URL = "https://api.openaq.org/v3"
headers = {"X-API-Key": API_KEY}

print("Cell 1: Các thư viện và biến đã được khởi tạo.")

Cell 1: Các thư viện và biến đã được khởi tạo.


In [2]:
# CELL 2: Lấy thông tin các trạm quan trắc (Trinh sát)

# 1. Lấy danh sách các trạm trong khu vực Hà Nội
bbox = "105.7,20.9,106.0,21.2" # Bounding box cho Hà Nội
print("Đang lấy thông tin các trạm từ API OpenAQ...")
resp = requests.get(f"{BASE_URL}/locations?bbox={bbox}&limit=1000", headers=headers).json()
locations_in_bbox = resp.get("results", [])

# 2. Tạo một "sổ tay tra cứu" (lookup dictionary) chứa thông tin của các trạm
stations_info = {}
earliest_overall_date = datetime.now(timezone.utc)

if not locations_in_bbox:
    print("Không tìm thấy trạm nào trong khu vực đã chọn.")
else:
    print(f"Tìm thấy {len(locations_in_bbox)} trạm. Bắt đầu xử lý thông tin từng trạm...")
    for loc in locations_in_bbox:
        loc_id = loc['id']
        
        # Xử lý an toàn cho datetimeFirst
        dt_first_obj = loc.get("datetimeFirst")
        if dt_first_obj and dt_first_obj.get("utc"):
            first_dt_str = dt_first_obj["utc"]
            start_date = datetime.fromisoformat(first_dt_str.replace("Z", "+00:00"))
            if start_date < earliest_overall_date:
                earliest_overall_date = start_date
        else:
            print(f"  - Trạm ID {loc_id} ({loc.get('name', 'N/A')}) bị bỏ qua do thiếu thông tin 'datetimeFirst'.")
            continue
            
        # Xử lý an toàn cho datetimeLast
        dt_last_obj = loc.get("datetimeLast")
        if dt_last_obj and dt_last_obj.get("utc"):
            last_dt_str = dt_last_obj["utc"]
            end_date = datetime.fromisoformat(last_dt_str.replace("Z", "+00:00"))
        else:
            end_date = datetime.now(timezone.utc)
            
        stations_info[loc_id] = {
            "name": loc.get('name', 'Unknown'),
            "start_date": start_date,
            "end_date": end_date
        }
    
    print("-" * 20)
    print(f"Cell 2: Đã thu thập thông tin của {len(stations_info)} trạm hợp lệ.")
    if stations_info:
        print(f"Toàn bộ dữ liệu sẽ được quét từ tháng: {earliest_overall_date.strftime('%Y-%m')}")

Đang lấy thông tin các trạm từ API OpenAQ...
Tìm thấy 33 trạm. Bắt đầu xử lý thông tin từng trạm...
  - Trạm ID 18 (SPARTAN - Vietnam Acad. Sci.) bị bỏ qua do thiếu thông tin 'datetimeFirst'.
  - Trạm ID 307169 (nồng độ pm) bị bỏ qua do thiếu thông tin 'datetimeFirst'.
--------------------
Cell 2: Đã thu thập thông tin của 31 trạm hợp lệ.
Toàn bộ dữ liệu sẽ được quét từ tháng: 2016-01


In [3]:
# CELL 3: Tải dữ liệu và lưu riêng lẻ (Tải hàng loạt)

# Kiểm tra xem các biến từ Cell 2 đã tồn tại chưa
if 'stations_info' not in locals() or not stations_info:
    print("Vui lòng chạy Cell 2 trước để lấy thông tin các trạm.")
else:
    # 1. Tạo thư mục chính
    main_output_dir = "hanoi_air_quality_data_raw"
    if not os.path.exists(main_output_dir):
        os.makedirs(main_output_dir)
        print(f"Đã tạo thư mục chính: {main_output_dir}")

    # 2. Thiết lập kết nối S3
    s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED, region_name='us-east-1'))
    bucket_name = 'openaq-data-archive'
    
    total_files_downloaded = 0
    print(f"Bắt đầu quá trình tải dữ liệu cho {len(stations_info)} trạm...")

    # 3. Lặp qua từng trạm
    for loc_id, info in stations_info.items():
        print(f"\n--- Đang xử lý trạm ID: {loc_id} ({info['name']}) ---")

        station_dir = os.path.join(main_output_dir, str(loc_id))
        if not os.path.exists(station_dir):
            os.makedirs(station_dir)

        # 4. Xác định các năm mà trạm này có dữ liệu
        years_to_scan = range(info['start_date'].year, info['end_date'].year + 1)
        
        for year in years_to_scan:
            # 5. Xử lý thành 2 nửa: Nửa đầu năm (H1) và Nửa cuối năm (H2)
            for semester in [1, 2]:
                months_in_semester = range(1, 7) if semester == 1 else range(7, 13)
                semester_label = "H1" if semester == 1 else "H2"
                
                semester_data_frames = []
                
                for month in months_in_semester:
                    prefix = (f"records/csv.gz/locationid={loc_id}/"
                              f"year={year}/"
                              f"month={month:02d}/")
                    
                    try:
                        paginator = s3.get_paginator('list_objects_v2')
                        pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

                        for page in pages:
                            if 'Contents' not in page: continue
                            
                            for obj in page['Contents']:
                                file_key = obj['Key']
                                s3_object = s3.get_object(Bucket=bucket_name, Key=file_key)
                                bytes_buffer = io.BytesIO(s3_object['Body'].read())
                                df = pd.read_csv(bytes_buffer, compression='gzip', header=0)
                                semester_data_frames.append(df)
                                total_files_downloaded += 1
                                print(f"  -> Đã tải file: {file_key.split('/')[-1]} ({len(df)} dòng)")

                    except Exception as e:
                        if 'NoSuchKey' not in str(e) and 'AccessDenied' not in str(e):
                             print(f"    -> Lỗi khi quét {prefix}: {e}")

                # 6. Gộp và lưu file cho cả nửa năm
                if semester_data_frames:
                    semester_df = pd.concat(semester_data_frames, ignore_index=True)
                    semester_df['datetime'] = pd.to_datetime(semester_df['datetime'], utc=True)
                    
                    semester_df = semester_df[semester_df['datetime'].between(info['start_date'], info['end_date'])]

                    if not semester_df.empty:
                        output_filename = os.path.join(station_dir, f"{loc_id}_{year}_{semester_label}.csv")
                        semester_df.to_csv(output_filename, index=False, encoding='utf-8-sig')
                        print(f"  ==> ĐÃ LƯU: {len(semester_df)} dòng dữ liệu cho {semester_label}/{year} vào file {output_filename}")

    print(f"\n---> HOÀN TẤT QUÁ TRÌNH TẢI <---")
    print(f"Cell 3: Tổng cộng đã tải và xử lý {total_files_downloaded} file.")
    print(f"Dữ liệu đã được lưu vào các thư mục con trong '{main_output_dir}'")

Đã tạo thư mục chính: hanoi_air_quality_data_raw
Bắt đầu quá trình tải dữ liệu cho 31 trạm...

--- Đang xử lý trạm ID: 2539 (US Diplomatic Post: Hanoi) ---
  -> Đã tải file: location-2539-20160130.csv.gz (18 dòng)
  -> Đã tải file: location-2539-20160131.csv.gz (24 dòng)
  -> Đã tải file: location-2539-20160201.csv.gz (24 dòng)
  -> Đã tải file: location-2539-20160202.csv.gz (24 dòng)
  -> Đã tải file: location-2539-20160203.csv.gz (24 dòng)
  -> Đã tải file: location-2539-20160204.csv.gz (24 dòng)
  -> Đã tải file: location-2539-20160205.csv.gz (24 dòng)
  -> Đã tải file: location-2539-20160206.csv.gz (9 dòng)
  -> Đã tải file: location-2539-20160208.csv.gz (11 dòng)
  -> Đã tải file: location-2539-20160209.csv.gz (24 dòng)
  -> Đã tải file: location-2539-20160210.csv.gz (24 dòng)
  -> Đã tải file: location-2539-20160211.csv.gz (24 dòng)
  -> Đã tải file: location-2539-20160212.csv.gz (24 dòng)
  -> Đã tải file: location-2539-20160213.csv.gz (24 dòng)
  -> Đã tải file: location-2539-2

In [4]:
# CELL 4: Gộp các file dữ liệu (Tổng hợp)

main_output_dir = "hanoi_air_quality_data_raw"

if not os.path.exists(main_output_dir):
    print(f"Thư mục '{main_output_dir}' không tồn tại. Vui lòng chạy Cell 3 trước.")
else:
    all_csv_files = glob.glob(os.path.join(main_output_dir, '**', '*.csv'), recursive=True)
    
    if not all_csv_files:
        print("Không tìm thấy file CSV nào để gộp.")
    else:
        print(f"Tìm thấy {len(all_csv_files)} file CSV. Bắt đầu gộp...")
        
        all_data_frames = [pd.read_csv(f) for f in all_csv_files]
        final_df_long = pd.concat(all_data_frames, ignore_index=True)
        
        final_df_long['datetime'] = pd.to_datetime(final_df_long['datetime'])
        final_df_long = final_df_long.sort_values(by=['location_id', 'datetime'])
        
        final_output_filename = "hanoi_air_quality_history_COMBINED_LONG.csv"
        final_df_long.to_csv(final_output_filename, index=False, encoding='utf-8-sig')
        
        print("\n---> QUÁ TRÌNH GỘP HOÀN TẤT <---")
        print(f"Cell 4: Dữ liệu dạng dài đã được gộp và lưu vào file '{final_output_filename}'")

Tìm thấy 70 file CSV. Bắt đầu gộp...

---> QUÁ TRÌNH GỘP HOÀN TẤT <---
Cell 4: Dữ liệu dạng dài đã được gộp và lưu vào file 'hanoi_air_quality_history_COMBINED_LONG.csv'


In [1]:
# CELL 5 (Cập nhật): Làm sạch và Biến đổi

import pandas as pd
import numpy as np # Thêm thư viện numpy để sử dụng np.nan
import os

long_format_file = "hanoi_air_quality_history_COMBINED_LONG.csv"

if not os.path.exists(long_format_file):
    print(f"File '{long_format_file}' không tồn tại. Vui lòng chạy Cell 4 trước.")
else:
    print(f"Đang đọc file '{long_format_file}'...")
    df_long = pd.read_csv(long_format_file)
    print(f"Đã đọc xong. Tổng số dòng ban đầu: {len(df_long)}")

    # --- BƯỚC 1: LÀM SẠCH DỮ LIỆU ---
    print("\nBắt đầu quá trình làm sạch dữ liệu...")
    # Đếm số lượng giá trị không hợp lệ trước khi xử lý
    invalid_values_count = (df_long['value'] < 0).sum()
    print(f"Tìm thấy {invalid_values_count} dòng có giá trị 'value' âm (bao gồm -999).")

    # Thay thế tất cả các giá trị 'value' âm bằng NaN
    # Điều này sẽ xử lý cả -999.0 và các giá trị âm khác nếu có
    df_long['value'] = df_long['value'].apply(lambda x: np.nan if x < 0 else x)
    print("Đã thay thế các giá trị không hợp lệ bằng NaN.")
    
    # Bỏ các dòng không còn giá trị 'value' nào (đã bị thay thế bằng NaN)
    df_long.dropna(subset=['value'], inplace=True)
    print(f"Tổng số dòng còn lại sau khi loại bỏ các giá trị không hợp lệ: {len(df_long)}")
    # ------------------------------------

    print("\nBắt đầu chuyển đổi sang dạng rộng...")
    # Loại bỏ các cột không cần thiết trước khi pivot
    columns_to_pivot = ['location_id', 'datetime', 'parameter', 'value']
    df_wide = df_long[columns_to_pivot].pivot_table(
        index=['location_id', 'datetime'], 
        columns='parameter', 
        values='value'
    ).reset_index()
    
    wide_output_filename = "hanoi_air_quality_history_COMBINED_WIDE_CLEANED.csv"
    df_wide.to_csv(wide_output_filename, index=False, encoding='utf-8-sig')
    
    print("\n---> QUÁ TRÌNH BIẾN ĐỔI HOÀN TẤT <---")
    print(f"Cell 5: Dữ liệu đã được làm sạch, chuyển đổi và lưu vào file '{wide_output_filename}'")
    print("\nXem trước 15 dòng đầu của dữ liệu dạng rộng đã được làm sạch:")
    print(df_wide.head(15))

Đang đọc file 'hanoi_air_quality_history_COMBINED_LONG.csv'...
Đã đọc xong. Tổng số dòng ban đầu: 663848

Bắt đầu quá trình làm sạch dữ liệu...
Tìm thấy 10650 dòng có giá trị 'value' âm (bao gồm -999).
Đã thay thế các giá trị không hợp lệ bằng NaN.
Tổng số dòng còn lại sau khi loại bỏ các giá trị không hợp lệ: 653198

Bắt đầu chuyển đổi sang dạng rộng...

---> QUÁ TRÌNH BIẾN ĐỔI HOÀN TẤT <---
Cell 5: Dữ liệu đã được làm sạch, chuyển đổi và lưu vào file 'hanoi_air_quality_history_COMBINED_WIDE_CLEANED.csv'

Xem trước 15 dòng đầu của dữ liệu dạng rộng đã được làm sạch:
parameter  location_id                   datetime  co  no2  o3  pm10  pm25  \
0                 2539  2016-01-30 01:00:00+00:00 NaN  NaN NaN   NaN  42.0   
1                 2539  2016-01-30 02:00:00+00:00 NaN  NaN NaN   NaN  42.2   
2                 2539  2016-01-30 03:00:00+00:00 NaN  NaN NaN   NaN  48.7   
3                 2539  2016-01-30 04:00:00+00:00 NaN  NaN NaN   NaN  41.6   
4                 2539  2016-01-30 0