In [1]:
import time
import random
import pandas as pd
import re
import asyncio
import nest_asyncio

import aiohttp
import os
import pickle
import logging
import json

import requests
from urllib.parse import quote
from chemspipy import ChemSpider
from requests.exceptions import RequestException

from tqdm import tqdm
from tqdm.asyncio import tqdm

import threading

### ChemSpider

In [2]:
API_KEYS = [] # Change to your own API keys (list of API keys)

# Global variables to track the current API key and request count
current_key_index = 0
request_count = 0
REQUEST_LIMIT = 980  # Maximum requests per API key

def get_current_api_key():
    global current_key_index, request_count
    
    if request_count >= REQUEST_LIMIT:
        # Switch to the next API key
        current_key_index += 1
        request_count = 0  # Reset the request count

        if current_key_index >= len(API_KEYS):
            current_key_index = 0  # Loop back to the first API key if we run out of keys
    
    return API_KEYS[current_key_index]

def get_smiles_from_ChemSpider(compound_name):
    global request_count
    
    api_key = get_current_api_key()
    cs = ChemSpider(api_key)
    
    try:
        results = cs.search(compound_name)
        request_count += 1  # Increment the request count after a successful request

        if results:
            return results[0].smiles, 'ChemSpider'
        else:
            logging.error(f"[ChemSpider] No results found for {compound_name}")
    except RequestException as re:
        logging.error(f"[ChemSpider] Network-related error for {compound_name}: {re}")
    except KeyError as ke:
        logging.error(f"[ChemSpider] KeyError when processing results for {compound_name}: {ke}")
    except Exception as e:
        logging.error(f"[ChemSpider] Unexpected error occurred for {compound_name}: {e}")
    
    return None, None

### Other APIs

In [3]:
async def exponential_backoff(attempt, max_delay=60):
    delay = min(max_delay, (2 ** attempt) + random.uniform(0, 1))
    await asyncio.sleep(delay)

async def fetch_smiles(session, url, semaphore, max_retries):
    async with semaphore:  # Ensure the semaphore is respected
        for attempt in range(max_retries):
            try:
                async with session.get(url) as response:
                    if response.status == 200:
                        return await response.text()
                    elif response.status == 429 or "ServerBusy" in await response.text():
                        logging.warning(f"[Busy] Server busy, retrying... (Attempt {attempt + 1}/{max_retries}) for URL: {url}")
                        await exponential_backoff(attempt)
                    else:
                        logging.error(f"[Error] Failed with status code: {response.status} for URL: {url}")
                        response.raise_for_status()
            except asyncio.TimeoutError:
                logging.error(f"[Timeout] Failed to fetch SMILES from {url} (Timeout)")
            except aiohttp.ClientError as e:
                logging.error(f"[ClientError] Failed to fetch SMILES from {url} [Error] {e}")
                if attempt < max_retries - 1:
                    await exponential_backoff(attempt)
            except Exception as e:
                logging.error(f"[Error] Failed to fetch SMILES from {url} [Error] {e}")
                if attempt < max_retries - 1:
                    await exponential_backoff(attempt)
    return None

async def get_smiles_from_pubchem(session, compound_name, semaphore, max_retries):
    pubchem_url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/name/{quote(compound_name)}/property/IsomericSMILES/JSON"
    try:
        response_text = await fetch_smiles(session, pubchem_url, semaphore, max_retries)
        if response_text:
            data = json.loads(response_text)
            if 'PropertyTable' in data and 'Properties' in data['PropertyTable'] and data['PropertyTable']['Properties']:
                return data['PropertyTable']['Properties'][0]['IsomericSMILES'], 'PubChem'
    except Exception as e:
        logging.error(f"[PubChem] Failed to fetch SMILES for {compound_name} from URL: {pubchem_url} [Error] {e}")
    return None, None

async def get_smiles_from_cir(session, compound_name, semaphore, max_retries):
    cir_url = f"http://cactus.nci.nih.gov/chemical/structure/{quote(compound_name)}/smiles"
    try:
        return await fetch_smiles(session, cir_url, semaphore, max_retries), 'CIR'
    except Exception as e:
        logging.error(f"[CIR] Failed to fetch SMILES for {compound_name} from URL: {cir_url} [Error] {e}")
    return None, None

async def get_smiles_from_opsin(session, compound_name, semaphore, max_retries):
    opsin_url = f"https://opsin.ch.cam.ac.uk/opsin/{quote(compound_name)}.smi"
    try:
        return await fetch_smiles(session, opsin_url, semaphore, max_retries), 'OPSIN'
    except Exception as e:
        logging.error(f"[OPSIN] Failed to fetch SMILES for {compound_name} from URL: {opsin_url} [Error] {e}")
    return None, None

In [4]:
# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Caching system for already fetched SMILES
smiles_cache = {}

# Load existing cache
if os.path.exists('smiles_cache.pkl'):
    with open('smiles_cache.pkl', 'rb') as f:
        smiles_cache = pickle.load(f)

# Ensure directory exists
def ensure_directory(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)

# JSON validation and fixing functions
def is_valid_json(json_string):
    try:
        json.loads(json_string)
        return True
    except ValueError as e:
        logging.error(f"Invalid JSON string: {json_string[:100]}... [Error] {e}")
        return False

def fix_json_string(json_string):
    try:
        json_string = json_string.strip()
        # Ensure JSON string starts with "{" and ends with "}"
        if not json_string.startswith('{'):
            json_string = '{' + json_string
        if not json_string.endswith('}'):
            json_string = json_string + '}'

        # Count the number of opening and closing braces
        open_braces = json_string.count('{')
        close_braces = json_string.count('}')

        # If there are more opening braces, add closing braces at the end
        if open_braces > close_braces:
            json_string += '}' * (open_braces - close_braces)
            
        # If there are more closing braces, add opening braces at the beginning
        elif close_braces > open_braces:
            json_string = '{' * (close_braces - open_braces) + json_string
        return json_string

    except Exception as e:
        logging.error(f"Failed to fix JSON string: {json_string} [Error] {e}")
        return None

def fix_name(compound_name):
    remove_patterns = [
        r"\d+\s+normal",
        r"\d+(\.\d+)?\s*N-",
        r"\d+(\.\d+)?\s*N",
        r"\d+(\.\d+)?\s*M",
        r"\d+%",
        r"\s*\(\s*\)",
        r"\([^()]*\)$",
        r"·",
        r'\([IVXLCDM]+\)',
        "anhydrous", "concentrated", "catalyst", "-catalyst", "saturated",
        "ice", "ice-", "dried", "aqueous", "solution", "normal", "solid", "complex", 
        "resin", "adduct", "corresponding", "atmosphere", "gas", "solvent", "crystal", 
        "crystals", "buffer",".conc","fuming","glacial"
    ]
    pattern = f"({'|'.join(remove_patterns)})"
    fixed_compound = re.sub(pattern, "", compound_name, flags=re.I)
    return fixed_compound.strip()

### Get Smiles

In [5]:
async def get_smiles_from_ChemSpider_async(compound_name):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, get_smiles_from_ChemSpider, compound_name)

async def get_smiles(session, compound_name, fix_name_bool, semaphore):
    compound_name = compound_name.replace("′","'")
    if not isinstance(compound_name, str):
        logging.warning(f"Warning: compound_name is not a string: {compound_name} (type: {type(compound_name)})")
        compound_name = str(compound_name)

    if fix_name_bool:
        if compound_name in ['ice', 'DCM', 'DMA','ether','brine','Pd/C','DMSO','LiAlH4']:
            return {'ice': 'O', 'DCM': 'C(Cl)Cl', 'DMA': 'CC(=O)N', 'ether': 'CCOCC', 'brine':'O.[Na+].[Cl-]','Pd/C':'Pd','DMSO':'CS(=O)C','LiAlH4':'[Li+].[AlH4-]'}.get(compound_name), 'Cache'
        else:
            compound_name = fix_name(compound_name)

    if compound_name in smiles_cache:
        return smiles_cache[compound_name], 'Cache'

    tasks = [
        get_smiles_from_opsin(session, compound_name, semaphore, max_retries=2),
    ]
    results = await asyncio.gather(*tasks)

    for result, source in results:
        if result:
            smiles_cache[compound_name] = result
            logging.info(f"Found SMILES for {compound_name} from {source}: {result}")
            return result, source
        
    logging.warning(f"Failed to find SMILES for {compound_name} in all sources.")
    return None, None


async def get_smiles_dict(response, session, fix_name_bool, semaphore):
    smiles_dict = {}
    problem_chemicals = []

    async def process_chemicals(chemicals_dict, category, fix_name_bool):
        tasks = {}
        for code, compound_name in chemicals_dict.items():
            tasks[code] = get_smiles(session, compound_name, fix_name_bool, semaphore)
        
        # print(tasks)
        results = await asyncio.gather(*tasks.values(), return_exceptions=True)
        for code, (smiles, source) in zip(tasks.keys(), results):
            if not smiles:
                fixed_name = fix_name(chemicals_dict[code])
                fixed_smiles, fixed_source = await get_smiles(session, fixed_name, True, semaphore)
                if not fixed_smiles:
                    problem_chemicals.append(f"{chemicals_dict[code]} ({category})")
                    smiles_dict[code] = f"[{chemicals_dict[code]} (NoSmi)]"
                else:
                    smiles_dict[code] = fixed_smiles
                    logging.info(f"Found SMILES for {chemicals_dict[code]} (Fixed Name) from {fixed_source}: {fixed_smiles}")
            else:
                smiles_dict[code] = smiles
                logging.info(f"Found SMILES for {chemicals_dict[code]} from {source}: {smiles}")

    if 'Reactants, Solvents, Catalysts' in response:
        await process_chemicals(response['Reactants, Solvents, Catalysts'], 'Reactant/Solvent/Catalyst', fix_name_bool)

    if 'Product' in response:
        await process_chemicals(response['Product'], 'Product', fix_name_bool)

    if problem_chemicals:
        logging.info(f"Problem chemicals: {problem_chemicals}")

    return smiles_dict

async def process_batch(json_responses, fix_name_bool, semaphore):
    smiles_dicts = []
    async with aiohttp.ClientSession() as session:
        tasks = []
        for json_response in json_responses:
            # Validate and fix JSON string
            if not is_valid_json(json_response):
                fixed_json = fix_json_string(json_response)
                if fixed_json and is_valid_json(fixed_json):
                    json_response = fixed_json
                else:
                    logging.error(f"Skipping invalid JSON response: {json_response}")
                    continue  # Skip this invalid JSON string

            # Process the valid JSON response
            response_dict = json.loads(json_response)
            tasks.append(get_smiles_dict(response_dict, session, fix_name_bool, semaphore))

        smiles_dicts = await asyncio.gather(*tasks)
    return smiles_dicts

def save_smiles_dict(smiles_dict, filename):
    with open(filename, 'w', encoding='utf-8-sig') as f:
        json.dump(smiles_dict, f, ensure_ascii=False, indent=2)

def make_smiles_dict(df, batch_size=100, output_dir='smiles_batches'):
    # Ensure the output directory exists
    ensure_directory(output_dir)

    smiles_dict = []
    semaphore = asyncio.Semaphore(60)
    
    # Process batches
    for i in tqdm(range(0, len(df), batch_size), desc="Processing GPT Responses in Batches"):
        batch = df[i:i + batch_size]
        batch_number = i // batch_size + 1
        try:
            temp_smiles_dicts = asyncio.run(process_batch(batch, fix_name_bool=False, semaphore=semaphore))
            smiles_dict.extend(temp_smiles_dicts)
            logging.info(f"Completed batch {batch_number}/{(len(df) + batch_size - 1) // batch_size}")
            
            # Save intermediate results
            save_smiles_dict(smiles_dict, os.path.join(output_dir, f'smiles_dict_batch_{batch_number}.json'))

            # Save the cache periodically
            with open(os.path.join(output_dir, 'smiles_cache.pkl'), 'wb') as f:
                pickle.dump(smiles_cache, f)

        except Exception as e:
            smiles_dict.extend(["Error"] * len(batch))
            logging.error(f"Error in batch {batch_number}: {e}")
    
    save_smiles_dict(smiles_dict, os.path.join(output_dir, 'smiles_dict_final_ver1.json'))
    return smiles_dict

# Function to monitor log in real-time
def monitor_log(file_path, stop_event, lines_per_batch=1):
    with open(file_path, 'r') as f:
        f.seek(0, 2)  # Go to the end of the file
        batch_counter = 0
        line_counter = 0

        while not stop_event.is_set():
            line = f.readline()
            if line:
                if "Starting batch" in line:  # Reset line counter for each new batch
                    line_counter = 0
                    batch_counter += 1
                    print(f"--- Batch {batch_counter} ---")
                
                if line_counter < lines_per_batch:
                    print(line, end='')
                    line_counter += 1
                
                if "Processing completed" in line:
                    print("All batches processed. Stopping log monitoring.")
                    stop_event.set() 
            else:
                time.sleep(0.1)  # Wait briefly before checking again



# Run the code

### Get SMILES from OPSIN first

In [6]:
def run_processing():
    make_smiles_dict(df['GPT_finetuned_five'], output_dir=output_directory)
    logging.info("Processing completed")
    print("Processing completed")

logging.basicConfig(
    filename='smiles_fetch.log',
    filemode='w',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# CSV 파일을 읽고, make_smiles_dict 함수 호출
df = pd.read_csv("GPT_response.csv")

# Specify the output directory
output_directory = "smiles_batches"

# Create a stop event for the log monitoring
stop_event = threading.Event()

# Start the processing thread
processing_thread = threading.Thread(target=run_processing)

# Start the processing thread
processing_thread.start()

# Monitor the log in real-time
try:
    monitor_log('smiles_fetch.log', stop_event)
finally:
    # Once processing is done, signal the log monitoring to stop
    stop_event.set()
    processing_thread.join()

Processing GPT Responses in Batches:   0%|          | 0/5 [00:00<?, ?it/s]

2024-09-09 18:39:55,800 - ERROR - [Error] Failed with status code: 404 for URL: https://opsin.ch.cam.ac.uk/opsin/MeOH.smi


Processing GPT Responses in Batches: 100%|██████████| 5/5 [02:06<00:00, 25.30s/it]

Processing completed
All batches processed. Stopping log monitoring.





### Get SMILES from other APIs: PubChem, CIR, ChemSpider

In [15]:
nest_asyncio.apply()

def check_server_status(server_name, test_url):
    try:
        response = requests.get(test_url)
        if response.status_code == 200:
            return f"{server_name} server is up and running.", True
        elif response.status_code == 429:
            return f"{server_name} server is overloaded (Too Many Requests).", False
        elif response.status_code == 503:
            return f"{server_name} server is currently unavailable (Service Unavailable).", False
        else:
            return f"{server_name} server returned status code {response.status_code}.", False
    except requests.exceptions.RequestException as e:
        return f"Failed to reach {server_name} server: {e}", False
    
def calculate_no_smi_percentage(smiles_dict_list):
    total_entries = 0
    no_smi_entries = 0

    for smiles_dict in smiles_dict_list:
        total_entries += len(smiles_dict)
        no_smi_entries += sum(1 for value in smiles_dict.values() if "(NoSmi)" in value)
    
    no_smi_percentage = (no_smi_entries / total_entries) * 100 if total_entries > 0 else 0
    logging.info(f"Found {no_smi_entries} entries with NoSmi [{no_smi_percentage:.2f}%].")
    return no_smi_percentage

async def process_no_smi_entry(code, compound_name, session, semaphore, smiles_dict,idx):
    tasks = [
        get_smiles_from_cir(session, compound_name, semaphore, max_retries=5),
        get_smiles_from_ChemSpider_async(compound_name),
        get_smiles_from_pubchem(session, compound_name, semaphore, max_retries=5)
    ]
    
    for task in asyncio.as_completed(tasks):
        try:
            smiles, source = await task
            if smiles:
                logging.info(f"[{idx}] Found SMILES for {compound_name} from {source}: {smiles}")
                smiles_dict[code] = smiles  # Update directly in the original dictionary
                return
        except Exception as e:
            logging.error(f"[{idx}] Error processing {compound_name}: {e}")

    logging.info(f"[{idx}] No SMILES found for {compound_name}")
    smiles_dict[code] = f"[{compound_name} (NoSmi)]"  # Mark as NoSmi if not found


async def process_batch(smiles_dict_list, session, semaphore):
    tasks = []
    for idx, smiles_dict in enumerate(smiles_dict_list):
        no_smi_entries = {key: value for key, value in smiles_dict.items() if "(NoSmi)" in value}
        for code, compound_name_with_no_smi in no_smi_entries.items():
            compound_name = compound_name_with_no_smi.replace("(NoSmi)", "").strip('[] ')
            tasks.append(process_no_smi_entry(code, compound_name, session, semaphore, smiles_dict, idx))
    
    await asyncio.gather(*tasks)

async def reprocess_no_smi(smiles_dict_file, output_file, session, semaphore, batch_size):
    with open(smiles_dict_file, 'r', encoding='utf-8-sig') as f:
        smiles_dict_list = json.load(f)

    calculate_no_smi_percentage(smiles_dict_list)

    total_batches = (len(smiles_dict_list) + batch_size - 1) // batch_size  # Calculate total number of batches

    with tqdm(total=total_batches, desc="Processing Batches", unit="batch") as pbar:
        for i in range(0, len(smiles_dict_list), batch_size):
            batch = smiles_dict_list[i:i + batch_size]
            await process_batch(batch, session, semaphore)

            # Save progress after each batch
            with open(output_file, 'w', encoding='utf-8-sig') as f:
                json.dump(smiles_dict_list, f, ensure_ascii=False, indent=2)

            logging.info(f"Batch {i//batch_size + 1}/{total_batches} processed.")
            pbar.update(1)  # Update the progress bar after each batch
        logging.info("Processing completed")



In [None]:
async def main_reprocess():
    semaphore = asyncio.Semaphore(40)  # Limit concurrent tasks
    async with aiohttp.ClientSession() as session:
        stop_event = threading.Event()
        processing_thread = threading.Thread(target=monitor_log, args=('smiles_fetch.log', stop_event))
        processing_thread.start()

        await reprocess_no_smi('./smiles_batches/smiles_dict_final_ver1.json', './smiles_batches/smiles_dict_final_updated_ver2.json', session, semaphore, batch_size=100)
        stop_event.set()
        processing_thread.join()

# Run the async main function
asyncio.run(main_reprocess())
