In [170]:
from pyspark.sql import Row
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.types import IntegerType,LongType,DateType,DoubleType,StringType,TimestampType,ArrayType, StructField,StructType
from pyspark.sql.functions import col,to_date,unix_timestamp
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from sklearn.cluster import DBSCAN,KMeans,OPTICS
import numpy as np
from scipy.spatial.distance import pdist
from scipy.spatial.distance import squareform

In [156]:
file_path = 'pos_zf_mt2_20220201120000_20220228115959_464.csv'

In [202]:
spark = SparkSession \
    .builder \
    .appName("Split by ports") \
    .getOrCreate()

df = spark.read.csv(file_path,header=True)
df = df.withColumnRenamed("航速(节)","航速")

df = df.withColumn("经度",df.经度.cast(DoubleType()))
df = df.withColumn("纬度",df.纬度.cast(DoubleType()))
df = df.withColumn("航向",df.航向.cast(DoubleType()))
df = df.withColumn("航速",df.航速.cast(DoubleType()))
df = df.withColumn("航艏向",df.航艏向.cast(DoubleType()))
df = df.withColumn('time', 
                   unix_timestamp(col('时间'), 'yyyy-MM-dd HH:mm:ss'))

In [203]:
df = df.orderBy(['MMSI','time'])

In [223]:
def dbscan_x1(coords):
    kms_per_radian = 6371.0086
    epsilon = 0.2/kms_per_radian
    data = [[i[0],i[1]] for i in coords]
    db = OPTICS(eps = epsilon,min_samples = 20,
                algorithm='ball_tree',metric = 'haversine',cluster_method='dbscan'
                ).fit(np.radians(data))
    cluster_labels = db.labels_.reshape(-1,1)
    coords_np = np.array(coords)
    return np.concatenate([coords_np,cluster_labels],axis=1).tolist()


def compute_squared_EDM(X):
    return squareform(pdist(X,metric='euclidean'))

# 判断pt点是否为核心点
# ptIndex为点pt的索引
# disMat距离矩阵
# eps为pt的半径（每个点都不一样）
# minPts为pt的圆中的点个数（每个点都不一样）
def isCorePoint(ptIndex,disMat,eps,minPts):
    if np.sum(np.where(disMat[ptIndex] <= eps, 1, 0)) >= minPts:
        return True
    return False

# Adaptive_DBSCAN算法核心过程
def Adaptive_DBSCAN(data,epsArr,minPtsArr):
    # 获得距离矩阵
    disMat = compute_squared_EDM(data)
    # 获得数据的行和列(一共有n条数据)
    n, m = data.shape
    # 初始化类别，-1代表未分类。
    labels = np.full((n,), -1)
    clusterId = 0
    # 遍历所有轨迹点寻找簇集
    for ptIndex in range(n):
        # 如果某轨迹点已经有类别，直接continue
        if labels[ptIndex] != -1:
            continue
        # 如果某轨迹点不是核心点，直接continue(因为簇集的产生是由核心点控制)
        if not isCorePoint(ptIndex,disMat,epsArr[ptIndex],minPtsArr[ptIndex]):
            continue
        # 首先将点pointId标记为当前类别(即标识为已操作)
        labels[ptIndex] = clusterId
        # 然后寻找种子点的eps邻域且没有被分类的点，将其放入种子集合
        neighbour = np.where((disMat[:, ptIndex] <= epsArr[ptIndex]) & (labels == -1))[0]
        seeds = set(neighbour)
        while len(seeds) > 0:
            # 弹出一个新种子点
            newPoint = seeds.pop()
            # 将newPoint标记为当前类
            labels[newPoint] = clusterId
            # 寻找newPoint种子点eps邻域（包含自己）
            queryResults = np.where(disMat[:,newPoint] <= epsArr[newPoint])[0]
            if len(queryResults) >= minPtsArr[newPoint]:
                # 将邻域内且没有被分类的点压入种子集合
                for resultPoint in queryResults:
                    if labels[resultPoint] == -1:
                        seeds.add(resultPoint)
        # 簇集生长完毕，寻找到一个类别
        clusterId = clusterId + 1
    return labels


def addbscam(coords):
    data = np.array([[i[0],i[1]] for i in coords])
    n = data.shape[0]
    # 构造每一个轨迹点的半径及包含的轨迹点个数
    epsArr = np.full((n),0.01)
    minPtsArr = np.full((n),15)   
    cluster_labels = np.array(Adaptive_DBSCAN(data,epsArr,minPtsArr)).reshape(-1,1)
    coords_np = np.array(coords)
    return np.concatenate([coords_np,cluster_labels],axis=1).tolist()    




output_schema = ArrayType(StructType(
            [
                StructField('经度', DoubleType(),False),
                StructField('纬度', DoubleType(),False),
                StructField('航速', DoubleType(),False),
                StructField('time', DoubleType(),False),                
                StructField('clusterid', DoubleType(),False)
             ]
    ))

In [224]:
udf_dbscan = F.udf(lambda x: addbscam(x),output_schema)
trajectories = df.withColumn('point',F.array(df.经度,df.纬度,df.航速,df.time)).groupby('MMSI').agg(F.collect_list('point').alias('point_list')).withColumn('cluster',udf_dbscan(F.col('point_list')))
#trajectories= trajectories


In [226]:
resultDF = trajectories.withColumn('centers',F.explode('cluster'))\
    .select('MMSI',F.col('centers').getItem('经度').alias('经度'),
           F.col('centers').getItem('纬度').alias('纬度'),
           F.col('centers').getItem('航速').alias('航速'),
           F.col('centers').getItem('time').alias('time'),            
           F.col('centers').getItem('clusterid').alias('clusterid'))

In [227]:
resultDF.show(20)

+---------+-------------+-----------+----+-------------+---------+
|     MMSI|         经度|       纬度|航速|         time|clusterid|
+---------+-------------+-----------+----+-------------+---------+
|311000894|-148.60327333|52.40270167|11.3| 1.64367371E9|     -1.0|
|311000894|   -148.61896|52.40559833|11.4|  1.6436739E9|     -1.0|
|311000894|-148.63296333|   52.40805|11.3| 1.64367407E9|     -1.0|
|311000894|-148.65608167|52.41196667|11.3| 1.64367435E9|     -1.0|
|311000894|-148.66839667|52.41412833|11.3|  1.6436745E9|     -1.0|
|311000894|   -148.68319|  52.416625|11.3| 1.64367468E9|     -1.0|
|311000894|   -148.69721|   52.41908|11.2|1.643674851E9|     -1.0|
|311000894|-148.70854833|52.42117167|11.2| 1.64367499E9|     -1.0|
|311000894|-148.72482667|52.42409833|11.2| 1.64367519E9|     -1.0|
|311000894|-148.73800333|52.42640833|11.2|1.643675351E9|     -1.0|
|311000894|-148.76268667|   52.43059|11.2|1.643675651E9|     -1.0|
|311000894|-148.77484667|  52.432765|11.2|  1.6436758E9|     -1.0|
|

In [228]:
df_stop = resultDF.filter('clusterid > -1')

In [232]:
df_stop.coalesce(1).write.option('header',True).csv('pos_zf_mt2_20220201120000_20220228115959_464_dbscan1.csv')

In [229]:
df_stop.show(100)

+---------+------------+-----------+----+-------------+---------+
|     MMSI|        经度|       纬度|航速|         time|clusterid|
+---------+------------+-----------+----+-------------+---------+
|311000894|  120.051635|35.33574167| 3.5|1.644983455E9|      0.0|
|311000894|120.05446333|35.33736833| 3.2|1.644983635E9|      0.0|
|311000894|120.05705167|35.33884667| 2.9|1.644983815E9|      0.0|
|311000894|120.05949167|35.34009833| 2.6|1.644983995E9|      0.0|
|311000894|120.06140833|35.34161667| 2.5|1.644984176E9|      0.0|
|311000894|  120.062965|   35.34336| 2.4|1.644984356E9|      0.0|
|311000894|   120.06431|35.34501667| 2.2|1.644984536E9|      0.0|
|311000894|  120.065515|35.34653167| 2.0|1.644984716E9|      0.0|
|311000894|120.06646833|   35.34777| 1.2|1.644984896E9|      0.0|
|311000894|120.06614833|35.34837167| 0.5|1.644985235E9|      0.0|
|311000894|120.06569333|  35.348225| 0.4|1.644985437E9|      0.0|
|311000894|  120.065425|35.34673333| 0.0|1.644987237E9|      0.0|
|311000894|  120