# **Cultural Bias Classification - Evaluation - Yelizaveta Tskhe**

## IMPORTS

In [None]:
!pip install SPARQLWrapper
!pip install datasets
!pip install evaluate
!pip install transformers[torch]
!pip install safetensors



In [None]:
import joblib
import re
import time
import random
import json
import os
from functools import lru_cache
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_recall_fscore_support
import matplotlib.pyplot as plt
import seaborn as sns
from SPARQLWrapper import SPARQLWrapper, JSON
from sklearn.model_selection import cross_val_score
import torch
from datasets import load_dataset
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments,
    DataCollatorWithPadding,
    set_seed
)
import evaluate
from sklearn.utils.class_weight import compute_class_weight
from collections import Counter
from transformers import AutoModelForSequenceClassification, AutoTokenizer

## NON-LM-BASED APPROACH

In [None]:
CACHE_FILE = "wikidata_cache.json"

if os.path.exists(CACHE_FILE):
    with open(CACHE_FILE, 'r') as f:
        wikidata_cache = json.load(f)
else:
    wikidata_cache = {}

path_testset = 'test_unlabeled.csv'
test_df = pd.read_csv(path_testset)

def get_wikidata_category_info(wikidata_id, max_retries=3):
    if wikidata_id in wikidata_cache:
        return wikidata_cache[wikidata_id]["values"], wikidata_cache[wikidata_id]["property_values"]

    wikidata_id_clean = wikidata_id.split("/")[-1]  # clean ID

    try:
        sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
        sparql.addCustomHttpHeader("User-Agent", "CulturalClassifier/1.0 (research-project)")

        query = f"""
        SELECT ?propertyLabel ?valueLabel WHERE {{
          VALUES ?prop {{
            wdt:P31  # instance of
            wdt:P279  # subclass of
            wdt:P495  # country of origin
            wdt:P27  # country of citizenship
            wdt:P361  # part of
            wdt:P1535  # used by
            wdt:P131  # located in administrative territory
            wdt:P17  # country
            wdt:P276  # location
            wdt:P3342  # significant place
          }}
          wd:{wikidata_id_clean} ?prop ?value .
          SERVICE wikibase:label {{ bd:serviceParam wikibase:language 'en'. }}
        }}
        LIMIT 20
        """

        for attempt in range(max_retries):
            try:
                sparql.setQuery(query)
                sparql.setReturnFormat(JSON)
                results = sparql.query().convert()

                values = [result["valueLabel"]["value"].lower() for result in results["results"]["bindings"]]
                property_values = []
                for result in results["results"]["bindings"]:
                    property_values.append({
                        "property": result["propertyLabel"]["value"].lower() if "propertyLabel" in result else "",
                        "value": result["valueLabel"]["value"].lower() if "valueLabel" in result else ""
                    })

                # cache the results
                wikidata_cache[wikidata_id] = {
                    "values": values,
                    "property_values": property_values
                }

                # save to cache file occasionally
                if random.random() < 0.01:
                    with open(CACHE_FILE, 'w') as f:
                        json.dump(wikidata_cache, f)

                return values, property_values

            except Exception as e:
                if "429" in str(e):
                    wait_time = 2 ** attempt
                    print(f"Rate limit hit for {wikidata_id_clean}, waiting {wait_time}s...")
                    time.sleep(wait_time)
                else:
                    print(f"SPARQL error for {wikidata_id_clean}: {e}")
                    break

        print(f"All SPARQL attempts failed for {wikidata_id_clean}")
        return [], []

    except Exception as e:
        print(f"SPARQL setup error for {wikidata_id_clean}: {e}")
        return [], []

def extract_features_with_wikidata(sample):
    is_entity = 1 if sample['type'] == 'entity' else 0
    is_concept = 1 if sample['type'] == 'concept' else 0
    desc_length = len(sample['description'])

    categories = ['politics', 'food', 'fashion', 'sports', 'music', 'films', 'literature']
    category_features = [1 if sample['category'] == cat else 0 for cat in categories]

    description = sample['description'].lower()
    common_countries = ["italy", "france", "japan", "china", "india", "germany", "spain", "brazil", "mexico",
                       "american", "european", "african", "asian", "australia", "canada", "russia"]
    has_country = int(any(country in description for country in common_countries))

    global_keywords = ["global", "universal", "worldwide", "international",
                       "common", "general", "standard", "widely used", "across cultures",
                       "ubiquitous", "everywhere", "throughout the world", "many countries"]
    has_global_term = int(any(term in description for term in global_keywords))

    country_count = sum(country in description for country in common_countries)

    try:
        wikidata_values, property_values = get_wikidata_category_info(sample['item'])
    except Exception as e:
        print(f"Error in Wikidata query: {e}")
        wikidata_values, property_values = [], []

    # unique countries in Wikidata
    country_entities = set()
    for value in wikidata_values:
        for country in common_countries:
            if country in value.lower():
                country_entities.add(country)
    wikidata_country_count = len(country_entities)

    cultural_keywords = ["culture", "heritage", "tradition", "ethnic", "korean", "japanese", "french",
                        "italian", "german", "american", "african", "traditional", "indigenous"]
    has_cultural_label = int(any(kw in value for value in wikidata_values for kw in cultural_keywords))

    # for agnostic items
    absence_of_cultural_markers = 1 if not has_cultural_label and not has_country else 0

    # "used by" property with multiple values
    used_by_count = sum(1 for pv in property_values if "used by" in pv["property"])

    # global categories that tend to be culturally agnostic
    global_categories = ["tool", "equipment", "device", "instrument", "technology", "scientific", "mathematical"]
    is_global_category = int(any(gc in value for value in wikidata_values for gc in global_categories))

    # items with global usage but specific origin
    has_origin = int(any("origin" in pv["property"] for pv in property_values))
    global_but_specific_origin = 1 if has_global_term and has_origin else 0

    cr_markers = ["widespread", "popular", "common", "adopted", "adapted",
                 "international", "regional", "variant", "inspired", "influence"]
    cr_marker_count = sum(term in description for term in cr_markers)

    ce_markers = ["unique", "exclusive", "specific", "only", "indigenous",
                 "traditional", "native", "authentic", "original", "exclusive to"]
    ce_marker_count = sum(term in description for term in ce_markers)

    adoption_exclusivity_ratio = cr_marker_count / max(ce_marker_count, 1)

    # ce (more specific locations/cultures)
    named_entities = len(re.findall(r'[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*', sample['description']))

    return [is_entity, is_concept, desc_length, has_country, has_cultural_label,
            has_global_term, country_count, wikidata_country_count, absence_of_cultural_markers,
            used_by_count, is_global_category, global_but_specific_origin,
            cr_marker_count, ce_marker_count, adoption_exclusivity_ratio, named_entities] + category_features


def predict_two_stage(X, model_stage1, model_stage2):
    stage1_preds = model_stage1.predict(X)

    final_preds = ['cultural agnostic' if pred == 'CA' else None for pred in stage1_preds]

    non_ca_indices = [i for i, pred in enumerate(stage1_preds) if pred == 'non-CA']

    if len(non_ca_indices) > 0:
        X_non_ca = X[non_ca_indices]
        stage2_preds = model_stage2.predict(X_non_ca)

        for idx, pred in zip(non_ca_indices, stage2_preds):
            final_preds[idx] = pred

    return final_preds

def post_process_cr_ce_predictions(predictions, X_val, feature_names):
    adjusted_predictions = predictions.copy()

    for i in range(len(predictions)):
        # convert features array to dictionary for easier access
        features = dict(zip(feature_names, X_val[i]))

        # rule 1
        if features.get('has_global_term', 0) == 1 and features.get('has_cultural_label', 0) == 0:
            adjusted_predictions[i] = 'cultural agnostic'

        # rule 2: many countries mentioned -> CA
        if features.get('country_count', 0) >= 3 or features.get('wikidata_country_count', 0) >= 3:
            adjusted_predictions[i] = 'cultural agnostic'

        # rule 3: global category items -> CA
        if features.get('is_global_category', 0) == 1 and features.get('has_cultural_label', 0) == 0:
            adjusted_predictions[i] = 'cultural agnostic'

        if adjusted_predictions[i] != 'cultural agnostic':
            # rule 1: high adoption ratio -> ce
            if features.get('adoption_exclusivity_ratio', 0) > 2.0:
                adjusted_predictions[i] = 'cultural representative'

            # rule 2: high exclusivity marker -> ce
            if features.get('ce_marker_count', 0) > 3 and features.get('cr_marker_count', 0) < 2:
                adjusted_predictions[i] = 'cultural exclusive'

            # rule 3: high named entity -> ce
            if features.get('named_entities', 0) > 3:
                adjusted_predictions[i] = 'cultural exclusive'

    return adjusted_predictions

def extract_features_for_test_set(test_df):
    X_test = []
    for _, row in test_df.iterrows():
        sample = row.to_dict()
        features = extract_features_with_wikidata(sample)
        X_test.append(features)
    return np.array(X_test)

X_test = extract_features_for_test_set(test_df)

model_stage1 = joblib.load('model_stage1_nonlm.pkl')
model_stage2 = joblib.load('model_stage2_nonlm.pkl')

y_pred_test = predict_two_stage(X_test, model_stage1, model_stage2)
feature_names = ['is_entity', 'is_concept', 'desc_length', 'has_country', 'has_cultural_label',
                     'has_global_term', 'country_count', 'wikidata_country_count', 'absence_of_cultural_markers',
                     'used_by_count', 'is_global_category', 'global_but_specific_origin',
                     'cr_marker_count', 'ce_marker_count', 'adoption_exclusivity_ratio', 'named_entities',
                     'category_politics', 'category_food', 'category_fashion',
                     'category_sports', 'category_music', 'category_films',
                     'category_literature']

y_pred_test = post_process_cr_ce_predictions(y_pred_test, X_test, feature_names)

output_data = []
for i, sample in test_df.iterrows():
    output_data.append({
        'item': sample['item'],
        'name': sample['name'],
        'description': sample['description'],
        'type': sample['type'],
        'category': sample['category'],
        'label': y_pred_test[i]
    })

output_df = pd.DataFrame(output_data)
output_df.to_csv('test_output_nonlm.csv', index=False)

print("saved as test_output_nonlm.csv.")


saved as no_bias_test_output_nonlm.csv.


## LM-BASED APPROCH

In [None]:
test_df = pd.read_csv(path_testset)

device = "cuda" if torch.cuda.is_available() else "cpu"

label_names = ["cultural agnostic", "cultural representative", "cultural exclusive"]
label_to_id = {label: i for i, label in enumerate(label_names)}
id_to_label = {i: label for label, i in label_to_id.items()}

model = AutoModelForSequenceClassification.from_pretrained("model_distilbert").to(device)
tokenizer = AutoTokenizer.from_pretrained("model_distilbert")

def preprocess_test(row):
    return f"{row['name']} {row['description']} {row['category']} {row['subcategory']}"

test_df["text"] = test_df.apply(preprocess_test, axis=1)

test_encodings = tokenizer(
    test_df["text"].tolist(),
    padding=True,
    truncation=True,
    max_length=128,
    return_tensors="pt"
).to(device)

model.eval()
with torch.no_grad():
    outputs = model(**test_encodings)
    logits = outputs.logits
    preds = torch.argmax(logits, dim=-1).cpu().numpy()


test_df["label"] = [id_to_label[i] for i in preds]

test_df.drop(columns=["text"], inplace=True)
test_df.to_csv("test_output_distilbert.csv", index=False)
print("saved to test_output_distilbert.csv")

saved to no_bias_test_output_distilbert.csv
