In [5]:
%%time
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd

def load_and_merge_csv(file_pattern, num_files):
    file_names = [file_pattern.format(i) for i in range(1, num_files + 1)]
    dataframes = [pd.read_csv(filename) for filename in file_names]
    merged_df = pd.concat(dataframes, ignore_index=True)
    return merged_df

df = load_and_merge_csv('data_upload/cluster_labels{}.csv', 4)
df = df.loc[range(200)]

CPU times: user 317 ms, sys: 41.5 ms, total: 358 ms
Wall time: 376 ms


In [11]:
%%time
def fetch_tags(article_pair):
    article_text, article_id = article
    final_prompt = prompt.format(text=article_text)
    response = llm.generate_content(final_prompt, safety_settings={
                                    HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
                                    HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
                                    HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, 
                                    HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE
                                    })
    time.sleep(1)
    try:
        return article_id, response.text.strip().split(", ")
    except ValueError:
        error_msg = 'unable to generate'
        return article_id, error_msg

def process_articles(df):
    results = {}
    max_workers = 20
    batch_size = 150
    cooldown_period = 10 # seconds

    # Extract articles and IDs from DataFrame
    articles = df['Text'].tolist()
    ids = df['id'].tolist()
    article_id_pairs = list(zip(articles, ids))

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for i in range(0, len(article_id_pairs), batch_size):
            current_batch = article_id_pairs[i:i+batch_size]
            print(f"Starting batch processing for articles {i+1} to {min(i+batch_size, len(article_id_pairs))}")
            futures = {executor.submit(fetch_tags, pair): pair for pair in current_batch}

            for future in as_completed(futures):
                article_id, tags = future.result()
                results[article_id] = tags

            if i + batch_size < len(article_id_pairs):
                print(f"All tasks in batch {i//batch_size + 1} completed, cooling down for {cooldown_period} seconds...")
                time.sleep(cooldown_period)

    return results


results = process_articles(df)
print("Final results:", results)

Starting batch processing for articles 1 to 150
Processing article ID nos7tzp7jprxlqxe 
Processing article ID zvv4ue0w64vfqoz1 
Processing article ID aph1tgua3xxoq2sg 
Processing article ID rlh53czyst054zfn 
Processing article ID aksixz7uun2gkpss 
Processing article ID slue2wdvlok4sfy6 
Processing article ID jmcyx62frlc3i24s 
Processing article ID szltbvfarltlhw2v 
Processing article ID zlimezzuv9k0v2mo 
Processing article ID rv9vnijfvrgud4qz 
Processing article ID sfaslffy0we4xsmr 
Processing article ID kr5ei12rm07mtfom 
Processing article ID c2wx2y4pnxokw180 
Processing article ID lgbcr5i7od6vnw0c 
Processing article ID jrherp4hwloc86h0 
Processing article ID hrbdw3gbtl99qdhb 
Processing article ID icelinh0fg9pc8hy 
Processing article ID i1arw3xj6oll4c7x 
Processing article ID 6tcbi1ivq6gdiip6 
Processing article ID 5ecjlm5vos1bnq76 
Processing article ID myd8fpq4hex3piyr 
Processing article ID ww6f35i5zex2vxj8 
Processing article ID gzxinu7g2hxhtgvj 
Processing article ID ur265cf2hb

In [None]:
# Wall time for 10 workers was 30seconds, using first 200 values, cooldown 10 seconds
# Wall time for 20 workers was 20seconds, using first 200 values, cooldown 10 seconds
# Wall time for 30 workers was 18seconds, using first 200 values, cooldown 10 seconds
# All above batch size 100.

# Wall time for 20 workers was 21 seconds, using first 200 values, cooldown 10 seconds, 
# Above batch size 150.