In [1]:
import numpy as np


class ATP:
    def __init__(self, tra):
        """初始化一个轨迹，该轨迹包含n个轨迹点，每个轨迹点为一个array，那么tra的形状为n×2
        """
        self.tra = tra

    def cal_mo(self, point):
        """计算该向量的模"""
        return np.sqrt((point**2).sum())

    def dist_ptol(self, a, b, c):
        """计算点c到直线ab的距离，点为numpy array形式"""
        ca = a-c
        cb = b-c
        ab = b-a
        dist = self.cal_mo(np.cross(ca, cb)) / (self.cal_mo(ab) + np.spacing(1))
        #     cos_t = np.dot(ca, cb)/(cal_mo(ca)*cal_mo(cb)+np.spacing(1))
        #     sin_t = np.sqrt(1-cos_t**2 + np.spacing(1))
        #     dist = cal_mo(ca)*cal_mo(cb)*sin_t/cal_mo(ab)
        if dist == 0:
            return np.spacing(1)
        return dist

    def dist_perp(self, si, ei, sj, ej):
        """根据定义计算两线间的垂直距离，点为array形式"""
        l1_perp = self.dist_ptol(si, ei, sj)
        l2_perp = self.dist_ptol(si, ei, ej)
        dist = (l1_perp**2+l2_perp**2)/(l1_perp+l2_perp+np.spacing(1))
        return dist

    # def dist_parr(si, ei, sj, ej):
    #     """根据定义计算两线间的平行距离，点为array形式"""
    #     u1 = np.dot(sj-si, ei-si)/(cal_mo(ei-si)**2)
    #     u2 = np.dot(ej-sj, ei-si)/(cal_mo(ei-si)**2)
    #     ps = si + u1*(ei-si)
    #     pe = si + u2*(ei-si)
    #     l1_parr = min(cal_mo(si-ps), cal_mo(ei-pe))
    #     l2_parr = min(cal_mo(si-pe), cal_mo(ei-pe))
    #     dist = min(l1_parr, l2_parr)
    #     return dist

    def dist_angle(self, si, ei, sj, ej):
        """计算两线间的角度距离"""
        li = ei-si
        lj = ej-sj
        cos_t = np.dot(li, lj)/(self.cal_mo(li)*self.cal_mo(lj)+np.spacing(1))
        if li[0] == lj[0] and li[1] == lj[1]:
            return 0
        if cos_t > 0:
            sin_t = np.sqrt(1-cos_t**2+np.spacing(1))
            return self.cal_mo(lj)*sin_t
        else:
            return self.cal_mo(lj)

    def dist_func(self, si, ei, sj, ej):
        """距离函数，辅助求解MDL"""
        dist = self.dist_perp(si, ei, sj, ej) + self.dist_angle(si, ei, sj, ej)
        return dist

    def mdl_part(self, startIndex, currIndex):
        """
        计算出若在此点分割的MDL代价的L(D|H)部分
        :param startIndex: 起点的索引
        :param currIndex: 当前的索引
        :return: L(D|H)部分
        """
        cost = 0
        for j in range(startIndex, currIndex):
            if list(self.tra[startIndex]) == list(self.tra[j]) and list(self.tra[currIndex]) == list(self.tra[j+1]):
                continue
            if self.cal_mo(self.tra[currIndex] - self.tra[startIndex]) > self.cal_mo(self.tra[j] - self.tra[j+1]):
                # 这里判断的原因是算距离的时候要区分哪条是长的，哪条是短的
                # 算法算的时候默认li是长的，lj是短的，那我们改变传参位置即可
                dist = self.dist_func(self.tra[startIndex], self.tra[currIndex], self.tra[j], self.tra[j+1])
            else:
                dist = self.dist_func(self.tra[j], self.tra[j+1], self.tra[startIndex], self.tra[currIndex])
            cost += np.log2(dist)
        return cost

    def MDL(self):
        """
        一个轨迹的MDL近似解，Approximate Trajectory Partitioning
        :return: 该轨迹的特征点集合CP
        """
        tra_len = len(self.tra)
        if tra_len == 0:
            return []
        CP = []  # CP: characteristic points
        CP.append(self.tra[0])
        startIndex = 0
        length = 1
        while startIndex + length < tra_len:
            currIndex = startIndex + length
            if self.mdl_part(startIndex, currIndex) < 0:
                CP.append(self.tra[currIndex - 1])
                startIndex = currIndex - 1
                length = 1
            else:
                length += 1
        CP.append(self.tra[-1])  # 把轨迹终点加进去
        return np.array(CP)


def array2Segment(CP):
    """将CP的array形式转变成后续算法好处理的Segment类型"""
    line_num = len(CP)-1  # n个特征点则形成n-1条线段
    if line_num <= 0:
        return []
    CP_segments = [Segment(Point(CP[i][0], CP[i][1]), Point(CP[i+1][0], CP[i+1][1])) for i in range(line_num)]
    print('共有' + str(len(CP_segments)) + '条线段')
    return CP_segments

In [2]:
import math
import numpy as np


class Point(object):
    """对轨迹中的点进行封装, 可以进行比较, 一些常用的计算, 如距离计算, dot计算等, 本point主要对2维的point进行封装
    method
    ------
        str: 返回point的字符串格式, ‘2.56557800, 1.00000000’
        +: 加号操作符, 实现两个point类型的相加操作
        -: 减号操作符, 实现两个point类型的减法运算
        distance(other): 实现两个Point类型的距离(欧式距离)计算
        dot(other): 实现两个Point对象的dot运算, 得到两个点的值: x**2 + y**2
    """
    def __init__(self, x, y, traj_id=None):
        self.trajectory_id = traj_id
        self.x = x
        self.y = y

    def __repr__(self):
        return "{0:.8f},{1:.8f}".format(self.x, self.y)

    def get_point(self):
        return self.x, self.y

    def __add__(self, other: 'Point'):
        if not isinstance(other, Point):
            raise TypeError("The other type is not 'Point' type.")
        _add_x = self.x + other.x
        _add_y = self.y + other.y
        return Point(_add_x, _add_y, traj_id=self.trajectory_id)

    def __sub__(self, other: 'Point'):
        if not isinstance(other, Point):
            raise TypeError("The other type is not 'Point' type.")
        _sub_x = self.x - other.x
        _sub_y = self.y - other.y
        return Point(_sub_x, _sub_y, traj_id=self.trajectory_id)

    def __mul__(self, x: float):
        if isinstance(x, float):
            return Point(self.x*x, self.y*x, traj_id=self.trajectory_id)
        else:
            raise TypeError("The other object must 'float' type.")

    def __truediv__(self, x: float):
        if isinstance(x, float):
            return Point(self.x / x, self.y / x, traj_id=self.trajectory_id)
        else:
            raise TypeError("The other object must 'float' type.")

    def distance(self, other: 'Point'):
        """计算两个point之间的距离"""
        return math.sqrt(math.pow(self.x-other.x, 2) + math.pow(self.y-other.y, 2))

    def dot(self, other: 'Point'):
        return self.x * other.x + self.y * other.y

    def as_array(self):
        return np.array((self.x, self.y))


def _point2line_distance(point, start, end):
    """计算point到line的垂直距离通过向量的方式: distance = |es x ps| / |es|, es为起始点的项量表示, ps为point到start点的向量
    parameter
    ---------
        point: np.ndarray, a point, 2-dim point or 3-dim point.
        start and end: 同point的格式一致, 都为numpy的array格式
    return
    ------
        float, point点到start, end两点连线的垂直距离, 欧式距离
    """
    if np.all(np.equal(start, end)):
        return np.linalg.norm(point - start)
    return np.divide(np.abs(np.linalg.norm(np.cross(end - start, start - point))),
                     np.linalg.norm(end - start))

In [3]:
from typing import Tuple
import math


class Segment(object):
    """将一个segment进行封装, 进行距离(垂直距离, 长度距离, 角度距离)的计算, 设置segment的cluster的ID等, 在使用时需区分长短segment,两者的调用方式不同
    method
    ------
        perpendicular_distance: 计算垂直距离, longer_segment.perpendicular_distance(short_segment)
        parallel_distance: 计算segment长度的相似度, longer_segment.parallel_distance(short_segment)
        angle_distance: 计算两个segment的角度相似性, longer_segment.angle_distance(short_segment)
    """
    eps = 1e-12

    def __init__(self, start_point: Point, end_point: Point, traj_id: int = None, cluster_id: int = -1):
        self.start = start_point
        self.end = end_point
        self.traj_id = traj_id
        self.cluster_id = cluster_id

    def set_cluster(self, cluster_id: int):
        self.cluster_id = cluster_id

    def pair(self) -> Tuple[Point, Point]:
        return self.start, self.end

    @property
    def length(self):
        return self.end.distance(self.start)

    def perpendicular_distance(self, other: 'Segment'):
        """计算两个segment之间起始点的垂直距离距离, 参考论文中的公式Formula(1); 必须Segment为short的line segment."""
        l1 = other.start.distance(self._projection_point(other, typed="start"))
        l2 = other.end.distance(self._projection_point(other, typed="end"))
        if l1 < self.eps and l2 < self.eps:
            return 0
        else:
            return (math.pow(l1, 2) + math.pow(l2, 2)) / (l1 + l2)

    def parallel_distance(self, other: 'Segment'):
        """计算两个segment之间的长度距离, 参考论文中的公式Formula(2),Segment必须为short的line segment."""
        l1 = self.start.distance(self._projection_point(other, typed='start'))
        l2 = self.end.distance(self._projection_point(other, typed='end'))
        return min(l1, l2)

    def angle_distance(self, other: 'Segment'):
        """计算两个segment之间的角度距离, 参考论文中的公式Formula(3),Segment必须为short的line segment."""
        self_vector = self.end - self.start
        self_dist, other_dist = self.end.distance(self.start), other.end.distance(other.start)

        # 当两个点重合时, 计算点到直线的距离即可
        if self_dist < self.eps:
            return _point2line_distance(self.start.as_array(), other.start.as_array(), other.end.as_array())
        elif other_dist < self.eps:
            return _point2line_distance(other.start.as_array(), self.start.as_array(), self.end.as_array())

        cos_theta = self_vector.dot(other.end - other.start) / (self.end.distance(self.start) * other.end.distance(other.start))
        if cos_theta > self.eps:
            if cos_theta >= 1:
                cos_theta = 1.0
            return other.length * math.sqrt(1 - math.pow(cos_theta, 2))
        else:
            return other.length

    def _projection_point(self, other: 'Segment', typed="e"):
        if typed == 's' or typed == 'start':
            tmp = other.start - self.start
        else:
            tmp = other.end - self.start
        u = tmp.dot(self.end-self.start) / math.pow(self.end.distance(self.start), 2)
        return self.start + (self.end-self.start) * u

    def get_all_distance(self, seg: 'Segment'):
        res = self.angle_distance(seg)
        # 起始点不能为同一个点
        if str(self.start) != str(self.end):
            res += self.parallel_distance(seg)
        # 不能为同一轨迹
        if self.traj_id != seg.traj_id:
            res += self.perpendicular_distance(seg)
        return res


def compare(segment_a: Segment, segment_b: Segment) -> Tuple[Segment, Segment]:
    """对两个segment进行对比, 返回:(longer_segment, shorter_segment)"""
    return (segment_a, segment_b) if segment_a.length > segment_b.length else (segment_b, segment_a)

In [4]:
import math
from collections import deque, defaultdict

# min_traj_cluster = 2  # 定义聚类的簇中至少需要的trajectory数量


class LSC:
    def __init__(self, epsilon: float = 2.0, min_lines: int = 5):
        """
        线段密度聚类，基于DBSCAN更改
        :param epsilon:
        :param min_lines: 核心线段要求的最小线段数
        """
        self.epsilon = epsilon
        self.min_lines = min_lines

    def neighborhood(self, seg, segs):
        """计算一个segment在距离epsilon范围内的所有segment集合, 计算的时间复杂度为O(n). n为所有segment的数量
        parameter
        ---------
            seg: Segment instance, 需要计算的segment对象
            segs: List[Segment, ...], 所有的segment集合, 为所有集合的partition分段结果集合
        return
        ------
            List[segment, ...], 返回seg在距离epsilon内的所有Segment集合.
        """
        segment_set = []
        for segment_tmp in segs:
            seg_long, seg_short = compare(seg, segment_tmp)  # get long segment by compare segment
            if seg_long.get_all_distance(seg_short) <= self.epsilon:
                segment_set.append(segment_tmp)
        return segment_set

    def expand_cluster(self, segs, queue: deque, cluster_id: int):
        """
        扩展簇
        :param segs: List[Segment, ...], 所有的segment集合, 为所有集合的partition分段结果集合
        :param queue: 一个双向列表
        :param cluster_id: 该簇所属的类
        """
        while len(queue) != 0:
            curr_seg = queue.popleft()
            curr_num_neighborhood = self.neighborhood(curr_seg, segs)
            if len(curr_num_neighborhood) >= self.min_lines:
                for m in curr_num_neighborhood:
                    if m.cluster_id == -1:
                        queue.append(m)
                        m.cluster_id = cluster_id
            else:
                pass

    def line_segment_clustering(self, traj_segments):
        """线段segment聚类, 采用dbscan的聚类算法, 参考论文中的伪代码来实现聚类, 论文中的part4.2部分中的伪代码及相关定义
        parameter
        ---------
            traj_segments: List[Segment, ...], 所有轨迹的partition划分后的segment集合.
        return
        ------
            Tuple[Dict[int, List[Segment, ...]], ...], 返回聚类的集合和不属于聚类的集合, 通过dict表示, key为cluster_id, value为segment集合
        """
        cluster_id = 0
        cluster_dict = defaultdict(list)
        for seg in traj_segments:
            _queue = deque(list(), maxlen=50)
            if seg.cluster_id == -1:
                seg_num_neighbor_set = self.neighborhood(seg, traj_segments)
                if len(seg_num_neighbor_set) >= self.min_lines:
                    seg.cluster_id = cluster_id
                    for sub_seg in seg_num_neighbor_set:
                        sub_seg.cluster_id = cluster_id  # assign clusterId to segment in neighborhood(seg)
                        _queue.append(sub_seg)  # insert sub segment into queue
                    self.expand_cluster(traj_segments, _queue, cluster_id)
                    cluster_id += 1
                else:
                    seg.cluster_id = -1
            # print(seg.cluster_id, seg.traj_id)
            if seg.cluster_id != -1:
                cluster_dict[seg.cluster_id].append(seg)  # 将轨迹放入到聚类的集合中, 按dict进行存放

#         remove_cluster = dict()
#         cluster_number = len(cluster_dict)
#         for i in range(0, cluster_number):
#             traj_num = len(set(map(lambda s: s.traj_id, cluster_dict[i])))  # 计算每个簇下的轨迹数量
#             print("the %d cluster lines:" % i, traj_num)
#             if traj_num < min_traj_cluster:
#                 remove_cluster[i] = cluster_dict.pop(i)
#         return cluster_dict, remove_cluster
        return cluster_dict


def representative_trajectory_generation(cluster_segment: dict, min_lines: int = 3, min_dist: float = 2.0):
    """通过论文中的算法对轨迹进行变换, 提取代表性路径, 在实际应用中必须和当地的路网结合起来, 提取代表性路径, 该方法就是通过算法生成代表性轨迹
    parameter
    ---------
        cluster_segment: Dict[int, List[Segment, ...], ...], 轨迹聚类的结果存储字典, key为聚类ID, value为类簇下的segment列表
        min_lines: int, 满足segment数的最小值
        min_dist: float, 生成的轨迹点之间的最小距离, 生成的轨迹点之间的距离不能太近的控制参数
    return
    ------
        Dict[int, List[Point, ...], ...], 每个类别下的代表性轨迹结果
    """
    representive_point = defaultdict(list)
    for i in cluster_segment.keys():
        cluster_size = len(cluster_segment.get(i))
        sort_point = []  # [Point, ...], size = cluster_size*2
        rep_point, zero_point = Point(0, 0, -1), Point(1, 0, -1)

        # 对某个i类别下的segment进行循环, 计算类别下的平均方向向量: average direction vector
        for j in range(cluster_size):
            rep_point = rep_point + (cluster_segment[i][j].end - cluster_segment[i][j].start)
        rep_point = rep_point / float(cluster_size)  # 对所有点的x, y求平均值

        cos_theta = rep_point.dot(zero_point) / rep_point.distance(Point(0, 0, -1))  # cos(theta)
        sin_theta = math.sqrt(1 - math.pow(cos_theta, 2))  # sin(theta)

        # 对某个i类别下的所有segment进行循环, 每个点进行坐标变换: X' = A * X => X = A^(-1) * X'
        #   |x'|      | cos(theta)   sin(theta) |    | x |
        #   |  |  =   |                         | *  |   |
        #   |y'|      |-sin(theta)   cos(theta) |    | y |
        for j in range(cluster_size):
            s, e = cluster_segment[i][j].start, cluster_segment[i][j].end
            # 坐标轴变换后进行原有的segment修改
            cluster_segment[i][j] = Segment(Point(s.x * cos_theta + s.y * sin_theta, s.y * cos_theta - s.x * sin_theta, -1),
                                            Point(e.x * cos_theta + e.y * sin_theta, e.y * cos_theta - e.x * sin_theta, -1),
                                            traj_id=cluster_segment[i][j].traj_id,
                                            cluster_id=cluster_segment[i][j].cluster_id)
            sort_point.extend([cluster_segment[i][j].start, cluster_segment[i][j].end])

        # 对所有点进行排序, 按照横轴的X进行排序, 排序后的point列表应用在后面的计算中
        sort_point = sorted(sort_point, key=lambda _p: _p.x)
        for p in range(len(sort_point)):
            intersect_cnt = 0.0
            start_y = Point(0, 0, -1)
            for q in range(cluster_size):
                s, e = cluster_segment[i][q].start, cluster_segment[i][q].end
                # 如果点在segment内则进行下一步的操作:
                if (sort_point[p].x <= e.x) and (sort_point[p].x >= s.x):
                    if s.x == e.x:
                        continue
                    elif s.y == e.y:
                        intersect_cnt += 1
                        start_y = start_y + Point(sort_point[p].x, s.y, -1)
                    else:
                        intersect_cnt += 1
                        start_y = start_y + Point(sort_point[p].x, (e.y-s.y)/(e.x-s.x)*(sort_point[p].x-s.x)+s.y, -1)
            # 计算the average coordinate: avg_p and dist >= min_dist
            if intersect_cnt >= min_lines:
                tmp_point: Point = start_y / intersect_cnt
                # 坐标转换到原始的坐标系, 通过逆矩阵的方式进行矩阵的计算:https://www.shuxuele.com/algebra/matrix-inverse.html
                tmp = Point(tmp_point.x*cos_theta-sin_theta*tmp_point.y,
                            sin_theta*tmp_point.x+cos_theta*tmp_point.y, -1)
                _size = len(representive_point[i]) - 1
                if _size < 0 or (_size >= 0 and tmp.distance(representive_point[i][_size]) > min_dist):
                    representive_point[i].append(tmp)
    return representive_point

In [5]:
import time
import json
import random

In [6]:
file_name = 'mainOD/'+'day_20170203.json'
save_file_name = 'res/' + 'day_20170203_eps0.05_res.json'
content = json.load(open(file_name, encoding='gbk'))  # 导入一个json文件，存放一千辆车的轨迹点
random.sample(list(content.keys()), 5)

['粤AN0Z41', '粤A130RF', '粤A3499E', '粤AN4M41', '粤A0JW53']

In [7]:
I = []
for car in random.sample(list(content.keys()), 50):
    TR = np.array(content.get(car))
    I.append(TR)

In [8]:
# 第一部分：将轨迹分成线段
total_segments = []
for tra in I:
    model = ATP(tra)
    tra_ATP = model.MDL()
    format_segment = array2Segment(tra_ATP)
    total_segments = total_segments + format_segment

共有898条线段
共有1143条线段
共有1055条线段
共有471条线段
共有952条线段
共有466条线段
共有1条线段
共有4条线段
共有656条线段
共有959条线段
共有1146条线段
共有587条线段
共有1025条线段
共有557条线段
共有541条线段
共有652条线段
共有624条线段
共有1116条线段
共有1144条线段
共有624条线段
共有769条线段
共有693条线段
共有899条线段
共有735条线段
共有1110条线段
共有800条线段
共有666条线段
共有890条线段
共有949条线段
共有425条线段
共有1028条线段
共有694条线段
共有1085条线段
共有428条线段
共有702条线段
共有1077条线段
共有656条线段
共有790条线段
共有420条线段
共有1040条线段
共有667条线段
共有1211条线段
共有729条线段
共有1201条线段
共有924条线段
共有756条线段
共有770条线段
共有642条线段
共有1135条线段
共有799条线段


In [None]:
I

In [10]:
len(total_segments)

39311

In [11]:
# 第二部分：线段聚类
start = time.time()
LSC_model = LSC(epsilon=0.05, min_lines=5)
norm_cluster = LSC_model.line_segment_clustering(total_segments)
end = time.time()
print('线段聚类部分用时:'+str(end-start))

线段聚类部分用时:903.0005311965942


In [12]:
norm_cluster.keys()

dict_keys([0, 1, 2, 3, 4, 5])

In [13]:
for i in norm_cluster.keys():
    print('第' + str(i) + '个类的线段条数为:' + str(len(norm_cluster[i])))

第0个类的线段条数为:35522
第1个类的线段条数为:591
第2个类的线段条数为:161
第3个类的线段条数为:3015
第4个类的线段条数为:8
第5个类的线段条数为:14


In [14]:
# 第三部分：获得每个簇的代表性轨迹
start = time.time()
RTR_dict = representative_trajectory_generation(norm_cluster, min_lines=2, min_dist=0.0005)
end = time.time()
print('提取代表性轨迹用时:'+ str(end-start))

提取代表性轨迹用时:832.0400011539459


In [15]:
RTR_dict.keys()

dict_keys([0, 1, 2, 3, 5])

In [16]:
# 输出每个簇相应的数量
for i in list(RTR_dict.keys()):
    print('第'+ str(i) + '个簇的点数为:' + str(len(RTR_dict[i])))

第0个簇的点数为:3424
第1个簇的点数为:235
第2个簇的点数为:118
第3个簇的点数为:410
第5个簇的点数为:11


In [17]:
for i in list(RTR_dict.keys()):
    if len(RTR_dict[i]) < 50:
        RTR_dict.pop(i)
        print('删除了第'+str(i)+'个簇')

删除了第5个簇


In [18]:
# 输出每个簇相应的数量
for i in list(RTR_dict.keys()):
    print('第'+ str(i) + '个簇的点数为:' + str(len(RTR_dict[i])))

第0个簇的点数为:3424
第1个簇的点数为:235
第2个簇的点数为:118
第3个簇的点数为:410


In [19]:
# 将default_dict的
for j in list(RTR_dict.keys()):
    for i in range(len(RTR_dict[j])):
        RTR_dict[j][i] = [RTR_dict[j][i].x, RTR_dict[j][i].y]

In [20]:
a = dict(RTR_dict)
a

{0: [[113.25785222753865, 22.98689331738406],
  [113.25764433208968, 22.987517844223383],
  [113.25756894716511, 22.98931573098899],
  [113.25815002774046, 22.989863122242568],
  [113.25948091622718, 22.98932501222575],
  [113.26005950104467, 22.988884810682272],
  [113.26049379719262, 22.988553655818478],
  [113.26105325144583, 22.98821422927424],
  [113.26166662047515, 22.987812039370404],
  [113.26227897649466, 22.987805937545986],
  [113.26285179318373, 22.987893914857267],
  [113.26276012773833, 22.988417562680944],
  [113.26254440008248, 22.988880302207683],
  [113.26303923993426, 22.98866647350622],
  [113.26351949785429, 22.98840460462528],
  [113.26401240020772, 22.98813726471085],
  [113.26375253880428, 22.98868349588669],
  [113.26369614440429, 22.989280985875062],
  [113.2641854733576, 22.989453067029082],
  [113.26467212539092, 22.98965683144951],
  [113.26486574889526, 22.990186749009204],
  [113.26529731018908, 22.990475804565598],
  [113.26582558962474, 22.9913579881957

In [21]:
with open(save_file_name, 'w') as f:
    b = json.dumps(a)
    f.write(b)

In [None]:
# source_x = [draw_point[i][0] for i in range(len(draw_point))]
# source_y = [draw_point[i][1] for i in range(len(draw_point))]

In [None]:
# from matplotlib import pyplot as plt
# fig = plt.figure(figsize=(9, 6))
# ax = fig.add_subplot(111)
# ax.plot(source_x[:], source_y[:], 'g--', lw=2.0, label="2000 chosen points")
# ax.scatter(source_x[:], source_y[:], c='g', alpha=0.5)