In [9]:
import math
import numpy as np
import pandas as pd

# ===============================================================
# 1. Lempel–Ziv 熵率计算
# ===============================================================
def lz_lambdas(seq):
    """计算每个位置的 Lambda_i（最短未在前缀出现的子串长度）"""
    N = len(seq)
    seq_t = tuple(seq)
    lambdas = []
    for i in range(N):
        max_L = N - i
        L = 1
        while L <= max_L:
            subseq = seq_t[i:i+L]
            if L > i:
                lambdas.append(L)
                break
            occurred = False
            for j in range(0, i - L + 1):
                if seq_t[j:j+L] == subseq:
                    occurred = True
                    break
            if not occurred:
                lambdas.append(L)
                break
            L += 1
        else:
            lambdas.append(max_L)
    return lambdas


def lz_entropy_rate(seq):
    """计算 Lempel–Ziv 熵率（bits/符号）"""
    N = len(seq)
    if N == 0:
        return 0.0, []
    lambdas = lz_lambdas(seq)
    sum_lambda = sum(lambdas)
    if sum_lambda == 0:
        return 0.0, lambdas
    H_hat = (N * math.log2(N)) / sum_lambda
    return H_hat, lambdas


# ===============================================================
# 2. 修正版 Fano 方程 — 理论最大可预测性 Πmax
# ===============================================================
def predictability_upper_bound(S_bits_per_symbol, alphabet_size, tol=1e-9, max_iter=200):
    """根据 Song et al. (2010) 公式计算理论最大可预测性 Πmax"""
    if S_bits_per_symbol < 1e-12:
        return 1.0
    H_max = math.log2(alphabet_size)
    if abs(S_bits_per_symbol - H_max) < 1e-12:
        return 1.0 / alphabet_size

    def f(Pi):
        Pi = np.clip(Pi, 1e-12, 1 - 1e-12)
        return -Pi * math.log2(Pi) - (1 - Pi) * math.log2((1 - Pi) / (alphabet_size - 1)) - S_bits_per_symbol

    low, high = 0.0, 1.0
    for _ in range(max_iter):
        mid = 0.5 * (low + high)
        if f(mid) < 0:  # 修正方向：f(Pi) 单调递减
            high = mid
        else:
            low = mid
        if abs(high - low) < tol:
            break
    return (low + high) / 2


# ===============================================================
# 3. 停留时长分箱 + 联合符号
# ===============================================================
def duration_to_bin(duration_sec, bins=None):
    if bins is None:
        # 默认分箱：0–10min,10–30min,30–60min,1–3h,>3h
        bins = [0, 600, 1800, 3600, 10800, float('inf')]
    for i in range(len(bins)-1):
        if bins[i] <= duration_sec < bins[i+1]:
            return i
    return len(bins)-2


def build_stay_sequence(df, grid_col='grid', duration_col='duration', bins=None):
    """构建按停留点粒度的联合符号序列"""
    if bins is None:
        bins = [0, 600, 1800, 3600, 10800, float('inf')]
    grids = sorted(df[grid_col].unique())
    grid_to_idx = {g: i for i, g in enumerate(grids)}
    B = len(bins) - 1
    seq = []
    for _, row in df.iterrows():
        g = grid_to_idx[int(row[grid_col])]
        d = duration_to_bin(float(row[duration_col]), bins)
        symbol = g * B + d
        seq.append(symbol)
    return seq, len(grids) * B


# ===============================================================
# 4. 按小时展开轨迹序列
# ===============================================================
def build_hourly_sequence(df, stime_col='stime', etime_col='etime', grid_col='grid', fill_missing=-1):
    """将轨迹展开为每小时一个 grid"""
    df = df.copy()
    df[stime_col] = pd.to_datetime(df[stime_col])
    df[etime_col] = pd.to_datetime(df[etime_col])
    t0 = df[stime_col].min().floor('H')
    t1 = df[etime_col].max().ceil('H')
    hours = pd.date_range(t0, t1, freq='H')
    hour_grid = {h: fill_missing for h in hours}
    for _, row in df.iterrows():
        s, e, g = row[stime_col], row[etime_col], int(row[grid_col])
        h = s.floor('H')
        while h < e:
            if h in hour_grid:
                hour_grid[h] = g
            h += pd.Timedelta(hours=1)
    seq = [hour_grid[h] for h in hours]
    return seq, len(set(seq))


# ===============================================================
# 5. 主函数：计算两种粒度的可预测性
# ===============================================================
def compute_predictability(data, mode='stay'):
    """
    mode: 'stay' (按停留点计算) 或 'hourly' (按小时展开)
    data:
      - 若为 list，则直接作为序列（如 grid 序列）
      - 若为 DataFrame，则从中构建序列
    返回: dict 包含 H_hat, Pi_max, alphabet_size, seq_length
    """
    # 直接传入序列
    if isinstance(data, (list, np.ndarray)):
        seq = [int(x) for x in data]
        alphabet_size = len(set(seq))
    # 传入 DataFrame
    elif isinstance(data, pd.DataFrame):
        if mode == 'stay':
            seq, alphabet_size = build_stay_sequence(data)
        elif mode == 'hourly':
            seq, alphabet_size = build_hourly_sequence(data)
        else:
            raise ValueError("mode must be 'stay' or 'hourly'")
        seq = [int(s) for s in seq if s != -1]
    else:
        raise TypeError("data must be a list or pandas.DataFrame")
    
    alphabet_size = 21000
    H_hat, _ = lz_entropy_rate(seq)
    Pi_max = predictability_upper_bound(H_hat, alphabet_size)
    return {
        "mode": mode,
        "H_hat(bits/symbol)": H_hat,
        "Pi_max": Pi_max,
        "alphabet_size": alphabet_size,
        "seq_length": len(seq)
    }


# ===============================================================
# 示例
# ===============================================================
if __name__ == "__main__":
    # ==== 示例1：规律性很高的序列 ====
    seq = [123, 4567, 4432, 2124, 4521] * 10
    res_seq = compute_predictability(seq)
    print("=== 周期性序列 ===")
    for k, v in res_seq.items():
        print(f"{k}: {v}")

    # ==== 示例2：停留点轨迹数据 ====
    data = [
        [1,'001','2008-10-23 05:59:59','2008-10-23 10:32:53',116.325033,39.979672,16374.0,14806],
        [1,'001','2008-10-23 11:03:36','2008-10-23 23:49:59',116.306654,40.016016,45983.0,14621],
        [1,'001','2008-10-24 00:14:57','2008-10-24 01:48:27',116.325005,39.978826,5610.0,14806],
        [1,'001','2008-10-24 01:54:17','2008-10-24 03:21:04',116.310839,39.980082,5207.0,14617],
        [1,'001','2008-10-24 03:54:19','2008-10-24 05:30:43',116.325061,39.979697,5784.0,14806],
    ]
    df = pd.DataFrame(data, columns=['userID','col2','stime','etime','lon','lat','duration','grid'])

    print("\n=== 按停留点粒度 ===")
    res_stay = compute_predictability(df, mode='stay')
    for k, v in res_stay.items():
        print(f"{k}: {v}")

    print("\n=== 按小时粒度 ===")
    res_hourly = compute_predictability(df, mode='hourly')
    for k, v in res_hourly.items():
        print(f"{k}: {v}")

=== 周期性序列 ===
mode: stay
H_hat(bits/symbol): 0.4703213491478937
Pi_max: 0.9779033088125288
alphabet_size: 21000
seq_length: 50

=== 按停留点粒度 ===
mode: stay
H_hat(bits/symbol): 2.321928094887362
Pi_max: 0.8759545055218041
alphabet_size: 21000
seq_length: 5

=== 按小时粒度 ===
mode: hourly
H_hat(bits/symbol): 1.4332889474613348
Pi_max: 0.9265487245284021
alphabet_size: 21000
seq_length: 25


  t0 = df[stime_col].min().floor('H')
  t1 = df[etime_col].max().ceil('H')
  hours = pd.date_range(t0, t1, freq='H')
  h = s.floor('H')


In [8]:
# 实际停留点数据。
traj = pd.read_csv('./Data/Output/Stays/000.csv', index_col=0)
traj.head(3)

res_stay = compute_predictability(traj, mode='stay')
for k, v in res_stay.items():
    print(f"{k}: {v}")

mode: stay
H_hat(bits/symbol): 3.046651980015867
Pi_max: 0.8331160484813154
alphabet_size: 21000
seq_length: 499


# 实际停留点数据。
traj = pd.read_csv('./Data/Output/Stays/000.csv', index_col=0)
traj.head(3)

res_stay = compute_predictability(traj, mode='stay')
for k, v in res_stay.items():
    print(f"{k}: {v}")