# Micro and Macro Metrics

In [3]:
def evaluate_aspect_and_sentiment_micro_macro(predictions_list, ground_truth_list):
    micro_aspect_tp = micro_aspect_fp = micro_aspect_fn = 0
    micro_sentiment_tp = micro_sentiment_fp = micro_sentiment_fn = 0
    
    macro_aspect_results = []
    macro_sentiment_results = []
    
    for predictions, ground_truth in zip(predictions_list, ground_truth_list):
        aspect_matches = []
        sentiment_matches = []
        matched_ground_truth_indices = set()

        for pred in predictions:
            for index, truth in enumerate(ground_truth):
                if index in matched_ground_truth_indices:
                    continue  # Skip already matched ground truth

                # Check aspect match (aspect term and category)
                try:
                  if pred[0].lower() == truth[0].lower() and pred[3] == truth[3]:
                      aspect_matches.append((pred, truth))
                      matched_ground_truth_indices.add(index)
                      
                      # Check sentiment match (only if aspect matches)
                      if pred[1] == truth[1]:
                          sentiment_matches.append((pred, truth))
                      break
                except Exception as e:
                  print(e)
                  print(pred)
                  print(truth)
                  return 

        # Micro calculations for aspects
        micro_aspect_tp += len(aspect_matches)
        micro_aspect_fp += len(predictions) - len(aspect_matches)
        micro_aspect_fn += len(ground_truth) - len(aspect_matches)

        # Micro calculations for sentiments
        micro_sentiment_tp += len(sentiment_matches)
        micro_sentiment_fp += len(aspect_matches) - len(sentiment_matches)
        micro_sentiment_fn += len(ground_truth) - len(sentiment_matches)

        # Macro calculations for aspects
        aspect_precision = len(aspect_matches) / len(predictions) if predictions else 0
        aspect_recall = len(aspect_matches) / len(ground_truth) if ground_truth else 0
        aspect_f1 = 2 * aspect_precision * aspect_recall / (aspect_precision + aspect_recall) if (aspect_precision + aspect_recall) > 0 else 0
        macro_aspect_results.append((aspect_precision, aspect_recall, aspect_f1))

        # Macro calculations for sentiments
        sentiment_precision = len(sentiment_matches) / len(aspect_matches) if aspect_matches else 0
        sentiment_recall = len(sentiment_matches) / len(ground_truth) if ground_truth else 0
        sentiment_f1 = 2 * sentiment_precision * sentiment_recall / (sentiment_precision + sentiment_recall) if (sentiment_precision + sentiment_recall) > 0 else 0
        macro_sentiment_results.append((sentiment_precision, sentiment_recall, sentiment_f1))
    
    # Micro metrics
    micro_aspect_precision = micro_aspect_tp / (micro_aspect_tp + micro_aspect_fp) if micro_aspect_tp + micro_aspect_fp > 0 else 0
    micro_aspect_recall = micro_aspect_tp / (micro_aspect_tp + micro_aspect_fn) if micro_aspect_tp + micro_aspect_fn > 0 else 0
    micro_aspect_f1 = 2 * micro_aspect_precision * micro_aspect_recall / (micro_aspect_precision + micro_aspect_recall) if (micro_aspect_precision + micro_aspect_recall) > 0 else 0
    
    micro_sentiment_precision = micro_sentiment_tp / (micro_sentiment_tp + micro_sentiment_fp) if micro_sentiment_tp + micro_sentiment_fp > 0 else 0
    micro_sentiment_recall = micro_sentiment_tp / (micro_sentiment_tp + micro_sentiment_fn) if micro_sentiment_tp + micro_sentiment_fn > 0 else 0
    micro_sentiment_f1 = 2 * micro_sentiment_precision * micro_sentiment_recall / (micro_sentiment_precision + micro_sentiment_recall) if (micro_sentiment_precision + micro_sentiment_recall) > 0 else 0

    # Macro metrics
    macro_aspect_precision, macro_aspect_recall, macro_aspect_f1 = zip(*macro_aspect_results)
    macro_sentiment_precision, macro_sentiment_recall, macro_sentiment_f1 = zip(*macro_sentiment_results)

    return {
        "Micro": {
            "Aspect": {"Precision": micro_aspect_precision, "Recall": micro_aspect_recall, "F1": micro_aspect_f1},
            "Sentiment": {"Precision": micro_sentiment_precision, "Recall": micro_sentiment_recall, "F1": micro_sentiment_f1}
        },
        "Macro": {
            "Aspect": {"Precision": sum(macro_aspect_precision) / len(macro_aspect_precision), "Recall": sum(macro_aspect_recall) / len(macro_aspect_recall), "F1": sum(macro_aspect_f1) / len(macro_aspect_f1)},
            "Sentiment": {"Precision": sum(macro_sentiment_precision) / len(macro_sentiment_precision), "Recall": sum(macro_sentiment_recall) / len(macro_sentiment_recall), "F1": sum(macro_sentiment_f1) / len(macro_sentiment_f1)}
        }
    }

# Example usage
predictions_list = [
    [("food", "positive", "good", "FOOD#QUALITY"), ("service", "negative", "bad", "SERVICE#GENERAL")],
    [("food", "positive", "delicious", "FOOD#QUALITY")]
]
ground_truth_list = [
    [("food", "positive", "delicious", "FOOD#QUALITY"), ("service", "negative", "poor", "SERVICE#GENERAL")],
    [("food", "positive", "tasty", "FOOD#QUALITY")]
]

results = evaluate_aspect_and_sentiment_micro_macro(predictions_list, ground_truth_list)
print(results)

{'Micro': {'Aspect': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}, 'Sentiment': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}}, 'Macro': {'Aspect': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}, 'Sentiment': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}}}


In [1]:
def evaluate_aspect_polarity_opinion_category(predictions_list, ground_truth_list):
    micro_aspect_tp = micro_aspect_fp = micro_aspect_fn = 0
    micro_polarity_tp = micro_polarity_fp = micro_polarity_fn = 0
    micro_opinion_tp = micro_opinion_fp = micro_opinion_fn = 0
    micro_category_tp = micro_category_fp = micro_category_fn = 0
    
    macro_aspect_results = []
    macro_polarity_results = []
    macro_opinion_results = []
    macro_category_results = []
    
    for predictions, ground_truth in zip(predictions_list, ground_truth_list):
        aspect_matches = []
        polarity_matches = []
        opinion_matches = []
        category_matches = []
        matched_ground_truth_indices = set()

        for pred in predictions:
            for index, truth in enumerate(ground_truth):
                if index in matched_ground_truth_indices:
                    continue  # Skip already matched ground truth

                # Check aspect match
                if pred[0].lower() == truth[0].lower():
                    aspect_matches.append((pred, truth))
                    matched_ground_truth_indices.add(index)
                      
                    # Check polarity match
                    if pred[1] == truth[1]:
                        polarity_matches.append((pred, truth))

                    # Check opinion match
                    if pred[2] == truth[2]:
                        opinion_matches.append((pred, truth))

                    # Check category match (only if aspect matches)
                    if pred[3].split('#')[0] == truth[3].split('#')[0]:
                        category_matches.append((pred, truth))
                    break

        # Micro calculations for aspects
        micro_aspect_tp += len(aspect_matches)
        micro_aspect_fp += len(predictions) - len(aspect_matches)
        micro_aspect_fn += len(ground_truth) - len(aspect_matches)

        # Micro calculations for polarity
        micro_polarity_tp += len(polarity_matches)
        micro_polarity_fp += len(aspect_matches) - len(polarity_matches)
        micro_polarity_fn += len(ground_truth) - len(polarity_matches)

        # Micro calculations for opinion
        micro_opinion_tp += len(opinion_matches)
        micro_opinion_fp += len(aspect_matches) - len(opinion_matches)
        micro_opinion_fn += len(ground_truth) - len(opinion_matches)

        # Micro calculations for categories
        micro_category_tp += len(category_matches)
        micro_category_fp += len(aspect_matches) - len(category_matches)
        micro_category_fn += len(ground_truth) - len(category_matches)

        # Macro calculations for aspects
        aspect_precision = len(aspect_matches) / len(predictions) if predictions else 0
        aspect_recall = len(aspect_matches) / len(ground_truth) if ground_truth else 0
        aspect_f1 = 2 * aspect_precision * aspect_recall / (aspect_precision + aspect_recall) if (aspect_precision + aspect_recall) > 0 else 0
        macro_aspect_results.append((aspect_precision, aspect_recall, aspect_f1))

        # Macro calculations for polarity
        polarity_precision = len(polarity_matches) / len(aspect_matches) if aspect_matches else 0
        polarity_recall = len(polarity_matches) / len(ground_truth) if ground_truth else 0
        polarity_f1 = 2 * polarity_precision * polarity_recall / (polarity_precision + polarity_recall) if (polarity_precision + polarity_recall) > 0 else 0
        macro_polarity_results.append((polarity_precision, polarity_recall, polarity_f1))

        # Macro calculations for opinion
        opinion_precision = len(opinion_matches) / len(aspect_matches) if aspect_matches else 0
        opinion_recall = len(opinion_matches) / len(ground_truth) if ground_truth else 0
        opinion_f1 = 2 * opinion_precision * opinion_recall / (opinion_precision + opinion_recall) if (opinion_precision + opinion_recall) > 0 else 0
        macro_opinion_results.append((opinion_precision, opinion_recall, opinion_f1))

        # Macro calculations for categories
        category_precision = len(category_matches) / len(aspect_matches) if aspect_matches else 0
        category_recall = len(category_matches) / len(ground_truth) if ground_truth else 0
        category_f1 = 2 * category_precision * category_recall / (category_precision + category_recall) if (category_precision + category_recall) > 0 else 0
        macro_category_results.append((category_precision, category_recall, category_f1))
    
    # Micro metrics
    micro_aspect_precision = micro_aspect_tp / (micro_aspect_tp + micro_aspect_fp) if micro_aspect_tp + micro_aspect_fp > 0 else 0
    micro_aspect_recall = micro_aspect_tp / (micro_aspect_tp + micro_aspect_fn) if micro_aspect_tp + micro_aspect_fn > 0 else 0
    micro_aspect_f1 = 2 * micro_aspect_precision * micro_aspect_recall / (micro_aspect_precision + micro_aspect_recall) if (micro_aspect_precision + micro_aspect_recall) > 0 else 0
    
    micro_polarity_precision = micro_polarity_tp / (micro_polarity_tp + micro_polarity_fp) if micro_polarity_tp + micro_polarity_fp > 0 else 0
    micro_polarity_recall = micro_polarity_tp / (micro_polarity_tp + micro_polarity_fn) if micro_polarity_tp + micro_polarity_fn > 0 else 0
    micro_polarity_f1 = 2 * micro_polarity_precision * micro_polarity_recall / (micro_polarity_precision + micro_polarity_recall) if (micro_polarity_precision + micro_polarity_recall) > 0 else 0

    micro_opinion_precision = micro_opinion_tp / (micro_opinion_tp + micro_opinion_fp) if micro_opinion_tp + micro_opinion_fp > 0 else 0
    micro_opinion_recall = micro_opinion_tp / (micro_opinion_tp + micro_opinion_fn) if micro_opinion_tp + micro_opinion_fn > 0 else 0
    micro_opinion_f1 = 2 * micro_opinion_precision * micro_opinion_recall / (micro_opinion_precision + micro_opinion_recall) if (micro_opinion_precision + micro_opinion_recall) > 0 else 0

    micro_category_precision = micro_category_tp / (micro_category_tp + micro_category_fp) if micro_category_tp + micro_category_fp > 0 else 0
    micro_category_recall = micro_category_tp / (micro_category_tp + micro_category_fn) if micro_category_tp + micro_category_fn > 0 else 0
    micro_category_f1 = 2 * micro_category_precision * micro_category_recall / (micro_category_precision + micro_category_recall) if (micro_category_precision + micro_category_recall) > 0 else 0

    # Macro metrics
    macro_aspect_precision, macro_aspect_recall, macro_aspect_f1 = zip(*macro_aspect_results)
    macro_polarity_precision, macro_polarity_recall, macro_polarity_f1 = zip(*macro_polarity_results)
    macro_opinion_precision, macro_opinion_recall, macro_opinion_f1 = zip(*macro_opinion_results)
    macro_category_precision, macro_category_recall, macro_category_f1 = zip(*macro_category_results)

    return {
        "Micro": {
            "Aspect": {"Precision": micro_aspect_precision, "Recall": micro_aspect_recall, "F1": micro_aspect_f1},
            "Polarity": {"Precision": micro_polarity_precision, "Recall": micro_polarity_recall, "F1": micro_polarity_f1},
            "Opinion": {"Precision": micro_opinion_precision, "Recall": micro_opinion_recall, "F1": micro_opinion_f1},
            "Category": {"Precision": micro_category_precision, "Recall": micro_category_recall, "F1": micro_category_f1}
        },
        "Macro": {
            "Aspect": {"Precision": sum(macro_aspect_precision) / len(macro_aspect_precision), "Recall": sum(macro_aspect_recall) / len(macro_aspect_recall), "F1": sum(macro_aspect_f1) / len(macro_aspect_f1)},
            "Polarity": {"Precision": sum(macro_polarity_precision) / len(macro_polarity_precision), "Recall": sum(macro_polarity_recall) / len(macro_polarity_recall), "F1": sum(macro_polarity_f1) / len(macro_polarity_f1)},
            "Opinion": {"Precision": sum(macro_opinion_precision) / len(macro_opinion_precision), "Recall": sum(macro_opinion_recall) / len(macro_opinion_recall), "F1": sum(macro_opinion_f1) / len(macro_opinion_f1)},
            "Category": {"Precision": sum(macro_category_precision) / len(macro_category_precision), "Recall": sum(macro_category_recall) / len(macro_category_recall), "F1": sum(macro_category_f1) / len(macro_category_f1)}
        }
    }

    
# Example usage
predictions_list = [
    [("food", "positive", "good", "FOOD#QUALITY"), ("service", "negative", "bad", "SERVICE#GENERAL")],
    [("food", "positive", "delicious", "FOOD#QUALITY")]
]
ground_truth_list = [
    [("food", "positive", "delicious", "FOOD#QUALITY"), ("service", "negative", "poor", "SERVICE#GENERAL")],
    [("food", "positive", "tasty", "FOOD#QUALITY")]
]

results = evaluate_aspect_polarity_opinion_category(predictions_list, ground_truth_list)
print(results)


{'Micro': {'Aspect': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}, 'Polarity': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}, 'Opinion': {'Precision': 0.0, 'Recall': 0.0, 'F1': 0}, 'Category': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}}, 'Macro': {'Aspect': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}, 'Polarity': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}, 'Opinion': {'Precision': 0.0, 'Recall': 0.0, 'F1': 0.0}, 'Category': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}}}


In [18]:
import json

# Function to read data from JSONL file
def read_jsonl(file_path):
    data = []
    with open(file_path, 'r') as f:
        for line in f:
            data.append(json.loads(line))
    return data

# Convert data to format required for evaluation
def convert_data_for_evaluation(data, name='Quadruples'):
    aspect_sentiment_pairs = []
    for item in data:
        predictions = item.get(name, [])
        aspect_sentiment_pairs.append([(pred['aspect'], pred['polarity'], pred['opinion'], pred['category']) for pred in predictions])
    return aspect_sentiment_pairs

# Paths to output JSONL files containing predictions
output_files = ["metric_train_set.jsonl"]
# Paths to ground truth JSONL files
ground_truth_files = ["full_train.jsonl"]

# Read predictions from output files
predictions_list = []
for output_file in output_files:
    predictions_data = read_jsonl(output_file)
    predictions_list.append(convert_data_for_evaluation(predictions_data))

# Read ground truth from ground truth files
ground_truth_list = []
for ground_truth_file in ground_truth_files:
    ground_truth_data = read_jsonl(ground_truth_file)
    ground_truth_list.append(convert_data_for_evaluation(ground_truth_data, name='labels'))
    
print("#####Testing Metrics#####")
results = evaluate_aspect_polarity_opinion_category(predictions_list[0], ground_truth_list[0])
print(results)

#####Testing Metrics#####
{'Micro': {'Aspect': {'Precision': 0.8287924655230406, 'Recall': 0.6112121713246238, 'F1': 0.703564460096131}, 'Polarity': {'Precision': 0.9791666666666666, 'Recall': 0.5984785844220275, 'F1': 0.7428923329569947}, 'Opinion': {'Precision': 0.6603084415584416, 'Recall': 0.4035885563089135, 'F1': 0.5009750590167299}, 'Category': {'Precision': 0.801948051948052, 'Recall': 0.49016041012072104, 'F1': 0.608436826439495}}, 'Macro': {'Aspect': {'Precision': 0.8435635136427739, 'Recall': 0.6698807126337675, 'F1': 0.7262781083238136}, 'Polarity': {'Precision': 0.9689887330244003, 'Recall': 0.6582840909703186, 'F1': 0.7594278348212016}, 'Opinion': {'Precision': 0.6722610136084375, 'Recall': 0.4827834128326744, 'F1': 0.5458644122599496}, 'Category': {'Precision': 0.7947884995771385, 'Recall': 0.55455300727404, 'F1': 0.6333524398360048}}}


# Generate output.csv

In [None]:
import json
import csv

def process_jsonl(file_path):
    data = []
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            entry = json.loads(line)
            data.append(entry)
    return data

def write_to_csv(data1, data2, csv_file):
    with open(csv_file, 'w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerow(['review_id', 'term_actual', 'term_predicted', 'opinion_actual', 'opinion_predicted', 'aspect_actual', 'aspect_predicted', 'sentiment_actual', 'sentiment_predicted', 'review_len'])
        for idx, entry in enumerate(data1):
            review_id = idx
            text = entry.get('text')
            labels = entry.get('labels')
            predicted = data2[idx].get('Quadruples')
            review_len = len(text.split())
            for label in labels:
                flag = False
                for idx, pred in enumerate(predicted):
                  if label['aspect'].lower() == pred['aspect'].lower():
                    writer.writerow([review_id, label['aspect'], pred['aspect'], label['opinion'], pred['opinion'], label['category'], pred['category'], label['polarity'], pred['polarity'], review_len])
                    predicted.pop(idx)
                    flag = True
                    break
                if not flag:
                    writer.writerow([review_id, label['aspect'], 'NA', label['opinion'], 'NA', label['category'], 'NA', label['polarity'], 'NA', review_len])
            for pred in predicted:
                writer.writerow([review_id, 'NA', pred['aspect'], 'NA', pred['opinion'], 'NA', pred['category'], 'NA', pred['polarity'], review_len])
                

# Process the first JSONL file
data_file1 = "full_test.jsonl"
data1 = process_jsonl(data_file1)

# Process the second JSONL file
data_file2 = "metric_test_set.jsonl"
data2 = process_jsonl(data_file2)

# Write the combined data to CSV
output_csv = "output_new.csv"
write_to_csv(data1, data2, output_csv)

print("CSV file created successfully!")


# Whitebox Testing

In [2]:
import unittest


class MetricEvaluationTestCase(unittest.TestCase):
    def setUp(self):

        # all same except aspect
        self.predictions_list = [
            [("music", "positive", "good", "FOOD#QUALITY"), ("service", "negative", "bad", "SERVICE#GENERAL")],
            [("music", "positive", "delicious", "FOOD#QUALITY")]
        ]
        self.ground_truth_list = [
            [("food", "positive", "delicious", "FOOD#QUALITY"), ("service", "negative", "poor", "SERVICE#GENERAL")],
            [("food", "positive", "tasty", "FOOD#QUALITY")]]

        # all same except polarity
        self.predictions_list2 = [
            [("food", "negative", "good", "FOOD#QUALITY"), ("service", "negative", "bad", "SERVICE#GENERAL")],
            [("food", "negative", "delicious", "FOOD#QUALITY")]
        ]
        self.ground_truth_list2 = [
            [("food", "positive", "delicious", "FOOD#QUALITY"), ("service", "negative", "poor", "SERVICE#GENERAL")],
            [("food", "positive", "tasty", "FOOD#QUALITY")]]

        # all same except opinion
        self.predictions_list3 = [
            [("food", "positive", "plenty", "FOOD#QUALITY"), ("service", "negative", "bad", "SERVICE#GENERAL")],
            [("food", "positive", "plenty", "FOOD#QUALITY")]
        ]
        self.ground_truth_list3 = [
            [("food", "positive", "delicious", "FOOD#QUALITY"), ("service", "negative", "poor", "SERVICE#GENERAL")],
            [("food", "positive", "tasty", "FOOD#QUALITY")]]

        # all same except category
        self.predictions_list4 = [
            [("food", "positive", "good", "SERVICE#GENERAL"), ("service", "negative", "bad", "SERVICE#GENERAL")],
            [("food", "positive", "delicious", "SERVICE#GENERAL")]
        ]
        self.ground_truth_list4 = [
            [("food", "positive", "delicious", "FOOD#QUALITY"), ("service", "negative", "poor", "SERVICE#GENERAL")],
            [("food", "positive", "tasty", "FOOD#QUALITY")]]




    def test_metric_evaluation(self):
        evaluation = evaluate_aspect_polarity_opinion_category(self.predictions_list, self.ground_truth_list)
        evaluation2 = evaluate_aspect_polarity_opinion_category(self.predictions_list2, self.ground_truth_list2)
        evaluation3 = evaluate_aspect_polarity_opinion_category(self.predictions_list3, self.ground_truth_list3)
        evaluation4 = evaluate_aspect_polarity_opinion_category(self.predictions_list4, self.ground_truth_list4)
        print("First evaluation:", evaluation)
        print("Second evaluation:", evaluation2)
        print("Third evaluation:", evaluation3)
        print("Last evaluation:", evaluation4)
        
def run_test():
    unittest.main(argv=[''], defaultTest='MetricEvaluationTestCase', exit=False)


run_test()

.
----------------------------------------------------------------------
Ran 1 test in 0.001s

OK


First evaluation: {'Micro': {'Aspect': {'Precision': 0.3333333333333333, 'Recall': 0.3333333333333333, 'F1': 0.3333333333333333}, 'Polarity': {'Precision': 1.0, 'Recall': 0.3333333333333333, 'F1': 0.5}, 'Opinion': {'Precision': 0.0, 'Recall': 0.0, 'F1': 0}, 'Category': {'Precision': 1.0, 'Recall': 0.3333333333333333, 'F1': 0.5}}, 'Macro': {'Aspect': {'Precision': 0.25, 'Recall': 0.25, 'F1': 0.25}, 'Polarity': {'Precision': 0.5, 'Recall': 0.25, 'F1': 0.3333333333333333}, 'Opinion': {'Precision': 0.0, 'Recall': 0.0, 'F1': 0.0}, 'Category': {'Precision': 0.5, 'Recall': 0.25, 'F1': 0.3333333333333333}}}
Second evaluation: {'Micro': {'Aspect': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}, 'Polarity': {'Precision': 0.3333333333333333, 'Recall': 0.3333333333333333, 'F1': 0.3333333333333333}, 'Opinion': {'Precision': 0.0, 'Recall': 0.0, 'F1': 0}, 'Category': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}}, 'Macro': {'Aspect': {'Precision': 1.0, 'Recall': 1.0, 'F1': 1.0}, 'Polarity': {'Precision'