## Data

In [None]:
import pandas as pd
import json

In [None]:
tky_data_path = '/dataset_TSMC2014_TKY.txt'
tky_data = pd.read_csv(tky_data_path, delimiter="\t", header=None, names=["user_id", "venue_id", "venue_category", "venue_category_name", "latitude", "longitude", "timezone_offset", "utc_time"], encoding='ISO-8859-1')

In [None]:
nyc_data_path = '/dataset_TSMC2014_NYC.txt'
nyc_data = pd.read_csv(nyc_data_path, delimiter="\t", header=None, names=["user_id", "venue_id", "venue_category", "venue_category_name", "latitude", "longitude", "timezone_offset", "utc_time"], encoding='ISO-8859-1')

nyc_data.head()

# Evaluation Metrics

In [None]:
import os
import json
import re

class PredictionEvaluator_2:
    # Compile the regex pattern once as a class attribute for efficiency
    ALPHANUMERIC_PATTERN = re.compile(r'[a-f0-9]{24}')

    def __init__(self, folder_path):
        # Constructor to initialize the folder path and load data
        self.folder_path = folder_path
        self.combined_data = {}  # Dictionary to hold all combined data
        self.load_data()

    def load_data(self):
        """Load JSON files from the specified folder path."""
    # Iterate over files in the folder
        for index, file_name in enumerate(os.listdir(self.folder_path), start=1):
            if file_name.endswith(".json"):
                file_path = os.path.join(self.folder_path, file_name)
                try:
                    with open(file_path, "r") as file:
                        # Check if file is not empty
                        if os.stat(file_path).st_size > 0:
                            # Load data from each JSON file
                            self.combined_data[str(index)] = json.load(file)
                        else:
                            print(f"Skipped empty file: {file_name}")
                except json.JSONDecodeError as e:
                    print(f"Error parsing JSON file {file_name}: {e}")

    @classmethod
    def extract_alphanumeric_codes(cls, text):
        """Extract alphanumeric codes using regex."""
        # Use the compiled regex pattern to find all matches
        return cls.ALPHANUMERIC_PATTERN.findall(text)

    def extract_combined_response_data(self):
        """Extract prediction codes from the loaded dataset."""
        total_outputs = sum('output' in entry for entry in self.combined_data.values())
        all_codes = {}

        for key, entry in self.combined_data.items():
            if 'output' in entry:
                if 'raw_response' in entry['output']:
                    # Extract codes from raw_response
                    extracted_codes = self.extract_alphanumeric_codes(entry['output']['raw_response'])
                    all_codes[key] = extracted_codes if extracted_codes else [entry['output']]
                else:
                    # If no raw_response, store the output directly
                    all_codes[key] = [entry['output']]

        return total_outputs, all_codes

    @staticmethod
    def get_prediction_values(predictions):
        """Extract prediction values from different prediction formats."""
        prediction_values = []

        if isinstance(predictions, list):
            for pred in predictions:
                if isinstance(pred, dict) and 'prediction' in pred:
                    # Extract prediction values from dict format
                    prediction_values.extend([p.lower() for p in pred['prediction'] if isinstance(p, str)])
                elif isinstance(pred, str):
                    # Handle string format predictions
                    prediction_values.append(pred.lower())

        return prediction_values

    def compute_combined_top_accuracies(self):
        """Compute top-1, top-3, and top-5 accuracies."""
        total_outputs, extracted_codes = self.extract_combined_response_data()
        correct_top_1 = 0  
        correct_top_3 = 0  
        correct_top_5 = 0  

        for key, predictions in extracted_codes.items():
            if key in self.combined_data:
                true_value = self.combined_data[key]['true'].lower()
                prediction_values = [pred.lower() for pred in self.get_prediction_values(predictions)]

                # Check if true value matches predictions for different top-n accuracies
                if true_value in prediction_values:
                    if true_value == prediction_values[0]:  # Top-1 accuracy
                        correct_top_1 += 1
                    if true_value in prediction_values[:3]:  # Top-3 accuracy
                        correct_top_3 += 1
                    if true_value in prediction_values[:5]:  # Top-5 accuracy
                        correct_top_5 += 1

        # Calculate accuracies, handling division by zero
        accuracy_top_1 = correct_top_1 / total_outputs if total_outputs else 0
        accuracy_top_3 = correct_top_3 / total_outputs if total_outputs else 0
        accuracy_top_5 = correct_top_5 / total_outputs if total_outputs else 0

        return accuracy_top_1, accuracy_top_3, accuracy_top_5

    @staticmethod
    def extract_stays(input_str):
        """Extract historical and context stays from the input string."""
        # Initialize lists for historical and context stays
        historical_stays = []
        context_stays = []

        # Extracting stays using string manipulation
        historical_start = input_str.find('<historical_stays>:') + len('<historical_stays>:')
        historical_end = input_str.find('<context_stays>:')
        context_start = historical_end + len('<context_stays>:')
        context_end = input_str.find('<target_stay>:')

        historical_stays_str = input_str[historical_start:historical_end].strip()
        context_stays_str = input_str[context_start:context_end].strip()

        try:
            # Convert extracted string to lists
            historical_stays = eval(historical_stays_str)
            context_stays = eval(context_stays_str)
        except SyntaxError:
            # Handle any parsing error
            pass

        return historical_stays, context_stays

    @staticmethod
    def get_predictions_from_entry(entry):
        predictions = []

        # Extracting predictions from raw_response
        output_data = entry.get("output", {})
        raw_response = output_data.get("raw_response", "")
        raw_response_predictions = re.findall(r'"prediction": \[(.*?)\]', raw_response)
        if raw_response_predictions:
            try:
                predictions.extend(json.loads(f"[{raw_response_predictions[0]}]"))
            except json.JSONDecodeError as e:
                print(f"Error parsing JSON in raw_response_predictions: {e}, skip it \n")  #DA RISOLVERE

        #extracting predictions directly from the prediction field
        direct_predictions = output_data.get("prediction", [])
        if isinstance(direct_predictions, list):
            predictions.extend(direct_predictions)
        elif isinstance(direct_predictions, str):
            predictions.append(direct_predictions)

        return predictions


    @staticmethod
    def is_prediction_in_input(entry, historical_stays, context_stays):
        """Check if any prediction is in historical or context stays."""
        predictions = PredictionEvaluator_2.get_predictions_from_entry(entry)

        # Check if any prediction is in historical_stays or context_stays
        for prediction in predictions:
            if any(prediction in stay for stay in historical_stays) or any(prediction in stay for stay in context_stays):
                return True

        return False

    def evaluate_predictions(self):
        """Evaluate predictions in the dataset."""
        total_entries = len(self.combined_data)
        entries_with_prediction_in_input = 0
        remaining_percentage_ids = []

        # Iterate through data and check if predictions are in input
        for key, entry in self.combined_data.items():
            historical_stays, context_stays = self.extract_stays(entry['input'])
            prediction_in_input = self.is_prediction_in_input(entry, historical_stays, context_stays)

            if prediction_in_input:
                entries_with_prediction_in_input += 1
            else:
                remaining_percentage_ids.append(key)

        # Calculate percentages
        percentage_with_prediction = (entries_with_prediction_in_input / total_entries) * 100
        remaining_percentage = 100 - percentage_with_prediction

        return percentage_with_prediction, remaining_percentage, remaining_percentage_ids

    def print_predictions_for_ids(self, ids, df, column_to_check):
        """Print predictions for specified IDs."""
        total_entries = len(ids)
        correct_matches_with_true_value = 0
        correct_matches_with_df_column = 0
        unique_predictions = set()

        for entry_id in ids:
            if entry_id in self.combined_data:
                entry = self.combined_data[entry_id]
                predictions = self.get_predictions_from_entry(entry)

                if not predictions:
                    continue

                true_value = entry.get("true", "").lower()

                print(f"Entry ID: {entry_id}")
                print(f"True Value: {true_value}")
                print(f"Predictions: {predictions}")

                # handle both strings and dictionaries
                prediction_values = []
                for prediction in predictions:
                    if isinstance(prediction, dict) and 'place_id' in prediction:
                        prediction_values.append(prediction['place_id'].lower())
                    elif isinstance(prediction, str):
                        prediction_values.append(prediction.lower())

                match_with_true_value = any(true_value == prediction for prediction in prediction_values)
                print(f"Match with True Value: {match_with_true_value}")

                if match_with_true_value:
                    correct_matches_with_true_value += 1

                # Check if any prediction matches any value in the specified DataFrame column
                values_to_check = set(df[column_to_check].str.lower())
                matching_predictions_df = [prediction for prediction in prediction_values if prediction in values_to_check]
                match_with_df_column = bool(matching_predictions_df)
                print(f"Match with {column_to_check} Column: {match_with_df_column}")

                if match_with_df_column:
                    correct_matches_with_df_column += 1
                    print(f"Matching Predictions in {column_to_check} Column: {matching_predictions_df}")
                    unique_predictions.update(matching_predictions_df)

                print("------------------------")
            else:
                print(f"Entry with ID {entry_id} not found in the JSON data.")

        # Calculate and print percentage of correct matches
        percentage_correct_matches_with_true_value = (correct_matches_with_true_value / total_entries) * 100
        print(f"Percentage of Matches with True Value: {percentage_correct_matches_with_true_value:.2f}%")

        percentage_correct_matches_with_df_column = (correct_matches_with_df_column / total_entries) * 100
        print(f"Percentage of Matches with {column_to_check} Column: {percentage_correct_matches_with_df_column:.2f}%")

        print(f"Unique Predictions: {list(unique_predictions)}")


### Usage example

In [None]:

folder_path = '/gpt35turbo/1/'  # path with all the JSONs of the model to test
evaluator_2 = PredictionEvaluator_2(folder_path)

# Evaluate predictions
accuracy_top_1, accuracy_top_3, accuracy_top_5 = evaluator_2.compute_combined_top_accuracies()
print(f"Top-1 Accuracy: {accuracy_top_1 * 100:.2f}%")
print(f"Top-3 Accuracy: {accuracy_top_3 * 100:.2f}%")
print(f"Top-5 Accuracy: {accuracy_top_5 * 100:.2f}%")
percentage_with_prediction, remaining_percentage, remaining_ids = evaluator_2.evaluate_predictions()
print(f"Percentage of entries with prediction in input: {percentage_with_prediction:.2f}%")
print(f"Remaining Percentage: {remaining_percentage:.2f}%")
print(f"IDs with Remaining Percentage: {remaining_ids}")
evaluator_2.print_predictions_for_ids(remaining_ids, nyc_data, 'venue_id')