# Pre-processing

In [None]:
# Read data
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer

df = pd.read_excel('C:/Users/ming/Desktop/master project/Copy of Data_All_JAN_2024_Anonymized__For_Analysis_V1.2.xlsx', header=1)

In [None]:
# Actual column name
print(df.columns.tolist()) 

In [None]:
# Needed Columns
selected_columns = [
    # Demographic Data
    'Style', 'Platform', 'Earpiece Configuration', 
    'What is your current age?', 'What is your gender?', 
    'How long have you been using hearing aids?', 
    'How long have you been using your current hearing aids?', 

    # Target Variable 
    'Satisfaction', 'Average comfort score',  

    # Structured Scoring
    'Slip Out', 'Annoying', 'Change Position', 
    'Too tight', 'Itchiness', 'Soreness', 'Take off hearing aids', 
    'Painful',  

    # Open-ended Responses
    'Please describe the discomfort that you are experiencing', 
    'What do you believe is causing this discomfort?', 
    'What do you believe can be done to improve the wearing comfort for your hearing aids?'
]

open_ended_columns = [
    'Please describe the discomfort that you are experiencing', 
    'What do you believe is causing this discomfort?', 
    'What do you believe can be done to improve the wearing comfort for your hearing aids?'
]

df = df[selected_columns]
# print(df.head())  

print(df.shape) 


In [None]:
# only keep open-eneded responses that are not null
# Keep rows with at least one non-null response
filtered_df1 = df.dropna(how='all', subset=open_ended_columns) 
filtered_df1 = filtered_df1[(filtered_df1[open_ended_columns].applymap(lambda x: isinstance(x, str) and x.strip() != '')).any(axis=1)]

print(filtered_df1.shape)

In [None]:
from transformers import pipeline

sentiment_model = pipeline("text-classification", model="cardiffnlp/twitter-roberta-base-sentiment")

def get_roberta_sentiment(text):
    neutral_texts = ["none", "n/a", "-", "no issues", "no problem", "no discomfort", "no pain"]
    negative_keywords = ["itchy", "pain", "sore", "uncomfortable", "pressure", "tight"]
    text = str(text).strip().lower()

    if text in neutral_texts:
        return 0  

    if any(word in text for word in negative_keywords):
        return -0.5 

    result = sentiment_model(text[:512])
    label = result[0]['label']  # 'LABEL_0' (Negative), 'LABEL_1' (Neutral), 'LABEL_2' (Positive)
    score = result[0]['score'] 

    if score < 0.6:
        return 0  

    if label == 'LABEL_2':  
        return score
    elif label == 'LABEL_0':  
        return -score
    else:  
        return 0

df['BERT_Sentiment_Score'] = df[text_column].apply(get_roberta_sentiment)
df['Sentiment_Label'] = df[text_column].apply(lambda x: sentiment_model(str(x)[:512])[0]['label'])

# Statistical Distribution of Categories
label_counts = df['Sentiment_Label'].value_counts()
print("RoBERTa predicted sentiment category distribution:")
print(label_counts)

# Visualization of Category Distribution
plt.figure(figsize=(6, 4))
sns.barplot(x=label_counts.index, y=label_counts.values)
plt.xlabel("Sentiment Category")
plt.ylabel("Count")
plt.title("Distribution of Sentiment Labels (RoBERTa)")
plt.show()

# Visualization of Sentiment Score Distribution
plt.figure(figsize=(8, 5))
sns.histplot(df['BERT_Sentiment_Score'], bins=30, kde=True)
plt.xlabel('Sentiment Score')
plt.ylabel('Frequency')
plt.title('Improved RoBERTa Sentiment Score Distribution')
plt.show()

# Train a model based on RoBERTa-base...

In [None]:
import pandas as pd
import torch
import numpy as np
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments
from datasets import Dataset
from sklearn.model_selection import train_test_split
import evaluate

# Load data
df = pd.read_excel('C:/Users/ming/Desktop/master project/200_text_label.xlsx')
df = df[['Text', 'label']].dropna().reset_index(drop=True)
df["Text"] = df["Text"].astype(str)

# Convert label to start from 0
label_map = {-1: 0, 0: 1, 1: 2} 
df["label"] = df["label"].map(label_map).astype(int)

train_texts, val_texts, train_labels, val_labels = train_test_split(
    df["Text"], df["label"], test_size=0.2, random_state=42
)

model_name = "roberta-base"
tokenizer = RobertaTokenizer.from_pretrained(model_name)

def tokenize_function(texts):
    return tokenizer(texts, padding="max_length", truncation=True, max_length=128)

train_encodings = tokenize_function(train_texts.tolist())
val_encodings = tokenize_function(val_texts.tolist())

class SentimentDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx], dtype=torch.long)  
        return item

train_dataset = SentimentDataset(train_encodings, train_labels.tolist())
val_dataset = SentimentDataset(val_encodings, val_labels.tolist())

model = RobertaForSequenceClassification.from_pretrained(model_name, num_labels=3) 

accuracy = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return accuracy.compute(predictions=predictions, references=labels)

training_args = TrainingArguments(
    output_dir="./results",  
    evaluation_strategy="epoch",  
    save_strategy="epoch",
    num_train_epochs=10,  
    per_device_train_batch_size=8,  
    per_device_eval_batch_size=8,
    warmup_steps=50,  
    weight_decay=0.01,  
    logging_dir="./logs",
    load_best_model_at_end=True,  
    metric_for_best_model="accuracy",
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
)
trainer.train()

trainer.save_model("sentiment-roberta")

# Map back to the original label during prediction
inverse_label_map = {0: -1, 1: 0, 2: 1}  

def predict_sentiment(text):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
    model.to("cpu") 
    inputs = {key: val.to("cpu") for key, val in inputs.items()}  

    outputs = model(**inputs)
    pred = torch.argmax(outputs.logits, dim=-1).item()
    return inverse_label_map[pred] 


In [None]:
# Test
print(predict_sentiment("I love this product!"))  # Expected 1
print(predict_sentiment("This is terrible."))  # Expected -1


In [None]:
# Test-2
print(predict_sentiment("No discomfort at all"))   # Expected 1
print(predict_sentiment("Very tight, uncomfortable"))   # Expected -1
print(predict_sentiment("It's okay"))  # Expected 0
print(predict_sentiment("terrible sound."))  # Expected -1

In [None]:
# Compute the confusion matrix
from sklearn.metrics import confusion_matrix, classification_report

true_labels = []
pred_labels = []

for example in val_texts.tolist(): 
    true_labels.append(inverse_label_map[val_labels.tolist()[val_texts.tolist().index(example)]])
    pred_labels.append(predict_sentiment(example))

cm = confusion_matrix(true_labels, pred_labels)
print("Confusion Matrix:\n", cm)
print("Classification Report:\n", classification_report(true_labels, pred_labels))


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

model.to(device)
trainer.model.to(device)
eval_results = trainer.evaluate()

eval_loss = eval_results["eval_loss"]
eval_accuracy = eval_results["eval_accuracy"]

print(f"Eval Loss: {eval_loss}")
print(f"Eval Accuracy: {eval_accuracy}")


In [None]:
# Load and save the original RoBERTa tokenizer
from transformers import RobertaTokenizer

tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
tokenizer.save_pretrained("sentiment-roberta")

print("Tokenizer has been saved to sentiment-roberta.")


# Satisfaction & Comfort Score Prediction Model

In [None]:
df_encoded['Sentiment_Label_Num'] = df_encoded['Custom_Sentiment_Label'].map({
    'LABEL_0': -1,  # Negative
    'LABEL_1': 0,   # Neutral
    'LABEL_2': 1    # Positive
})

df_encoded = df_encoded.drop(columns=['Custom_Sentiment_Label'])

for col in open_ended_columns:
    df_encoded[f'{col}_length'] = df_encoded[col].apply(lambda x: len(str(x).split()))

print(df_encoded.head())


In [None]:
from sklearn.model_selection import train_test_split

open_ended_columns = [
    'Please describe the discomfort that you are experiencing', 
    'What do you believe is causing this discomfort?', 
    'What do you believe can be done to improve the wearing comfort for your hearing aids?'
]

target_comfort = 'Average comfort score'
target_satisfaction = 'Satisfaction'
features = df_encoded.drop(columns=[target_comfort, target_satisfaction] + open_ended_columns)
length_cols = [col for col in features.columns if col.endswith('_length')]
features = features.drop(columns=length_cols)

X_train_c, X_test_c, y_train_c, y_test_c = train_test_split(features, df_encoded[target_comfort], test_size=0.3, random_state=42)
X_train_s, X_test_s, y_train_s, y_test_s = train_test_split(features, df_encoded[target_satisfaction], test_size=0.3, random_state=42)


In [None]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score

# Train a comfort prediction model
model_comfort = RandomForestRegressor(random_state=42)
model_comfort.fit(X_train_c, y_train_c)
predictions_c = model_comfort.predict(X_test_c)

# Train a satisfaction prediction model
model_satisfaction = RandomForestRegressor(random_state=42)
model_satisfaction.fit(X_train_s, y_train_s)
predictions_s = model_satisfaction.predict(X_test_s)

# Evaluate the model
print("Comfort Score Model - MSE:", mean_squared_error(y_test_c, predictions_c))
print("Comfort Score Model - R²:", r2_score(y_test_c, predictions_c))
print("Satisfaction Score Model - MSE:", mean_squared_error(y_test_s, predictions_s))
print("Satisfaction Score Model - R²:", r2_score(y_test_s, predictions_s))


In [None]:
import matplotlib.pyplot as plt

# Comfort prediction results
plt.figure(figsize=(8, 5))
plt.scatter(y_test_c, predictions_c, alpha=0.6, color='skyblue')
plt.plot([min(y_test_c), max(y_test_c)], [min(y_test_c), max(y_test_c)], color='red', linestyle='--')
plt.title('Comfort Score: True vs Predicted')
plt.xlabel('True Comfort Score')
plt.ylabel('Predicted Comfort Score')
plt.grid()
plt.show()

# Satisfaction prediction results
plt.figure(figsize=(8, 5))
plt.scatter(y_test_s, predictions_s, alpha=0.6, color='lightcoral')
plt.plot([min(y_test_s), max(y_test_s)], [min(y_test_s), max(y_test_s)], color='red', linestyle='--')
plt.title('Satisfaction Score: True vs Predicted')
plt.xlabel('True Satisfaction Score')
plt.ylabel('Predicted Satisfaction Score')
plt.grid()
plt.show()


In [None]:
import matplotlib.pyplot as plt
import pandas as pd

importance_comfort_df = pd.DataFrame({
    'Feature': features.columns,
    'Importance': importance_comfort
})

# Top 10 features for comfort score
importance_comfort_df = importance_comfort_df.sort_values(by='Importance', ascending=False).head(10)
print("Top 10 features for comfort score:")
print(importance_comfort_df)

# Figure
plt.figure(figsize=(10, 8))
plt.barh(importance_comfort_df['Feature'], importance_comfort_df['Importance'], color='skyblue')
plt.gca().invert_yaxis() 
plt.title('Top 10 Feature Importance for Comfort Score')
plt.xlabel('Importance')
plt.show()


In [None]:
importance_satisfaction_df = pd.DataFrame({
    'Feature': features.columns,
    'Importance': importance_satisfaction
})

# Top 20 features for satisfaction score
importance_satisfaction_df = importance_satisfaction_df.sort_values(by='Importance', ascending=False).head(20)
print("Top 20 features for satisfaction score:")
print(importance_satisfaction_df)

plt.figure(figsize=(10, 8))
plt.barh(importance_satisfaction_df['Feature'], importance_satisfaction_df['Importance'], color='skyblue')
plt.gca().invert_yaxis() 
plt.title('Top 20 Feature Importance for Satisfaction Score')
plt.xlabel('Importance')
plt.show()


In [None]:
# Re-train the Comfort Score Prediction Model with Best Parameters

best_model_comfort = RandomForestRegressor(
    max_depth=15,
    min_samples_split=2,
    n_estimators=300,
    random_state=42
)
best_model_comfort.fit(X_train_c, y_train_c)
best_predictions_c = best_model_comfort.predict(X_test_c)

print("Best parameters (Comfort score prediction): - MSE:", mean_squared_error(y_test_c, best_predictions_c))
print("Best parameters (Comfort score prediction): - R²:", r2_score(y_test_c, best_predictions_c))


In [None]:
# Combine the results of the two prediction models (comfort and satisfaction) ---- stacking model
# in 2 model
from sklearn.multioutput import MultiOutputRegressor

stacked_features = np.column_stack((pred_xgb, predictions_s))
stacked_targets = np.column_stack((y_test_c, y_test_s))

stacked_model = MultiOutputRegressor(LinearRegression())
stacked_model.fit(stacked_features, stacked_targets)

stacked_pred = stacked_model.predict(stacked_features)

print("Stacking Model (Comfort) - MSE:", mean_squared_error(y_test_c, stacked_pred[:, 0]))
print("Stacking Model (Comfort) - R²:", r2_score(y_test_c, stacked_pred[:, 0]))

print("Stacking Model (Satisfaction) - MSE:", mean_squared_error(y_test_s, stacked_pred[:, 1]))
print("Stacking Model (Satisfaction) - R²:", r2_score(y_test_s, stacked_pred[:, 1]))


In [None]:
# Use PCA to reduce to 20 principal components.
from sklearn.decomposition import PCA

pca = PCA(n_components=20)
X_train_pca = pca.fit_transform(X_train_c)
X_test_pca = pca.transform(X_test_c)
print("Feature dimensions after PCA reduction", X_train_pca.shape)


In [None]:
## SHAP-based feature importance
import shap

explainer = shap.TreeExplainer(model_satisfaction)
shap_values = explainer.shap_values(X_test_s)
shap.summary_plot(shap_values, X_test_s, plot_type="bar")


In [None]:
import seaborn as sns

residuals = y_test_s - predictions_s
sns.histplot(residuals, bins=30, kde=True, color='lightcoral')
plt.title('Residual Distribution for Satisfaction Score')
plt.show()
