In [1]:
%load_ext kedro

In [None]:
df = catalog.load('clean_data')
df.info()

In [9]:
from scipy.cluster.hierarchy import dendrogram, linkage, fcluster
from statsmodels.tsa.seasonal import seasonal_decompose
from scipy.signal import periodogram
from sklearn.preprocessing import StandardScaler
import numpy as np

In [10]:
class CommodityItem:
    def __init__(self, name, df):
        self.name = name
        self.df = df.sort_values('date')
        self.series = self.df.set_index('date')['high_price']
        
    def extract_features(self):
        """Extract time series features for clustering"""
        features = {}
        ts = self.series
        
        features['trend_strength'] = self._get_trend_strength(ts)
        features['trend_slope'] = self._get_trend_slope(ts)
        
        features['seasonality_strength'] = self._get_seasonality_strength(ts)
        features['seasonal_period'] = self._detect_seasonal_period(ts)
        
        features['volatility'] = ts.pct_change().std()
        features['hurst_exponent'] = self._get_hurst_exponent(ts)
        
        features['skewness'] = ts.skew()
        features['kurtosis'] = ts.kurtosis()
        features['cv'] = ts.std() / ts.mean()
        
        return features
    
    def _get_trend_strength(self, ts):
        """Measure trend strength using variance decomposition"""
        try:
            result = seasonal_decompose(ts.dropna(), period=365, extrapolate_trend='freq')
            return np.var(result.trend) / (np.var(result.trend) + np.var(result.resid))
        except:
            return 0
    
    def _get_seasonality_strength(self, ts):
        """Measure seasonality strength"""
        try:
            result = seasonal_decompose(ts.dropna(), period=365, extrapolate_trend='freq')
            return np.var(result.seasonal) / (np.var(result.trend) + np.var(result.seasonal) + np.var(result.resid))
        except:
            return 0
    
    def _get_trend_slope(self, ts):
        """Linear trend slope"""
        x = np.arange(len(ts))
        return np.polyfit(x, ts.values, 1)[0]
    
    def _detect_seasonal_period(self, ts):
        """Detect dominant seasonal period using FFT"""
        f, Pxx = periodogram(ts.dropna())
        if len(Pxx) > 0:
            return 1/f[np.argmax(Pxx[1:]) + 1]
        return 0
    
    def _get_hurst_exponent(self, ts):
        """Hurst exponent for long-term memory/persistence"""
        lags = range(2, 100)
        tau = [np.std(np.subtract(ts[lag:].values, ts[:-lag].values)) for lag in lags]
        return np.polyfit(np.log(lags), np.log(tau), 1)[0]

In [13]:
class CommodityClusterAnalyzer:
    def __init__(self, df):
        self.df = df
        self.items = {}
        self.feature_matrix = None
        self.clusters = None
        
    def create_items(self):
        """Create CommodityItem objects for each unique item"""
        for item_name in self.df['item'].unique():
            item_df = self.df[self.df['item'] == item_name]
            self.items[item_name] = CommodityItem(item_name, item_df)
    
    def extract_all_features(self):
        """Extract features for all items with NaN handling"""
        features_list = []
        item_names = []
        
        for name, item in self.items.items():
            try:
                features = item.extract_features()
                # Convert to list and check for NaN/Inf
                feature_values = list(features.values())
                
                # Check for any NaN or infinite values
                if not any(np.isnan(x) or np.isinf(x) for x in feature_values if isinstance(x, (int, float))):
                    features_list.append(feature_values)
                    item_names.append(name)
                else:
                    print(f"Skipping {name} due to NaN/Inf values")
                    
            except Exception as e:
                print(f"Error processing {name}: {e}")
                continue
        
        self.feature_matrix = np.array(features_list)
        self.feature_names = list(features.keys()) if features_list else []
        self.item_names = item_names
        
        print(f"Successfully processed {len(features_list)} items out of {len(self.items)}")
        return self.feature_matrix
    
    def perform_clustering(self, n_clusters=5):
        """Perform hierarchical clustering with robust error handling"""
        if self.feature_matrix is None or len(self.feature_matrix) == 0:
            raise ValueError("No valid features extracted for clustering")
        
        # Standardize features and handle any remaining NaN/Inf
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(self.feature_matrix)
        
        # Final check for NaN/Inf
        if np.any(~np.isfinite(X_scaled)):
            print("Warning: NaN/Inf values found after scaling. Replacing with 0.")
            X_scaled = np.nan_to_num(X_scaled)
        
        # Hierarchical clustering
        Z = linkage(X_scaled, method='ward')
        self.clusters = fcluster(Z, n_clusters, criterion='maxclust')
        
        return self.clusters, Z

In [15]:

analyzer = CommodityClusterAnalyzer(df)
analyzer.create_items()
analyzer.extract_all_features()

clusters, linkage_matrix = analyzer.perform_clustering(n_clusters=6)

cluster_summary = analyzer.perform_clustering()

for cluster_id, summary in cluster_summary.items():
    print(f"Cluster {cluster_id}:")
    print(f"  Items: {summary['items']}")
    print(f"  Avg Trend: {summary['avg_trend']:.3f}")
    print(f"  Avg Seasonality: {summary['avg_seasonality']:.3f}")
    print(f"  Size: {summary['size']}")
    print()

Skipping আদা (দেশী) due to NaN/Inf values
Skipping ইলিশ due to NaN/Inf values
Skipping খাসী due to NaN/Inf values
Skipping গরু due to NaN/Inf values
Skipping পাম অয়েল (সুপার) due to NaN/Inf values
Skipping পিঁয়াজ (আমদানি) due to NaN/Inf values
Successfully processed 45 items out of 51
