In [None]:
# Import necessary libraries
import pandas as pd               # Data manipulation and analysis
import numpy as np                # Numerical operations on arrays
import matplotlib.pyplot as plt   # Plotting library

from matplotlib.lines import Line2D                        # Custom legend creation
from tslearn.metrics import cdist_dtw                      # Compute DTW distances
from sktime.clustering.k_means import TimeSeriesKMeans     # K-Means clustering for time series
from sklearn.preprocessing import MinMaxScaler             # Normalize data
from itertools import product                              # Create combinations of parameters
from tslearn.clustering import silhouette_score            # Compute silhouette score for clustering
from matplotlib.lines import Line2D                        # Custom legend creation



In [None]:
#--- Data import ---
# Load CSV containing indices
data = pd.read_csv('path/to/your/data.csv', delimiter=',')

#--- Data exploration ---
# Count unique polygons/areas
print('Number of areas/polygons:', data['ID'].nunique())

#--- Date extraction and conversion ---
# Extract date substring and convert to datetime
data = data.assign(date=data['system:index'].str[:9])
data['date'] = pd.to_datetime(data['date'])

In [4]:
#--- Filtering ---
# Exclude records with NDSI >= 0.4 (likely clouds or snow)
filtered_data = data[data['NDSI'] < 0.4]

In [None]:
#--- Prepare ndarray for clustering ---
unique_ids = filtered_data['ID'].unique()
unique_dates = filtered_data['date'].unique()
features = ['NDMI']   # Features to include in clustering

n_samples = len(unique_ids)
seq_length = len(unique_dates)
n_features = len(features)

# Initialize array with NaNs for missing values
data_ndarray = np.full((n_samples, seq_length, n_features), np.nan)

In [16]:
# Map ID and date to array indices
id_to_idx = {val: idx for idx, val in enumerate(unique_ids)}
date_to_idx = {val: idx for idx, val in enumerate(unique_dates)}

In [17]:
# Fill the ndarray with observed values
for _, row in filtered_data.iterrows():
    i = id_to_idx[row['ID']]
    j = date_to_idx[row['date']]
    for f_idx, feature in enumerate(features):
        data_ndarray[i, j, f_idx] = row[feature]

In [None]:
#--- Interpolate missing data ---
# Use linear interpolation per sample and feature
def interpolate_ts(ts):
    df = pd.DataFrame(ts, columns=['val'])
    df['val'] = df['val'].interpolate(method='linear', limit_direction='both')
    return df['val'].values

for i in range(n_samples):
    for f in range(n_features):
        data_ndarray[i, :, f] = interpolate_ts(data_ndarray[i, :, f])

print('NaN count after interpolation:', np.isnan(data_ndarray).sum())
print('Data shape:', data_ndarray.shape)

In [None]:
# --- Min-Max normalization to the range [-1, 1] for each feature separately ---
data_reshaped = data_ndarray.reshape(-1, n_features)  # Reshape to 2D for normalization

scaler = MinMaxScaler(feature_range=(-1, 1))
data_scaled = scaler.fit_transform(data_reshaped)  # Perform normalization

# Reshape back to the original 3D shape
data_ndarray = data_scaled.reshape(n_samples, seq_length, n_features)

print("Min-Max normalization completed.")
print("New data range:", data_ndarray.min(), data_ndarray.max())

In [None]:
# --- Parameter grid definition for sktime TimeSeriesKMeans ---
param_grid = {
    "n_clusters":     [7],
    "init_algorithm": ["kmeans++", "random", "forgy"],
    "n_init":         [10, 100, 250, 500, 750, 1000],
    "max_iter":       [100, 200, 300, 500],
    "tol":            [1e-3],
}

keys, values = zip(*param_grid.items())
grid = [dict(zip(keys, combo)) for combo in product(*values)]

best_score = -np.inf
best_cfg   = None
best_model = None

In [None]:
# Grid search + silhouette evaluation
for params in grid:
    model = TimeSeriesKMeans(
        random_state=42,
        metric="dtw",
        **params
    )
    labels = model.fit_predict(data_ndarray)
    score = silhouette_score(
        data_ndarray,
        labels,
        metric="dtw",
    )
    print(f"Testing {params} → silhouette score: {score:.4f}")
    if score > best_score:
        best_score = score
        best_cfg   = params
        best_model = model

print(f"\n🏆 Best configuration: {best_cfg} with silhouette score = {best_score:.4f}")


In [None]:
# --- Visualization of results using plot_cluster_algorithm ---
best_model = TimeSeriesKMeans(
    random_state=42,
    metric="dtw",
    **best_cfg
).fit(data_ndarray)


In [None]:
# Predict cluster labels for each time series
clusters = best_model.fit_predict(data_ndarray)

# --- Visualize clusters ---
# Prepare a colormap and assign unique colors for each feature
cmap = plt.get_cmap('tab10')
feature_colors = [cmap(i / n_features) for i in range(n_features)]

# Create one subplot per cluster, arranged vertically
fig, axes = plt.subplots(7, 1, figsize=(10, 2 * 7), sharex=True)

for cluster_id in range(7):
    series_indices = np.where(clusters == cluster_id)[0]
    print(f'Cluster {cluster_id + 1}: {len(series_indices)} time series')

    ax = axes[cluster_id]
    # Plot each time series in the cluster using its assigned color
    for idx in series_indices:
        for f_idx in range(n_features):
            ax.plot(
                unique_dates,
                data_ndarray[idx, :, f_idx],
                color=feature_colors[f_idx],
                alpha=0.5,
                linewidth=1
            )

    ax.set_title(f'Cluster {cluster_id + 1}')
    ax.grid(True)

    # Add a legend mapping colors to feature names
    legend_handles = [
        Line2D([0], [0], color=feature_colors[f_idx], lw=2, label=features[f_idx])
        for f_idx in range(n_features)
    ]
    ax.legend(handles=legend_handles, loc='upper right')

# Shared axis label for all subplots
axes[-1].set_xlabel('Date')
# fig.text(0.06, 0.5, 'Value', va='center', rotation='vertical')

plt.tight_layout()
plt.show()


In [None]:
#--- Export cluster assignments ---
cluster_map = pd.DataFrame({'ID': unique_ids, 'Cluster': clusters})
export_data = filtered_data[['ID', 'phase']].drop_duplicates().merge(cluster_map, on='ID')
export_data.to_csv('output_file.csv', index=False)
print('Export completed: output_file.csv')
