In [1]:
import pandas as pd
import pickle
import os
import time
from task1 import get_drug_post
import concurrent.futures

# data - 1.5 million
non_drug_sub = pickle.load(open('../data/non_drug_data_filtered_delauthor.pkl', 'rb'))

In [6]:
# date range per subreddit
non_drug_sub['created_utc'] = pd.to_datetime(non_drug_sub['created_utc'], unit='s')

In [7]:
for subreddit in non_drug_sub['subreddit'].unique():
    print(subreddit)
    sub = non_drug_sub[non_drug_sub['subreddit'] == subreddit]
    print(sub['created_utc'].min(), sub['created_utc'].max())

unpopularopinion
2013-08-18 16:04:58 2022-12-31 23:58:58
nursing
2009-12-02 04:47:48 2022-12-31 23:26:24
medicine
2008-05-11 19:20:01 2022-12-31 23:51:58
offmychest
2010-02-25 14:55:14 2022-12-31 23:59:54


In [2]:
import json
import openai
with open('../data/secrets.json') as f:
    secrets = json.load(f)

api_key = secrets['OPENAI_API_KEY_LB']

client = openai.Client(api_key=api_key)

---
## TASK1

In [3]:
dat = pickle.load(open('../data/destigma_pipeline/task1_is_drug2.pkl', 'rb'))

In [26]:
seed = 7
# drug_sample = drug_sub.sample(1500, random_state = seed)
# dat = non_drug_sub.sample(100000, random_state = seed)
dat2 = non_drug_sub[~non_drug_sub['id'].isin(dat['id'])].sample(25000, random_state = seed)
del non_drug_sub

In [4]:
# Get the number of CPU cores
num_cores = os.cpu_count()

print(f"Number of CPU cores: {num_cores}")

rate_limit = 500
rate_limit_period = 60  # seconds

# Set max_workers based on the number of CPU cores and the nature of the task
max_workers = min(rate_limit // (60 // rate_limit_period), num_cores * 4)
print(f"Max workers: {max_workers}")

Number of CPU cores: 10
Max workers: 40


In [18]:
def process_posts_in_parallel(posts, max_workers=20, task = None):
    openai_client = client
    results = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_index = {executor.submit(task, post, openai_client=openai_client): idx for idx, post in enumerate(posts)}
        for future in concurrent.futures.as_completed(future_to_index):
            idx = future_to_index[future]
            try:
                result = future.result()
                results.append((idx, result))
            except Exception as exc:
                print(f'Post at index {idx} generated an exception: {exc}')
                results.append((idx, "skipped"))
    
    # Sort results by the original index
    results.sort(key=lambda x: x[0])
    return [result for _, result in results]

In [29]:
# indices = [100, 272, 1667, 1787, 2385, 2914, 3368, 5000, 5262, 5563, 5912, 6939, 7033]
# blah = dat.iloc[indices]
# blah['test'] = process_posts_in_parallel(blah['text'].tolist(), task=get_drug_post)

In [None]:
time_start = time.time()
dat2['task1_gpt3_5'] = process_posts_in_parallel(dat['text'].tolist(), max_workers=max_workers, task=get_drug_post)
time_end = time.time()
print(f"Time taken: {time_end - time_start}")

In [6]:
# dat['task1_label'] = dat['text'].apply(lambda x: get_drug_post(x, openai_client=client))

In [10]:
dat['task1_gpt3_5'].value_counts()

nd                                                                                                                                                                                                                                                                                        98777
d                                                                                                                                                                                                                                                                                          1107
skipped                                                                                                                                                                                                                                                                                      20
this post is not primarily about drugs or people who use drugs.                                                                         

In [4]:
# save
# pickle.dump(dat, open('../data/destigma_pipeline/task1_is_drug3.pkl', 'wb'))
task1_is_drug3 = pickle.load(open('../data/destigma_pipeline/task1_is_drug3.pkl', 'rb'))

In [20]:
# experiment with task1 just s-d
# task1 = pickle.load(open('../data/destigma_pipeline/task1/task1_1.pkl', 'rb'))

In [12]:
just_d = dat[dat['task1_gpt3_5'].apply(lambda x: x.startswith('d'))]
pickle.dump(just_d, open('../data/destigma_pipeline/task2/task1_just_d2.pkl', 'wb'))

In [11]:
# get random sample of 20% of the data
just_d = pickle.load(open('../data/destigma_pipeline/task2/task1_just_d2.pkl', 'rb'))
seed = 7
just_d_sample = just_d.sample(frac=0.1, random_state=seed)
non_drug_sample = task1_is_drug3[task1_is_drug3['task1_gpt3_5'] == 'nd'].sample(n = just_d_sample.shape[0], random_state=seed)

# combine
task1_review = pd.concat([just_d_sample, non_drug_sample])
# drop task1_;abel
task1_review = task1_review.drop(columns = ['task1_label'])

# export
task1_review.to_csv('../data/evaluation_sets/task1_models_review.csv', index = False)

In [14]:
# 41 s-d labeled 
# get the same number of non-s-d randomly
# 123 other labels
seed = 6
n = just_d.shape[0]
other = dat[dat['task1_gpt3_5'].apply(lambda x: not x.startswith('d'))].sample(n, random_state = seed)

In [15]:
# combine
# review = pd.concat([just_d, other])
# review.to_csv('../data/evaluation_spreadsheets/task1_5.csv', index = False)

In [14]:
# agreement
task1_1 = pd.read_excel("../data/evaluation_spreadsheets/stigma_drug_categories_review_200_2_combined.xlsx")
task1_1_filtered = task1_1[task1_1['label2'].apply(lambda x: x.startswith('s-d'))]
# label starts with 's-d'
dat_task2 = dat[dat['task1_label'].apply(lambda x: x.startswith('s-d'))]

pickle.dump(dat_task2, open('../data/destigma_pipeline/task2_1.pkl', 'wb'))

---
## TASK 2

In [14]:
from task2 import get_utterance
task2 = pickle.load(open('../data/destigma_pipeline/task2/task1_just_d.pkl', 'rb'))

In [15]:
def process_posts_in_parallel(posts, max_workers=10):
    openai_client = client
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_post = {executor.submit(get_utterance, post, openai_client=openai_client): post for post in posts}
        results = []
        for future in concurrent.futures.as_completed(future_to_post):
            post = future_to_post[future]
            try:
                result = future.result()
                results.append(result)
            except Exception as exc:
                print(f'{post} generated an exception: {exc}')
                results.append("skipped")
    return results

In [16]:
results = process_posts_in_parallel(task2['text'].tolist())

Rate limit reached. Pausing for a minute...
Rate limit reached. Pausing for a minute...
Rate limit reached. Pausing for a minute...


In [17]:
task2['task2_label'] = results
just_s = task2[task2['task2_label'].apply(lambda x: x.startswith('s'))]

In [18]:
# evaluate
n = just_s.shape[0]
other = task2[task2['task2_label'].apply(lambda x: not x.startswith('s'))].sample(n, random_state = seed)

# combine
review = pd.concat([just_s, other])
review.to_csv('../data/evaluation_spreadsheets/task2_2.csv', index = False)

In [7]:
# agreement
labeled = pd.read_excel('../data/evaluation_spreadsheets/task2_1_combined.xlsx')

In [None]:
# for next time - new 100k posts that are not in the labeled set
non_drug_sub = pickle.load(open('../data/non_drug_data_filtered_delauthor.pkl', 'rb'))
# select a new 100k that are not in dat
seed = 7
new_dat = non_drug_sub[~non_drug_sub['text'].isin(dat['text'])].sample(100000, random_state = seed)


In [17]:
# cohens kappa
from sklearn.metrics import cohen_kappa_score
import numpy as np
from statsmodels.stats.inter_rater import aggregate_raters, fleiss_kappa
import krippendorff

def get_agreement(col_name1, col_name2):
    # cohens
    k = cohen_kappa_score(col_name1, col_name2)
    print("cohens kappa: ", k)
    # krippendorff
    dat = [col_name1, col_name2]
    alpha = krippendorff.alpha(dat, level_of_measurement='nominal')
    print("krippendorff alpha: ", alpha)
    # fleiss kappa
    dat_transformed = np.array([dat[0], dat[1]]).T.tolist()
    # Using aggregate_raters to prepare data for Fleiss' Kappa, which is a similar measure
    table, n_ij = aggregate_raters(dat_transformed)
    # Compute Fleiss' Kappa as an approximation
    kappa = fleiss_kappa(table, method='fleiss')
    print(f"Fleiss' Kappa: {kappa}")
    # percentage agreement
    agreement = sum(col_name1 == col_name2) / len(col_name1)
    print("percentage agreement: ", agreement)
    # pabak: Prevalence and Bias-Adjusted Kappa = percent observed agrement over the number of categories being rated
    k = len(set(col_name1))
    print("number of categories: ", k)
    PABAK = (agreement - 1) / (k - 1)
    print("PABAK: ", PABAK)
    
    return k, alpha, kappa, agreement



In [18]:
task2_1 = get_agreement(labeled['agree_LB'], labeled['agree_EA'])

cohens kappa:  -0.17647058823529393
krippendorff alpha:  -0.1875
Fleiss' Kappa: -0.2500000000000007
percentage agreement:  0.6
number of categories:  2
PABAK:  -0.4


---
##Putting it all together

In [None]:
# create loop for task 1 and task2
# chunks of 1000 rows 10 times

for i in range(10):
    print(i)
    seed = i
    drug_sample = drug_sub.sample(500, random_state = seed)
    non_drug_sample = non_drug_sub.sample(500, random_state = seed)
    dat = pd.concat([drug_sample, non_drug_sample])

    dat['task1_label'] = dat['text'].apply(lambda x: get_stigma_drug(x, openai_client=client))

    # save
    pickle.dump(dat, open('../data/destigma_pipeline/task1/task1_{}.pkl'.format(i), 'wb'))

    # label starts with 's-d'
    dat_task2 = dat[dat['task1_label'].apply(lambda x: x.startswith('s-d'))]
    dat_task2['task2_label'] = dat_task2['text'].apply(lambda x: get_stigma_drug(x, openai_client=client))

    # save
    pickle.dump(dat_task2, open('../data/destigma_pipeline/task2/task2_{}.pkl'.format(i), 'wb'))
    time.sleep(5) # sleep for 5 seconds