<div style="display: flex; justify-content: flex-end; align-items: center;">
    <div style="width: 30%; text-align: right; margin-right: 20px;">
        <img src="https://www.juntadeandalucia.es/datosabiertos/portal/uploads/group/2022-09-06-135504.979247fps.png" alt="Web Fundación Progreso y Salud" style="width: 100%;"/>
    </div>
    <div style="width: 60%; margin-right: 1em;">
        <p style="text-align: right; font-weight: bold; font-size: 2em; margin-top: 30px;">
            Scrape PubMed based on MeSH and Spanish text
        </p>
        <p style="text-align: right; color: #666">
            Proyect: MedicoderICD
        </p>
        <p style="text-align: right; color: #666">
            Juan Luis González Rodríguez
        </p>
    </div>
</div>

# 0.Libraries

In [1]:
import requests
from bs4 import BeautifulSoup
import re
import csv
from tqdm import tqdm
import os
import json
from requests.exceptions import RequestException
from IPython.display import clear_output
import time
from langdetect import detect

# 1.Functions

In [2]:
def check_none_in_tuple(t):
    """
    Check if there is None or empty string in the tuple
    :param t: tuple
    :return: None if there is None or empty string in the tuple, otherwise return the tuple
    """
    if t is None:
        return None
    if t == ('',) or t == ('', '') or t == ([''],) or t == ('', ['']):
        return None
    for element in t:
        if element is None or element == '':
            return None
        elif type(element) == list:
            if not element or '' in element:
                return None
    return t

In [3]:
def remove_english(text):
    """Removes English phrases from the given text, preserving sentence structure."""

    phrases = text.split('.')  # Split into phrases, maintaining original sentence structure

    spanish_phrases = []
    for phrase in phrases:
        lang = detect(phrase)  # Detect language using `langdetect`

        if lang == "en":  # Exclude English phrases
            break
        spanish_phrases.append(phrase)

    return '.'.join(spanish_phrases)  # Rejoin remaining phrases preserving structure

In [4]:
def clean_quotes_breakline_spaces(string):
    string = string.replace("\"", "\'")
    string = string.replace("\n", "")
    string = ' '.join(string.split())
    return string

In [5]:
class NoArticlesFoundError(Exception):
    """Excepción personalizada para indicar que no se encontraron artículos asociados al término Mesh en PubMed."""

    def __init__(self, message="No se encontraron artículos asociados al término Mesh en PubMed."):
        self.message = message
        super().__init__(self.message)

In [6]:
class Publication:
    def __init__(self, tuple_info):
        self.pmid = tuple_info[0]
        self.title = tuple_info[1]
        self.abstract = tuple_info[2]
        self.mesh_list = tuple_info[3]

    def mesh_in_pub(self, mesh_desired):
        return mesh_desired in self.mesh_list

    def is_mesh_in_pub_list(self, mesh_reference):
        return mesh_reference in self.mesh_list

    def pub_to_csv(self, mesh_major_topic):
        line = f'\n"{self.pmid}","{mesh_major_topic}", "{self.title}", "{self.abstract}", "('
        for mesh in self.mesh_list:
            line += f"{mesh}, "
        line = line[:-2]
        line += ')"'
        return line

In [7]:
class ElsevierScrapper:
    
    def __init__(self, doi, keypath):
        self.doi = doi
        self.apikey = self.get_api_key(keypath)
    
    def get_api_key(self, key_path):
        with open(key_path, "r") as file:
            return file.read().strip()
        
    def scrape_publication(self):
        try:
            url = f"https://api.elsevier.com/content/article/doi/{self.doi}?apiKey={self.apikey}"
            response = requests.get(url)

            if response.status_code != 200:
                raise RequestException(f"Error en la solicitud: {response.status_code}. La publicación no existe en Elsevier y por tanto no se puede obtener la información.")
            else:
                soup = BeautifulSoup(response.text, 'html.parser')

                title_es = soup.find("dc:title")
                abstract_es = soup.find("dc:description")
                
            return title_es.text, abstract_es.text

        except RequestException as e:
            print(f'Error en la solicitud: {e}')

In [8]:
class PublicationScraper:
    def __init__(self, publication_url):
        self.url = publication_url

    def scrape_spanish_text(self, url):
        try:
            response = requests.get(url)
            soup = BeautifulSoup(response.text, 'html.parser')

            title_es = soup.find("span", class_="title-text").text.strip()
            title_es = clean_quotes_breakline_spaces(title_es)

            abstract_elements = soup.find_all(class_="abstract_author")
            abstract_es = ""
            for abstract_element in abstract_elements:
                abstract_es += abstract_element.get_text()

            return title_es, abstract_es
        
        except RequestException as e:
            print(f'Error en la solicitud: {e}')
        except Exception as e:
            print(f'Ocurrió un error inesperado: {e}')
        
    def scrape_publication(self):
        try:
            response = requests.get(self.url)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'html.parser')

            pmid = soup.find("span", class_="identifier pubmed").find("strong").text
            try:
                doi = soup.find("span", class_="identifier doi").find("a").text.strip()
                doi = clean_quotes_breakline_spaces(doi)
            except AttributeError:

                raise RequestException(f"No se encontró el enlace DOI para la publicacion con PMID: {pmid}")

            elsevier_scraper = ElsevierScrapper(doi, "../data/apikey_elseiver.txt")
            title_es, abstract_es = elsevier_scraper.scrape_publication()

            # Clean title and abstract
            title_es = clean_quotes_breakline_spaces(title_es.strip())
            abstract_es = remove_english(clean_quotes_breakline_spaces(abstract_es.strip()))

                

            mesh_set = {re.sub("\s/\s.*|\*", "", button.text.strip()) for button in
                        soup.select("div.mesh-terms ul.keywords-list button")}
            mesh_list = list(mesh_set)

            return pmid, title_es, abstract_es, mesh_list

        except RequestException as e:
            print(f'Error en la solicitud: {e}')
        except Exception as e:
            print(f'Ocurrió un error inesperado: {e}')

In [9]:
class PubmedScraper:
    def __init__(self, mesh_text):
        self.base_url = "https://pubmed.ncbi.nlm.nih.gov/"
        self.mesh_text = mesh_text
        self.current_page = 1
        self.max_pages = 10

    def scrape_articles_current_page(self):
        try:
            urls = []

            if self.current_page <= self.max_pages:
                query = f"{self.base_url}?term=({self.mesh_text}%5BMeSH%20Major%20Topic%5D)%20AND%20(Spanish%5BLanguage%5D)&filter=simsearch1.fha&page={self.current_page}"
                self.current_page += 1

                response = requests.get(query)
                response.raise_for_status()

                soup = BeautifulSoup(response.text, 'html.parser')
                articles = soup.find_all("article", class_="full-docsum")

                if not articles:
                    raise NoArticlesFoundError(f"No se encontraron artículos asociados al término "
                                               f"Mesh \"{self.mesh_text}\" en PubMed.")

                urls.extend(
                    [f"{self.base_url}{article.find('a', class_='docsum-title')['data-ga-label']}/" for article in
                     articles])

                return urls

        except RequestException as e:
            print(f'Error en la solicitud: {e}')
            return []
        except Exception as e:
            print(f'Ocurrió un error inesperado: {e}')
            return []

In [10]:
def open_and_get_mesh(json_path):
    """
    Gives the path of a json file, this function
    extract the mesh name from the file and return it.

    Args:
        json_path (str): The path of the json file.

    Returns:
        str: The mesh name.
    """
    try:
        with open(json_path, 'r') as file:
            # Load JSON data from the file
            mesh_data = json.load(file)

        # Access the "DescriptorName" key
        return mesh_data["DescriptorName"]
    except Exception as e:
        print(f"Error processing the file {json_path}: {e}")

# 3.Scrapping

In [11]:
# Read all existing mesh
json_paths = os.path.join("..", "data", "meshs_json")
mesh_files = [os.path.join("..", "data", "meshs_json", mesh) for mesh in os.listdir(json_paths) if mesh.endswith(".json")]
all_mesh_list = []
n_publications_per_mesh = 50

# Get all the mesh names from the json files
for mesh_path in tqdm(mesh_files, desc="Processing Files", unit="file"):
    all_mesh_list.append(open_and_get_mesh(mesh_path))

# Create the csv file and scrape the data
csv_path = os.path.join("..", "data", "projects_pubmed.csv")
with open(csv_path, "w", encoding='utf-8') as csv_file:
    csv_file.write("PMID, MESH_MAJOR_TOPIC, TITLE, ABSTRACT, MESH_TUPLE")

    for mesh in tqdm(all_mesh_list, desc="Processing MeSH terms", unit="MeSH code"):
        print(f"Processing MeSH term: {mesh}")
        list_publications = []

        # Instancia del buscador
        pubmed_engine = PubmedScraper(mesh)

        while len(list_publications) < n_publications_per_mesh:
            list_urls = pubmed_engine.scrape_articles_current_page()
            if list_urls is None:
                break

            for url in list_urls:
                # Parse each url into a tuple
                tuple_pub = PublicationScraper(url).scrape_publication()

                # Check if any element in the tuple is empty
                tuple_pub = check_none_in_tuple(tuple_pub)

                # If tuple as no info skip to next url
                if tuple_pub is None:
                    continue

                # Create a publication obj
                publication = Publication(tuple_pub)
                if publication.is_mesh_in_pub_list(mesh):
                    list_publications.append(publication)
                    if len(list_publications) == n_publications_per_mesh:
                        break

        print(f"Se han encontrado {len(list_publications)} publicaciones para el MeSH {mesh}")
        time.sleep(1)
        # Resfres the output flush
        clear_output(wait=True)

        for pub in list_publications:
            csv_file.write(pub.pub_to_csv(mesh))

Processing MeSH terms: 100%|██████████| 5155/5155 [148:00:34<00:00, 103.36s/MeSH code]
