In [14]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import Dropout,Dense,LSTM,GRU,Conv2D,Activation,BatchNormalization
import matplotlib.pyplot as plt
from sklearn.preprocessing import OrdinalEncoder,StandardScaler
import joblib
import os
from math import sqrt
from sklearn.metrics import mean_squared_error,mean_absolute_error,r2_score
from tensorflow.keras.models import load_model
import matplotlib.dates as mdates
from time import time
import multiprocessing as mp
from tensorflow.keras import Model
import itertools
from functools import partial
import gc

In [15]:
# 指定训练使用的GPU
os.environ['CUDA_VISIBLE_DEVICES']='1'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # 设置log输出信息，2是打印error
# 设置全局控制的变量
freq = 'D'
region = 'NA'

if freq == 'D':
    list_col = ['NO2+d','NO2-d','NO2-2d','NO2-3d']
    file_name = 'demo_data.csv'

file_path = 'data/'

In [17]:
data = pd.read_csv(file_path + file_name)

In [18]:
# 划分训练集和测试集
train_set = data[data.year < 2020].reset_index(drop=True)
test_set = data[data.year == 2020].reset_index(drop=True)

In [19]:
cols = data.columns.drop(['city',list_col[0],'station','geometry','index'])
#cols = data.columns.drop(['city',list_col[0],'station','index'])
index_std_NO2 = 1

# 需要从数据集中取出进行标准化特征列
cols = ['SCTC', 'PM2.5', 'Dew Point Temp', 'O3','Sea Level Pres', 'AQI', 'dist', 'PM10','NO2-d','NO2-2d','NO2-3d'
    , 'Wind Dire', 'Elev', 'CO','Wind Speed', 'NO2', 'SO2', 'Air Temp', 'year', 'month', 'day', 'hour','day_of_week'
    , 'week', 'day_of_year', 'season','is_weekend', 'lon_m', 'lon', 'lat_m', 'lat' ,'dis_pop','people_density'
    ,'road_density','PBLH','HLML','QSH']
index_std_NO2 = 1

In [20]:
# 函数：将特征标准化
def data_std(train_set,test_set,num_col=38):
    global std,index_std_NO2
    std = StandardScaler()
    col =train_set[cols].columns
    train_ = std.fit_transform(train_set[cols])
    train_ = pd.DataFrame(train_,columns=col)
    test_ = std.transform(test_set[cols])
    test_ = pd.DataFrame(test_,columns=col)
    index_std_NO2 = train_.columns.tolist().index('NO2')

   # list_col = ['NO2', 'season', 'SCTC', 'NO2-d','O3', 'period_of_day', 'hour', 'lon_m', 'lon', 'dist','year', 'week','lat_m'
        #,'Dew Point Temp','day_of_year','CO','lat','Air Temp', 'NO2-3d','NO2-2d','PM2.5','day','month','PM10','Sea Level Pres'
        #,'day_of_week','SO2','AQI', 'Wind Speed','Wind Dire','Elev','is_weekend']
    #train_ = train_.loc[:,list_col[:num_col]]
    #test_ = test_.loc[:,list_col[:num_col]]
    # 添加数据关键信息，站点名和index等
    train_['index'] = train_set['index']
    train_['station']=train_set['station']
    test_['index'] = test_set['index']
    test_['station'] = test_set['station']
    return train_,test_

In [21]:
# 函数：填补时间序列
def impute_time(group,start,end,sta,lists,locks):

    group = group.set_index('index')
    group.index = pd.to_datetime(group.index)
    new_index = pd.date_range(start,end,freq=freq)[:-1]
    group = group.reindex(new_index)
    group['station'] = sta
    
    locks.acquire()
    lists.append(group)
    locks.release()

def impute_time_multi(df,start,end,targets=impute_time):
    lists = mp.Manager().list()
    locks = mp.Manager().Lock()
    pool1 = mp.Pool()
    for sta,group in df.groupby('station'):
        pool1.apply_async(targets,args=(group,start,end,sta,lists,locks))
    pool1.close()
    pool1.join()
    data_1 = pd.concat(lists)
    data_1 = data_1.reset_index()
    return data_1

In [22]:
# 函数：将数据转换为三维数据,并提取标签
def data_transform_s(step,group):
    
    xx = []
    yy = []
    index_NO2 = group.columns.tolist().index('NO2')
    index_time = group.columns.tolist().index('index')
    index_station = group.columns.tolist().index('station')
    for i in range(step,group.shape[0]):
        # 筛选还有缺失值的展开步长不足的数据块
        if group.iloc[i-step:i,:].isnull().sum().values.sum() == 0 and group.iloc[i-step:i,:].shape[0] == step and np.isnan(group.iloc[i,index_NO2]) == False:
            #print(group.iloc[i,index_NO2])    
            xx.append(np.array(group.iloc[i-step:i,:].drop(columns=['index','station'])))
            yy.append(np.array(group.iloc[i,[index_NO2,index_time,index_station]]))
    
    xx = np.array(xx).astype('float32')
    yy = np.array(yy)
    return xx,yy

def concat_array(df):
    xxxx = np.array(df)
    x_sum = []
    y_sum = []
    for i in range(0,xxxx.shape[0]):
        x_np = xxxx[i,0]
        y_np = xxxx[i,1]
        if x_np.shape[0] != 0:
            x_sum.append(x_np)
            y_sum.append(y_np)
    
    x_sum = np.concatenate(x_sum)
    y_sum = np.concatenate(y_sum)
    #print(x_sum.shape,y_sum.shape)
    return x_sum,y_sum

            
def data_transform_multi(df,step):
    #x = mp.Manager().list()
    #y = mp.Manager().list()
    #locks = mp.Manager().Lock()
    
    
    df_list = []
    for i,j in df.groupby(['station']):
        df_list.append(j)
    
    #定义偏函数传入step
    pool = mp.Pool()
    adjust = partial(data_transform_s,step)
    res = pool.map(adjust,df_list[:500]) 
    pool.close()
    pool.join()
    
    pool = mp.Pool()
    adjust = partial(data_transform_s,step)
    res_1 = pool.map(adjust,df_list[500:1000]) 
    pool.close()
    pool.join()
    
    pool = mp.Pool()
    adjust = partial(data_transform_s,step)
    res_2 = pool.map(adjust,df_list[1000:1400]) 
    pool.close()
    pool.join()
    
    pool = mp.Pool()
    adjust = partial(data_transform_s,step)
    res_3 = pool.map(adjust,df_list[1400:]) 
    pool.close()
    pool.join()
    
    res = res + res_1 + res_2 + res_3 
    del df_list,res_1,res_2,res_3,pool
    gc.collect()
    
    x,y = concat_array(res)
    return x,y

In [23]:
# 函数：数据维度转换，打乱和reshape
def train_test_to_3D(train_,test_,step):
    # 填补时间序列
    train = impute_time_multi(train_,'2014-05-13','2020-01-01')
    test = impute_time_multi(test_,'2020-01-01','2021-01-01')
    # 找到NO2所在的列数
    
    # 划分训练集和测试集的特征、标签

    xtrain,ytrain_ = data_transform_multi(train,step)
    xtest,ytest_ = data_transform_multi(test,step)

    del train,test
    gc.collect()
    
    # 对训练集进行打乱
    np.random.seed(7)
    np.random.shuffle(xtrain)
    np.random.seed(7)
    np.random.shuffle(ytrain_)
    # 将训练集由list转换位array
    xtrain,ytrain_ = np.array(xtrain),np.array(ytrain_)
    # 将训练集转化为可输入模型的格式
    xtrain  = np.reshape(xtrain,(xtrain.shape[0],step,train_.shape[1]-2))
    #ytrian_ = ytrain_.reshape(len(ytrain_),3)
    # ytrian_:带时间索引和站点名称的数据
    ytrain = np.array(list(ytrain_[:,0]))

    # 测试集数据转换
    xtest,ytest_ = np.array(xtest),np.array(ytest_)
    xtest = np.reshape(xtest,(xtest.shape[0],step,train_.shape[1]-2))
    ytest = np.array(list(ytest_[:,0]))

    return xtrain,ytrain,xtest,ytest,ytrain_,ytest_

In [24]:
# 对训练集和测试集进行标准化，并分别划分转化特征和标签.
train_,test_ = data_std(train_set,test_set)

In [25]:
del data, train_set, test_set
gc.collect()

69

In [26]:
if __name__ == '__main__':
    xtrain,ytrain,xtest,ytest,ytrain_,ytest_ = train_test_to_3D(train_,test_,14)



In [27]:
print(index_std_NO2,std)

0 StandardScaler()


In [28]:
xtrain.dtype

dtype('float32')

In [30]:
# save data and laebls
np.savez(file_path + 'numpy/NA_d_array_14.npz',a=xtrain,b=ytrain,c=xtest,d=ytest,e=ytrain_,f=ytest_)

In [31]:
file_path

'data/'

In [21]:
# save feature names
pd.DataFrame(cols).to_csv(file_path + 'features_name/features_name_' +  freq, index=False)

In [22]:
# save normalization parameters
norma_param = pd.DataFrame([std.var_,std.mean_],index=['std', 'mean']).T
norma_param.to_csv(file_path + 'normalization_params/normalization_params_' +  freq + '_no2-idx-0', index=False)