In [13]:
from openai import OpenAI
import os
import csv
import json
from dotenv import load_dotenv
from pathlib import Path

extracted_answers = []

dotenv_path = Path('../global_environment.env')
load_dotenv(dotenv_path=dotenv_path)

openai_key = os.getenv('OPENAI_KEY')
if(openai_key):
    print('Key loaded successfully')
else:
    print('API Key not found')

Key loaded successfully


In [14]:
def extract_keywords(text):
    try:
        response = openai.Completion.create(
            engine="text-davinci-003",
            prompt=f"Extract key adjectives, emotions, or metaphors from the following text:\n\n{text}\n\nKeywords:",
            max_tokens=50,
            temperature=0.7
        )
        return response.choices[0].text.strip().split(", ")
    except Exception as e:
        print(f"Error extracting keywords: {e}")
        return []


NotFoundError: Error code: 404 - {'error': {'message': 'The model `gpt-3.5` does not exist or you do not have access to it.', 'type': 'invalid_request_error', 'param': None, 'code': 'model_not_found'}}

In [12]:
def process_json_file(file_path):
    with open(file_path, 'r') as f:
        data = json.load(f)

    free_text_data = data["FREE_TEXT"]["value"]
    extracted_keywords = []

    for entry in free_text_data:
        signal_index = entry["signal_index"]
        category = entry["category"]
        free_text = entry["data"]

        if free_text.strip().lower() != "n.a":
            keywords = extract_keywords(free_text)
            extracted_keywords.append({
                "signal_index": signal_index,
                "category": category,
                "free_text": free_text,
                "keywords": keywords
            })

    return extracted_keywords
    


In [None]:
def extract_signal_descriptions_from_json(file_path):
    """
    Extract signal descriptions from a single JSON file.
    """
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    free_text_data = data.get("FREE_TEXT", {}).get("value", [])
    signal_descriptions = {}

    for entry in free_text_data:
        signal_index = entry["signal_index"]
        category = entry["category"]
        description = entry["data"]

        if signal_index not in signal_descriptions:
            signal_descriptions[signal_index] = {}

        signal_descriptions[signal_index][category] = description

    return signal_descriptions

# Save extracted keywords to a JSON file
def save_keywords_to_file(keywords, output_file):
    with open(output_file, 'w') as f:
        json.dump(keywords, f, indent=4)

def process_json_files_in_directory(root_directory):
    """
    Traverse the directory structure, process all JSON files, and extract signal descriptions.
    """
    all_signal_descriptions = {}

    for subdir, _, files in os.walk(root_directory):
        for file_name in files:
            if file_name.endswith(".json"):
                file_path = os.path.join(subdir, file_name)
                print(f"Processing file: {file_path}")
                signal_descriptions = extract_signal_descriptions_from_json(file_path)

                for signal_index, descriptions in signal_descriptions.items():
                    if signal_index not in all_signal_descriptions:
                        all_signal_descriptions[signal_index] = []
                    all_signal_descriptions[signal_index].append({
                        "file_name": file_path,
                        **descriptions
                    })

    return all_signal_descriptions

def save_to_csv(data, output_file):
    """
    Save the extracted data to a CSV file.
    """
    with open(output_file, mode="w", newline="", encoding="utf-8") as csvfile:
        fieldnames = ["signal_index", "file_name", "free_text_sensory", "free_text_emotional", "free_text_association"]
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()

        for signal_index, entries in data.items():
            for entry in entries:
                writer.writerow({
                    "signal_index": signal_index,
                    "file_name": os.path.basename(entry.get("file_name")),
                    "free_text_sensory": entry.get("free_text_sensory", "N.A"),
                    "free_text_emotional": entry.get("free_text_emotional", "N.A"),
                    "free_text_association": entry.get("free_text_association", "N.A")
                })

def save_to_json(data, output_file):
    """
    Save the extracted data to a JSON file.
    """
    with open(output_file, 'w', encoding='utf-8') as jsonfile:
        json.dump(data, jsonfile, indent=4, ensure_ascii=False)

In [None]:
# Main function
def main():
    root_directory = "./Data"  # Replace with your top-level directory containing subfolders with JSON files
    output_csv = "signal_descriptions.csv"  # CSV output file
    output_json = "signal_descriptions.json"  # JSON output file

    if not os.path.exists(root_directory):
        print(f"Directory {root_directory} does not exist.")
        return

    print("Processing all JSON files in the directory structure...")
    signal_data = process_json_files_in_directory(root_directory)

    print("Saving data to CSV and JSON files...")
    save_to_csv(signal_data, output_csv)
    save_to_json(signal_data, output_json)

    print(f"Data has been saved to {output_csv} and {output_json}")

if __name__ == "__main__":
    main()