In [48]:
import numpy as np
import pandas as pd
from scipy import stats
from scipy.stats import chi2_contingency
from statsmodels.stats.multitest import multipletests
from typing import Dict, List, Tuple, Optional
import warnings

# Try to import validation module
try:
    from validation import ExperimentValidator
    VALIDATION_AVAILABLE = True
except ImportError:
    VALIDATION_AVAILABLE = False
    warnings.warn("Validation module not available. Skipping validation checks.")


class ABTestAnalyzer:
    
    def __init__(self, alpha: float = 0.05):
        self.alpha = alpha
        if VALIDATION_AVAILABLE:
            self.validator = ExperimentValidator(srm_threshold=0.001)  # Stricter for SRM
        else:
            self.validator = None
    
    def calculate_sample_size(self,
                            baseline_rate: float,
                            mde: float,
                            alpha: float = 0.05,
                            power: float = 0.80,
                            two_tailed: bool = True) -> int:
        
        if two_tailed:
            z_alpha = stats.norm.ppf(1 - alpha/2)
        else:
            z_alpha = stats.norm.ppf(1 - alpha)
        
        z_beta = stats.norm.ppf(power)
  
        p1 = baseline_rate
        p2 = baseline_rate * (1 + mde)
        
        
        p2 = min(p2, 0.999)
        
        numerator = (z_alpha + z_beta) ** 2 * (p1 * (1 - p1) + p2 * (1 - p2))
        denominator = (p2 - p1) ** 2
        
        n = numerator / denominator
        
        return int(np.ceil(n))
    
    def two_sample_ttest(self,
                        control: np.ndarray,
                        treatment: np.ndarray,
                        metric_name: str,
                        equal_var: bool = False) -> Dict:
        
        control = control[~np.isnan(control)]
        treatment = treatment[~np.isnan(treatment)]
        
        control_mean = control.mean()
        treatment_mean = treatment.mean()
        control_std = control.std(ddof=1)
        treatment_std = treatment.std(ddof=1)
        n_control = len(control)
        n_treatment = len(treatment)
        
        statistic, pvalue = stats.ttest_ind(treatment, control, equal_var=equal_var)
        
        pooled_std = np.sqrt((control_std**2 + treatment_std**2) / 2)
        cohens_d = (treatment_mean - control_mean) / pooled_std if pooled_std > 0 else 0
        
        se_diff = np.sqrt(control_std**2/n_control + treatment_std**2/n_treatment)
        
        if not equal_var:
            num = (control_std**2/n_control + treatment_std**2/n_treatment)**2
            denom = ((control_std**2/n_control)**2/(n_control-1) + 
                    (treatment_std**2/n_treatment)**2/(n_treatment-1))
            df = num / denom if denom > 0 else n_control + n_treatment - 2
        else:
            df = n_control + n_treatment - 2
        
        t_crit = stats.t.ppf(1 - self.alpha/2, df)
        diff = treatment_mean - control_mean
        ci_lower = diff - t_crit * se_diff
        ci_upper = diff + t_crit * se_diff
        
        relative_lift_pct = (diff / control_mean * 100) if control_mean != 0 else 0
        
        return {
            'metric': metric_name,
            'test_type': 't-test',
            'statistic': statistic,
            'pvalue': pvalue,
            'significant': pvalue < self.alpha,
            'control_mean': control_mean,
            'treatment_mean': treatment_mean,
            'control_std': control_std,
            'treatment_std': treatment_std,
            'absolute_diff': diff,
            'relative_lift_pct': relative_lift_pct,
            'cohens_d': cohens_d,
            'effect_interpretation': self._interpret_cohens_d(cohens_d),
            'ci_lower': ci_lower,
            'ci_upper': ci_upper,
            'n_control': n_control,
            'n_treatment': n_treatment,
            'degrees_of_freedom': df
        }
    
    def proportion_test(self,
                       control_successes: int,
                       control_total: int,
                       treatment_successes: int,
                       treatment_total: int,
                       metric_name: str) -> Dict:
        
        p_control = control_successes / control_total
        p_treatment = treatment_successes / treatment_total
        
        p_pooled = (control_successes + treatment_successes) / (control_total + treatment_total)
        
        se = np.sqrt(p_pooled * (1 - p_pooled) * (1/control_total + 1/treatment_total))
        
        z_stat = (p_treatment - p_control) / se if se > 0 else 0
        
        pvalue = 2 * (1 - stats.norm.cdf(abs(z_stat)))
        
        se_diff = np.sqrt(p_control*(1-p_control)/control_total + 
                         p_treatment*(1-p_treatment)/treatment_total)
        z_crit = stats.norm.ppf(1 - self.alpha/2)
        diff = p_treatment - p_control
        ci_lower = diff - z_crit * se_diff
        ci_upper = diff + z_crit * se_diff
        
        relative_lift_pct = (diff / p_control * 100) if p_control > 0 else 0
        
        return {
            'metric': metric_name,
            'test_type': 'proportion_test',
            'statistic': z_stat,
            'pvalue': pvalue,
            'significant': pvalue < self.alpha,
            'control_rate': p_control,
            'treatment_rate': p_treatment,
            'absolute_diff': diff,
            'relative_lift_pct': relative_lift_pct,
            'ci_lower': ci_lower,
            'ci_upper': ci_upper,
            'n_control': control_total,
            'n_treatment': treatment_total
        }
    
    def chi_square_test(self,
                       control: np.ndarray,
                       treatment: np.ndarray,
                       metric_name: str) -> Dict:
        
        combined = np.concatenate([control, treatment])
        labels = np.concatenate([np.zeros(len(control)), np.ones(len(treatment))])
        
        contingency_table = pd.crosstab(combined, labels)
        
        chi2, pvalue, dof, expected = chi2_contingency(contingency_table)

        n = len(combined)
        min_dim = min(contingency_table.shape[0], contingency_table.shape[1]) - 1
        cramers_v = np.sqrt(chi2 / (n * min_dim)) if min_dim > 0 else 0
        
        return {
            'metric': metric_name,
            'test_type': 'chi_square',
            'statistic': chi2,
            'pvalue': pvalue,
            'significant': pvalue < self.alpha,
            'degrees_of_freedom': dof,
            'cramers_v': cramers_v,
            'effect_interpretation': self._interpret_cramers_v(cramers_v),
            'n_control': len(control),
            'n_treatment': len(treatment)
        }
    
    def mann_whitney_u_test(self,
                           control: np.ndarray,
                           treatment: np.ndarray,
                           metric_name: str) -> Dict:

        

        control = control[~np.isnan(control)]
        treatment = treatment[~np.isnan(treatment)]
        

        statistic, pvalue = stats.mannwhitneyu(treatment, control, alternative='two-sided')
        

        n1 = len(control)
        n2 = len(treatment)
        rank_biserial = 1 - (2*statistic) / (n1 * n2)
        

        control_median = np.median(control)
        treatment_median = np.median(treatment)
        
        return {
            'metric': metric_name,
            'test_type': 'mann_whitney',
            'statistic': statistic,
            'pvalue': pvalue,
            'significant': pvalue < self.alpha,
            'control_median': control_median,
            'treatment_median': treatment_median,
            'rank_biserial': rank_biserial,
            'n_control': n1,
            'n_treatment': n2
        }
    
    def bootstrap_confidence_interval(self,
                                     control: np.ndarray,
                                     treatment: np.ndarray,
                                     metric_name: str,
                                     n_bootstrap: int = 10000,
                                     confidence_level: float = 0.95) -> Dict:
        
        np.random.seed(42)
        

        control = control[~np.isnan(control)]
        treatment = treatment[~np.isnan(treatment)]
        
 
        boot_diffs = []
        for _ in range(n_bootstrap):
            control_boot = np.random.choice(control, size=len(control), replace=True)
            treatment_boot = np.random.choice(treatment, size=len(treatment), replace=True)
            boot_diffs.append(treatment_boot.mean() - control_boot.mean())
        
        boot_diffs = np.array(boot_diffs)
        

        alpha_bootstrap = 1 - confidence_level
        ci_lower = np.percentile(boot_diffs, alpha_bootstrap/2 * 100)
        ci_upper = np.percentile(boot_diffs, (1 - alpha_bootstrap/2) * 100)
        
 
        observed_diff = treatment.mean() - control.mean()

        significant = not (ci_lower <= 0 <= ci_upper)
        
        return {
            'metric': metric_name,
            'test_type': 'bootstrap',
            'observed_diff': observed_diff,
            'ci_lower': ci_lower,
            'ci_upper': ci_upper,
            'significant': significant,
            'confidence_level': confidence_level,
            'n_bootstrap': n_bootstrap
        }
    
    def multiple_testing_correction(self,
                                   p_values: List[float],
                                   method: str = 'holm') -> Dict:


        reject, pvals_corrected, alphacSidak, alphacBonf = multipletests(
            p_values, 
            alpha=self.alpha, 
            method=method
        )
        
        fwer_uncorrected = 1 - (1 - self.alpha) ** len(p_values)
        
        return {
            'method': method,
            'original_pvalues': p_values,
            'corrected_pvalues': pvals_corrected.tolist(),
            'reject': reject.tolist(),
            'fwer_uncorrected': fwer_uncorrected,
            'num_tests': len(p_values),
            'num_significant_uncorrected': sum(p < self.alpha for p in p_values),
            'num_significant_corrected': sum(reject)
        }

    # ==========================
    # NORMALITY CHECK
    # ==========================
    def _check_normality(self,
                         control: np.ndarray,
                         treatment: np.ndarray,
                         alpha: float = 0.05) -> bool:

        control = control[~np.isnan(control)]
        treatment = treatment[~np.isnan(treatment)]

        if len(control) < 3 or len(treatment) < 3:
            return False

        _, p_c = stats.shapiro(control)
        _, p_t = stats.shapiro(treatment)

        return (p_c > alpha) and (p_t > alpha)

    # ==========================
    # MAIN PIPELINE
    # ==========================
    def run(self,
            df: pd.DataFrame,
            group_col: str,
            metric_col: str,
            metric_type: str = "continuous") -> Dict:

        results = {}

        # ======================
        # 1️⃣ SRM VALIDATION
        # ======================
        if self.validator is not None:
            srm_result = self.validator.sample_ratio_mismatch_test(
                df,
                variant_col=group_col
            )

            results["srm"] = srm_result

            if srm_result["has_srm"]:
                return {
                    "error": "Experiment failed SRM check",
                    "srm_details": srm_result
                }

        # ======================
        # 2️⃣ SPLIT DATA
        # ======================
        from itertools import combinations

        variants = df[group_col].dropna().unique()

        if len(variants) < 2:
            raise ValueError("Experiment must have at least 2 variants.")

        # =========================================
        # CASE 1 → NORMAL A/B
        # =========================================
        if len(variants) == 2:

            control = df[df[group_col] == variants[0]][metric_col].values
            treatment = df[df[group_col] == variants[1]][metric_col].values

            if len(control) == 0 or len(treatment) == 0:
                raise ValueError("Control or Treatment group is empty.")

            # lanjut logic test lama lo di bawah sini


        # =========================================
        # CASE 2 → MULTI VARIANT (A/B/C/D...)
        # =========================================
        else:

            pairwise_results = []

            for v1, v2 in combinations(variants, 2):

                df_pair = df[df[group_col].isin([v1, v2])]

                control = df_pair[df_pair[group_col] == v1][metric_col].values
                treatment = df_pair[df_pair[group_col] == v2][metric_col].values

                if len(control) == 0 or len(treatment) == 0:
                    continue

                # ==========================
                # PANGGIL TEST LO YANG LAMA
                # ==========================
                test_result = {}

                if metric_type == "continuous":
                    from scipy.stats import mannwhitneyu
                    stat, pval = mannwhitneyu(control, treatment, alternative="two-sided")
                    test_result["test_type"] = "mann_whitney"

                else:
                    from statsmodels.stats.proportion import proportions_ztest
                    count = [control.sum(), treatment.sum()]
                    nobs = [len(control), len(treatment)]
                    stat, pval = proportions_ztest(count, nobs)
                    test_result["test_type"] = "proportion_test"

                test_result["statistic"] = stat
                test_result["pvalue"] = pval
                test_result["comparison"] = f"{v1} vs {v2}"

                pairwise_results.append(test_result)

            return {
                "multi_variant": True,
                "pairwise_results": pairwise_results
            }

        # ======================
        # 3️⃣ DECISION TREE
        # ======================
        if metric_type == "binary":

            test_result = self.proportion_test(
                np.sum(control),
                len(control),
                np.sum(treatment),
                len(treatment),
                metric_col
            )

        elif metric_type == "continuous":

            is_normal = self._check_normality(control, treatment)

            if is_normal:
                test_result = self.two_sample_ttest(
                    control,
                    treatment,
                    metric_col
                )
            else:
                test_result = self.mann_whitney_u_test(
                    control,
                    treatment,
                    metric_col
                )

            test_result["normality_passed"] = is_normal

        # ======================
        # 4️⃣ MULTIPLE TESTING CORRECTION
        # ======================
        correction = self.multiple_testing_correction(
            [test_result["pvalue"]],
            method="holm"
        )

        test_result["corrected_pvalue"] = correction["corrected_pvalues"][0]
        test_result["significant_corrected"] = correction["reject"][0]

        results["analysis"] = test_result

        return results


In [43]:
def print_ab_result(result):
    if "error" in result:
        print("❌ ERROR:", result["error"])
        return
    
    analysis = result["analysis"] 

    
    print("===== A/B TEST RESULT =====")
    print("Test Type:", analysis["test_type"])
    print("P-value:", round(analysis["pvalue"], 5))
    print("Significant:", analysis["significant"])
    
    if "corrected_pvalue" in analysis:
        print("Corrected P-value:", round(analysis["corrected_pvalue"], 5))
        print("Significant (corrected):", analysis["significant_corrected"])
    
    if "relative_lift_pct" in analysis:
        print("Lift (%):", round(analysis["relative_lift_pct"], 2))
    
    if "ci_lower" in analysis:
        print("Confidence Interval:", 
              round(analysis["ci_lower"], 4), 
              "to", 
              round(analysis["ci_upper"], 4))
    
    print("============================")


In [44]:
df1 = pd.read_csv('test1_menu.csv')
df2 = pd.read_csv('test2_novelty_slider.csv')
df3 = pd.read_csv('test3_product_sliders.csv')
df4 = pd.read_csv('test4_reviews.csv')
df5 = pd.read_csv('test5_search_engine.csv')

In [7]:
df1["variant"].unique()
df1["variant"].value_counts(dropna=False)

variant
A_horizontal_menu    3500
B_dropdown_menu      3500
Name: count, dtype: int64

In [29]:
analyzer = ABTestAnalyzer(alpha=0.05)

result1 = analyzer.run(
    df=df1,
    group_col="variant",
    metric_col="revenue",
    metric_type="continuous"
)
print("Menu Design Test Result:")
print_ab_result(result1)

result2 = analyzer.run(
    df=df1,
    group_col="variant",
    metric_col="pages_viewed",
    metric_type="continuous"
)
print_ab_result(result2)

Menu Design Test Result:
===== A/B TEST RESULT =====
Test Type: mann_whitney
P-value: 0.0
Significant: True
Corrected P-value: 0.0
Significant (corrected): True
===== A/B TEST RESULT =====
Test Type: mann_whitney
P-value: 0.06748
Significant: False
Corrected P-value: 0.06748
Significant (corrected): False


In [35]:
metrics1 = df1.select_dtypes(include=np.number).columns.tolist()
metrics2 = df2.select_dtypes(include=np.number).columns.tolist()

In [46]:
datasets = {
    "Menu Design Test": df1,
    "Novelty Slider Test": df2,
    "Product Sliders Test": df3,
    "Customer Review Test": df4,
    "Search Engine Test": df5
}

In [51]:
analyzer = ABTestAnalyzer(alpha=0.05)
all_summaries = {}

for exp_name, df in datasets.items():
    
    all_results = []  # reset setiap experiment
    
    metrics = df.select_dtypes(include="number").columns.tolist()
    
    if "variant" in metrics:
        metrics.remove("variant")
    
    for metric in metrics:
        
        unique_values = df[metric].dropna().unique()
        
        if set(unique_values).issubset({0, 1}):
            metric_type = "binary"
        else:
            metric_type = "continuous"
        
        result = analyzer.run(
            df=df,
            group_col="variant",
            metric_col=metric,
            metric_type=metric_type
        )
        
        # CASE A/B
        if "analysis" in result:
            analysis = result["analysis"]
            analysis["metric"] = metric
            all_results.append(analysis)

        # CASE MULTI VARIANT
        elif "pairwise_results" in result:
            for res in result["pairwise_results"]:
                res["metric"] = metric
                all_results.append(res)

    summary_df = pd.DataFrame(all_results)
    
    correction = analyzer.multiple_testing_correction(
        summary_df["pvalue"].tolist(),
        method="holm"
    )
    
    summary_df["corrected_pvalue"] = correction["corrected_pvalues"]
    summary_df["significant_corrected"] = correction["reject"]
    
    all_summaries[exp_name] = summary_df


  res = hypotest_fun_out(*samples, **kwds)
  res = hypotest_fun_out(*samples, **kwds)


In [52]:
for exp_name, summary_df in all_summaries.items():
    
    print("="*70)
    print("A/B TEST SUMMARY REPORT")
    print(f"Experiment : {exp_name}")
    print(f"Alpha      : {analyzer.alpha}")
    print("Correction : Holm (Multiple Testing)")
    print("="*70)
    
    display(summary_df)

A/B TEST SUMMARY REPORT
Experiment : Menu Design Test
Alpha      : 0.05
Correction : Holm (Multiple Testing)


Unnamed: 0,metric,test_type,statistic,pvalue,significant,control_median,treatment_median,rank_biserial,n_control,n_treatment,normality_passed,corrected_pvalue,significant_corrected,control_rate,treatment_rate,absolute_diff,relative_lift_pct,ci_lower,ci_upper
0,pages_viewed,mann_whitney,5970453.0,0.06748254,False,2.171813,2.129282,0.025232,3500,3500,False,0.1349651,False,,,,,,
1,added_to_cart,proportion_test,-14.68221,0.0,True,,,,3500,3500,,0.0,True,0.961714,0.862286,-0.099429,-10.338681,-0.112496,-0.086362
2,bounced,proportion_test,0.9632098,0.3354422,False,,,,3500,3500,,0.3354422,False,0.434,0.445429,0.011429,2.633311,-0.011825,0.034682
3,revenue,mann_whitney,5653101.0,2.377455e-08,True,2.862354,2.602078,0.077045,3500,3500,False,7.132364e-08,True,,,,,,


A/B TEST SUMMARY REPORT
Experiment : Novelty Slider Test
Alpha      : 0.05
Correction : Holm (Multiple Testing)


Unnamed: 0,metric,test_type,statistic,pvalue,significant,control_rate,treatment_rate,absolute_diff,relative_lift_pct,ci_lower,ci_upper,n_control,n_treatment,corrected_pvalue,significant_corrected,control_median,treatment_median,rank_biserial,normality_passed
0,is_registered,proportion_test,-0.1271251,0.8988414,False,0.450625,0.449625,-0.001,-0.221914,-0.016418,0.014418,8000,8000,0.8988414,False,,,,
1,novelty_revenue,mann_whitney,33612110.0,3.418849e-08,True,,,,,,,8000,8000,1.025655e-07,True,3.773003,3.981853,-0.050378,False
2,products_added_from_novelties,proportion_test,4.472533,7.729863e-06,True,0.0015,0.00575,0.00425,283.333333,0.002389,0.006111,8000,8000,1.545973e-05,True,,,,


A/B TEST SUMMARY REPORT
Experiment : Product Sliders Test
Alpha      : 0.05
Correction : Holm (Multiple Testing)


Unnamed: 0,test_type,statistic,pvalue,comparison,metric,corrected_pvalue,significant_corrected
0,proportion_test,0.0,1.0,A_selected_by_others_only vs B_similar_product...,add_to_cart_rate,1.0,False
1,proportion_test,0.1298812,0.8966604,A_selected_by_others_only vs C_selected_by_oth...,add_to_cart_rate,1.0,False
2,proportion_test,0.1298812,0.8966604,B_similar_products_top vs C_selected_by_others...,add_to_cart_rate,1.0,False
3,mann_whitney,17675900.0,0.08053754,A_selected_by_others_only vs B_similar_product...,slider_interactions,0.4832253,False
4,mann_whitney,17715630.0,0.1250252,A_selected_by_others_only vs C_selected_by_oth...,slider_interactions,0.6251259,False
5,mann_whitney,18042470.0,0.8189313,B_similar_products_top vs C_selected_by_others...,slider_interactions,1.0,False
6,mann_whitney,15166190.0,1.9542269999999997e-50,A_selected_by_others_only vs B_similar_product...,revenue_from_recommendations,2.735917e-49,True
7,mann_whitney,17247860.0,7.371883e-05,A_selected_by_others_only vs C_selected_by_oth...,revenue_from_recommendations,0.0006634695,True
8,mann_whitney,20099620.0,1.8437710000000002e-28,B_similar_products_top vs C_selected_by_others...,revenue_from_recommendations,2.396902e-27,True
9,mann_whitney,19239580.0,6.451671e-11,A_selected_by_others_only vs B_similar_product...,products_per_order,7.096838e-10,True


A/B TEST SUMMARY REPORT
Experiment : Customer Review Test
Alpha      : 0.05
Correction : Holm (Multiple Testing)


Unnamed: 0,metric,test_type,statistic,pvalue,significant,control_rate,treatment_rate,absolute_diff,relative_lift_pct,ci_lower,ci_upper,n_control,n_treatment,corrected_pvalue,significant_corrected
0,converted,proportion_test,0.284027,0.776389,False,0.106667,0.107524,0.000857,0.803571,-0.005058,0.006772,21000,21000,0.776389,False
1,added_to_cart,proportion_test,1.192175,0.233193,False,0.826762,0.831143,0.004381,0.529893,-0.002821,0.011583,21000,21000,0.466386,False


A/B TEST SUMMARY REPORT
Experiment : Search Engine Test
Alpha      : 0.05
Correction : Holm (Multiple Testing)


Unnamed: 0,metric,test_type,statistic,pvalue,significant,control_median,treatment_median,rank_biserial,n_control,n_treatment,normality_passed,corrected_pvalue,significant_corrected,control_rate,treatment_rate,absolute_diff,relative_lift_pct,ci_lower,ci_upper
0,avg_revenue_per_visitor,mann_whitney,45481190.0,0.346066,False,0.692879,0.69305,-0.007893,9500,9500,False,1.0,False,,,,,,
1,added_to_cart,proportion_test,3.199683,0.001376,True,,,,9500,9500,,0.005503,True,0.898737,0.912316,0.013579,1.510892,0.005263,0.021894
2,converted,proportion_test,0.894315,0.371153,False,,,,9500,9500,,1.0,False,0.066211,0.069474,0.003263,4.928458,-0.003888,0.010414
3,interacted_with_search,proportion_test,-0.7468953,0.455127,False,,,,9500,9500,,1.0,False,0.349368,0.344211,-0.005158,-1.476348,-0.018693,0.008377
