In [None]:
# 导入必要的库
import os
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from datetime import datetime, timedelta





In [None]:
# 配置
DATA_DIR = r"data\Air pollution data\FI"
URLS_CSV = r"data\Air pollution data\FI\ParquetFilesUrlse1a.csv"

# 设置固定日期范围
START_DATE = "2020-10-01"
END_DATE = "2025-09-30"
# 预测开始日期
PREDICTION_START_DATE = "2025-10-01"

print(f'Fetching data from {START_DATE} to {END_DATE}')
print(f'Prediction will start from {PREDICTION_START_DATE} for one year')

# 下载parquet文件的函数
def download_parquet_files(urls_csv, data_dir):
    # 从CSV文件读取URLs
    urls = pd.read_csv(urls_csv)["ParquetFileUrl"]
    # 下载每个文件
    for url in urls:
        file_name = url.split('/')[-1]
        file_path = os.path.join(data_dir, file_name)
        # 只在文件不存在时下载
        if not os.path.exists(file_path):
            print(f'Downloading {file_name}...')
            try:
                response = requests.get(url)
                response.raise_for_status()
                with open(file_path, 'wb') as f:
                    f.write(response.content)
            except Exception as e:
                print(f'Error downloading {file_name}: {e}')
        else:
            print(f'{file_name} already exists, skipping download.')


In [None]:
# 加载和预处理空气污染数据的函数
def load_air_pollution_data(data_dir, start_date, end_date):
    # 定义要读取的列
    columns_air = ["Samplingpoint", "Pollutant", "Start", "Value", "Unit", "Validity"]
    
    # 初始化空DataFrame
    df_air = pd.DataFrame()
    
    # 遍历目录并读取所有parquet文件
    for root, _, files in os.walk(data_dir):
        for filename in files:
            if filename.endswith('.parquet'):
                file_path = os.path.join(root, filename)
                try:
                    # 读取parquet文件
                    data = pd.read_parquet(file_path, columns=columns_air)
                    # 合并到主DataFrame
                    df_air = pd.concat([df_air, data])
                except Exception as e:
                    print(f'Error reading {filename}: {e}')
    
    # 数据清洗和预处理
    if not df_air.empty:
        # 将Start列转换为datetime类型
        df_air['Start'] = pd.to_datetime(df_air['Start'])
        
        # 按日期范围过滤数据
        df_air = df_air[(df_air['Start'] >= start_date) & (df_air['Start'] <= end_date)]
        
        # 只保留有效数据
        df_air = df_air[df_air['Validity'] == 1]
        
        # 移除Validity列，因为不再需要
        df_air = df_air.drop(['Validity'], axis=1)
        
        # 从Samplingpoint中移除国家代码前缀
        if 'Samplingpoint' in df_air.columns:
            df_air['Samplingpoint'] = df_air['Samplingpoint'].str[3:]
        
        # 将Value转换为float并处理负值
        df_air['Value'] = pd.to_numeric(df_air['Value'], errors='coerce')
        df_air['Value'] = df_air['Value'].where(lambda x: x > 0, np.nan)
        
        # 前向填充缺失值
        df_air = df_air.ffill()
        
        # 删除任何剩余的NaN值
        df_air = df_air.dropna()
    
    return df_air

# 执行数据获取
print('Downloading parquet files...')
download_parquet_files(URLS_CSV, DATA_DIR)

print('\nLoading and preprocessing air pollution data...')
df_air = load_air_pollution_data(DATA_DIR, START_DATE, END_DATE)

print(f'\nLoaded {len(df_air)} records')
if not df_air.empty:
    print('\nSample data:')
    print(df_air.head())
    print('\nData summary:')
    print(df_air.describe())
    
    # 获取所有唯一站点
    stations = df_air['Samplingpoint'].unique()
    print(f'\nFound {len(stations)} unique sampling stations:')
    print(stations)

# 为GRU准备多污染物时间序列数据的函数
def prepare_multi_pollutant_data(df, pollutant_ids, station_id, sequence_length=24):
    ""
    # 按站点过滤数据
    df_station = df[df['Samplingpoint'] == station_id].copy()
    
    # 确保数据按时间排序
    df_station = df_station.sort_values('Start')
    
    # 获取唯一的日期时间点
    timestamps = df_station['Start'].unique()
    timestamps.sort()
    
    # 创建一个时间点索引的数据框，用于合并不同污染物的数据
    multi_pollutant_df = pd.DataFrame({'Start': timestamps})
    
    # 为每个污染物创建一列
    for pollutant_id in pollutant_ids:
        pollutant_data = df_station[df_station['Pollutant'] == pollutant_id][['Start', 'Value']]
        pollutant_data = pollutant_data.rename(columns={'Value': f'Value_{pollutant_id}'})
        multi_pollutant_df = pd.merge(multi_pollutant_df, pollutant_data, on='Start', how='left')
    
    # 填充缺失值
    multi_pollutant_df = multi_pollutant_df.ffill().bfill()
    multi_pollutant_df = multi_pollutant_df.dropna()
    
    # 准备数据用于时间序列预测
    X, y, dates = [], [], []
    feature_cols = [f'Value_{pid}' for pid in pollutant_ids]
    num_features = len(pollutant_ids)
    
    # 为每个特征创建单独的缩放器
    scalers = {pid: MinMaxScaler(feature_range=(0, 1)) for pid in pollutant_ids}
    
    # 对每个污染物进行归一化
    scaled_values = np.zeros((len(multi_pollutant_df), num_features))
    for i, pid in enumerate(pollutant_ids):
        values_col = multi_pollutant_df[f'Value_{pid}'].values.reshape(-1, 1)
        scaled_values[:, i] = scalers[pid].fit_transform(values_col).flatten()
    
    # 创建时间序列序列
    for i in range(len(scaled_values) - sequence_length):
        X.append(scaled_values[i:i+sequence_length])
        y.append(scaled_values[i+sequence_length])
        dates.append(multi_pollutant_df.iloc[i+sequence_length]['Start'])
    
    # 转换为numpy数组
    X = np.array(X)
    y = np.array(y)
    
    return X, y, scalers, dates

# 自定义PyTorch数据集类
class MultiPollutantDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# GRU模型定义 - 支持多输出
class MultiOutputGRUModel(nn.Module):
    def __init__(self, input_size, hidden_size=64, num_layers=2, output_size=4):
        super(MultiOutputGRUModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # GRU层
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        
        # 全连接层 - 输出多个污染物的值
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # 初始化为零的隐藏状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # 前向传播GRU
        out, _ = self.gru(x, h0)
        
        # 将最后一个时间步的输出传递给全连接层
        out = self.fc(out[:, -1, :])
        
        return out

# 训练函数
def train_model(model, train_loader, test_loader, num_epochs=50, learning_rate=0.001):
    # 设置设备（可用GPU时使用GPU，否则使用CPU）
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    # 损失函数和优化器
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    # 训练历史
    train_losses = []
    test_losses = []
    
    # 训练循环
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            
            # 前向传播
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            
            # 反向传播和优化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item() * X_batch.size(0)
        
        # 计算平均训练损失
        train_loss /= len(train_loader.dataset)
        train_losses.append(train_loss)
        
        # 在测试集上评估
        model.eval()
        test_loss = 0.0
        
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)
                test_loss += loss.item() * X_batch.size(0)
        
        # 计算平均测试损失
        test_loss /= len(test_loader.dataset)
        test_losses.append(test_loss)
        
        # 打印进度
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')
    
    return model, train_losses, test_losses

# 如果有数据则训练模型
if 'X_train' in locals():
    print('Initializing and training GRU model...')
    
    # 创建模型实例
    model = GRUModel(input_size=1, hidden_size=64, num_layers=2, output_size=1)
    
    # 训练模型
    model, train_losses, test_losses = train_model(
        model, train_loader, test_loader, num_epochs=50, learning_rate=0.001
    )
    
    # 绘制训练和测试损失
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(test_losses, label='Test Loss')
    plt.title('GRU Model Training Progress')
    plt.xlabel('Epochs')
    plt.ylabel('Loss (MSE)')
    plt.legend()
    plt.grid(True)
    plt.show()



In [None]:
# 评估模型并进行预测的函数
def evaluate_model(model, test_loader, scalers, pollutant_ids):
    # 设置设备
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()
    
    # 存储预测和实际值的字典
    all_predictions = {pid: [] for pid in pollutant_ids}
    all_actuals = {pid: [] for pid in pollutant_ids}
    
    # 在测试集上进行预测
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch = X_batch.to(device)
            predictions = model(X_batch)
            
            # 转换为numpy并反向转换
            predictions_np = predictions.cpu().numpy()
            y_batch_np = y_batch.numpy()
            
            # 反向转换以获取实际值
            predictions_inv = scaler.inverse_transform(predictions_np.reshape(-1, 1)).flatten()
            actuals_inv = scaler.inverse_transform(y_batch_np.reshape(-1, 1)).flatten()
            
            all_predictions.extend(predictions_inv)
            all_actuals.extend(actuals_inv)
    
    # 计算评估指标
    mse = mean_squared_error(all_actuals, all_predictions)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(all_actuals, all_predictions)
    
    print(f'Model Evaluation Metrics:')
    print(f'RMSE: {rmse:.4f}')
    print(f'MAE: {mae:.4f}')
    
    return all_predictions, all_actuals, rmse, mae


In [None]:
# 如果模型存在则评估
if 'model' in locals():
    print('Evaluating model performance...')
    predictions, actuals, rmse, mae = evaluate_model(model, test_loader, scaler)
    
    # 绘制预测值与实际值对比
    plt.figure(figsize=(14, 7))
    plt.plot(test_dates[:len(predictions)], actuals, label='Actual Values', color='blue')
    plt.plot(test_dates[:len(predictions)], predictions, label='Predictions', color='red', linestyle='--')
    plt.title('GRU Model Predictions vs Actual Values')
    plt.xlabel('Date')
    plt.ylabel('Pollutant Concentration')
    plt.legend()
    plt.grid(True)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()





In [None]:
# 预测未来值的函数
def predict_future(model, last_sequence, scaler, pollutant_ids, prediction_days=365):
    # 设置设备
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()
    
    # 将last_sequence转换为tensor
    current_sequence = torch.tensor(last_sequence, dtype=torch.float32).to(device)
    current_sequence = current_sequence.unsqueeze(0)  # 添加批次维度
    
    # 存储预测的列表
    future_predictions = []
    
    # 为指定天数生成预测
    for _ in range(prediction_days * 24):  # 假设每小时预测
        with torch.no_grad():
            # 预测下一个值
            next_pred = model(current_sequence)
            
            # 转换为numpy并存储
            next_pred_np = next_pred.cpu().numpy()[0][0]
            future_predictions.append(next_pred_np)
            
            # 更新序列以进行下一次预测
            new_sequence = torch.cat([
                current_sequence[:, 1:, :],
                next_pred.unsqueeze(2)
            ], dim=1)
            current_sequence = new_sequence
    
    # 反向转换以获取实际值
    future_predictions = np.array(future_predictions).reshape(-1, 1)
    future_predictions_inv = scaler.inverse_transform(future_predictions).flatten()
    
    return future_predictions_inv

In [None]:
# 如果模型存在则进行未来预测
if 'model' in locals() and 'X_test' in locals() and 'scaler' in locals():
    print('Making predictions for the current year...')
    
    # 获取测试数据中的最后一个序列
    last_sequence = X_test[-1]
    
    # 预测今年（简化为365天）
    current_year_predictions = predict_future(model, last_sequence, scaler, prediction_days=300)  # 演示使用30天
    
    prediction_start = datetime.strptime(PREDICTION_START_DATE, '%Y-%m-%d')
    future_dates = [prediction_start + timedelta(hours=i) for i in range(len(current_year_predictions))]
    
    # 绘制未来预测
    plt.figure(figsize=(14, 7))
    plt.plot(future_dates, current_year_predictions, color='green', label='Predicted Values for Current Year')
    plt.title('GRU Model Predictions for Current Year')
    plt.xlabel('Date')
    plt.ylabel('Pollutant Concentration')
    plt.legend()
    plt.grid(True)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
        # 保存预测到CSV
    pred_df = pd.DataFrame({
        'Date': future_dates,
        'Predicted_Pollutant_Level': current_year_predictions
    })
    
    pred_csv_path = 'current_year_predictionsv0.5.csv'
    pred_df.to_csv(pred_csv_path, index=False)
    print(f'Predictions saved to {pred_csv_path}')
    
    # 显示预测的汇总统计
    print('\nSummary of Current Year Predictions:')
    print(f'Minimum predicted level: {current_year_predictions.min():.2f}')
    print(f'Maximum predicted level: {current_year_predictions.max():.2f}')
    print(f'Average predicted level: {current_year_predictions.mean():.2f}')
    print(f'Standard deviation: {current_year_predictions.std():.2f}')

    