In [2]:
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor
from IPython.core.debugger import set_trace
import re
import ujson
import pickle
import msgpack
import numpy as np
import pandas as pd
from datetime import datetime
import os
from functools import partial
from tqdm.notebook import tqdm
from collections import Counter
import yaml

from common_utils import *

In [3]:
paths = yaml.safe_load(open('./config.yml').read())

data_dir = Path(paths['data_dir'])
pkl_dir = Path(paths['pkl_dir'])
msg_dir = Path(paths['msg_dir'])

## weather

In [16]:
weather = pd.read_csv(data_dir/'weather.csv')

# Monday is indexed as 0 and Sunday is 6
def lamb(x):
    # int 20200801 to weekday
    x = str(x)
    dt = datetime(year=int(x[0:4]), 
                  month=int(x[4:6]), 
                  day=int(x[6:8]))
    return dt.weekday()

weather['weekday'] = weather.date.apply(lambda x: lamb(x))
weather['weather_code'] = pd.Categorical(weather.weather).codes
weather = weather.set_index("date")
weather = weather[["lowtemp", "hightemp", "weekday", "weather_code"]]

weather_dct = {}
for item in weather.iterrows():
    weather_dct[str(item[0])] = item[1]

dump_pickle(weather_dct, pkl_dir/'weather_dct.pkl')

## all lines

In [14]:
end_date = 31

In [6]:
lines_dct = {}

# train lines
for i in range(1, end_date+1):
    print(f"loading 202008{i:02d}.txt")
    try:
        with open(data_dir/f'train/202008{i:02d}.txt', 'r') as f:
            lines_dct[f'202008{i:02d}'] = f.readlines()
    except FileNotFoundError:
        print(f"not found 202008{i:02d}.txt")
        pass

# test lines
with open(data_dir/f'20200901_test.txt', 'r') as f:
    print(f"loading 20200901_test.txt")
    lines_dct['20200901'] = f.readlines()

dump_pickle(lines_dct, pkl_dir/"lines_dct.pkl")

loading 20200801.txt
loading 20200802.txt
loading 20200901_test.txt


## link_freq_pkl

In [7]:
%%time
lines_dct = load_pickle(pkl_dir/'lines_dct.pkl')

CPU times: user 318 ms, sys: 669 ms, total: 987 ms
Wall time: 985 ms


In [8]:
def line2link(line):
    return [ll.split(':')[0] for ll in line.split(';;')[1].split()]

In [9]:
all_links = []
for k, lines in lines_dct.items():
    for line in tqdm(lines):
         all_links += line2link(line)
dump_pickle(Counter(all_links), pkl_dir/'link_freq.pkl')

  0%|          | 0/74328 [00:00<?, ?it/s]

  0%|          | 0/74129 [00:00<?, ?it/s]

  0%|          | 0/288076 [00:00<?, ?it/s]

## driver pkl

In [10]:
def line2head(line):
    return line.split(';')[0].split()

In [11]:
lines = []
for k, v in lines_dct.items():
    lines += v

all_drivers = list(set([ line2head(line)[-2] for line in lines]))

driver_dct = dict()

for idx, driver in enumerate(all_drivers):
    driver_dct[int(driver)] = idx

In [12]:
dump_pickle(driver_dct, pkl_dir/"driver2id_dct.pkl")

## create train

In [13]:
def get_lines(path):
    with open(path, 'r') as f:
        lines = f.readlines()
        return lines
    
def line2part(line):
    head, link, cross = line.split(';;')
    return head, link, cross  

def link2npary(link):
    return np.asarray(
            [re.split(':|,', l) for l in link.split()], 
             dtype=float
           )
def cross2npary(cross):
    return np.asarray([ re.split('_|:', c) for c in cross.split()], dtype='float')

def line2dict(line, weather):
    weather_keys = ["lowtemp", "hightemp", "weekday", "weather"]
    lowtemp = weather['lowtemp'].tolist()
    hightemp = weather['hightemp'].tolist()
    weekday = weather['weekday'].tolist()
    weather_code = weather['weather_code'].tolist()
    
    weather_dict = dict(zip(weather_keys, 
                            [lowtemp, hightemp, weekday, weather_code]
                           )
                       )
    
    head, link, cross = line2part(line)
    
    # head
    head_keys = ["order_id", "eta", "dist", "simple_eta", "driver_id", "slice_id"]
    head_dict = dict(zip(head_keys, np.asarray(head.split(), dtype=float)))
    
    # line
    link_ary = link2npary(link)
    link_id = link_ary[:, 0].tolist()

    link_time = link_ary[:, 1].tolist()

    link_ratio = link_ary[:, 2].tolist()

    link_current_status = link_ary[:, 3].tolist()

    link_arrival_status = link_ary[:, 4].tolist()

    link_keys = ["link_id", "link_time", "link_ratio", "link_current_status", "link_arrival_status"]

    link_dict = dict(zip(link_keys, 
             [link_id, link_time, link_ratio, link_current_status, link_arrival_status]))
    

    # cross
    if len(cross.strip()) == 0:
        cross_start = []
        cross_end = []
        cross_time = []
        
        cross_keys = ["cross_start", "cross_end", "cross_time"]
        cross_dict = dict(zip(cross_keys, 
                 [cross_start, cross_end, cross_time]))
    else:
        cross_ary = cross2npary(cross)

        cross_start = cross_ary[:, 0].tolist()
        cross_end = cross_ary[:, 1].tolist()
        cross_time = cross_ary[:, 2].tolist()

        cross_keys = ["cross_start", "cross_end", "cross_time"]
        cross_dict = dict(zip(cross_keys, 
                 [cross_start, cross_end, cross_time]))
    
    # combine all    
    line_dict = {}
    
    line_dict.update(head_dict)
    line_dict.update(link_dict)
    line_dict.update(cross_dict)
    line_dict.update(weather_dict)
    
    return line_dict

def raw2msgpack(path, day_weather, num_workers=40):
    "输入要处理的文件路径"
    lines = get_lines(path)

    pool = ProcessPoolExecutor(max_workers=num_workers)
    
    line2dict_weather = partial(line2dict, weather=day_weather)

    lines_dict = list(pool.map(line2dict_weather, lines))
    
    stem = path.name.split('.')[0]
    
    msg_path = Path(msg_dir/f'{stem}.msgpack')
    msg_path.parent.mkdir(parents=True, exist_ok=True)
    
    with open(msg_path, 'wb') as f:
        packed = msgpack.packb(lines_dict)
        f.write(packed)

In [21]:
# generate all train data
for i in range(1, end_date+1):
    
    file_name = f"202008{i:02}.txt"
    print(f"Processing {file_name}")

    day_weather = weather_dct[f'202008{i:02}']

    raw2msgpack(data_dir/f'train/{file_name}', day_weather)

Processing 20200801.txt
Processing 20200802.txt


In [17]:
# generate test data
file_name = "20200901_test.txt"

day_weather = weather_dct[f'20200901']

raw2msgpack(data_dir/f'{file_name}', day_weather)