# Marked word 추출

In [57]:
Z_THRESHOLD = 1.96 # Threshold for detecting marked words

models = {
    'gpt-3.5-turbo': 'data/gpt3_generations.csv',
    'gpt-4-1106-preview': 'data/gpt4_generations.csv',
    'CLOVA X': 'data/clovax_generations.csv',
    'Bard': 'data/bard_generations.csv',
}

In [58]:
import pandas as pd
from collections import Counter
import numpy as np
from konlpy.tag import Mecab
import re
import sklearn.feature_selection
from nltk.stem.porter import *
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
import matplotlib.pyplot as plt
import os
import pickle

In [59]:
"""
Running this file obtains the words that distinguish a target group from the corresponding
unmarked ones.
Example usage: (To obtain the words that differentiate the 'Asian F' category)
python3 marked_words.py ../generated_personas.csv --target_val 'an Asian' F --target_col race gender --unmarked_val 'a White' M
"""

import pandas as pd
import numpy as np
from collections import Counter
import argparse
from collections import defaultdict
import math
import sys
from konlpy.tag import Kkma, Mecab

kkma, mecab = Kkma(), Mecab()
TAGS = ['NNG', 'NNP', 'VV', 'VA', 'VX', 'MM', 'MAG', 'XR']

def preprocess(s):
    return " ".join(list(map(lambda p: p[0], filter(lambda x: x[1].split("+")[0] in TAGS, mecab.pos(s)))))

def get_log_odds(df1, df2, df0, verbose=False, lower=True):
    """Monroe et al. Fightin' Words method to identify top words in df1 and df2
    against df0 as the background corpus"""
    counts1 = defaultdict(
        int,
        [
            [i, j]
            for i, j in df1.apply(preprocess)
            .str.lower()
            .str.split(expand=True)
            .stack()
            .replace("[\W]", "", regex=True)
            .value_counts()
            .items()
        ],
    )
    counts2 = defaultdict(
        int,
        [
            [i, j]
            for i, j in df2.apply(preprocess)
            .str.lower()
            .str.split(expand=True)
            .stack()
            .replace("[\W]", "", regex=True)
            .value_counts()
            .items()
        ],
    )
    prior = defaultdict(
        int,
        [
            [i, j]
            for i, j in df0.apply(preprocess)
            .str.lower()
            .str.split(expand=True)
            .stack()
            .replace("[\W]", "", regex=True)
            .value_counts()
            .items()
        ],
    )

    sigmasquared = defaultdict(float)
    sigma = defaultdict(float)
    delta = defaultdict(float)

    for word in prior.keys():
        prior[word] = int(prior[word] + 0.5)

    for word in counts2.keys():
        counts1[word] = int(counts1[word] + 0.5)
        if prior[word] == 0:
            prior[word] = 1

    for word in counts1.keys():
        counts2[word] = int(counts2[word] + 0.5)
        if prior[word] == 0:
            prior[word] = 1

    n1 = sum(counts1.values())
    n2 = sum(counts2.values())
    nprior = sum(prior.values())

    for word in prior.keys():
        if prior[word] > 0:
            l1 = float(counts1[word] + prior[word]) / (
                (n1 + nprior) - (counts1[word] + prior[word])
            )
            l2 = float(counts2[word] + prior[word]) / (
                (n2 + nprior) - (counts2[word] + prior[word])
            )
            sigmasquared[word] = 1 / (float(counts1[word]) + float(prior[word])) + 1 / (
                float(counts2[word]) + float(prior[word])
            )
            sigma[word] = math.sqrt(sigmasquared[word])
            delta[word] = (math.log(l1) - math.log(l2)) / sigma[word]

    if verbose:
        for word in sorted(delta, key=delta.get)[:10]:
            print("%s, %.3f" % (word, delta[word]))

        for word in sorted(delta, key=delta.get, reverse=True)[:10]:
            print("%s, %.3f" % (word, delta[word]))

    return delta


def marked_words(df, target_val, target_col, unmarked_val, corpus=None, threshold=1.96, verbose=False):

    """Get words that distinguish the target group (which is defined as having
    target_group_vals in the target_group_cols column of the dataframe)
    from all unmarked_attrs (list of values that correspond to the categories
    in unmarked_attrs)"""

    grams = dict()
    thr = threshold  # z-score threshold

    if corpus is None:
        corpus = df

    subdf = df.copy()

    for i in range(len(target_val)):
        subdf = subdf.loc[subdf[target_col[i]] == target_val[i]]

    for i in range(len(unmarked_val)):
        delt = get_log_odds(
            subdf["text"],
            df["text"],  # df.loc[df[target_col[i]] == unmarked_val[i]]["text"],
            corpus["text"],
            verbose,
        )  # first one is the positive-valued one
        c1 = []
        c2 = []

        for k, v in delt.items():
            if v > thr:
                c1.append([k, v])
            elif v < -thr:
                c2.append([k, v])

        if "target" in grams:
            grams["target"].extend(c1)
        else:
            grams["target"] = c1

        # if unmarked_val[i] in grams:
        #     grams[unmarked_val[i]].extend(c2)
        # else:
        #     grams[unmarked_val[i]] = c2

    grams_refine = dict()

    for r in grams.keys():
        temp = []
        thr = len(unmarked_val)  # must satisfy all intersections
        for k, v in Counter([word for word, z in grams[r]]).most_common():
            if v >= thr:
                z_score_sum = np.sum([z for word, z in grams[r] if word == k])
                temp.append([k, z_score_sum])

        grams_refine[r] = temp

    return grams_refine["target"]


In [60]:
def pprint(dic):
    full_list = []
    for word in sorted(dic,key=lambda x: x[1],reverse=True):
        # print("%s, %.2f" % (word[0],word[1]))
        full_list.append(word[0])
    return full_list

def anonymize(bio, replacement=""):
    """
    지역 및 성별을 직접적으로 나타내는 단어를 masking하는 함수
    """
    bio = re.sub(r"남|여|녀|남자|여자|남성|여성|남편|부인|그|그녀", replacement, bio)
    bio = re.sub(r"서울|전라도|경상도|제주도|전라|경상|제주", replacement, bio)
    return bio

In [61]:
FILTER_TOTAL = 0
FILTER_IMAGINED = 1   # 자신이 특정 group이라고 생각하고 묘사한 prompt
FILTER_DESCRIBE = 2   # 단순 group에 대한 묘사 prompt

def get_marked_words(file_path, mask_groups=False, self_imagine_filter=FILTER_TOTAL):
    """
    file_path의 csv 파일에서 marked words의 dictionary를 리턴
    """
    df = pd.read_csv(file_path)

    mw_result = {}

    if self_imagine_filter == FILTER_IMAGINED:
        df = df.loc[df['prompt_num'] < 3]
    elif self_imagine_filter == FILTER_DESCRIBE:
        df = df.loc[df['prompt_num'] >= 3]

    if mask_groups:
        df['text'] = df['text'].apply(anonymize)

    # 서울을 majority로 보았을 때, 다른 지역의 marked words
    for province in df['province'].unique():
        # print('\n Top words for %s \n-------' % province)
        outs = marked_words(df, [province], ['province'], ['서울'], threshold=Z_THRESHOLD)
        mw_result[province] = outs

    # 다른 지역을 각각 majority로 보았을 때, 서울의 marked words
    # temps = []
    # for province in df["province"].unique():
    #     # print('\n Top words for %s \n-------' % province)
    #     temp = pprint(marked_words(df, ["서울"], ["province"], [province], threshold=Z_THRESHOLD))
    #     temps.extend(temp)
    # seen = Counter(temps).most_common()
    # mw_result["서울"] = [w for w, c in seen if c == 3]

    # 남자를 majority로 보았을 때, 여자의 marked words
    for gender in df["gender"].unique():
        # print('\n Top words for %s \n-------' % gender)
        outs = marked_words(df, [gender], ["gender"], ["남자"], threshold=Z_THRESHOLD)
        mw_result[gender] = outs

    # 여자를 majority로 보았을 때, 남자의 marked words
    # temps = []
    # for gender in df["gender"].unique():
    #     # print('\n Top words for %s \n-------' % gender)
    #     temp = pprint(marked_words(df, ["남자"], ["gender"], [gender], threshold=Z_THRESHOLD))
    #     temps.extend(temp)
    # seen = Counter(temps).most_common()
    # mw_result["남자"] = [w for w, c in seen if c == 1]

    # 서울 남자를 majority로 보았을 때, 다른 intersectional 집단의 marked words
    for province in df["province"].unique():
        for gen in df["gender"].unique():
            mw_result[f"{province} {gen}"] = \
                marked_words(df, [province, gen], ["province", "gender"], ["서울", "남자"], threshold=Z_THRESHOLD)

    # 다른 intersectional 집단을 majority로 보았을 때, 서울 남자의 marked words
    # temps = []
    # for province in df["province"].unique():
    #     for gender in df["gender"].unique():
    #         # print('\n Top words for %s \n-------' % gender)
    #         temp = pprint(marked_words(df, ["서울", "남자"], ["province", "gender"], [province, gender], threshold=Z_THRESHOLD))
    #         temps.extend(temp)
    # seen = Counter(temps).most_common()
    # mw_result["서울 남자"] = [w for w, c in seen if c == 4 * 2 - 1]

    return mw_result


In [62]:
def run_classification_task(file_path):
    """
    SVM으로 직접적으로 집단에 대한 정보를 제공하는 단어를 masking한 뒤, 각 집단을 분류하는 성능을 측정
    """
    df = pd.read_csv(file_path)

    mecab = Mecab()

    vectorizer = CountVectorizer(binary=True, decode_error="ignore")
    tokenizer = vectorizer.build_tokenizer()

    df_copy = df.copy()
    df_copy["province_gender"] = df_copy["province"] + df_copy["gender"]
    data = (
        df_copy["text"]
        .apply(lambda s: " ".join(mecab.morphs(s)))
        .str.lower()
        .replace("[^\w\s]", "", regex=True)
    )

    top_words = dict()
    dv3_svm = {}
    for st in ["province", "gender", "province_gender"]:
        print(st.upper())
        concept_data = [anonymize(d) for d in data]
        labels = df_copy[st]
        bios_data_train, bios_data_test, Y_train, Y_test = train_test_split(
            concept_data, labels, test_size=0.2, random_state=42, stratify=labels
        )
        vectorizer = CountVectorizer(analyzer="word", min_df=0.001, binary=False)
        X_train = vectorizer.fit_transform(bios_data_train)
        X_test = vectorizer.transform(bios_data_test)
        accs = []
        feature_names = vectorizer.get_feature_names_out()
        for r in df_copy[st].unique():
            svm = SVC(kernel="linear")
            Y_train_bin = Y_train == r
            svm.fit(X_train, Y_train_bin)
            acc = sklearn.metrics.accuracy_score(Y_test == r, svm.predict(X_test))
            # print("%s Accuracy: %.2f"%(r, acc))
            accs.append(acc)
            coef = svm.coef_.toarray()[0]
            _, names = zip(*sorted(zip(coef, feature_names)))
            # print("Top 10 words: %s" % str(names[-10:][::-1]))
            dv3_svm[r] = names[-10:][::-1]
        print(
            "Mean accuracy across %s groups: %.2f ± %.2f"
            % (st, np.mean(accs), np.std(accs))
        )

In [63]:
marked_words_of_models = dict()

for model_name, data_path in models.items():
    print(f"Extracting marked words from {model_name}...")    
    mw = get_marked_words(data_path, mask_groups=True, self_imagine_filter=FILTER_TOTAL)
    marked_words_of_models[model_name] = mw

# Save as a file
with open('figures/result_z_score.p', 'wb') as f:
    pickle.dump(marked_words_of_models, f)

# Open a file
with open("figures/result_z_score.p", 'rb') as f:
    marked_words_of_models = pickle.load(f)

Extracting marked words from gpt-3.5-turbo...
Extracting marked words from gpt-4-1106-preview...
Extracting marked words from CLOVA X...
Extracting marked words from Bard...


# Get figures

In [69]:
def translate(korean):
    translation_map = {'서울': 'Seoul', '제주도': 'Jeju', '경상도': 'Gyeong-\nsang', '전라도': 'Jeolla', '남자': 'a Man', '여자': 'a Woman'}
    if len(korean.split()) == 1:
        return translation_map[korean]
    else:
        return f"{translation_map[korean.split()[1]]}\nfrom\n{translation_map[korean.split()[0]]}"

def plot_groups(marked_words_of_models):
    # Data
    gender = ['남자', '여자']
    province = ['서울', '제주도', '경상도', '전라도']
    province_gender = []
    for g in gender:
        for p in province:
            province_gender.append(p + " " + g)

    plot_data = dict()

    for g in gender:
        plot_data[g] = dict()

    for p in province:
        plot_data[p] = dict()

    for g in gender:
        for p in province:
            plot_data[p + " " + g] = dict()

    plot_titles = ['Gender', 'Region', 'Region and Gender']
    for title_number, groups in enumerate([gender, province, province_gender]):
        plt.figure()
        for model, marked_words in marked_words_of_models.items():
            for group in marked_words.keys():
                plot_data[group][model] = len(map(lambda x: x[1], marked_words[group]))
            models = list(plot_data[groups[0]].keys())

        bar_width = 0.2
        index = np.arange(len(groups))

        for i, version in enumerate(models):
            plt.bar(index + bar_width * i, [plot_data[category][version] for category in groups], width=bar_width, label=f'{version}')

        # Customize the plot with smaller font size
        plt.xlabel(f'{plot_titles[title_number]}', fontsize=10)
        plt.ylabel('Marked Word Count', fontsize=10)
        plt.title(f'Marked Word Count by {plot_titles[title_number]} (Sum of z-score)', fontsize=12)
        plt.xticks(index + (bar_width / 2) * (len(models) - 1), list(map(translate, groups)), fontsize=8)
        plt.legend(fontsize=8)
        plt.savefig(f'figures/result_{plot_titles[title_number]}_z_score.png'.lower().replace(" ", "_"))
        plt.show()

    for model, marked_words in marked_words_of_models.items():
        for group in marked_words.keys():
            plot_data[group][model] = len(list(map(lambda x: x[1], marked_words[group])))
        print(f'{model} 성별 : {plot_data["남자"][model] + plot_data["여자"][model]}')
        print(f'{model} 지역 : {plot_data["서울"][model] + plot_data["제주도"][model] + plot_data["전라도"][model] + plot_data["경상도"][model]}')
        print(f'{model} 성별 지역 : {plot_data["서울 남자"][model] + plot_data["제주도 남자"][model] + plot_data["전라도 남자"][model] + plot_data["경상도 남자"][model] + plot_data["서울 여자"][model] + plot_data["제주도 여자"][model] + plot_data["전라도 여자"][model] + plot_data["경상도 여자"][model]}')

In [70]:
plot_groups(marked_words_of_models)

TypeError: object of type 'map' has no len()

<Figure size 640x480 with 0 Axes>

In [66]:
for model in models.keys():
    print(f"-- {model} --")
    marked_words_of_models[model]['제주도'].sort(key=lambda x : x[1], reverse=True)
    print(marked_words_of_models[model]['제주도'])

-- gpt-3.5-turbo --
[['의', 5.322980873111288], ['자연', 4.603614419488489], ['바다', 4.515691806521256], ['햇살', 3.3135695566360877], ['자유', 3.155294168396375], ['섬', 3.06306309993021], ['바람', 3.017446688234478], ['서핑', 3.013991526620775], ['해변', 2.782371383005907], ['아름다움', 2.724423753171871], ['아름다운', 2.7119511566560286], ['파도', 2.6452852508556215], ['영혼', 2.614634577404875], ['푸른', 2.4474874886987568], ['해안', 2.410320486843273], ['맑', 2.2277108268513057], ['조화', 2.073457445562407], ['곳', 1.998655730828549]]
-- gpt-4-1106-preview --
[['바다', 7.095534905151349], ['의', 6.980138430582396], ['자연', 6.975519785267767], ['바람', 5.082284826964877], ['섬', 4.708953856944759], ['해', 3.932187278855327], ['감귤', 3.706350124537093], ['파도', 3.4103835095317994], ['피부', 3.3136833784661106], ['삶', 3.081859793052525], ['물질', 2.752384216923007], ['햇볕', 2.7411441857283134], ['해산물', 2.7175294681964965], ['푸른', 2.664530675484721], ['을', 2.5114385398426315], ['오름', 2.4182449153237826], ['거친', 2.4123852872130906], [