
# Quantium Virtual Internship – Task 2 (Python Version)

This notebook mirrors the **R Markdown template** for Task 2, but implements the analysis in **Python**.

You will:
- Load the **QVI_data.csv** dataset
- Build **monthly metrics** for each store
- **Select control stores** for trial stores 77, 86, and 88 using a combined metric (correlation + magnitude distance)
- **Assess the trial impact** during Feb–Apr 2019 for sales and customers
- **Visualize** the results and **export** charts for your client deck

> **Note:** Keep the file `QVI_data.csv` in the same folder as this notebook, or update the path in the config cell below.


## 0. Configuration

In [None]:

# Paths & settings
DATA_PATH = "QVI_data.csv"   # change if needed
OUTPUT_DIR = "outputs"
TRIAL_STORES = [77, 86, 88]

# Time windows
# Pre-trial: May 2018 → Jan 2019 (inclusive). If you want exactly 8 months, adjust the start to Jun 2018.
PRETRIAL_START = "2018-05"
PRETRIAL_END   = "2019-01"
TRIAL_START    = "2019-02"
TRIAL_END      = "2019-04"

# Weights for control-store scoring
CORR_WEIGHT = 0.5  # weight on correlation within (0,1); magnitude weight is (1 - CORR_WEIGHT)

# Ensure output directory exists
import os
os.makedirs(OUTPUT_DIR, exist_ok=True)
print("Output dir:", os.path.abspath(OUTPUT_DIR))


## 1. Imports

In [None]:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# IMPORTANT: Follow the project rule — only matplotlib, single-plot figs, no explicit colors.
# Do not import seaborn.

from typing import Tuple, Dict, List


## 2. Load Data

In [None]:

# Load data; adjust parse_dates if your file uses a different date column name
df = pd.read_csv(DATA_PATH)

# Common Quantium columns: DATE, STORE_NBR, LYLTY_CARD_NBR, TXN_ID, TOT_SALES, etc.
# Make sure DATE is parsed to datetime
if 'DATE' in df.columns:
    df['DATE'] = pd.to_datetime(df['DATE'])
else:
    raise ValueError("Expected a 'DATE' column in QVI_data.csv")

# Minimal validation
expected_cols = {'STORE_NBR', 'DATE', 'TOT_SALES', 'LYLTY_CARD_NBR', 'TXN_ID'}
missing = expected_cols - set(df.columns)
if missing:
    raise ValueError(f"Missing expected columns in QVI_data.csv: {missing}")

print(df.head())
print("Rows:", len(df), "Unique stores:", df['STORE_NBR'].nunique())


## 3. Build Monthly Metrics

In [None]:

# Create MONTH period and derive monthly metrics
df['MONTH'] = df['DATE'].dt.to_period('M')

monthly = (
    df.groupby(['STORE_NBR', 'MONTH'])
      .agg(
          total_sales=('TOT_SALES', 'sum'),
          total_customers=('LYLTY_CARD_NBR', pd.Series.nunique),
          total_transactions=('TXN_ID', 'nunique')
      )
      .reset_index()
)

monthly['avg_txn_per_customer'] = (
    monthly['total_transactions'] / monthly['total_customers']
).replace([np.inf, -np.inf], np.nan)

# Keep only stores available throughout pre-trial (full observation)
pre_mask = (monthly['MONTH'] >= PRETRIAL_START) & (monthly['MONTH'] <= PRETRIAL_END)
pre_counts = (
    monthly.loc[pre_mask]
           .groupby('STORE_NBR')['MONTH']
           .nunique()
           .reset_index(name='n_months')
)

# Require full pre-trial coverage
full_pre_stores = pre_counts.loc[pre_counts['n_months'] == pre_counts['n_months'].max(), 'STORE_NBR']
monthly_full = monthly[monthly['STORE_NBR'].isin(full_pre_stores)]

print("Monthly head:")
print(monthly_full.head())
print("Stores with full pre-trial coverage:", len(full_pre_stores))


## 4. Control Store Selection Functions

In [None]:

def _align_series_for_period(m: pd.DataFrame, store: int, metric: str,
                             start: str, end: str) -> pd.Series:
    """Return a metric series indexed by MONTH (string) for a store, clipped to [start, end]."""
    sel = m[(m['STORE_NBR'] == store) & (m['MONTH'] >= start) & (m['MONTH'] <= end)][['MONTH', metric]].copy()
    sel['MONTH'] = sel['MONTH'].astype(str)
    return sel.set_index('MONTH')[metric].sort_index()

def correlation_score(m: pd.DataFrame, trial_store: int, candidate_store: int,
                      metric: str, start: str, end: str) -> float:
    s_trial = _align_series_for_period(m, trial_store, metric, start, end)
    s_cand  = _align_series_for_period(m, candidate_store, metric, start, end)
    common = s_trial.index.intersection(s_cand.index)
    if len(common) < 2:
        return np.nan
    return float(pd.Series(s_trial.loc[common]).corr(pd.Series(s_cand.loc[common])))

def magnitude_distance_score(m: pd.DataFrame, trial_store: int,
                             metric: str, start: str, end: str) -> pd.DataFrame:
    """Compute standardized magnitude distance per month then average.
    Returns DataFrame with columns [Store2, mag_score]."""
    trial_series = _align_series_for_period(m, trial_store, metric, start, end)
    months = trial_series.index.tolist()
    cand_stores = sorted(set(m['STORE_NBR']) - {trial_store})

    rows = []
    for s in cand_stores:
        s_series = _align_series_for_period(m, s, metric, start, end)
        common = [mo for mo in months if mo in s_series.index]
        if len(common) == 0:
            continue
        diffs = (trial_series.loc[common] - s_series.loc[common]).abs()
        for mo, val in diffs.items():
            rows.append({'Store1': trial_store, 'Store2': s, 'MONTH': mo, 'measure': float(val)})
    if not rows:
        return pd.DataFrame(columns=['Store2', 'mag_score'])

    diffs_df = pd.DataFrame(rows)
    minmax = diffs_df.groupby(['Store1', 'MONTH'])['measure'].agg(['min','max']).reset_index()
    merged = diffs_df.merge(minmax, on=['Store1','MONTH'], how='left')
    denom = (merged['max'] - merged['min']).replace(0, np.nan)
    merged['magnitudeMeasure'] = 1 - (merged['measure'] - merged['min']) / denom
    merged['magnitudeMeasure'] = merged['magnitudeMeasure'].fillna(1.0)

    mag = merged.groupby(['Store1','Store2'])['magnitudeMeasure'].mean().reset_index()
    mag = mag.rename(columns={'magnitudeMeasure':'mag_score'})
    return mag[['Store2','mag_score']]

def rank_control_candidates(m: pd.DataFrame, trial_store: int,
                            metric: str, start: str, end: str,
                            corr_weight: float = 0.5) -> pd.DataFrame:
    cand_stores = sorted(set(m['STORE_NBR']) - {trial_store})
    corr_rows = []
    for s in cand_stores:
        c = correlation_score(m, trial_store, s, metric, start, end)
        corr_rows.append({'Store2': s, 'corr': c})
    corr_df = pd.DataFrame(corr_rows)

    mag_df = magnitude_distance_score(m, trial_store, metric, start, end)

    out = corr_df.merge(mag_df, on='Store2', how='left')
    out['mag_score'] = out['mag_score'].fillna(0)
    out['corr'] = out['corr'].fillna(0)
    out['score'] = corr_weight * out['corr'] + (1 - corr_weight) * out['mag_score']
    out = out.sort_values('score', ascending=False).reset_index(drop=True)
    return out


## 5. Composite Scoring (Sales + Customers)

In [None]:

def select_control_store(m: pd.DataFrame, trial_store: int,
                         start: str, end: str,
                         corr_weight: float = CORR_WEIGHT) -> pd.DataFrame:
    sales_rank = rank_control_candidates(m, trial_store, 'total_sales', start, end, corr_weight)
    cust_rank  = rank_control_candidates(m, trial_store, 'total_customers', start, end, corr_weight)
    merged = sales_rank.merge(cust_rank, on='Store2', suffixes=('_sales','_cust'))
    merged['final_score'] = 0.5 * merged['score_sales'] + 0.5 * merged['score_cust']
    merged = merged.sort_values('final_score', ascending=False).reset_index(drop=True)
    return merged


## 6. Trial Assessment Helpers

In [None]:

def scale_control_to_trial(m: pd.DataFrame, trial_store: int, control_store: int,
                           metric: str, pre_start: str, pre_end: str) -> float:
    trial_sum = m[(m['STORE_NBR']==trial_store) & (m['MONTH']>=pre_start) & (m['MONTH']<=pre_end)][metric].sum()
    control_sum = m[(m['STORE_NBR']==control_store) & (m['MONTH']>=pre_start) & (m['MONTH']<=pre_end)][metric].sum()
    if control_sum == 0:
        return np.nan
    return float(trial_sum / control_sum)

def monthly_percentage_diff(m: pd.DataFrame, trial_store: int, control_store: int,
                            metric: str, start: str, end: str, scale_factor: float) -> pd.DataFrame:
    t = m[(m['STORE_NBR']==trial_store) & (m['MONTH']>=start) & (m['MONTH']<=end)][['MONTH', metric]].copy()
    c = m[(m['STORE_NBR']==control_store) & (m['MONTH']>=start) & (m['MONTH']<=end)][['MONTH', metric]].copy()
    t['MONTH'] = t['MONTH'].astype(str)
    c['MONTH'] = c['MONTH'].astype(str)
    merged = t.merge(c, on='MONTH', suffixes=('_trial','_control'))
    merged['scaled_control'] = merged[f'{metric}_control'] * scale_factor
    merged['pct_diff'] = (merged[f'{metric}_trial'] - merged['scaled_control']).abs() / merged['scaled_control']
    return merged[['MONTH','pct_diff', f'{metric}_trial','scaled_control']].sort_values('MONTH')

def control_bands_from_pretrial(pdiff_pretrial: pd.DataFrame) -> float:
    return float(pdiff_pretrial['pct_diff'].std(ddof=1))

def plot_trial_with_bands(m: pd.DataFrame, trial_store: int, control_store: int,
                          metric: str, pre_start: str, pre_end: str,
                          trial_start: str, trial_end: str, figpath: str = None):
    sf = scale_control_to_trial(m, trial_store, control_store, metric, pre_start, pre_end)
    if np.isnan(sf):
        print("Skipping plot: invalid scaling factor")
        return
    pre = monthly_percentage_diff(m, trial_store, control_store, metric, pre_start, pre_end, sf)
    std = control_bands_from_pretrial(pre)

    full_start, full_end = pre_start, trial_end
    t = m[(m['STORE_NBR']==trial_store) & (m['MONTH']>=full_start) & (m['MONTH']<=full_end)][['MONTH', metric]].copy()
    c = m[(m['STORE_NBR']==control_store) & (m['MONTH']>=full_start) & (m['MONTH']<=full_end)][['MONTH', metric]].copy()
    t['MONTH'] = t['MONTH'].astype(str); c['MONTH'] = c['MONTH'].astype(str)
    full = t.merge(c, on='MONTH', suffixes=('_trial','_control')).sort_values('MONTH')
    full['scaled_control'] = full[f'{metric}_control'] * sf

    full['upper'] = full['scaled_control'] * (1 + 2*std)
    full['lower'] = full['scaled_control'] * (1 - 2*std)

    plt.figure(figsize=(10, 5))
    plt.plot(full['MONTH'], full[f'{metric}_trial'], label=f'Trial {trial_store}')
    plt.plot(full['MONTH'], full['scaled_control'], label=f'Control {control_store} (scaled)')
    plt.plot(full['MONTH'], full['upper'], label='Control 95% band')
    plt.plot(full['MONTH'], full['lower'], label='Control 5% band')
    plt.title(f"{metric.replace('_',' ').title()} — Store {trial_store} vs Control {control_store}")
    plt.xlabel("Month")
    plt.ylabel(metric.replace('_',' ').title())
    plt.xticks(rotation=45)
    plt.legend()
    plt.tight_layout()
    if figpath:
        plt.savefig(figpath, dpi=150, bbox_inches='tight')
    plt.show()

    return std, full


## 7. Control Selection for Each Trial Store

In [None]:

control_choices = {}
for ts in TRIAL_STORES:
    table = select_control_store(monthly_full, ts, PRETRIAL_START, PRETRIAL_END, CORR_WEIGHT)
    control_choices[ts] = table
    print(f"Top candidates for trial store {ts}:")
    display(table.head(5))


## 8. Trial Assessment (Sales & Customers)

In [None]:

summary_rows = []

for ts in TRIAL_STORES:
    cand_table = control_choices[ts]
    if cand_table.empty:
        print(f"No candidates for store {ts}. Skipping.")
        continue
    control = int(cand_table.iloc[0]['Store2'])

    # SALES
    fig_sales = f"{OUTPUT_DIR}/store_{ts}_vs_{control}_sales.png"
    std_sales, sales_frame = plot_trial_with_bands(
        monthly_full, ts, control, 'total_sales',
        PRETRIAL_START, PRETRIAL_END, TRIAL_START, TRIAL_END, figpath=fig_sales
    )

    # CUSTOMERS
    fig_cust = f"{OUTPUT_DIR}/store_{ts}_vs_{control}_customers.png"
    std_cust, cust_frame = plot_trial_with_bands(
        monthly_full, ts, control, 'total_customers',
        PRETRIAL_START, PRETRIAL_END, TRIAL_START, TRIAL_END, figpath=fig_cust
    )

    # Significance: count months where trial is outside bands (≥2/3 indicates success)
    def count_outside(frame, metric_name):
        tw = frame[(frame['MONTH']>=TRIAL_START) & (frame['MONTH']<=TRIAL_END)]
        above = (tw[f'{metric_name}_trial'] > tw['upper']).sum()
        below = (tw[f'{metric_name}_trial'] < tw['lower']).sum()
        return int(above + below)

    sales_outside = count_outside(sales_frame, 'total_sales')
    cust_outside  = count_outside(cust_frame,  'total_customers')

    # Uplift during trial (%)
    def mean_uplift(frame, metric_label):
        tw = frame[(frame['MONTH']>=TRIAL_START) & (frame['MONTH']<=TRIAL_END)]
        return float( (tw[f'{metric_label}_trial'].mean() - tw['scaled_control'].mean()) / tw['scaled_control'].mean() * 100 )

    uplift_sales = mean_uplift(sales_frame, 'total_sales')
    uplift_cust  = mean_uplift(cust_frame,  'total_customers')

    rec = "Recommend rollout" if (sales_outside >= 2 or cust_outside >= 2) and uplift_sales > 0 else "No rollout (insufficient evidence)"

    summary_rows.append({
        'trial_store': ts,
        'control_store': control,
        'sales_outside_band_months': sales_outside,
        'customers_outside_band_months': cust_outside,
        'avg_sales_uplift_%': round(uplift_sales, 2),
        'avg_customers_uplift_%': round(uplift_cust, 2),
        'std_sales_pctdiff_pretrial': round(std_sales, 4),
        'std_customers_pctdiff_pretrial': round(std_cust, 4),
        'recommendation': rec,
        'sales_fig': fig_sales,
        'customers_fig': fig_cust
    })

summary = pd.DataFrame(summary_rows)
print("Summary:")
display(summary)
summary_path = f"{OUTPUT_DIR}/trial_summary.csv"
summary.to_csv(summary_path, index=False)
print("Saved summary to:", summary_path)


## 9. Exporting to PDF


To export this notebook to **PDF**:
1. Run all cells (Kernel → Restart & Run All).
2. Use **File → Download as → PDF** (or **Print to PDF** from your browser).
3. Include the images saved in the `outputs/` folder in your client report.
