In [1]:
import os
import csv
import json
import time
from dotenv import load_dotenv
from langchain_community.tools.reddit_search.tool import RedditSearchRun
from langchain_community.utilities.reddit_search import RedditSearchAPIWrapper
from langchain_community.tools.reddit_search.tool import RedditSearchSchema
from langchain_community.llms import Ollama
from tqdm import tqdm
import sys
sys.path.insert(0, '..')
from utility import *


In [2]:
def load_questions(file_path):
    """
    Load questions from a CSV file.

    Args:
        file_path (str): Path to the CSV file containing questions.

    Returns:
        list: A list of questions.
    """
    questions = []
    with open(file_path, newline='') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            questions.append(row['question'])
    return questions


def search_reddit(query, sort_by="relevance", time_filter="all", subreddit="EhlersDanlos", limit=3):
    """
    Search Reddit for a given query.

    Args:
        query (str): The search query to use.
        sort_by (str, optional): The sorting method for the search results.
            Accepted values are "relevance", "hot", "top", "new", and "comments".
        time_filter (str, optional): The time filter for the search results.
            Accepted values are "all", "year", "month", "week", "day", and "hour".
        subreddit (str, optional): The subreddit to search in.
        limit (int, optional): The maximum number of search results to return.

    Returns:
        list: A list of dictionaries containing the search results from Reddit.
    """
    search_params = RedditSearchSchema(
        query=query,
        sort=sort_by,
        time_filter=time_filter,
        subreddit=subreddit,
        limit=limit
    )
    result = search.run(tool_input=search_params.dict())

    # Convert the result string into a list of dictionaries
    posts = []
    if isinstance(result, str):
        result_lines = result.split("\n\nPost Title: '")

        for i, post_str in enumerate(result_lines):
            if i == 0:
                # This is the first chunk, handle separately to remove leading text
                post_str = post_str.split("Post Title: '", 1)[-1]
            
            lines = post_str.strip().split("\n")
            post = {}

            # Reattach 'Post Title:' for the first line
            if lines:
                post["Post Title"] = lines[0].strip("'")

            # Combine lines for the text body
            text_body = []
            for line in lines[1:]:
                line = line.strip()
                if ": " in line:
                    key, value = line.split(": ", 1)
                    if key.strip() == "Text body":
                        text_body.append(value.strip())
                    else:
                        post[key.strip()] = value.strip()
                else:
                    text_body.append(line.strip())
            
            # Join the text body
            post["Text body"] = " ".join(text_body).strip()
            
            posts.append(post)
    
    return posts

def process_post(question, post):
    """
    Confirm relevance of a post and generate structured output using LLM.
    Args:
        question (str): The question to confirm relevance against.
        post (dict): The Reddit post to process.
    Returns:
        str or None: A summarized response if relevant, otherwise None.
    """
    instruction = f"Verify if the following post addressed the question: '{question}'? If yes, say 'Yes, it is related to the question' and provide a summarized answer. If not, say 'No, it is not related to the question'."
    input_text = post.get("Text body", "")
    prompt = f"Instruction: {instruction}\n\nInput: {input_text}\n\nOutput:"
    
    relevance_system_prompt = "You are an excellent question validator. Verify if the post addresses the question asked"
    try:
        response = get_GPT_response(prompt, relevance_system_prompt, os.environ.get('MODEL_NAME'), temperature=0.3)
    except:
        response = 'No, it is not related to the question'
    
    if "yes, it is related to the question" in response.lower():
        summarized_response = response.split("Summarized answer:")[-1].strip() if "Summarized answer:" in response else response.split("Output:")[-1].strip()
        return summarized_response
    return None

def combine_responses(question, responses):
    """
    Combine multiple responses into a single summarized answer using LLM.
    Args:
        question (str): The question being answered.
        responses (list): A list of individual summarized responses.
    Returns:
        str: A single summarized answer.
    """
    combined_input = " ".join(responses)
    summary_system_prompt = '''
    You are an expert who summarizes the textual passages elegantly. 
    In your summary, focus on the relevant information and avoid using phrases like 'the author of the post reported'. Instead, provide a summary of the key points.
    '''
    prompt = f"Instruction: Based on discussions on online platforms like Reddit, summarize the following responses to the question: '{question}'. \n\nInput: {combined_input}\n\nOutput:"
    try:
        combined_response = get_GPT_response(prompt, summary_system_prompt, os.environ.get('MODEL_NAME'), temperature=0.3)
    except:
        combined_response = combined_input

    return combined_response.strip()

def search_and_process_questions(questions, delay=1):
    """
    Search Reddit for each question and process the results.

    Args:
        questions (list): A list of questions to search and process.
        delay (int): Delay in seconds between API requests to avoid rate limiting.

    Returns:
        list: A list of dictionaries containing processed results.
    """
    final_results = []

    for question in tqdm(questions):
        reddit_results = search_reddit(question)
        relevant_responses = []
        for post in reddit_results:
            processed_post = process_post(question, post)
            if processed_post:
                relevant_responses.append(processed_post)
        
        if relevant_responses:
            combined_response = combine_responses(question, relevant_responses)
            final_results.append({
                "input":"",
                "instruction": question,
                "output": combined_response
            })
        
        # Sleep to avoid hitting the API rate limit
        time.sleep(delay)

    return final_results


def save_results_to_json(data, file_path):
    """
    Save the processed results to a JSON file.

    Args:
        data (list): The data to save to JSON.
        file_path (str): Path to the output JSON file.
    """
    with open(file_path, 'w') as jsonfile:
        json.dump(data, jsonfile, indent=4)


In [3]:
# Load questions from the CSV file
questions = load_questions('../../eds_reddit/eds_questions_llm_generated.csv')

In [4]:
#Try only 10
# questions = questions[:50]
# questions

In [5]:
#Load environment variables (save a .eds.env file with your reddit credentials in the repo root folder)
dotenv_path = os.path.join(REPO_ROOT_PATH, '.eds.env')
load_dotenv(dotenv_path)

client_id = os.environ.get('client_id')
client_secret = os.environ.get('client_secret')
user_agent = os.environ.get('user_agent')


In [6]:
# Initialize the Reddit Search API wrapper
search = RedditSearchRun(
    api_wrapper=RedditSearchAPIWrapper(
        reddit_client_id=client_id,
        reddit_client_secret=client_secret,
        reddit_user_agent=user_agent,
    )
)

In [7]:
final_results = search_and_process_questions(questions)


100%|█████████████████████████████████████████| 100/100 [05:17<00:00,  3.17s/it]


In [8]:
# Save final output to JSON file:
# save_results_to_json(final_results, '../../eds_reddit/final_output.json')

In [22]:
final_results[22]

{'input': '',
 'instruction': 'Can EDS lead to an increased risk of developing anxiety or panic disorders?',
 'output': "The post discusses an individual's anxiety and fear related to their EDS and the potential increased risk of infection or difficulty healing. They also express concerns about using laughing gas due to their dizziness and dysautonomia."}