In [None]:
import os
os.chdir('..')  #cd to project folder

import pandas as pd
from Bio import Phylo
from pathlib import Path
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.exceptions import HTTPError
import time, logging
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

In [None]:
#import Angiosperms tree
tree_file_path = Path('data/global_tree_brlen_pruned_renamed.tre')
if tree_file_path.exists():
    tree = Phylo.read(tree_file_path, 'newick')
    logging.info(f'Angiosperms phylogenetic tree successfully imported!')
else:
    logging.warning(f'Tree file not found in {tree_file_path}.')

#extract leaf names
tree_leaves = [leaf.name for leaf in tree.get_terminals()]
tree_leaves = pd.Series(tree_leaves, name='leaf_name')

#create df with order, family, genus, species
tree_df = tree_leaves.str.split('_', expand=True)
tree_df = tree_df.iloc[:, :4] #keep first 4 columns
tree_df.columns = ['Order', 'Family', 'Genus', 'Species'] #rename columns
tree_df = pd.concat([tree_leaves, tree_df], axis=1).rename(columns={0: 'leaf_name'}).set_index('leaf_name')

#list of genera in the tree to be queried in Wikidata
genera_list = tree_df['Genus'].unique()

#summary
logging.info(f'Tree contains {len(tree_leaves)} leaves (i.e., genera).')
logging.info(f"{tree_leaves.str.endswith('sp.').sum()} species names are not defined (e.g., 'Lessertia_sp.')")

INFO: Angiosperms phylogenetic tree successfully imported!
INFO: Tree contains 7922 leaves (i.e., genera).
INFO: 293 species names are not defined (e.g., 'Lessertia_sp.')


In [None]:
#create SPARQL query for given genus
def generate_query(genus):
    return f"""
    PREFIX wdt: <http://www.wikidata.org/prop/direct/>
    PREFIX p: <http://www.wikidata.org/prop/>
    PREFIX ps: <http://www.wikidata.org/prop/statement/>
    PREFIX pr: <http://www.wikidata.org/prop/reference/>
    PREFIX prov: <http://www.w3.org/ns/prov#>

    SELECT DISTINCT ?genus ?genus_name ?taxon ?taxon_name ?structure_inchikey ?structure_smiles 
    (GROUP_CONCAT(DISTINCT ?reference; separator=", ") AS ?references) 
    (GROUP_CONCAT(DISTINCT ?reference_doi; separator=", ") AS ?reference_dois) WHERE {{
        ?genus wdt:P225 "{genus}".
        ?genus wdt:P225 ?genus_name.                 

        ?taxon wdt:P171* ?genus.                     
        ?structure wdt:P235 ?structure_inchikey;      
                   wdt:P233 ?structure_smiles;        
                   p:P703 [                           
                       ps:P703 ?taxon;                
                       prov:wasDerivedFrom/pr:P248 ?reference  
                   ].
        ?taxon wdt:P225 ?taxon_name.                  
        OPTIONAL {{ ?reference wdt:P356 ?reference_doi. }}
    }}
    GROUP BY ?genus ?genus_name ?taxon ?taxon_name ?structure_inchikey ?structure_smiles
    """


#run (generated) SPARQL query for a given genus
def run_query(genus, max_attempts=5):
    
    #generate query
    query = generate_query(genus)
    url = "https://query.wikidata.org/sparql"
    headers = {'Accept': 'application/sparql-results+json',
               'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
    
    #try to run query
    for attempt in range(1, max_attempts + 1):
        try:
            response = requests.get(url, headers=headers, params={'query': query})
            response.raise_for_status() #raise exception for HTTP response 4xx or 5xx (error)
            query_out = response.json()
            out_df = pd.json_normalize(query_out['results']['bindings']) #convert to df
            out_df = out_df[[col for col in out_df.columns if col.endswith('.value')]]
            out_df.columns = [col.replace('.value', '') for col in out_df.columns]
            
            #successful query: log and return output
            logging.info(f"Query for '{genus}' genus completed in {attempt} attempts.")
            return out_df
        
        except HTTPError as http_err:
            if response.status_code == 429:  #HTTP Error 429: Too Many Requests
                wait_time = 2
                logging.warning(f"Wikidata requests limit reached when querying '{genus}' genus. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                logging.error(f"HTTP Error when querying genus '{genus}' on attempt N° {attempt}: {http_err}")
                break #break loop if error differen from HTTPError429 occurs
        except Exception as e:
            logging.error(f"Error for genus '{genus}' on attempt {attempt}: {e}")
            break #break loop if any other error occurs

    #query unsuccessful (nothing returned in the for loop): log and return None
    logging.error(f"Query for '{genus}' genus failed after {max_attempts} attempts.")
    return None  # Return None to indicate a failed request


# Process genera in parallel
def process_genera_parallel(genera_list, threads=20, max_attempts=5):
    all_results = []
    failed_tasks = {}

    with ThreadPoolExecutor(max_workers=threads) as executor:
        # Submit tasks
        tasks_list = {executor.submit(run_query, genus, max_attempts): genus for genus in genera_list}

        for completed_task in as_completed(tasks_list):
            genus_name = tasks_list[completed_task]
            try:
                output = completed_task.result()
                if output is not None:  # Only add successful results
                    all_results.append(output)
                else:
                    failed_tasks[genus_name] = "No output returned"

            except Exception as e:
                failed_tasks[genus_name] = str(e)

    #Log query summary
    logging.info(f"Processing completed: {len(genera_list)} queries performed.")
    logging.info(f"Processing completed: {len(genera_list) - len(failed_tasks)} queries run successfully.")
    if failed_tasks:
        logging.warning(f"The following {len(failed_tasks)} queries failed:")
        for genus, reason in failed_tasks.items():
            logging.error(f"'{genus}' genus: {reason}")

    #concatenate all results into a single dataframe
    if all_results:
        return pd.concat(all_results, ignore_index=True), failed_tasks
    else:
        logging.warning("No results returned.")
        return pd.DataFrame(), failed_tasks  # Return empty dataframe if no results

In [None]:


# Example usage
genera_list_test = genera_list[:500]
results_df, failed_tasks = process_genera_parallel(genera_list_test, threads=20)

# Log final summary
logging.info(f"Final dataframe shape: {results_df.shape}")
if failed_tasks:
    logging.info(f"Failed tasks: {len(failed_tasks)}")
else:
    logging.info("All tasks completed successfully!")


INFO: Completed task for genus 'Dignathia' in 1 attempts.
INFO: Completed task for genus 'Bewsia' in 1 attempts.
INFO: Completed task for genus 'Neostapfia' in 1 attempts.
INFO: Completed task for genus 'Coleanthus' in 1 attempts.
INFO: Completed task for genus 'Farrago' in 1 attempts.
INFO: Completed task for genus 'Triplasiella' in 1 attempts.
INFO: Completed task for genus 'Perotis' in 1 attempts.
INFO: Completed task for genus 'Vaseyochloa' in 1 attempts.
INFO: Completed task for genus 'Hubbardochloa' in 1 attempts.
INFO: Completed task for genus 'Orinus' in 1 attempts.
INFO: Completed task for genus 'Lepturidium' in 1 attempts.
INFO: Completed task for genus 'Trichoneura' in 1 attempts.
INFO: Completed task for genus 'Zaqiqah' in 1 attempts.
INFO: Completed task for genus 'Odyssea' in 1 attempts.
INFO: Completed task for genus 'Mosdenia' in 1 attempts.
INFO: Completed task for genus 'Ctenium' in 1 attempts.
INFO: Completed task for genus 'Sphenopholis' in 1 attempts.
INFO: Complet

In [8]:
failed_tasks

[]