# Experiment: Model Internals - Why Instructions Change Behavior

**Goal:** Understand the internal mechanisms through which system prompts affect model outputs.

**Analysis Focus:**
- Hidden state, attention, MLP, and residual stream changes across layers
- Which layers are most affected by system prompts?
- Impact of system prompt length, specific tokens/phrases, and token positions
- Correlation between internal changes and output changes

In [None]:
import sys, os
if 'google.colab' in sys.modules:
    import shutil
    if os.path.exists('/content/LLM-Instruction-Understanding'):
        shutil.rmtree('/content/LLM-Instruction-Understanding')
    !git clone https://github.com/maralkh/LLM-Instruction-Understanding.git
    os.chdir('/content/LLM-Instruction-Understanding')
    !pip install -q -r requirements.txt
    sys.path.insert(0, '/content/LLM-Instruction-Understanding')
else:
    sys.path.insert(0, os.path.abspath('..'))

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.auto import tqdm
from scipy import stats
import torch
import re

from src.model_utils import load_model
from src.test_configs import get_all_test_prompts, get_core_system_prompts, get_system_prompts, build_chat_prompt

plt.style.use('seaborn-v0_8-whitegrid')

In [None]:
model = load_model("TinyLlama/TinyLlama-1.1B-Chat-v1.0")
layer_info = model.get_layer_info()
print(f"Model: {layer_info['model_name']}")
print(f"Layers: {layer_info['n_layers']}, Heads: {layer_info['n_heads']}, Hidden: {layer_info['hidden_size']}")

## 1. Enhanced Internals Comparison (Hidden States, Attention, MLP, Residuals)

In [None]:
def compute_component_similarities(model, prompt1, prompt2):
    """
    Compute similarities for different model components:
    - Hidden states (output of each layer)
    - Attention patterns
    - MLP outputs (approximated via residual difference)
    - Residual stream
    """
    inputs1 = model.tokenizer(prompt1, return_tensors="pt").to(model.config.device)
    inputs2 = model.tokenizer(prompt2, return_tensors="pt").to(model.config.device)
    
    with torch.no_grad():
        out1 = model.model(**inputs1, output_hidden_states=True, output_attentions=True)
        out2 = model.model(**inputs2, output_hidden_states=True, output_attentions=True)
    
    results = {'layer': [], 'component': [], 'cosine_sim': [], 'l2_norm': []}
    
    n_layers = len(out1.hidden_states) - 1  # Exclude embedding layer
    
    for layer_idx in range(n_layers):
        # Hidden states (layer output)
        hs1 = out1.hidden_states[layer_idx + 1][0, -1, :].float().cpu().numpy()
        hs2 = out2.hidden_states[layer_idx + 1][0, -1, :].float().cpu().numpy()
        
        cos_sim = np.dot(hs1, hs2) / (np.linalg.norm(hs1) * np.linalg.norm(hs2) + 1e-10)
        l2 = np.linalg.norm(hs1 - hs2)
        
        results['layer'].append(layer_idx)
        results['component'].append('hidden_state')
        results['cosine_sim'].append(float(cos_sim) if np.isfinite(cos_sim) else 0.0)
        results['l2_norm'].append(float(l2))
        
        # Attention patterns (average over heads, last query position)
        if out1.attentions and layer_idx < len(out1.attentions):
            attn1 = out1.attentions[layer_idx][0, :, -1, :].float().cpu().numpy().flatten()
            attn2 = out2.attentions[layer_idx][0, :, -1, :].float().cpu().numpy().flatten()
            
            # Pad to same length
            max_len = max(len(attn1), len(attn2))
            attn1_pad = np.pad(attn1, (0, max_len - len(attn1)))
            attn2_pad = np.pad(attn2, (0, max_len - len(attn2)))
            
            cos_sim = np.dot(attn1_pad, attn2_pad) / (np.linalg.norm(attn1_pad) * np.linalg.norm(attn2_pad) + 1e-10)
            l2 = np.linalg.norm(attn1_pad - attn2_pad)
            
            results['layer'].append(layer_idx)
            results['component'].append('attention')
            results['cosine_sim'].append(float(cos_sim) if np.isfinite(cos_sim) else 0.0)
            results['l2_norm'].append(float(l2))
        
        # Residual stream change (difference between consecutive layers)
        if layer_idx > 0:
            prev_hs1 = out1.hidden_states[layer_idx][0, -1, :].float().cpu().numpy()
            prev_hs2 = out2.hidden_states[layer_idx][0, -1, :].float().cpu().numpy()
            
            # Residual = current - previous (approximates layer contribution)
            res1 = hs1 - prev_hs1
            res2 = hs2 - prev_hs2
            
            cos_sim = np.dot(res1, res2) / (np.linalg.norm(res1) * np.linalg.norm(res2) + 1e-10)
            l2 = np.linalg.norm(res1 - res2)
            
            results['layer'].append(layer_idx)
            results['component'].append('residual')
            results['cosine_sim'].append(float(cos_sim) if np.isfinite(cos_sim) else 0.0)
            results['l2_norm'].append(float(l2))
    
    return pd.DataFrame(results)

In [None]:
test_prompts = get_all_test_prompts()[:5]
system_prompts = get_core_system_prompts()
baseline_sys = system_prompts['none']

all_component_results = []

for test in tqdm(test_prompts, desc="Analyzing components"):
    for sys_name, sys_info in system_prompts.items():
        if sys_name == 'none':
            continue
        
        try:
            prompt_base = build_chat_prompt(baseline_sys['text'], test['prompt'], model.tokenizer)
            prompt_var = build_chat_prompt(sys_info['text'], test['prompt'], model.tokenizer)
            
            comp_df = compute_component_similarities(model, prompt_base, prompt_var)
            comp_df['test_id'] = test['id']
            comp_df['category'] = test['category']
            comp_df['system_prompt'] = sys_name
            
            all_component_results.append(comp_df)
        except Exception as e:
            print(f"Error for {test['id']}/{sys_name}: {e}")

component_df = pd.concat(all_component_results, ignore_index=True)
print(f"Collected {len(component_df)} component measurements")

In [None]:
# Component similarity by layer
component_by_layer = component_df.groupby(['layer', 'component'])['cosine_sim'].mean().unstack()

print("=== Component Similarity by Layer ===")
print(component_by_layer.round(4))

In [None]:
# Visualize component similarities across layers
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

for ax, component in zip(axes, ['hidden_state', 'attention', 'residual']):
    if component in component_by_layer.columns:
        data = component_by_layer[component].dropna()
        ax.plot(data.index, data.values, 'o-', linewidth=2, markersize=6)
        ax.set_xlabel('Layer')
        ax.set_ylabel('Cosine Similarity')
        ax.set_title(f'{component.replace("_", " ").title()} Similarity')
        ax.set_ylim(0, 1.05)
        ax.axhline(y=1.0, color='gray', linestyle='--', alpha=0.5)

plt.tight_layout()
plt.savefig('../results/component_similarity_by_layer.png', dpi=150)
plt.show()

In [None]:
# Heatmap: Component x System Prompt
comp_sys = component_df.groupby(['system_prompt', 'component'])['cosine_sim'].mean().unstack()

fig, ax = plt.subplots(figsize=(10, 6))
sns.heatmap(comp_sys, annot=True, fmt='.3f', cmap='RdYlGn', ax=ax, vmin=0.5, vmax=1.0)
ax.set_title('Component Similarity by System Prompt\n(higher = more similar to baseline)')
plt.tight_layout()
plt.savefig('../results/component_by_system_prompt.png', dpi=150)
plt.show()

## 2. System Prompt Length vs Impact Analysis

In [None]:
# Get all system prompts (not just core) for more length variation
all_system_prompts = get_system_prompts()

# Calculate system prompt properties
sys_properties = []
for sys_name, sys_info in all_system_prompts.items():
    text = sys_info['text']
    tokens = model.tokenizer(text, return_tensors="pt").input_ids
    
    sys_properties.append({
        'system_prompt': sys_name,
        'text': text,
        'char_length': len(text),
        'token_length': tokens.shape[1],
        'word_count': len(text.split()),
        'has_persona': 'you are' in text.lower() or 'act as' in text.lower(),
        'has_format': 'format' in text.lower() or 'structure' in text.lower(),
        'has_thinking': 'step' in text.lower() or 'think' in text.lower() or 'reason' in text.lower(),
        'has_constraint': 'must' in text.lower() or 'always' in text.lower() or 'never' in text.lower(),
        'has_concise': 'concise' in text.lower() or 'brief' in text.lower() or 'short' in text.lower(),
    })

sys_props_df = pd.DataFrame(sys_properties)
print("=== System Prompt Properties ===")
print(sys_props_df[['system_prompt', 'token_length', 'word_count', 'has_persona', 'has_thinking', 'has_constraint']].to_string())

In [None]:
# Collect impact data for all system prompts
test_subset = get_all_test_prompts()[:3]  # Use fewer tests for speed
baseline_sys = all_system_prompts.get('none', {'text': ''})

length_impact_results = []

for test in tqdm(test_subset, desc="Length analysis"):
    for sys_name, sys_info in all_system_prompts.items():
        if sys_name == 'none':
            continue
        
        try:
            prompt_base = build_chat_prompt(baseline_sys['text'], test['prompt'], model.tokenizer)
            prompt_var = build_chat_prompt(sys_info['text'], test['prompt'], model.tokenizer)
            
            comparison = model.compare_internals(prompt_base, prompt_var)
            
            # Get average hidden state similarity across layers
            avg_hs_sim = np.mean([v['cosine_sim'] for v in comparison['hidden_state_diff'].values()])
            
            # Get properties
            props = sys_props_df[sys_props_df['system_prompt'] == sys_name].iloc[0]
            
            length_impact_results.append({
                'test_id': test['id'],
                'system_prompt': sys_name,
                'token_length': props['token_length'],
                'word_count': props['word_count'],
                'has_persona': props['has_persona'],
                'has_thinking': props['has_thinking'],
                'has_constraint': props['has_constraint'],
                'has_concise': props['has_concise'],
                'avg_hs_similarity': avg_hs_sim,
                'logit_similarity': comparison['logit_diff']['cosine_sim'],
                'top_token_same': comparison['logit_diff']['top_token_same']
            })
        except Exception as e:
            print(f"Error: {e}")

length_df = pd.DataFrame(length_impact_results)
print(f"Collected {len(length_df)} length impact measurements")

In [None]:
# Correlation: Length vs Impact
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Token length vs hidden state similarity
ax = axes[0]
grouped = length_df.groupby('system_prompt').agg({
    'token_length': 'first',
    'avg_hs_similarity': 'mean'
}).reset_index()
ax.scatter(grouped['token_length'], grouped['avg_hs_similarity'], s=100, alpha=0.7)
for _, row in grouped.iterrows():
    ax.annotate(row['system_prompt'], (row['token_length'], row['avg_hs_similarity']), fontsize=8)

# Add correlation line
if len(grouped) > 2:
    z = np.polyfit(grouped['token_length'], grouped['avg_hs_similarity'], 1)
    p = np.poly1d(z)
    ax.plot(grouped['token_length'], p(grouped['token_length']), 'r--', alpha=0.5)
    corr, pval = stats.pearsonr(grouped['token_length'], grouped['avg_hs_similarity'])
else:
    corr, pval = 0, 1
ax.set_xlabel('System Prompt Token Length')
ax.set_ylabel('Avg Hidden State Similarity')
ax.set_title(f'Length vs HS Similarity\nr={corr:.3f}, p={pval:.3f}')

# Token length vs logit similarity
ax = axes[1]
grouped2 = length_df.groupby('system_prompt').agg({
    'token_length': 'first',
    'logit_similarity': 'mean'
}).reset_index()
ax.scatter(grouped2['token_length'], grouped2['logit_similarity'], s=100, alpha=0.7, color='orange')
if len(grouped2) > 2:
    corr2, pval2 = stats.pearsonr(grouped2['token_length'], grouped2['logit_similarity'])
else:
    corr2, pval2 = 0, 1
ax.set_xlabel('System Prompt Token Length')
ax.set_ylabel('Logit Similarity')
ax.set_title(f'Length vs Logit Similarity\nr={corr2:.3f}, p={pval2:.3f}')

# Token length vs top token change rate
ax = axes[2]
grouped3 = length_df.groupby('system_prompt').agg({
    'token_length': 'first',
    'top_token_same': 'mean'
}).reset_index()
ax.scatter(grouped3['token_length'], 1 - grouped3['top_token_same'], s=100, alpha=0.7, color='green')
ax.set_xlabel('System Prompt Token Length')
ax.set_ylabel('Top Token Change Rate')
ax.set_title('Length vs Output Change Rate')

plt.tight_layout()
plt.savefig('../results/length_vs_impact.png', dpi=150)
plt.show()

## 3. Token/Phrase Impact Analysis

In [None]:
# Analyze impact of specific phrases
phrase_features = ['has_persona', 'has_thinking', 'has_constraint', 'has_concise']

phrase_impact = []
for feature in phrase_features:
    with_feature = length_df[length_df[feature] == True]['avg_hs_similarity'].mean()
    without_feature = length_df[length_df[feature] == False]['avg_hs_similarity'].mean()
    
    # Handle NaN
    if np.isnan(with_feature):
        with_feature = 0.0
    if np.isnan(without_feature):
        without_feature = 0.0
    
    phrase_impact.append({
        'feature': feature.replace('has_', ''),
        'with_feature': with_feature,
        'without_feature': without_feature,
        'impact': without_feature - with_feature  # Lower similarity = more impact
    })

phrase_df = pd.DataFrame(phrase_impact).sort_values('impact', ascending=False)
print("=== Phrase Impact on Hidden State Similarity ===")
print("(Higher impact = phrase causes more deviation from baseline)")
print(phrase_df.round(4).to_string(index=False))

In [None]:
# Visualize phrase impact
fig, ax = plt.subplots(figsize=(10, 6))

x = np.arange(len(phrase_df))
width = 0.35

bars1 = ax.bar(x - width/2, phrase_df['with_feature'], width, label='With phrase', color='coral')
bars2 = ax.bar(x + width/2, phrase_df['without_feature'], width, label='Without phrase', color='steelblue')

ax.set_ylabel('Avg Hidden State Similarity to Baseline')
ax.set_title('Impact of Specific Phrases in System Prompts')
ax.set_xticks(x)
ax.set_xticklabels(phrase_df['feature'])
ax.legend()
ax.set_ylim(0, 1.1)

plt.tight_layout()
plt.savefig('../results/phrase_impact.png', dpi=150)
plt.show()

## 4. Token Position Analysis

In [None]:
def analyze_attention_to_positions(model, prompt, system_prompt_length):
    """
    Analyze where the model attends based on token positions.
    Returns attention to: system prompt region, user prompt region, recent tokens.
    """
    inputs = model.tokenizer(prompt, return_tensors="pt").to(model.config.device)
    seq_len = inputs.input_ids.shape[1]
    
    with torch.no_grad():
        outputs = model.model(**inputs, output_attentions=True)
    
    results = []
    for layer_idx, attn in enumerate(outputs.attentions):
        # attn shape: (batch, heads, seq_len, seq_len)
        # Look at last token's attention pattern, averaged over heads
        last_token_attn = attn[0, :, -1, :].mean(dim=0).float().cpu().numpy()
        
        # Define regions
        sys_end = min(system_prompt_length, seq_len)
        user_start = system_prompt_length
        user_end = max(user_start, seq_len - 5)
        recent_start = max(0, seq_len - 5)
        
        attn_sys = float(last_token_attn[:sys_end].sum()) if sys_end > 0 else 0.0
        attn_user = float(last_token_attn[user_start:user_end].sum()) if user_end > user_start else 0.0
        attn_recent = float(last_token_attn[recent_start:].sum())
        
        # Entropy
        probs = np.clip(last_token_attn, 1e-10, 1.0)
        entropy = float(-np.sum(probs * np.log(probs)))
        
        results.append({
            'layer': layer_idx,
            'attn_to_system': attn_sys,
            'attn_to_user': attn_user,
            'attn_to_recent': attn_recent,
            'attn_entropy': entropy if np.isfinite(entropy) else 0.0
        })
    
    return pd.DataFrame(results)

In [None]:
# Analyze attention to different positions for different system prompts
test_prompt = test_prompts[0]

position_results = []
for sys_name, sys_info in list(all_system_prompts.items())[:8]:  # Limit for speed
    full_prompt = build_chat_prompt(sys_info['text'], test_prompt['prompt'], model.tokenizer)
    sys_tokens = model.tokenizer(sys_info['text'], return_tensors="pt").input_ids.shape[1] if sys_info['text'] else 0
    
    pos_df = analyze_attention_to_positions(model, full_prompt, sys_tokens)
    pos_df['system_prompt'] = sys_name
    pos_df['sys_token_length'] = sys_tokens
    position_results.append(pos_df)

position_df = pd.concat(position_results, ignore_index=True)
print(f"Collected {len(position_df)} position measurements")

In [None]:
# Visualize attention to different regions by layer
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Aggregate by layer
layer_pos = position_df.groupby('layer').agg({
    'attn_to_system': 'mean',
    'attn_to_user': 'mean',
    'attn_to_recent': 'mean'
}).reset_index()

ax = axes[0, 0]
ax.plot(layer_pos['layer'], layer_pos['attn_to_system'], 'o-', label='System Prompt', linewidth=2)
ax.plot(layer_pos['layer'], layer_pos['attn_to_user'], 's-', label='User Prompt', linewidth=2)
ax.plot(layer_pos['layer'], layer_pos['attn_to_recent'], '^-', label='Recent Tokens', linewidth=2)
ax.set_xlabel('Layer')
ax.set_ylabel('Attention Weight')
ax.set_title('Attention to Different Regions by Layer')
ax.legend()

# Heatmap: System prompt x attention to system region by layer
ax = axes[0, 1]
pivot = position_df.pivot_table(values='attn_to_system', index='system_prompt', columns='layer', aggfunc='mean')
sns.heatmap(pivot, cmap='YlOrRd', ax=ax, cbar_kws={'label': 'Attention to System'})
ax.set_title('Attention to System Prompt Region')

# Correlation: System prompt length vs attention to system
ax = axes[1, 0]
sys_attn = position_df.groupby('system_prompt').agg({
    'sys_token_length': 'first',
    'attn_to_system': 'mean'
}).reset_index()
ax.scatter(sys_attn['sys_token_length'], sys_attn['attn_to_system'], s=100)
for _, row in sys_attn.iterrows():
    ax.annotate(row['system_prompt'], (row['sys_token_length'], row['attn_to_system']), fontsize=8)
ax.set_xlabel('System Prompt Token Length')
ax.set_ylabel('Mean Attention to System Region')
ax.set_title('Longer System Prompts -> More Attention?')

# Attention entropy by system prompt
ax = axes[1, 1]
entropy_by_sys = position_df.groupby('system_prompt')['attn_entropy'].mean().sort_values()
ax.barh(range(len(entropy_by_sys)), entropy_by_sys.values, color='teal', alpha=0.7)
ax.set_yticks(range(len(entropy_by_sys)))
ax.set_yticklabels(entropy_by_sys.index)
ax.set_xlabel('Mean Attention Entropy')
ax.set_title('Attention Focus by System Prompt\n(Lower = More Focused)')

plt.tight_layout()
plt.savefig('../results/position_analysis.png', dpi=150)
plt.show()

## 5. Summary & Key Findings

In [None]:
print("="*70)
print("MODEL INTERNALS ANALYSIS - KEY FINDINGS")
print("="*70)

print("\n1. COMPONENT SIMILARITY (averaged across system prompts):")
comp_avg = component_df.groupby('component')['cosine_sim'].mean().sort_values()
for comp, sim in comp_avg.items():
    print(f"   - {comp}: {sim:.4f} similarity to baseline")

print("\n2. LENGTH CORRELATION:")
print(f"   - Token length vs HS similarity: r={corr:.3f} (p={pval:.3f})")
interpretation = 'Longer prompts cause MORE change' if corr < 0 else 'Length has minimal effect'
print(f"   - Interpretation: {interpretation}")

print("\n3. PHRASE IMPACT (which phrases cause most deviation):")
if len(phrase_df) > 0:
    top_phrase = phrase_df.iloc[0]
    print(f"   - Most impactful: '{top_phrase['feature']}' (impact={top_phrase['impact']:.4f})")

print("\n4. POSITION ATTENTION:")
layer_mid = layer_info['n_layers'] // 2
mid_data = layer_pos[layer_pos['layer'] == layer_mid]
if len(mid_data) > 0:
    mid_layer_data = mid_data.iloc[0]
    print(f"   - Middle layer ({layer_mid}) attention distribution:")
    print(f"     - To system prompt: {mid_layer_data['attn_to_system']:.3f}")
    print(f"     - To user prompt: {mid_layer_data['attn_to_user']:.3f}")
    print(f"     - To recent tokens: {mid_layer_data['attn_to_recent']:.3f}")

In [None]:
# Save all results
import json
os.makedirs('../results', exist_ok=True)

results = {
    'component_by_layer': component_by_layer.to_dict(),
    'phrase_impact': phrase_df.to_dict('records'),
    'length_correlation': {'r': float(corr), 'p': float(pval)},
    'model_info': layer_info
}

with open('../results/internals_full_analysis.json', 'w') as f:
    json.dump(results, f, indent=2, default=float)

# Save dataframes
component_df.to_csv('../results/component_analysis.csv', index=False)
length_df.to_csv('../results/length_analysis.csv', index=False)
position_df.to_csv('../results/position_analysis.csv', index=False)

print("All results saved to ../results/")