In [1]:
import pandas as pd
import dask.dataframe as dd
import dask.array as da
from tqdm import tqdm
import numpy as np

#from dask_ml.metrics import mean_squared_error
#from dask_ml.model_selection import KFold
from sklearn.metrics import mean_squared_error,explained_variance_score
from sklearn.model_selection import KFold
import lightgbm as lgb

import warnings
warnings.filterwarnings('ignore')
from dask.distributed import Client

client = Client(n_workers=8)
client

0,1
Client  Scheduler: tcp://127.0.0.1:40169  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 16  Memory: 16.63 GB


In [2]:
# 这是一个数据预处理函数
def get_data(data, mode='train'): # 针对训练数据 和 测试数据 分别将部分关于时间的列转换为 datetime 时间格式
    
    assert mode=='train' or mode=='test'
    
    if mode=='train':
        data['vesselNextportETA'] = pd.to_datetime(data['vesselNextportETA'], infer_datetime_format=True) # 使用自动识别推理出的时间格式，把这'vesselNextportETA'转化为时间格式
    elif mode=='test':
        data['temp_timestamp'] = data['timestamp'] # 拷贝'timestamp'列
        data['onboardDate'] = pd.to_datetime(data['onboardDate'], infer_datetime_format=True)
    data['timestamp'] = pd.to_datetime(data['timestamp'], infer_datetime_format=True)
    data['longitude'] = data['longitude'].astype(float)
    data['loadingOrder'] = data['loadingOrder'].astype(str)
    data['latitude'] = data['latitude'].astype(float)
    data['speed'] = data['speed'].astype(float)
    data['direction'] = data['direction'].astype(float) # 把特征都变成浮点数类型
    
    #data = data.compute()
    return data

In [3]:
# 做特征工程的函数
def get_feature(df, mode='train'):
    
    assert mode=='train' or mode=='test'
    
    df.sort_values(['loadingOrder', 'timestamp'], inplace=True) # 将数据按 订单号 和 时间戳 两列进行 升序排列，排列结果替换元数据
    # 特征只选择经纬度、速度\方向
    df['lat_diff'] = df.groupby('loadingOrder')['latitude'].diff(1) # 返回按 订单号 分类以后 维度的差分(变化)
    df['lon_diff'] = df.groupby('loadingOrder')['longitude'].diff(1) # 返回按 订单号 分类以后 经度的差分(变化)
    df['speed_diff'] = df.groupby('loadingOrder')['speed'].diff(1) # 返回按 订单号 分类以后 速度的差分(变化)
    df['diff_minutes'] = df.groupby('loadingOrder')['timestamp'].diff(1).dt.total_seconds() // 60 # 返回按 订单号 分类以后 时间的差分，差分化成秒，除以60转化为时间
    
    df = dd.from_pandas(df, npartitions=12)
    df['anchor'] = df.apply(lambda x: 1 if x['lat_diff'] <= 0.03 and x['lon_diff'] <= 0.03
                            and x['speed_diff'] <= 0.3 and x['diff_minutes'] <= 10 else 0, axis=1) # 对每一列应用函数，判断船是否抛锚停下。
    df = df.compute()
    
    if mode=='train':
        group_df = df.groupby('loadingOrder')['timestamp'].agg(mmax='max', count='count', mmin='min').reset_index() #对订单号进行分组，然后对一组内时间戳合计，统计一个订单号时间戳下所有行，
        # 每个属性的最大值、计数、最小值，并且重新设置索引index，将原来的index加入到df中成为一列
        # 读取数据的最大值-最小值，即确认时间间隔为label            # 即将时间间隔作为一个特征
        group_df['label'] = (group_df['mmax'] - group_df['mmin']).dt.total_seconds()
    elif mode=='test':
        group_df = df.groupby('loadingOrder')['timestamp'].agg(count='count').reset_index()
        
    anchor_df = df.groupby('loadingOrder')['anchor'].agg('sum').reset_index()
    anchor_df.columns = ['loadingOrder', 'anchor_cnt']
    group_df = group_df.merge(anchor_df, on='loadingOrder', how='left')
    group_df['anchor_ratio'] = group_df['anchor_cnt'] / group_df['count']

    agg_function = ['min', 'max', 'mean', 'median']
    agg_col = ['latitude', 'longitude', 'speed', 'direction']

    group = df.groupby('loadingOrder')[agg_col].agg(agg_function).reset_index()
    group.columns = ['loadingOrder'] + ['{}_{}'.format(i, j) for i in agg_col for j in agg_function]
    group_df = group_df.merge(group, on='loadingOrder', how='left')
    
    return group_df

In [4]:
def mse_score_eval(preds, valid):
    labels = valid.get_label()
    scores = mean_squared_error(y_true=labels, y_pred=preds)
    return 'mse_score', scores, True

In [5]:
# baseline只用到gps定位数据，即train_gps_path
train_gps_path = '/run/media/liweikang/OS/Users/Li Weikang/Desktop/train0523.csv'
test_data_path = '/run/media/liweikang/OS/Users/Li Weikang/Desktop/A_testData0531.csv'
order_data_path = '/run/media/liweikang/OS/Users/Li Weikang/Desktop/loadingOrderEvent.csv'
port_data_path = '/run/media/liweikang/OS/Users/Li Weikang/Desktop/port.csv'

In [6]:
NDATA = 5000000 # 一次读NDATA行
Thresold = 10000000 # 总共要读多少行
weight = float(NDATA)/Thresold

def set_data_columns(data):
    # 按照官网上对运单GPS数据的说明，重命名训练数据的列名
    data.columns = ['loadingOrder','carrierName','timestamp','longitude',
                  'latitude','vesselMMSI','speed','direction','vesselNextport',
                  'vesselNextportETA','vesselStatus','vesselDatasource','TRANSPORT_TRACE']
    return data

# params
params = {
    'learning_rate': 0.01,
    'boosting_type': 'gbdt',
    'objective': 'regression',
    'num_leaves': 36,
    'feature_fraction': 0.6,
    'bagging_fraction': 0.7,
    'bagging_freq': 6,
    'seed': 8,
    'bagging_seed': 1,
    'feature_fraction_seed': 7,
    'min_data_in_leaf': 20,
    'nthread': 8,
    'verbose': 1,
}
sum = 0
clf = None
label = 'label'
seed=1080 
is_shuffle=True

In [7]:
test_data = pd.read_csv(test_data_path)
test_data = get_data(test_data, mode='test') # 预处理测试数据
test = get_feature(test_data, mode='test')
test_pred = da.zeros((test.shape[0], )) 

In [8]:
for batch_data in pd.read_csv(train_gps_path, chunksize = NDATA):
    sum += NDATA
    batch_data = set_data_columns(batch_data)
    batch_data = get_data(batch_data, mode = 'train')
    batch_train = get_feature(batch_data, mode='train')
    features = [c for c in batch_train.columns if c not in ['loadingOrder', 'label', 'mmin', 'mmax', 'count']]
    pred = features

    train_pred = da.zeros((batch_train.shape[0], ))
    n_splits = 5
    # Kfold
    fold = KFold(n_splits=n_splits, shuffle=is_shuffle, random_state=seed)
    kf_way = fold.split(batch_train[pred])
    
    # train
    for n_fold, (train_idx, valid_idx) in enumerate(kf_way, start=1):
        train_x, train_y = batch_train[pred].iloc[train_idx], batch_train[label].iloc[train_idx]
        valid_x, valid_y = batch_train[pred].iloc[valid_idx], batch_train[label].iloc[valid_idx]
        # 数据加载
        n_train = lgb.Dataset(train_x, label=train_y)
        n_valid = lgb.Dataset(valid_x, label=valid_y)

        clf = lgb.train(
            params=params,
            train_set=n_train,
            num_boost_round=3000,
            valid_sets=[n_valid],
            early_stopping_rounds=100,
            verbose_eval=100,
            feval=mse_score_eval,
            init_model=clf,
            keep_training_booster = True,
        )
        test_pred += clf.predict(test[pred], num_iteration=clf.best_iteration)*weight/fold.n_splits
    if sum >= Thresold:
        break

Training until validation scores don't improve for 100 rounds
[100]	valid_0's l2: 1.33182e+11	valid_0's mse_score: 1.33182e+11
Early stopping, best iteration is:
[1]	valid_0's l2: 4.82078e+11	valid_0's mse_score: 4.82078e+11
Training until validation scores don't improve for 100 rounds
[200]	valid_0's l2: 4.89644e+10	valid_0's mse_score: 4.89644e+10
Early stopping, best iteration is:
[102]	valid_0's l2: 1.20675e+11	valid_0's mse_score: 1.20675e+11
Training until validation scores don't improve for 100 rounds
[300]	valid_0's l2: 2.96339e+10	valid_0's mse_score: 2.96339e+10
Early stopping, best iteration is:
[203]	valid_0's l2: 4.20532e+10	valid_0's mse_score: 4.20532e+10
Training until validation scores don't improve for 100 rounds
[400]	valid_0's l2: 1.84784e+10	valid_0's mse_score: 1.84784e+10
Early stopping, best iteration is:
[304]	valid_0's l2: 1.99161e+10	valid_0's mse_score: 1.99161e+10
Training until validation scores don't improve for 100 rounds
[500]	valid_0's l2: 9.61045e+09	

In [9]:
test['label'] = test_pred
result = test[['loadingOrder', 'label']]

In [10]:
test_data = test_data.merge(result, on='loadingOrder', how='left')
test_data['ETA'] = (test_data['onboardDate'] + test_data['label'].apply(lambda x:pd.Timedelta(seconds=x))).apply(lambda x:x.strftime('%Y/%m/%d  %H:%M:%S'))
test_data.drop(['direction','TRANSPORT_TRACE'],axis=1,inplace=True)
test_data['onboardDate'] = test_data['onboardDate'].apply(lambda x:x.strftime('%Y/%m/%d  %H:%M:%S'))
test_data['creatDate'] = pd.datetime.now().strftime('%Y/%m/%d  %H:%M:%S')
test_data['timestamp'] = test_data['temp_timestamp']
# 整理columns顺序
result = test_data[['loadingOrder', 'timestamp', 'longitude', 'latitude', 'carrierName', 'vesselMMSI', 'onboardDate', 'ETA', 'creatDate']]

In [11]:
result.to_csv('result.csv', index=False)

In [12]:
result

Unnamed: 0,loadingOrder,timestamp,longitude,latitude,carrierName,vesselMMSI,onboardDate,ETA,creatDate
0,CF946210847851,2019-04-02T02:42:28.000Z,138.471062,40.278787,OIEQNT,R5480015614,2019/04/02 02:42:28,2019/04/06 06:21:29,2020/06/08 21:38:53
1,CF946210847851,2019-04-02T02:59:28.000Z,138.552168,40.327785,OIEQNT,R5480015614,2019/04/02 02:42:28,2019/04/06 06:21:29,2020/06/08 21:38:53
2,CF946210847851,2019-04-02T03:07:28.000Z,138.588250,40.352542,OIEQNT,R5480015614,2019/04/02 02:42:28,2019/04/06 06:21:29,2020/06/08 21:38:53
3,CF946210847851,2019-04-02T03:43:28.000Z,138.751325,40.459447,OIEQNT,R5480015614,2019/04/02 02:42:28,2019/04/06 06:21:29,2020/06/08 21:38:53
4,CF946210847851,2019-04-02T04:29:28.000Z,138.969782,40.581485,OIEQNT,R5480015614,2019/04/02 02:42:28,2019/04/06 06:21:29,2020/06/08 21:38:53
...,...,...,...,...,...,...,...,...,...
45451,XG479584941731,2019-01-13T03:56:08.000Z,104.633357,1.630708,JCMFTA,U2218600548,2019/01/10 00:27:58,2019/01/14 15:18:04,2020/06/08 21:38:53
45452,XG479584941731,2019-01-13T03:57:08.000Z,104.631958,1.626713,JCMFTA,U2218600548,2019/01/10 00:27:58,2019/01/14 15:18:04,2020/06/08 21:38:53
45453,XG479584941731,2019-01-13T03:57:38.000Z,104.631258,1.624615,JCMFTA,U2218600548,2019/01/10 00:27:58,2019/01/14 15:18:04,2020/06/08 21:38:53
45454,XG479584941731,2019-01-13T03:58:08.000Z,104.630597,1.622682,JCMFTA,U2218600548,2019/01/10 00:27:58,2019/01/14 15:18:04,2020/06/08 21:38:53
