In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
from scipy import stats
import seaborn as sns
from transformers import AutoModelForCausalLM

In [None]:
LABEL_PAD = 2

def qqplot_points(data, dist, *params):
    percs = np.linspace(0, 100, 101)
    qn_sample = np.percentile(data, percs)
    qn_dist = dist.ppf(percs / 100.0, *params)
    return qn_sample, qn_dist

def plot_histogram(ax, data, bins, absmax_x, name='Layer'):
    ax.hist(data, bins=bins, density=True, alpha=0.6, color='g')
    ax.set_xlim(-absmax_x, absmax_x)
    ax.set_title(f'{name} - Histogram with Distribution Fits')
    ax.set_xlabel('Value', labelpad=LABEL_PAD)
    ax.set_ylabel('Density', labelpad=LABEL_PAD)
    ax.grid()

def plot_distribution_fit(ax, x, data, dist, color, label, subset_ratio, std_factor=1):
    if subset_ratio is not None and subset_ratio < 1.0:
        subset_size = int(len(data) * subset_ratio)
        data_subset = data[:subset_size]
    else:
        data_subset = data

    params = dist.fit(data_subset)
    # Adjust the std parameter for the Gaussian distribution
    if dist == stats.norm:
        params = (params[0], params[1] * std_factor)
    pdf = dist.pdf(x, *params)
    ax.plot(x, pdf, color=color, linewidth=2, label=label)
    return params

def plot_qq(ax, data, dist, params, color, label, xy_lim, line_color='r', std_factor=1, name='Layer'):
    # Adjust the std parameter for the Gaussian distribution
    if dist == stats.norm:
        params = (params[0], params[1] * std_factor)
    qn_sample, qn_dist = qqplot_points(data, dist, *params)
    ax.plot(qn_dist, qn_sample, 'o', color=color, label=label, markersize=5)
    ax.plot(qn_dist, qn_dist, color=line_color)
    ax.set_title(f'{name} - QQ Plot')
    ax.set_xlabel('Theoretical Quantiles', labelpad=LABEL_PAD)
    ax.set_ylabel('Sample Quantiles', labelpad=LABEL_PAD)
    ax.set_xlim([-xy_lim, xy_lim])
    ax.set_ylim([-xy_lim, xy_lim])
    ax.grid()

def plot_distributions(values_dict, bins=150, xlim_percentage=0.5, subset_ratio=None,
                       std_factors=[1], save_path='profiled_values.png', show_params=True):
    num_layers = len(values_dict)
    fig = plt.figure(figsize=(6, 5 * num_layers))
    gs = GridSpec(num_layers, 2, figure=fig)
    colors = sns.color_palette('muted')

    for i, (name, value) in enumerate(values_dict.items()):
        value = value.flatten()
        value = value.to(torch.float32)
        
        absmax_x = np.abs(value).max() * xlim_percentage

        # Histogram and distribution fits
        ax_hist = fig.add_subplot(gs[i, 0])
        plot_histogram(ax_hist, value, bins, absmax_x, name=name)

        x = np.linspace(-absmax_x, absmax_x, 1000)

        # Combined QQ plot
        ax_qq = fig.add_subplot(gs[i, 1])
        
        # Fit and plot Gaussian distribution
        gaussian_params_list = []
        for idx, std_factor in enumerate(std_factors):
            color = colors[idx % len(colors)]  # Cycle through colors
            gaussian_params = plot_distribution_fit(ax_hist, x, value, stats.norm, color, f'Normal ({std_factor} $\\times\\,\\sigma$)', subset_ratio, std_factor)
            plot_qq(ax_qq, value, stats.norm, gaussian_params, color, f'Normal ({std_factor} $\\times\\,\\sigma$)', xy_lim=absmax_x, std_factor=std_factor)
            gaussian_params_list.append(gaussian_params)

        # Fit and plot Student's t distribution
        student_params = plot_distribution_fit(ax_hist, x, value, stats.t, 'g', "Student's t", subset_ratio)
        plot_qq(ax_qq, value, stats.t, student_params, 'g', "Student's t", xy_lim=absmax_x, name=name)

        # Add text for parameters
        if show_params:
            text_str = "Params:\n"
            for idx, std_factor in enumerate(std_factors):
                text_str += f"Normal: (std x {std_factor}): mean={gaussian_params_list[idx][0]:.2f}, std={gaussian_params_list[idx][1]:.2f}\n"
            text_str += f"Student's t: df={student_params[0]:.2f}, loc={student_params[1]:.2f}, scale={student_params[2]:.2f}"
            ax_qq.text(0.95, 0.05, text_str, transform=ax_qq.transAxes, fontsize=10, verticalalignment='bottom', horizontalalignment='right')

        ax_hist.set_title(f'Weight Distribution')
        ax_qq.set_title(f'QQ Plot')
        ax_hist.legend()
        ax_qq.legend()

    plt.tight_layout()
    plt.subplots_adjust(top=0.95, wspace=0.34)
    plt.savefig(save_path, dpi=300)
    plt.show()

In [None]:
def calculate_distribution_fits(values_dict, subset_ratio=None):
    fit_dict = {}  # Dictionary to store fit results for each layer

    for name, value in values_dict.items():
        value = value.flatten()
        value = value.to(torch.float32)

        # Fit Normal and Student's t distributions
        norm_params = fit_distribution(value, stats.norm, subset_ratio)
        student_params = fit_distribution(value, stats.t, subset_ratio)

        # Perform KS test for both distributions
        ks_stat_norm, _ = stats.kstest(value, 'norm', norm_params)
        ks_stat_student, _ = stats.kstest(value, 't', student_params)

        dof_student = min(student_params[0], 15)

        # Store the results in the dictionary
        fit_dict[name] = {
            'norm_params': norm_params,
            'student_params': student_params,
            'dof_student': dof_student,
            'ks_norm': ks_stat_norm,
            'ks_student': ks_stat_student
        }

    return fit_dict

def aggregate_and_analyze(fit_dict):
    # Initialize dictionaries to hold values for each group
    grouped_dofs = {}
    grouped_ks_norm = {}
    grouped_ks_student = {}

    # Grouping the values
    for layer_name, stats in fit_dict.items():
        # Extract the last part of the layer name
        layer_type = layer_name.split('.')[-1]

        # Aggregate degrees of freedom for Student's t-distribution and KS test statistics
        dof = stats['dof_student']
        grouped_dofs.setdefault(layer_type, []).append(dof)
        grouped_ks_norm.setdefault(layer_type, []).append(stats['ks_norm'])
        grouped_ks_student.setdefault(layer_type, []).append(stats['ks_student'])

    # Calculating mean and standard deviation for each group
    aggregated_stats = {}
    for layer_type in grouped_dofs:
        mean_dof = np.mean(grouped_dofs[layer_type])
        std_dof = np.std(grouped_dofs[layer_type])
        mean_ks_norm = np.mean(grouped_ks_norm[layer_type])
        mean_ks_student = np.mean(grouped_ks_student[layer_type])

        aggregated_stats[layer_type] = {
            'mean_dof': mean_dof, 'std_dof': std_dof,
            'mean_ks_norm': mean_ks_norm, 'mean_ks_student': mean_ks_student
        }

    # Calculate overall mean and std for degrees of freedom and KS test statistics
    all_dofs = [stat['dof_student'] for stat in fit_dict.values()]
    all_ks_norm = [stat['ks_norm'] for stat in fit_dict.values()]
    all_ks_student = [stat['ks_student'] for stat in fit_dict.values()]

    total_mean_dof = np.mean(all_dofs)
    total_std_dof = np.std(all_dofs)
    total_mean_ks_norm = np.mean(all_ks_norm)
    total_mean_ks_student = np.mean(all_ks_student)

    # Add a "total" group to the aggregated statistics
    aggregated_stats['total'] = {
        'mean_dof': total_mean_dof, 'std_dof': total_std_dof,
        'mean_ks_norm': total_mean_ks_norm, 'mean_ks_student': total_mean_ks_student
    }

    return aggregated_stats


def fit_distribution(data, distribution, subset_ratio=None):
    if subset_ratio is not None and subset_ratio < 1.0:
        subset_size = int(len(data) * subset_ratio)
        data = data[:subset_size]  # Slicing the data array to get the subset
    
    params = distribution.fit(data)
    return params

def print_dict_in_lines(d, indent=0):
    for key, value in d.items():
        if isinstance(value, dict):
            print(' ' * indent + f"{key}:")
            print_dict_in_lines(value, indent + 2)
        else:
            print(' ' * indent + f"{key} -> {value:}")

def format_values_single_string(s: str):
    import re

    # Extracting the values
    mean_dof = float(re.search(r"mean_dof -> ([\d.]+)", s).group(1))
    std_dof = float(re.search(r"std_dof -> ([\d.]+)", s).group(1))
    mean_ks_norm = float(re.search(r"mean_ks_norm -> ([\d.]+)", s).group(1))
    mean_ks_student = float(re.search(r"mean_ks_student -> ([\d.]+)", s).group(1))

    # Calculating the difference between mean_ks_norm and mean_ks_student
    ks_difference = mean_ks_norm - mean_ks_student

    # Formatting into a single string
    single_string_output = f"{mean_dof:.2f}\(_{{{std_dof:.2f}}}\) & {ks_difference:.3f}"

    return single_string_output

In [None]:
model_name = 'facebook/opt-125m'

# model = timm.create_model(model_name, pretrained=True)
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype="auto", trust_remote_code=True)
# model = AutoModel.from_pretrained(model_name, trust_remote_code=True)
# model = models.resnet18(pretrained=True)
module_types = (torch.nn.Linear, torch.nn.Conv1d, torch.nn.Conv2d)

In [None]:
# Only analyze a subset of layers for testing
MAX_LAYERS = 1
# Subset ratio for reducing the number of samples for testing
SUBSET_RATIO = 0.1

weights = {name: module.weight.data for name, module in model.named_modules() if isinstance(module, module_types) and \
                                                                                            'embed' not in name and 'layer_norm' not in name}
first_layers = dict(list(weights.items())[0:MAX_LAYERS])
print(first_layers.keys())

plot_distributions(first_layers, bins=100, subset_ratio=SUBSET_RATIO, std_factors=[1.0], xlim_percentage=0.2, show_params=True)

In [None]:
weight_raw_dict = calculate_distribution_fits(first_layers, subset_ratio=0.1)
print_dict_in_lines(weight_raw_dict)

# Analyze the DOF dictionary
aggregated_stats = aggregate_and_analyze(weight_raw_dict)

# Usage with your aggregated_stats dictionary
print("Aggregated Stats:")
print_dict_in_lines(aggregated_stats)

In [None]:
activation_list = []
activation_names = []
counter = 0

MAX_LAYERS = 10
# Subset ratio for reducing the number of samples for testing
SUBSET_RATIO = 0.1
INPUT_LENGTH = 64
FILTERS = []

def collect_activations(module, input, output):
    output = output.to(torch.float32)
    activation_list.append(output.detach().cpu())

# Attaching hooks with a break after attaching to the first N modules of interest
# and skipping modules based on filters
for name, module in model.named_modules():
    # Check if current module's name contains any filter term
    if isinstance(module, module_types) and not any(filter_term in name for filter_term in FILTERS):
        module.register_forward_hook(lambda m, i, o: collect_activations(m, i, o))
        activation_names.append(name)  # 'name' includes the full path
        counter += 1  # Increment the counter
        if counter >= MAX_LAYERS:  # Check if the counter has reached the limit
            break  # Break out of the loop

# Assuming a vocabulary size of 1024 for the language model
input_data = torch.randint(low=0, high=1024, size=(1, INPUT_LENGTH))

# Forward pass with input data to collect activations
with torch.no_grad():
    model(input_data)

# dict from names and list
activation_dict = dict(zip(activation_names, activation_list))

# Now you can use the plot_weight_distributions function on activation_list
act_dist_dict = calculate_distribution_fits(activation_dict, subset_ratio=SUBSET_RATIO)
print_dict_in_lines(act_dist_dict)

# Analayze the DOF dictionary
aggregated_stats = aggregate_and_analyze(act_dist_dict)

# Usage with your aggregated_stats dictionary
print("Aggregated Stats:")
print_dict_in_lines(aggregated_stats)

In [None]:
# Define the range of degrees of freedom
dfs = [1, 2, 3, 4, 5, 10, 100]

# Create a range of x values for plotting
x = np.linspace(-4, 4, 1000)

# Plot t distributions for each degree of freedom
plt.figure(figsize=(6, 4))
for df in dfs:
    # Calculate the t distribution
    rv = stats.t(df, loc=0, scale=1)
    plt.plot(x, rv.pdf(x), label=r"$\nu={}$".format(df))  # LaTeX for ν=df

plt.title('Student\'s t-Distributions')
plt.xlabel('x')
plt.ylabel('Probability Density')
# Create legend with a title
legend = plt.legend(title='Degrees \nof Freedom')
# Center-align the title of the legend
legend.get_title().set_ha("center")
plt.grid(True)
plt.tight_layout()
plt.savefig("t-distributions", dpi=300)
plt.show()