In [1]:
from __future__ import annotations
import os, sys
import pandas as pd
import numpy as np
from tqdm import tqdm
from itertools import repeat
from multiprocessing import cpu_count, freeze_support
from concurrent.futures import ProcessPoolExecutor
import json


In [None]:
# Batch processing of lighting data reading and index setting 
wide_data_avrg_2018 = pd.read_csv(
    r'E:\National University of Singapore\Yang Yang - flooding\Process Data\h09v06_Florida\dbf\2018xy\summary_wide.csv',
    index_col=False
)
wide_data_avrg_2018 = wide_data_avrg_2018.set_index('xy_id')

wide_data_avrg_2017 = pd.read_csv(
    r'E:\National University of Singapore\Yang Yang - flooding\Process Data\h09v06_Florida\dbf\2017xy\summary_wide.csv',
    index_col=False
)
wide_data_avrg_2017 = wide_data_avrg_2017.set_index('xy_id')

# Merge the datasets

wide_data_avrg = pd.concat([wide_data_avrg_2017, wide_data_avrg_2018], axis=1)
del wide_data_avrg_2017, wide_data_avrg_2018

In [None]:
# Adjust the lighting levels
wide_data_avrg.iloc[:, 1:] = wide_data_avrg.iloc[:, 1:] * 0.1

In [None]:
#  Delete the data rows where the mean value is less than 0.5.
row_means = wide_data_avrg.mean(axis=1, skipna=True)
cleaned_wide_data_avrg = wide_data_avrg[row_means >= 0.5]

In [None]:
#Remove outliers
def remove_outliers_mad(
    df, 
    k=5, 
    min_valid=3, 
    max_offset=3,
    verbose=False
):


    df_cleaned = df.copy()

    for i, row_idx in enumerate(df.index):
        series = df.loc[row_idx]
        for t_idx in range(len(series)):
            neighbors = []
            offset = 1

            while offset <= max_offset and len(neighbors) < min_valid:
                for delta in [-offset, offset]:
                    neighbor_idx = t_idx + delta
                    if 0 <= neighbor_idx < len(series):
                        val = series.iloc[neighbor_idx]
                        if pd.notna(val):
                            neighbors.append(val)
                offset += 1

            if len(neighbors) >= min_valid and pd.notna(series.iloc[t_idx]):
                median = np.median(neighbors)
                mad = np.median([abs(x - median) for x in neighbors])
                if mad > 0 and abs(series.iloc[t_idx] - median) > k * mad:
                    df_cleaned.iat[i, t_idx] = np.nan
                    if verbose:
                        print(f"Row {row_idx}, Column {df.columns[t_idx]} marked as outlier: {series.iloc[t_idx]}")

    return df_cleaned

cleaned_wide_data_avrg = remove_outliers_mad(cleaned_wide_data_avrg, k=5, min_valid=3, max_offset=3)

In [None]:
#cleaned_wide_data_avrg.to_csv(r'E:\National University of Singapore\Yang Yang - flooding\Process Data\h09v06_Florida\0520test\cleaneddata.csv')

In [None]:
cleaned_wide_data_avrg = pd.read_csv(r'E:\National University of Singapore\Yang Yang - flooding\Process Data\h09v06_Florida\0520test\cleaneddata.csv',index_col=0)

In [None]:

#  1. back up data
if 'cleaned_wide_data_avrg' not in globals():
    print("！错误：在当前环境未找到 cleaned_wide_data_avrg，请先读取")
    print("cleaned_wide_data_avrg = pd.read_csv(..., index_col=0)")
    sys.exit(1)

df = cleaned_wide_data_avrg
df.columns = pd.to_datetime(df.columns)

df_smooth = df.rolling(7, axis=1, min_periods=1).mean()
df = df_smooth

# —— A. Prior hyperparameters —— 
prior = dict(
    trend = dict(
        maxK = 5,           
        minSepDistTrend = 10, 
        maxOrder = 1         
    ),
    alpha1 = 3,
    alpha2 = 3
)


dates = df.columns
freq  = 1
mask_num = 0.1
season_num = 4
formula_type = 'y ~ trend+season' # or 'y ~ trend + season'

json_out_path = r"E:\National University of Singapore\Yang Yang - flooding\Process Data\h09v06_Florida\0520test\beast_model_0521.json"
out_trend_path = r"E:\National University of Singapore\Yang Yang - flooding\Process Data\h09v06_Florida\0520test\beast_trends_0521.csv"
out_cp_path    = r"E:\National University of Singapore\Yang Yang - flooding\Process Data\h09v06_Florida\0520test\beast_change_points_0521.csv"

results = []
cp_records = []

# —— 2. BEAST Single Sequence Function —— 

def extract_cp_info(res):
    import numpy as np

    try:
        cp_raw = getattr(res.trend, "cp", None)
        print(f"[DEBUG] raw cp = {cp_raw} ({type(cp_raw)})")

        # convert to ndarray
        cp_array = np.asarray(cp_raw).ravel()
        print(f"[DEBUG] cp_array (raveled) = {cp_array}")

        # delete NaN
        cp_array_clean = cp_array[~np.isnan(cp_array)]
        print(f"[DEBUG] cp_array_clean = {cp_array_clean}")

        # convert to int index
        cp_index = cp_array_clean.astype(int)
        print(f"[DEBUG] cp_index = {cp_index}")

        cp_prob_full = getattr(res.trend, "cpOccPr", None)
        cp_prob_array = np.asarray(cp_prob_full).ravel()
        print(f"[DEBUG] cpOccPr full = {cp_prob_array} (len={len(cp_prob_array)})")

        if cp_index.size > 0 and cp_index.max() < len(cp_prob_array):
            cp_prob = cp_prob_array[cp_index]
        else:
            cp_prob = np.array([], dtype=float)

        print(f"[OK] extracted cp_index = {cp_index}, cp_prob = {cp_prob}")
        return cp_index, cp_prob

    except Exception as e:
        print("[ERROR in extract_cp_info]:", e)
        return np.array([], dtype=int), np.array([], dtype=float)



def run_beast_on_series(h3_id, values, dates, freq=1):
    import Rbeast, numpy as np, pandas as pd
    


    ts = pd.Series(values.astype(float), index=dates)

    res = Rbeast.beast(
        ts,
        freq=freq,
        season=season_num,
        what='all',
        formula=formula_type,
        prior=prior,
        extra={
            "computeCredible": True,
            "computeTrendChngpt": True,
            "computeTrendSlope": True,
            "computeSeasonOrder": True,
            "computeTrendOrder": True,
            "computeSeasonChngpt": True,
            "quiet": True,
            "printProgress": False,
            "dumpInputData": False
        }
    )


    def _to_list(comp):
        import numpy as np, collections.abc as cabc
        if isinstance(comp, np.ndarray):
            return comp.tolist()
        for attr in ("est", "value", "mean", "Y", "Yhat"):  
            if hasattr(comp, attr):
                return np.asarray(getattr(comp, attr)).squeeze().tolist()
        if isinstance(comp, (list, tuple, cabc.Iterable)):
            return list(comp)
        raise TypeError(f"Unrecognized BEAST component: {type(comp)}")

    trend_vals  = _to_list(res.trend)
    fitted_vals = _to_list(res.fitted) if hasattr(res, "fitted") else None

    # Filter out significant mutation points
    cp_index, cp_prob = extract_cp_info(res)        # 1-based
    # ---- get slope  ----
    slope_arr = getattr(res.trend, "slope", None) or getattr(res.trend, "slp", None)
    if slope_arr is None:   
        slope_arr = np.diff(trend_vals, prepend=trend_vals[0])
    slope = np.asarray(slope_arr).ravel()       # 0-based

    if cp_index.size:
        # ① 1-based → 0-based
        cp_index0 = cp_index - 1

        # ② Exclude the illegal actions
        valid     = cp_index0 > 0
        cp_index0 = cp_index0[valid]
        cp_prob   = cp_prob[valid]

        # ③ Determine the decline by using "the average slope on the right side of the win".
        win = 3
        right_mean = [
            slope[i : min(i + win, len(slope))].mean()
            for i in cp_index0
        ]
        right_mean = np.asarray(right_mean)

        # ④ Jump downward and "right_mean" is not nan
        down_mask  = (~np.isnan(right_mean)) & (right_mean < 0)
        cp_index0  = cp_index0[down_mask]
        cp_prob    = cp_prob[down_mask]
    else:
        cp_index0 = np.array([], dtype=int)
        cp_prob   = np.array([], dtype=float)

    # ⑤ Confidence level filtering
    mask          = cp_prob >= mask_num           
    cp_index_keep = cp_index0[mask].tolist()
    cp_prob_keep  = cp_prob[mask].tolist()
    cp_date_keep  = [dates[i] for i in cp_index_keep]

    print(f"[{h3_id}] cp_date_keep: {cp_date_keep}")

    def _py(v):
        import numpy as np
        if isinstance(v, np.generic):
            return v.item()
        if isinstance(v, np.ndarray):
            return v.tolist()
        return v
    
    return {k: _py(v) for k, v in {
        "id": h3_id,
        "trend": trend_vals,
        "fitted": fitted_vals,
        "cp_index": cp_index_keep,
        "cp_date":  [d.strftime("%Y-%m-%d") for d in cp_date_keep],
        "cp_prob":  cp_prob_keep,
        "r2": getattr(res, "R2", None),
        "rmse": getattr(res, "RMSE", None),
        "mape": getattr(res, "MAPE", None),
        "n_cp": len(cp_index_keep),
        "season_order": None,
        "freq": freq,
    }.items()}


#3. Serial processing, fault tolerance, and interim storage


# Continuation after interruption
completed = set()
if os.path.exists(out_trend_path):
    completed = set(pd.read_csv(out_trend_path, index_col=0).index)

for idx in tqdm(df.index, desc="Serial BEAST"):
    if idx in completed:
        continue  
    values = df.loc[idx].to_numpy()
    try:
        r = run_beast_on_series(idx, values, dates, freq)
        results.append(r)
        
        trend_series = pd.Series(
            data=r['trend'], 
            index=dates, 
            name=r['id']
        )
        trend_series.to_frame().T.to_csv(
            out_trend_path,
            mode='a',
            header=not os.path.exists(out_trend_path)
        )

        if r["n_cp"] > 0:
            pd.DataFrame(
                [{"H3_id": r["id"], "cp_date": d, "cp_prob": p}
                for d, p in zip(r["cp_date"], r["cp_prob"])]
            ).to_csv(
                out_cp_path,
                mode='a',
                index=False,
                header=not os.path.exists(out_cp_path)
            )
            
        with open(json_out_path, "a", encoding="utf-8") as jf:
            jf.write(json.dumps(r, ensure_ascii=False) + "\n")        
    except Exception as e:
        print(f"!! H3 {idx} failed, skip: {e}")

# 4.Write the remaining cp_records into
if cp_records:
    pd.DataFrame(cp_records).to_csv(out_cp_path, mode='a', index=False,
                                   header=not os.path.exists(out_cp_path))
print("Done")
print(f"  trends: {out_trend_path}")
print(f"  mutational site: {out_cp_path}")


print(f"full model: {json_out_path}")
