In [1]:
import numpy as np
import pandas as pd
from tqdm import tqdm as tqdm
from multiprocessing import Pool
import time,gc

In [2]:
# f10:上一天该时刻数据的可信度
# f25:上一天该时刻最多进站站台
    
# f11:该站台进站口个数+出站口个数
# f13:站台的邻居个数
    
# f14:上一天乘客该时刻刷卡类型
# f15:上一天乘客该时间段刷卡类型
# f16:上一天乘客当天刷卡类型(刷卡类型归一化)

In [3]:
# 统计进出站每天的变化情况
def get_daily_activity(i):
    if i<10:
        s = '0'+str(i)
    else:
        s = str(i)
    day_activ = pd.read_csv('data/Metro_train/record_2019-01-%s.csv'%s)
    day_activ['time'] = pd.to_datetime(day_activ.time)
    day_activ['stationID'] = day_activ['stationID']
    day_activ['deviceID'] = day_activ['deviceID']
    day_activ['status'] = day_activ['status']
    day_activ['payType'] = day_activ['payType']
    day_activ['lineID'] = day_activ['lineID'].apply(lambda x: {'A':0,'B':2,'C':1}[x])
    return day_activ

def construct_df():
    final = pd.DataFrame()
    for i in range(81):
        temp = pd.DataFrame()
        temp['minute'] = list(range(1440))
        temp['stationID'] = i
        final = final.append(temp)
    final['minute'] = final['minute']/10
    return final

def get_cum_feature(function,n):
    pool = Pool(7)  #创建拥有5个进程数量的进程池
    timestep = [(x,n) for x in range(1440)]
    count_result =pool.map(function, timestep) 
    pool.close()#关闭进程池，不再接受新的进程
    pool.join()#主进程阻塞等待子进程的退出
    return count_result

# 分站点统计
def median_cum_bystation(params):
    i,n = params
    time_range = [i/10+x for x in range(0-n,1+n)]
    time_range = [143+1+x if x<0 else x for x in time_range]
    station_num = []
    for s in range(80):
        count = []
        for t in time_range:
            try:
                count.append(come_bystation.loc[(s,t),'count_10min'])
            except:
                count.append(0)
        station_num.append([s,i/10,np.median(count)])
    return station_num

In [4]:
# 构建邻接矩阵
roadmap = pd.read_csv('data/Metro_roadMap.csv',index_col=0)
near_node = {}
for index,row in roadmap.iterrows():
    for i in range(80):
        if row[i]==1:
            if index not in near_node:
                near_node[index] = []
            near_node[index].append(i)

In [5]:
t1 = time.time()
all_df = pd.DataFrame()
#for date in range(7,26):
for date in [12,13,14,19,20,21,26,28]:
    final = construct_df()
    day_activ = get_daily_activity(date)[['time','lineID','stationID','deviceID','status','payType']]
    df = pd.DataFrame()
    for i in range(10):
        day_activ['minute'] = (day_activ['time'].dt.minute-i+day_activ['time'].dt.hour*60)//10+0.1*i
        day_activ['minute'] = day_activ['minute'].apply(lambda x:143+1+x if x<0 else x)
        df = df.append(day_activ)
    df.sort_values(by='minute',inplace=True)
    come = df[df.status==0]
    # 周围半小时中位数
    come_bystation = come.groupby(['stationID','minute']).agg({'status':'count'}).rename(columns={'status':'count_10min'})
    count_result = get_cum_feature(median_cum_bystation,1)
    result = []
    for x in count_result:
        for i in x:
            result.append(i)
    f1 = pd.DataFrame(result)
    f1.rename(columns={0:'stationID',1:'minute',2:'median_30min_bystation'},inplace=True)
    final = final.merge(f1,on=['stationID','minute'],how='left')
    # 上一天该时刻最多进站展台
    come_bystation = come.groupby(['minute','stationID'],as_index=False).agg({'status':'count'}).rename(columns={'status':'count_10min'})
    f2 = come_bystation.sort_values(['minute','count_10min'],ascending=False).\
    drop_duplicates(['minute']).rename(columns={'count_10min':'count_10_max','stationID':'max_count_ID'})
    final = final.merge(f2,on='minute',how='left')
    # 该站台进站口个数+出站口个数
    f3 = come.groupby(['stationID'],as_index=False).agg({'deviceID':'nunique'}).rename(columns={'deviceID':'device_num'})    # f13:站台的邻居个数
    final = final.merge(f3,on='stationID',how='left')
    # 该站台的邻居个数
    final['near_num'] = final['stationID'].apply(lambda x:len(near_node[x]))
    # 刷卡类型处理，该时刻，全天，分站台处理
    count_type = come.groupby(['stationID','minute','payType'],as_index=False).agg({'status':'count'})
    come_bystation = come.groupby(['stationID','minute']).agg({'status':'count'}).rename(columns={'status':'count_10min'})
    rate = []
    for index,row in tqdm(count_type.iterrows()):
        rate.append(row['status']/come_bystation.loc[(row['stationID'],row['minute']),'count_10min'])
    count_type['rate'] = rate
    f4 = count_type[count_type.payType==0].rename(columns={'rate':'type_0_bystatiommin'})[['stationID','minute','type_0_bystatiommin']]
    final = final.merge(f4,on=['stationID','minute'],how='left')
    f5 = count_type[count_type.payType==1].rename(columns={'rate':'type_1_bystatiommin'})[['stationID','minute','type_1_bystatiommin']]
    final = final.merge(f5,on=['stationID','minute'],how='left')
    f6 = count_type[count_type.payType==2].rename(columns={'rate':'type_2_bystatiommin'})[['stationID','minute','type_2_bystatiommin']]
    final = final.merge(f6,on=['stationID','minute'],how='left')
    f7 = count_type[count_type.payType==3].rename(columns={'rate':'type_3_bystatiommin'})[['stationID','minute','type_3_bystatiommin']]
    final = final.merge(f7,on=['stationID','minute'],how='left')
    # 站台，全天
    count_type = come.groupby(['stationID','payType'],as_index=False).agg({'status':'count'})
    come_bystation = come.groupby(['stationID']).agg({'status':'count'}).rename(columns={'status':'count_10min'})
    rate = []
    for index,row in tqdm(count_type.iterrows()):
        rate.append(row['status']/come_bystation.loc[row['stationID'],'count_10min'])
    count_type['rate'] = rate
    for i in range(4):
        f = count_type[count_type.payType==i].rename(columns={'rate':'type_%s_bystatiom'%i})[['stationID','type_%s_bystatiom'%i]]
        final = final.merge(f,on=['stationID'],how='left')
    # 分钟，全天
    count_type = come.groupby(['minute','payType'],as_index=False).agg({'status':'count'})
    come_bystation = come.groupby(['minute']).agg({'status':'count'}).rename(columns={'status':'count_10min'})
    rate = []
    for index,row in tqdm(count_type.iterrows()):
        rate.append(row['status']/come_bystation.loc[row['minute'],'count_10min'])
    count_type['rate'] = rate
    for i in range(4):
        f = count_type[count_type.payType==i].rename(columns={'rate':'type_%s_bymin'%i})[['minute','type_%s_bymin'%i]]
        final = final.merge(f,on=['minute'],how='left')
    t2 = time.time()
    # 数据格式减少内存
    final['date'] = date
    final.fillna(0,inplace=True)
    for c in final.dtypes.index:
        if final.dtypes[c] == 'int64':
            final[c] = final[c].astype('int32')
    all_df = all_df.append(final)
    print('epoch %d, now f12 %f 秒'%(date,t2-t1))
    #def final
    gc.collect()
                                                     
# progress2缺少时间-所有站台这个维度

331601it [02:03, 2683.94it/s]
320it [00:00, 4618.16it/s]
4742it [00:00, 6407.91it/s]


epoch 12, now f12 174.193657 秒


330207it [01:57, 2809.86it/s]
320it [00:00, 8562.75it/s]
4724it [00:00, 6263.27it/s]


epoch 13, now f12 357.600338 秒


331162it [01:52, 2937.29it/s]
320it [00:00, 5491.93it/s]
4707it [00:00, 6891.75it/s]


epoch 14, now f12 525.806422 秒


331425it [01:51, 2961.14it/s]
320it [00:00, 5382.01it/s]
4748it [00:00, 6913.08it/s]


epoch 19, now f12 689.214768 秒


329562it [01:52, 2935.97it/s]
320it [00:00, 10309.93it/s]
4731it [00:00, 6817.01it/s]


epoch 20, now f12 851.171928 秒


333832it [02:04, 2680.02it/s]
320it [00:00, 5346.21it/s]
4736it [00:00, 6777.31it/s]


epoch 21, now f12 1036.835681 秒


335733it [02:02, 2744.16it/s]
320it [00:00, 5355.30it/s]
4817it [00:00, 6664.90it/s]


epoch 26, now f12 1219.299785 秒


334525it [02:03, 2699.73it/s]
320it [00:00, 10701.55it/s]
4789it [00:00, 13614.07it/s]


epoch 28, now f12 1403.145296 秒


In [6]:
all_df.to_pickle('features/progress3_out.pkl')

In [7]:
all_df.shape

(933120, 20)