In [1]:
import torch
from model import CNN
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
from sklearn.metrics import r2_score, root_mean_squared_error, mean_absolute_error, max_error
# Determine the equipment
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

### Calibration Transfer-merge LoRA modules with pre-trained weights

In [2]:
def merge_weights(base_model_path, lora_model_path, output_dir):
    # Load pre-trained model and weights
    base_model = CNN().to(device)
    base_model.load_state_dict(torch.load(base_model_path, map_location=device))
    base_model.load_state_dict(torch.load('peft_model_BN_15%.pt', map_location=device), strict=False)
    # Perform weight decay
    original_weight = base_model.Linear.weight.data
    decayed_weight = original_weight * 0.5
    base_model.Linear.weight.data = decayed_weight
    # Load LoRA fine-tuned weights
    lora_state_dict = torch.load(lora_model_path)

    # Merge weights
    for k, v in lora_state_dict.items():
        if 'lora_A' in k:
            original_key = k.replace('.lora_A', '')
            lora_a_key = k
            lora_b_key = k.replace('lora_A', 'lora_B')

            # Get LoRA matrix
            lora_a = lora_state_dict[lora_a_key]
            lora_b = lora_state_dict[lora_b_key]

            # A*B
            merged_weight = torch.matmul(lora_b, lora_a)

            # Add the merged weights to the corresponding weights of the pre-trained model
            if original_key + '.weight' in base_model.state_dict():
                base_model.state_dict()[original_key + '.weight'] += merged_weight

    # Save merged weights
    torch.save(base_model.state_dict(), output_dir)


merge_weights('oil-best_model_cnn_x1.pt', 'lora_state_dict_15%.pt', 'transfer_model_15%.pt')

  base_model.load_state_dict(torch.load(base_model_path, map_location=device))
  base_model.load_state_dict(torch.load('peft_model_BN_15%.pt', map_location=device), strict=False)
  lora_state_dict = torch.load(lora_model_path)


### Test the transfer model

In [4]:
secondary_cnn = CNN().to(device)
secondary_cnn.load_state_dict(torch.load('transfer_model_15%.pt', map_location=device))


Path2 = 'oil-data\\iRaman_processed_spectra.csv'
X = pd.read_csv(Path2, header=None)
X = np.array(X)

y_path = 'oil-data\\Olive oil labels.csv'
y = pd.read_csv(y_path, header=None)
y = np.array(y)
y = y.reshape(-1)

X_trans, X_val, y_trans, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_val, y_val, test_size=0.5, random_state=42)

X_train, X_left, y_train, y_left = train_test_split(X_trans, y_trans, test_size=0.85, random_state=12)


X_train = torch.tensor(X_train, dtype=torch.float32, device=device)
X_val = torch.tensor(X_val, dtype=torch.float32, device=device)
X_test = torch.tensor(X_test, dtype=torch.float32, device=device)
y_train = torch.tensor(y_train, dtype=torch.float32, device=device)
y_val = torch.tensor(y_val, dtype=torch.float32, device=device)
y_test = torch.tensor(y_test, dtype=torch.float32, device=device)

train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=len(y_train), shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=len(y_val), shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=len(y_test), shuffle=False)


def evaluate_model(model, loader, device):
    model.eval()
    predictions = []
    actuals = []
    with torch.no_grad():
        for batch_x, batch_y in loader:
            batch_x = batch_x.unsqueeze(dim=1).to(device)
            batch_y = batch_y.to(device)
            output = model(batch_x)
            predictions.extend(output.cpu().tolist())
            actuals.extend(batch_y.cpu().tolist())
    final_r2 = r2_score(actuals, predictions)
    RMSEP = root_mean_squared_error(actuals, predictions)
    MAE = mean_absolute_error(actuals, predictions)
    MAX_ERROR = max_error(actuals, predictions)
    return final_r2, RMSEP, MAE, MAX_ERROR

print('Training set evaluation results:')
r2, rmse, mae, me = evaluate_model(secondary_cnn, train_loader, device)
print(f'R2: {r2}, RMSEP: {rmse}, MAE: {mae}, MAX_ERROR: {me}')

print('Validation set evaluation results:')
r2, rmse, mae, me = evaluate_model(secondary_cnn, val_loader, device)
print(f'R2: {r2}, RMSEP: {rmse}, MAE: {mae}, MAX_ERROR: {me}')

print('Test set evaluation results:')
r2, rmse, mae, me = evaluate_model(secondary_cnn, test_loader, device)
print(f'R2: {r2}, RMSEP: {rmse}, MAE: {mae}, MAX_ERROR: {me}')

  secondary_cnn.load_state_dict(torch.load('transfer_model_15%.pt', map_location=device))


Training set evaluation results:
R2: 0.9994573256142273, RMSEP: 0.0038521102535665664, MAE: 0.002560625784099102, MAX_ERROR: 0.009753882884979248
Validation set evaluation results:
R2: 0.8592947481730904, RMSEP: 0.06038775120503804, MAE: 0.0504158792587427, MAX_ERROR: 0.11481696367263794
Test set evaluation results:
R2: 0.9128151721774279, RMSEP: 0.051704658223179364, MAE: 0.04282660675900323, MAX_ERROR: 0.11463791877031326
