"""
Date: 2024/10/08

用Dask库读取全部交通赛GPS数据，高效处理：
1. 格式转换（包括日期），
2. 定义栅格化函数，对离散GPS点进行集计处理，统计每个网格的数据点数量，选择前13个最多出租车数量的栅格
3. 为这13个栅格指定POI编号，将POI信息合并回原始数据，使用suffixes参数避免重复列名
4. 计算每两个相邻POI的耗时，输出格式：出租车ID  起点（POI）  终点（POI） 耗时（分钟）
5. 将结果保存为CSV文件
"""

In [2]:
# 系统
import os
from dotenv import load_dotenv
# from dotenv import load_dotenv
# load_dotenv()
import platform
import warnings
# warnings.filterwarnings('ignore')

# 进度条
from tqdm.auto import tqdm

# 数学计算
import numpy as np
import math
import random

# GPS数据读取&处理
import pandas as pd
import geopandas as gpd
import transbigdata as tbd
# tbd.set_mapboxtoken(os.getenv('MAPBOX_TOKEN'))
from shapely.geometry import Polygon
import pickle
import datetime

# 并行计算
import dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
ProgressBar().register()

# 绘图
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
%config InlineBackend.figure_format = 'svg'
# sns.set_context('notebook')
# sns.set_theme(style="ticks", palette="pastel")
# plt.style.use(['grid'])

# 设置字体（兼容Windows和Mac OS）
if platform.system() == 'Windows':
    plt.rcParams['font.sans-serif'] = ['SimHei']
    plt.rcParams['axes.unicode_minus'] = False
else:
    plt.rcParams['font.family'] = ['Arial Unicode MS']

In [1]:
# 文件路径列表（不止一个文件）
directory = r"E:\taxi_data\gps_data\gps_1"
file_list = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.txt')]

# 读取文件
ddf = dd.read_csv(file_list, header=None,
                   names=['出租车ID', '纬度', '经度', '载客状态', '时间点'])

# 格式转换
ddf['出租车ID'] = ddf['出租车ID'].astype(int)
ddf['纬度'] = ddf['纬度'].astype(float)
ddf['经度'] = ddf['经度'].astype(float)
ddf['时间点'] = dd.to_datetime(ddf['时间点'])

# 栅格化
grid_size = 1000
earth_radius = 6371004
min_lat, max_lat, min_lon, max_lon = (30.290675, 31.032468, 103.269638, 104.609693)
delta_lat = grid_size * 360 / (2 * math.pi * earth_radius)
delta_lon = grid_size * 360 / (2 * math.pi * earth_radius * math.cos((min_lat + max_lat) * math.pi / 360))
ddf['经度索引'] = ((ddf['经度'] - (min_lon - delta_lon / 2)) / delta_lon).astype('int')
ddf['纬度索引'] = ((ddf['纬度'] - (min_lat - delta_lat / 2)) / delta_lat).astype('int')
ddf.head()

[########################################] | 100% Completed | 3.40 ss


Unnamed: 0,出租车ID,纬度,经度,载客状态,时间点,经度索引,纬度索引
0,1,30.624806,104.136604,1,2014-08-03 21:18:46,83,37
1,1,30.624809,104.136612,1,2014-08-03 21:18:15,83,37
2,1,30.624811,104.136587,1,2014-08-03 21:20:17,83,37
3,1,30.624811,104.136596,1,2014-08-03 21:19:16,83,37
4,1,30.624811,104.136619,1,2014-08-03 21:17:44,83,37


In [2]:
# 读取栅格化后的数据并降序排列13个POI
df_grid = pd.read_csv('df_grid_computed.csv')
pois = df_grid.sort_values(by='count', ascending=False).nlargest(n=13, columns=['count']).reset_index(drop=True)
pois['辅助列'] = pois['经度索引'].astype(str) + '_' + pois['纬度索引'].astype(str)
pois

Unnamed: 0,经度索引,纬度索引,count,辅助列
0,76,40,24451946,76_40
1,77,40,18683562,77_40
2,77,41,18039878,77_41
3,76,42,15756466,76_42
4,66,31,14886638,66_31
5,78,41,14834008,78_41
6,72,40,14637176,72_40
7,76,41,14120298,76_41
8,79,37,12692081,79_37
9,78,40,12496950,78_40


删除POI-Free数据，转换为分钟

对于每个出租车ID, 执行计算逻辑

如果下一个POI和上一个POI相异，添加一条记录，计算每两个相邻POI的耗时，输出格式：[出租车ID  起点（POI）  终点（POI） 耗时（分钟）]
创建辅助列以标识POI

In [3]:
# 创建辅助列
# 列出原始数据出租车辅助列编号，方便后续与13个POI编号进行对比
ddf['辅助列'] = ddf['经度索引'].astype(str) + '_' + ddf['纬度索引'].astype(str)
ddf.head()

[########################################] | 100% Completed | 994.93 ms


Unnamed: 0,出租车ID,纬度,经度,载客状态,时间点,经度索引,纬度索引,辅助列
0,1,30.624806,104.136604,1,2014-08-03 21:18:46,83,37,83_37
1,1,30.624809,104.136612,1,2014-08-03 21:18:15,83,37,83_37
2,1,30.624811,104.136587,1,2014-08-03 21:20:17,83,37,83_37
3,1,30.624811,104.136596,1,2014-08-03 21:19:16,83,37,83_37
4,1,30.624811,104.136619,1,2014-08-03 21:17:44,83,37,83_37


In [4]:
# 读取13个最热门POI
poi_df = pd.read_csv('poi.csv')

# 创建POI列表并计数
POI = poi_df.sort_values(by='count', ascending=False).reset_index(drop=True)
POI['辅助列'] = POI['经度索引'].astype(str) + '_' + POI['纬度索引'].astype(str)
POI.head()


Unnamed: 0,POI编号,经度索引,纬度索引,count,辅助列
0,1,76,40,24451946,76_40
1,2,77,40,18683562,77_40
2,3,77,41,18039878,77_41
3,4,76,42,15756466,76_42
4,5,66,31,14886638,66_31


In [5]:
# 过滤掉原始数据出租车不在13个POI中的数据
filtered_ddf = ddf[ddf['辅助列'].isin(POI['辅助列'])]
filtered_ddf.head()

[########################################] | 100% Completed | 2.40 ss


Unnamed: 0,出租车ID,纬度,经度,载客状态,时间点,经度索引,纬度索引,辅助列
2871,2,30.619336,104.067111,1,2014-08-03 21:57:18,76,37,76_37
2878,2,30.619658,104.066771,1,2014-08-03 20:52:34,76,37,76_37
2880,2,30.619698,104.067447,1,2014-08-03 21:57:28,76,37,76_37
2881,2,30.619743,104.068535,1,2014-08-03 21:57:38,76,37,76_37
2898,2,30.620093,104.066735,1,2014-08-03 20:52:24,76,37,76_37


In [6]:
# # 统计在13个POI中出租车的ID
# unique_ids = filtered_ddf['出租车ID'].unique().compute()
# unique_ids

# # 13个POI中有多少个出租车
# len(unique_ids)

# # 统计出租车在13个POI中出现的数量
# ids = filtered_ddf['出租车ID'].value_counts().compute()
# ids

# # 通过ids选出出现次数最多的196个出租车
# top_196_ids = ids.nlargest(196)
# top_196_ids.to_csv('top_196_ids.csv')

In [7]:
# 读取top_196_ids.csv
top_196_ids = pd.read_csv('top_196_ids.csv')

# 过滤掉不在top_196_ids中的出租车数据
filtered_ddf = ddf[ddf['出租车ID'].isin(top_196_ids['出租车ID'])]

# 按出租车ID和时间排序
filtered_ddf = filtered_ddf.sort_values(['出租车ID', '时间点'])

# 转换时间格式，确保'时间点'是datetime类型
filtered_ddf['时间点'] = dd.to_datetime(filtered_ddf['时间点'], format='%Y-%m-%d %H:%M:%S')

# 计算每辆出租车的前一个POI，显式指定meta参数
filtered_ddf['起点POI'] = filtered_ddf.groupby('出租车ID')['辅助列'].shift(1, meta=('辅助列', 'object'))

# 计算每辆出租车的前一个时间，显式指定meta参数
filtered_ddf['前一时间'] = filtered_ddf['时间点'].groupby(filtered_ddf['出租车ID']).shift(1, meta=('时间点', 'datetime64[ns]'))

# 筛选出POI变化的记录
transitions = filtered_ddf[filtered_ddf['辅助列'] != filtered_ddf['起点POI']].copy()

# 计算转换之间的耗时（分钟）
transitions['耗时（分钟）'] = (transitions['时间点'] - transitions['前一时间']).dt.total_seconds() / 60.0

# 构建结果数据框
results_df = transitions[['出租车ID', '起点POI', '辅助列', '耗时（分钟）']].rename(columns={'辅助列': '终点POI'})

# 显示结果
print(results_df.head())


[################                        ] | 41% Completed | 10m 14ss

In [13]:
top_196_ids = pd.read_csv('top_196_ids.csv')
# 过滤掉原始数据出租车不在13个POI中的数据
filtered_ddf = ddf[ddf['出租车ID'].isin(top_196_ids['出租车ID'])]
filtered_ddf.head()

# 按出租车ID和时间排序
filtered_ddf = filtered_ddf.sort_values(['出租车ID', '时间点'])

# 计算前一个POI和时间
filtered_ddf['起点POI'] = filtered_ddf.groupby('出租车ID')['辅助列'].shift(1, meta=('辅助列', 'object'))

filtered_ddf['前一时间'] = dd.to_datetime(filtered_ddf['时间点'], format='%Y-%m-%d %H:%M:%S').groupby(filtered_ddf['出租车ID']).shift(1)


# 筛选POI变化的记录
transitions = filtered_ddf[filtered_ddf['辅助列'] != filtered_ddf['起点POI']].copy()

# 计算耗时（分钟）
transitions['耗时（分钟）'] = (pd.to_datetime(transitions['时间点']) - transitions['前一时间']).dt.total_seconds() / 60.0

# 构建结果DataFrame
results_df = transitions[['出租车ID', '起点POI', '辅助列', '耗时（分钟）']].rename(columns={'辅助列': '终点POI'})

# 显示结果
print(results_df.head())


[########################################] | 100% Completed | 2.67 ss
[########################################] | 100% Completed | 2.76 s


  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  filtered_ddf['前一时间'] = dd.to_datetime(filtered_ddf['时间点'], format='%Y-%m-%d %H:%M:%S').groupby(filtered_ddf['出租车ID']).shift(1)


[                                        ] | 1% Completed | 266.45 ss
[                                        ] | 1% Completed | 266.83 s


KeyboardInterrupt: 

In [15]:
# 读取top_196_ids.csv
top_196_ids = pd.read_csv('top_196_ids.csv')

# 过滤掉不在top_196_ids中的出租车数据
filtered_ddf = ddf[ddf['出租车ID'].isin(top_196_ids['出租车ID'])]

# 转换时间格式，明确指定日期格式，并处理无效日期（errors='coerce' 会将无效日期设置为 NaT）
filtered_ddf['时间点'] = dd.to_datetime(filtered_ddf['时间点'], format='%Y-%m-%d %H:%M:%S', errors='coerce')

# 删除包含无效日期（NaT）的行
filtered_ddf = filtered_ddf.dropna(subset=['时间点'])

# 按出租车ID和时间排序
filtered_ddf = filtered_ddf.sort_values(['出租车ID', '时间点'])

# 计算每辆出租车的前一个POI，显式指定meta参数
filtered_ddf['起点POI'] = filtered_ddf.groupby('出租车ID')['辅助列'].shift(1, meta=('辅助列', 'object'))

# 计算每辆出租车的前一个时间，显式指定meta参数
filtered_ddf['前一时间'] = filtered_ddf['时间点'].groupby(filtered_ddf['出租车ID']).shift(1, meta=('时间点', 'datetime64[ns]'))

# 筛选出POI变化的记录
transitions = filtered_ddf[filtered_ddf['辅助列'] != filtered_ddf['起点POI']].copy()

# 计算转换之间的耗时（分钟）
transitions['耗时（分钟）'] = (transitions['时间点'] - transitions['前一时间']).dt.total_seconds() / 60.0

# 构建结果数据框
results_df = transitions[['出租车ID', '起点POI', '辅助列', '耗时（分钟）']].rename(columns={'辅助列': '终点POI'})

# 显示结果
print(results_df.head())


[###############################         ] | 79% Completed | 23m 46ss

  return get_meta_library(args[0]).to_datetime(*args, **kwargs)


[################################        ] | 80% Completed | 23m 55s
[################################        ] | 80% Completed | 23m 55s


KeyboardInterrupt: 

In [25]:
# 增加起点POI和终点POI相同的数据的次数 count
results_df['count'] = results_df.groupby(['起点POI', '终点POI'])['出租车ID'].transform('count')

# 显示增加 count 列后的结果
print(results_df.head())

   出租车ID  起点POI  终点POI      耗时（分钟）  count
0      2  76_37  79_37   70.216667    140
1      2  76_37  79_37   81.933333    140
2      2  79_37  73_38  422.200000      8
3      2  72_40  77_40   57.183333    158
4      2  77_40  72_40  323.616667    152


In [20]:
# 只保留刚刚耗时小于等于60分钟的数据
filtered_results_df = results_df[results_df['耗时（分钟）'] <= 60]

# 显示过滤后的结果
print(filtered_results_df.head())


    出租车ID  起点POI  终点POI     耗时（分钟）
3       2  72_40  77_40  57.183333
5       2  77_40  76_40  38.133333
7       2  76_40  77_40  19.366667
8       2  72_40  77_40  51.316667
12      2  77_40  76_40  46.816667


In [28]:
# 增加起点POI和终点POI相同的数据的次数 count
filtered_results_df.loc[:, 'count'] = filtered_results_df.groupby(['起点POI', '终点POI'])['出租车ID'].transform('count')

# 显示增加 count 列后的结果
print(filtered_results_df.head())


    出租车ID  起点POI  终点POI     耗时（分钟）  count
3       2  72_40  77_40  57.183333     23
5       2  77_40  76_40  38.133333     52
7       2  76_40  77_40  19.366667     56
8       2  72_40  77_40  51.316667     23
12      2  77_40  76_40  46.816667     52


In [29]:
# 定义起点POI为i，终点POI为j，耗时阈值T为60分钟
i = '起点POI'
j = '终点POI'
T = 60

# 计算(i和j之间耗时小于60的数据)的数量
filtered_count = filtered_results_df.groupby([i, j])['出租车ID'].count().reset_index(name='filtered_count')

# 计算(i和j之间全部的数据)的数量
total_count = results_df.groupby([i, j])['出租车ID'].count().reset_index(name='total_count')

# 合并两个数据框以便计算概率
merged_df = pd.merge(filtered_count, total_count, on=[i, j])

# 计算概率Z(i,j,T)
merged_df['Z(i,j,T)'] = merged_df['filtered_count'] / merged_df['total_count']

# 显示结果
print(merged_df.head())


   起点POI  终点POI  filtered_count  total_count  Z(i,j,T)
0  66_31  76_37               2            7  0.285714
1  72_40  76_40              11          127  0.086614
2  72_40  77_40              23          158  0.145570
3  72_40  78_40               4           68  0.058824
4  76_37  73_38               1            5  0.200000
