In [1]:
# Install necessary libraries
import json
import pandas as pd
# from google.colab import drive
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from functools import partial
from matplotlib.colors import rgb_to_hsv
from scipy import stats
from sklearn.metrics import mean_squared_error
from scipy.optimize import minimize

In [2]:
# Mount:
# drive.mount('/content/drive', force_remount = True)

In [3]:
# Read json file
def readJsonData(JSON_FILE_PATH):
  data = None
  try:
    with open(JSON_FILE_PATH, 'r') as f:
        data = json.load(f)
    print("✅ JSON file loaded successfully.")

    metadata = data.get('metadata', {})
    print(f"\n--- Session Metadata ---")
    for key, value in metadata.items():
        print(f"{key.ljust(20)}: {value}")

  except FileNotFoundError:
      print(f"❌ ERROR: File not found at path: {JSON_FILE_PATH}")
      print("Please check the file path and ensure Google Drive is correctly mounted.")
  except json.JSONDecodeError:
      print("❌ ERROR: Could not decode JSON. Ensure the file is valid.")
  return data

# Preprocessing
def preprocessData(data):
  flat_data = []

  # Extract metadata for easy merging later
  session_metadata = data.get('metadata', {})
  session_name = session_metadata.get('sessionName', 'Unknown Session')

  # Iterate through each sample (color card)
  for sample in data.get('data', []):
      sample_number = sample.get('sampleNumber')

      # Iterate through each measurement (1 to 10) within that sample
      for capture_index, measurement in enumerate(sample.get('measurements', [])):

          # Create a dictionary for the current row
          row = {
              'session_name': session_name,
              'sample_number': sample_number,
              'capture_index': capture_index, # 0 to 9
              'lighting_condition': session_metadata.get('lightingCondition'),
              'reflective_surface': session_metadata.get('useReflectiveSurface'),

              # Sensor Data
              'pitch': measurement['angles']['pitch'],
              'roll': measurement['angles']['roll'],
          }

          # Extract Color Data (White and Color reticles, three radii each)

          # White Reticle Captures
          for radius in [0, 2, 4]:
              capture_key = f'r{radius}'
              color_data = measurement['white'].get(capture_key, {'r': 0, 'g': 0, 'b': 0})
              row[f'white_r{radius}_R'] = color_data['r']
              row[f'white_r{radius}_G'] = color_data['g']
              row[f'white_r{radius}_B'] = color_data['b']

          # Color Reticle Captures
          for radius in [0, 2, 4]:
              capture_key = f'r{radius}'
              color_data = measurement['color'].get(capture_key, {'r': 0, 'g': 0, 'b': 0})
              row[f'color_r{radius}_R'] = color_data['r']
              row[f'color_r{radius}_G'] = color_data['g']
              row[f'color_r{radius}_B'] = color_data['b']

          flat_data.append(row)

  # Convert the list of dictionaries to a Pandas DataFrame
  df = pd.DataFrame(flat_data)

  print(f"\n✅ Data flattened into DataFrame with {len(df)} rows (Total captures: 24 samples * 10 captures = 240 rows).")
  return df

# ## 3. Initial Review and Visualization

# Display the first few rows and the column types for verification.
def displayDataFrameInfo(df):
  print("\n--- DataFrame Head (First 5 Rows) ---")
  print(df.head())

  print("\n--- DataFrame Information ---")
  print(df.info())

  # ### 3.1 Check Sensor Variability

  # This checks the pitch/roll stability across all 240 measurements.
  print("\n--- Sensor Angle Statistics ---")
  print(df[['pitch', 'roll']].describe())

def merge_datasets_for_model(dfs):
    """
    Vertically concatenate multiple datasets.
    All rows are kept. Column names must be consistent.

    Parameters
    ----------
    dfs : list of pd.DataFrame
        List of datasets with identical column structure

    Returns
    -------
    pd.DataFrame
        Combined dataset
    """
    merged_df = pd.concat(dfs, axis=0, ignore_index=True)
    return merged_df



In [4]:
def combineGroundTruth(df):
  ground_truth_data = [
      {'sample_number': 1,  'label': 'Dark Skin',      'gt__R': 115, 'gt__G': 82,  'gt__B': 69},
      {'sample_number': 2,  'label': 'Light Skin',     'gt__R': 204, 'gt__G': 161, 'gt__B': 141},
      {'sample_number': 3,  'label': 'Blue Sky',       'gt__R': 101, 'gt__G': 134, 'gt__B': 179},
      {'sample_number': 4,  'label': 'Foliage',        'gt__R': 89,  'gt__G': 109, 'gt__B': 61},
      {'sample_number': 5,  'label': 'Blue Flower',    'gt__R': 141, 'gt__G': 137, 'gt__B': 194},
      {'sample_number': 6,  'label': 'Bluish Green',   'gt__R': 132, 'gt__G': 228, 'gt__B': 208},
      {'sample_number': 7,  'label': 'Orange',         'gt__R': 249, 'gt__G': 118, 'gt__B': 35},
      {'sample_number': 8,  'label': 'Purplish Blue',  'gt__R': 80,  'gt__G': 91,  'gt__B': 182},
      {'sample_number': 9,  'label': 'Moderate Red',   'gt__R': 222, 'gt__G': 91,  'gt__B': 125},
      {'sample_number': 10, 'label': 'Purple',         'gt__R': 91,  'gt__G': 63,  'gt__B': 123},
      {'sample_number': 11, 'label': 'Yellow Green',   'gt__R': 173, 'gt__G': 232, 'gt__B': 91},
      {'sample_number': 12, 'label': 'Orange Yellow',  'gt__R': 255, 'gt__G': 164, 'gt__B': 26},
      {'sample_number': 13, 'label': 'Blue',           'gt__R': 44,  'gt__G': 56,  'gt__B': 142},
      {'sample_number': 14, 'label': 'Green',          'gt__R': 74,  'gt__G': 148, 'gt__B': 81},
      {'sample_number': 15, 'label': 'Red',            'gt__R': 179, 'gt__G': 42,  'gt__B': 50},
      {'sample_number': 16, 'label': 'Yellow',         'gt__R': 250, 'gt__G': 226, 'gt__B': 21},
      {'sample_number': 17, 'label': 'Magenta',        'gt__R': 191, 'gt__G': 81,  'gt__B': 160},
      {'sample_number': 18, 'label': 'Cyan',           'gt__R': 6,   'gt__G': 142, 'gt__B': 172},
      {'sample_number': 19, 'label': 'White',          'gt__R': 252, 'gt__G': 252, 'gt__B': 252},
      {'sample_number': 20, 'label': 'Neutral 8',      'gt__R': 230, 'gt__G': 230, 'gt__B': 230},
      {'sample_number': 21, 'label': 'Neutral 6.5',    'gt__R': 200, 'gt__G': 200, 'gt__B': 200},
      {'sample_number': 22, 'label': 'Neutral 5',      'gt__R': 143, 'gt__G': 143, 'gt__B': 142},
      {'sample_number': 23, 'label': 'Neutral 3.5',    'gt__R': 100, 'gt__G': 100, 'gt__B': 100},
      {'sample_number': 24, 'label': 'Black',          'gt__R': 50,  'gt__G': 50,  'gt__B': 50},
  ]
  df_gt = pd.DataFrame(ground_truth_data)
  df = pd.merge(df, df_gt, on='sample_number', how='outer')

  return df, df_gt

def generateFinalDataFrame(df_with_gt_columns):
  # Calculate average color and corrected color values per sample_number
  avg_cols_to_compute = [
      'color_r0_R', 'color_r0_G', 'color_r0_B',
      'correction_r0_R', 'correction_r0_G', 'correction_r0_B',
      'color_r2_R', 'color_r2_G', 'color_r2_B',
      'correction_r2_R', 'correction_r2_G', 'correction_r2_B',
      'color_r4_R', 'color_r4_G', 'color_r4_B',
      'correction_r4_R', 'correction_r4_G', 'correction_r4_B'
  ]
  df_avg = df_with_gt_columns.groupby('sample_number')[avg_cols_to_compute].mean().reset_index()

  # Rename columns to 'avg_...' to clearly distinguish them
  new_avg_columns_map = {col: 'avg_' + col for col in avg_cols_to_compute}
  df_avg = df_avg.rename(columns=new_avg_columns_map)

  # Merge the df (which now has ground truth) with the averaged color data
  df_final_comparison = pd.merge(df_with_gt_columns, df_avg, on='sample_number', how='left')

  return df_final_comparison

In [5]:
# Preprocessing testing data
jsonFilePath = '../Data/Baisu1.json'
data = readJsonData(jsonFilePath)
df = preprocessData(data)
df, _ = combineGroundTruth(df)

✅ JSON file loaded successfully.

--- Session Metadata ---
sessionName         : baisu1
lightingCondition   : 0
useReflectiveSurface: False
dateTime            : 2025-11-19T15:01:46.558942

✅ Data flattened into DataFrame with 240 rows (Total captures: 24 samples * 10 captures = 240 rows).


## Method: Using Polynomial Correction:  

The correction method uses three-channel polynomial regression:

$$
\hat{L} = f_L(L_m, A_m, B_m), \quad
\hat{A} = f_A(L_m, A_m, B_m), \quad
\hat{B} = f_B(L_m, A_m, B_m)
$$

where $L_m, A_m, B_m$ are the measured HSV values, and $f_L, f_A, f_B$ are polynomial functions.  

In matrix form:

$$
\hat{C} = X \cdot \Theta, \quad
X = [1, L_m^1, \dots, L_m^d, \; A_m^1, \dots, A_m^d, \; B_m^1, \dots, B_m^d]
$$

where $d$ is the polynomial degree, $\Theta$ is the coefficient vector, and $\hat{C} = [\hat{L}, \hat{A}, \hat{B}]$.

In [6]:
# ============================================================
# Design matrix: polynomial terms
# ============================================================
def build_design_matrix(measured_h, measured_s, measured_v, degree):
    """
    Build design matrix using all HSV channels.
    Each channel's polynomial terms are included.
    
    Returns:
        X: (N, 3*degree + 1)
           [1, l^1..l^degree, a^1..a^degree, b^1..b^degree]
    """
    N = len(measured_h)
    
    bias = np.ones((N, 1))
    # polynomial terms for each channel
    X_l = np.vstack([measured_h**k for k in range(1, degree+1)]).T
    X_a = np.vstack([measured_s**k for k in range(1, degree+1)]).T
    X_b = np.vstack([measured_v**k for k in range(1, degree+1)]).T
    # concatenate all
    X = np.hstack([bias, X_l, X_a, X_b])
    return X



# ============================================================
# Loss function (MSE + boundary penalty + L2 regularization)
# ============================================================
def regression_loss(theta, X, y, reg_lambda=1e-2, lower_bound=0.0, upper_bound=1.0):
    """
    theta: regression coefficients
    X: design matrix
    y: ground truth
    """
    y_pred = X @ theta
    mse = mean_squared_error(y, y_pred)
    # Penalize predictions outside valid intensity range

    penalty_low = np.mean(np.maximum(lower_bound - y_pred, 0.0)**2)
    penalty_high = np.mean(np.maximum(y_pred - upper_bound, 0.0)**2)

    boundary_penalty = penalty_low + penalty_high
    reg = reg_lambda * np.sum(theta**2)
    return mse + reg + boundary_penalty


# ============================================================
# Optimize one color channel
# ============================================================
def fit_single_channel(measured_l, measured_a, measured_b, gt, degree, channel):
    """
    Fit polynomial model for one LAB channel using all channels.
    """
    X = build_design_matrix(measured_l, measured_a, measured_b, degree)
    # initial guess
    theta0 = np.zeros(X.shape[1])
    # roughly identity mapping for self channel
    theta0[1] = 1.0

    # Set bounds based on channel
    if channel == 'l':
        lower_bound = 0.0
        upper_bound = 100.0
    elif channel in ['a', 'b']:
        lower_bound = -128.0
        upper_bound = 127.0

    result = minimize(
        regression_loss,
        theta0,
        args=(X, gt, 1e-2, lower_bound, upper_bound),
        method="L-BFGS-B"
    )
    return {
        "theta": result.x,
        "success": result.success,
        "final_mse": mean_squared_error(gt, X @ result.x)
    }

def fit_hsv_polynomial(
    df,
    max_degree=5,
    meas_prefix="color_r4_",
    gt_prefix="gt__"
):
    """
    Fit polynomial regression for LAB channels using all channels as input.
    """
    measured_l = df[f"{meas_prefix}l"].values
    measured_a = df[f"{meas_prefix}a"].values
    measured_b = df[f"{meas_prefix}b"].values

    results = {}

    for ch, gt_col in zip(["l", "a", "b"], ["gt__l", "gt__a", "gt__b"]):
        gt = df[gt_col].values
        best_mse = np.inf
        best_result = None
        best_degree = None
        all_results = {}

        for degree in range(1, max_degree+1):
            res = fit_single_channel(measured_l, measured_a, measured_b, gt, degree, ch)
            all_results[degree] = res

            print("degree",degree,"mse",res["final_mse"])
            if res["success"] and res["final_mse"] < best_mse:
                best_mse = res["final_mse"]
                best_result = res
                best_degree = degree

        results[ch] = {
            "best_degree": best_degree,
            "theta": best_result["theta"],
            "final_mse": best_mse,
            "success": best_result["success"],
            "all_results": all_results
        }
    return results

In [7]:
# ============================================================
# Polynomial correction (all channels)
# ============================================================
def correctByPolynomial(meas_l, meas_a, meas_b, coeffs):
    """
    Correct LAB values using a polynomial model.
    coeffs: array of shape (num_features,) learned from training
    """
    coeffs = np.asarray(coeffs, dtype=np.float64)

    # ensure all inputs are arrays
    meas_l = np.atleast_1d(np.asarray(meas_l, dtype=np.float64))
    meas_a = np.atleast_1d(np.asarray(meas_a, dtype=np.float64))
    meas_b = np.atleast_1d(np.asarray(meas_b, dtype=np.float64))

    # automatically infer degree from coeffs
    # number of features per channel = degree
    # total columns = 3*degree + 1
    num_features = len(coeffs)
    degree = (num_features - 1) // 3
    if degree < 0:
        raise ValueError(f"Invalid number of coefficients: {len(coeffs)}")

    # build design matrix
    N = len(meas_l)
    bias = np.ones((N, 1))
    X_l = np.vstack([meas_l**k for k in range(1, degree+1)]).T
    X_a = np.vstack([meas_a**k for k in range(1, degree+1)]).T
    X_b = np.vstack([meas_b**k for k in range(1, degree+1)]).T
    X = np.hstack([bias, X_l, X_a, X_b])  # shape (N, 3*degree + 1)

    corr = X @ coeffs

    # return scalar if input was scalar
    return corr[0] if corr.size == 1 else corr


# ============================================================
# Dispatcher for row-wise correction
# ============================================================
from functools import partial

def apply_correction_dispatcher(row, color_prefix, radius,
                                correction_type,
                                coeffs_L=None, coeffs_A=None, coeffs_B=None):
    meas_l = row[f'{color_prefix}_l']
    meas_a = row[f'{color_prefix}_a']
    meas_b = row[f'{color_prefix}_b']

    # Polynomial correction automatically matches coeffs
    corr_l = correctByPolynomial(meas_l, meas_a, meas_b, coeffs_L)
    corr_a = correctByPolynomial(meas_l, meas_a, meas_b, coeffs_A)
    corr_b = correctByPolynomial(meas_l, meas_a, meas_b, coeffs_B)

    # Clip and convert to integer
    corr_l = np.clip(corr_l, 0, 100)
    corr_a = np.clip(corr_a, -128, 127)
    corr_b = np.clip(corr_b, -128, 127)

    return pd.Series([corr_l, corr_a, corr_b])
# ============================================================
# Main function to apply corrections to DataFrame
# ============================================================
def correctLAB(df, correction_type, coeffs_L=None, coeffs_A=None, coeffs_B=None):
    """
    Apply correction to all r0, r2, r4 columns in DataFrame.
    Supports white_scaling or polynomial (all channels)
    """

    partial_apply = partial(
        apply_correction_dispatcher,
        correction_type=correction_type,
        coeffs_L=coeffs_L,
        coeffs_A=coeffs_A,
        coeffs_B=coeffs_B
    )

    for radius in [0, 2, 4]:
        color_prefix = f'color_r{radius}'
        df[[f'correction_r{radius}_l', f'correction_r{radius}_a', f'correction_r{radius}_b']] = df.apply(
            lambda row: partial_apply(row, color_prefix, radius), axis=1
        )


    return df

In [8]:
import sys
sys.path.append('..')
import color_conversion
color_conversion.convert_rgb_cols(df, prefix='gt__', to='lab')
color_conversion.convert_rgb_cols(df, prefix='color_r0_', to='lab')
color_conversion.convert_rgb_cols(df, prefix='color_r2_', to='lab')
color_conversion.convert_rgb_cols(df, prefix='color_r4_', to='lab')
best_result = fit_hsv_polynomial(df)


best_coeffs_L = best_result["l"]["theta"]
best_coeffs_A = best_result["a"]["theta"]
best_coeffs_B = best_result["b"]["theta"]

df = correctLAB(
    df,
    correction_type='polynomial',
    coeffs_L=best_coeffs_L,
    coeffs_A=best_coeffs_A,
    coeffs_B=best_coeffs_B
)
def rgb_to_hsv_wrapper(r, g, b):
    # Normalize RGB values to [0, 1]
    rgb_normalized = np.array([r, g, b]) / 255.0
    h, s, v = rgb_to_hsv(rgb_normalized)
    return pd.Series([h, s, v])

# Convert original RGB values to HSV and add to DataFrame
rgb_column_sets = [
    ('white', 'r0'), ('white', 'r2'), ('white', 'r4'),
    ('color', 'r0'), ('color', 'r2'), ('color', 'r4'),
    ('gt', '') # This entry needs to be fixed to match the new gt__R naming
]

for prefix, radius in rgb_column_sets:
    # Special handling for ground truth to match the double underscore naming
    if prefix == 'gt':
        r_col, g_col, b_col = 'gt__R', 'gt__G', 'gt__B'
        h_col, s_col, v_col = 'gt__H', 'gt__S', 'gt__V'
    else:
        r_col, g_col, b_col = f'{prefix}_{radius}_R', f'{prefix}_{radius}_G', f'{prefix}_{radius}_B'
        h_col, s_col, v_col = f'{prefix}_{radius}_H', f'{prefix}_{radius}_S', f'{prefix}_{radius}_V'

    df[[h_col, s_col, v_col]] = df.apply(
        lambda row: rgb_to_hsv_wrapper(row[r_col], row[g_col], row[b_col]),
        axis=1, result_type='expand'
    )
displayDataFrameInfo(df)

degree 1 mse 24.33579056147413
degree 2 mse 22.808989239476457
degree 3 mse 27.395389259957973
degree 4 mse 36.1809427142855
degree 5 mse 150.92315486533462
degree 1 mse 18.243319612087017
degree 2 mse 17.83093556486562
degree 3 mse 17.082075022707045
degree 4 mse 148.38553010735134
degree 5 mse 979.5973049179613
degree 1 mse 35.70144326701417
degree 2 mse 24.000578546527176
degree 3 mse 33.312280480041
degree 4 mse 208.7997831564345
degree 5 mse 1354.3960543436806

--- DataFrame Head (First 5 Rows) ---
  session_name  sample_number  capture_index  lighting_condition  \
0       baisu1              1              0                   0   
1       baisu1              1              1                   0   
2       baisu1              1              2                   0   
3       baisu1              1              3                   0   
4       baisu1              1              4                   0   

   reflective_surface      pitch      roll  white_r0_R  white_r0_G  \
0           

In [9]:
mse_l = mean_squared_error(df['gt__l'], df['correction_r4_l'])
mse_a = mean_squared_error(df['gt__a'], df['correction_r4_a'])
mse_b = mean_squared_error(df['gt__b'], df['correction_r4_b'])
avg_mse = (mse_l + mse_a + mse_b) / 3.0
print("\nNew Polynomial correction MSE (r4):")
print(f"  L channel MSE: {mse_l:.2f}")
print(f"  A channel MSE: {mse_a:.2f}")
print(f"  B channel MSE: {mse_b:.2f}")
print(f"  Average MSE : {avg_mse:.2f}")


New Polynomial correction MSE (r4):
  L channel MSE: 22.79
  A channel MSE: 17.08
  B channel MSE: 24.00
  Average MSE : 21.29
