In [1]:
import os
import requests
import pymedtermino
from sickle import Sickle
from collections import Counter
import xml.etree.ElementTree as ET

class PubMedOAIMetadata:
    # Define the directory path where the SNOMED CT database is located
    pymedtermino.DATA_DIR = '/path/to/snomedct_database'

    def __init__(self):
        self.metadata_list = []
        self.base_url = "https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi"
        
        # Create directory if it does not exist
        if not os.path.exists('F:/pmc_oai'):
            os.makedirs('F:/pmc_oai')

    def _make_request(self, params):
        """
        Private method to make HTTP requests to the PMC-OAI service.
        
        Parameters:
            params (dict): Dictionary of URL parameters.
        
        Returns:
            response (Response object): Response object containing server's response.
        """
        headers = {'Accept-Encoding': 'gzip, deflate'}
        response = requests.get(self.base_url, params=params, headers=headers)
        if response.status_code == 200:
            return response
        else:
            response.raise_for_status()

    def get_metadata_by_prefix(self, metadata_prefix, date_from=None, date_until=None):
        """
        Method to get metadata by its prefix and optionally by date range.
        
        Parameters:
            metadata_prefix (str): The metadata prefix as defined by PMC-OAI.
            date_from (str, optional): Starting date in YYYY-MM-DD format.
            date_until (str, optional): Ending date in YYYY-MM-DD format.
        
        Returns:
            content (str): XML content of the response.
        """
        params = {'verb': 'ListRecords', 'metadataPrefix': metadata_prefix}
        if date_from:
            params['from'] = date_from
        if date_until:
            params['until'] = date_until
        response = self._make_request(params)
        return response.content

    def get_metadata_by_date(self, date_from, date_until):
        """
        Method to get metadata by date range.
        
        Parameters:
            date_from (str): Starting date in YYYY-MM-DD format.
            date_until (str): Ending date in YYYY-MM-DD format.
        
        Returns:
            content (str): XML content of the response.
        """
        params = {'verb': 'ListRecords', 'from': date_from, 'until': date_until}
        response = self._make_request(params)
        return response.content

    def get_subject_counts(self):
        # Use Counter para contar os tipos de 'subject' de forma mais eficiente
        subject_counts = Counter()

        # Percorra cada registro de metadados e conte os 'subject'
        for record in self.metadata_list:
            if 'subject' in record:
                subjects = record['subject'].split('; ')
                subject_counts.update(subjects)

        return subject_counts


class MetadataParser:
    
    def __init__(self, xml_data):
        self.xml_data = xml_data
        self.records = []
        
    def parse_metadata(self):
        """
        Parses the metadata XML string and extracts records.
        
        Returns:
            A list of dictionaries, each representing a record.
        """
        try:
            root = ET.fromstring(self.xml_data)
        except ET.ParseError:
            return "Invalid XML data"
        
        # Locate ListRecords element
        for list_records in root.findall('.//{http://www.openarchives.org/OAI/2.0/}ListRecords'):
            for record in list_records.findall('.//{http://www.openarchives.org/OAI/2.0/}record'):
                record_data = {}
                metadata = record.find('.//{http://www.openarchives.org/OAI/2.0/oai_dc/}dc', namespaces={'oai_dc': 'http://www.openarchives.org/OAI/2.0/oai_dc/'})
                
                if metadata is not None:
                    for child in metadata:
                        tag = child.tag.split('}')[-1]  # Extracting the local name from the QName
                        record_data[tag] = child.text
                        
                    self.records.append(record_data)

        return self.records

A classe `PubMedOAIMetadata` agora inclui os métodos das duas classes originais e foi ajustada para usar a lista de metadados interna `metadata_list` ao calcular as contagens de assuntos na função `get_subject_counts()`. Certifique-se de adaptar o caminho para o banco de dados do SNOMED CT no atributo `DATA_DIR` conforme necessário.

In [2]:
import os
import requests
import pymedtermino
from sickle import Sickle
from collections import Counter
import xml.etree.ElementTree as ET

class PubMedOAIMetadata:
    # Define the directory path where the SNOMED CT database is located
    pymedtermino.DATA_DIR = '/path/to/snomedct_database'

    def __init__(self):
        self.metadata_list = []
        # Create directory if it does not exist
        if not os.path.exists('F:/pmc_oai'):
            os.makedirs('F:/pmc_oai')

    def _make_request(self, params):
        """
        Private method to make HTTP requests to the PMC-OAI service.
        
        Parameters:
            params (dict): Dictionary of URL parameters.
        
        Returns:
            response (Response object): Response object containing server's response.
        """
        headers = {'Accept-Encoding': 'gzip, deflate'}
        response = requests.get(self.base_url, params=params, headers=headers)
        if response.status_code == 200:
            return response
        else:
            response.raise_for_status()

    def get_all_metadata_by_prefix(self, metadata_prefix, date_from=None, date_until=None):
        """
        Method to get metadata by its prefix and optionally by date range.
        
        Parameters:
            metadata_prefix (str): The metadata prefix as defined by PMC-OAI.
            date_from (str, optional): Starting date in YYYY-MM-DD format.
            date_until (str, optional): Ending date in YYYY-MM-DD format.
        
        Returns:
            content (str): XML content of the response.
        """
        self.base_url = "https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi"
        # params = {'verb': 'ListRecords', 'metadataPrefix': metadata_prefix}
        params = {'verb': 'ListRecords', 'from': date_from, 'until': date_until}
        if date_from:
            params['from'] = date_from
        if date_until:
            params['until'] = date_until
        response = self._make_request(params)
        return response.content

    def get_metadata_by_prefix(self, metadata_prefix, date_from=None, date_until=None):
        """
        Method to get metadata by its prefix and optionally by date range.
        
        Parameters:
            metadata_prefix (str): The metadata prefix as defined by PMC-OAI.
            date_from (str, optional): Starting date in YYYY-MM-DD format.
            date_until (str, optional): Ending date in YYYY-MM-DD format.
        
        Returns:
            content (str): XML content of the response.
        """
        self.base_url = "https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi"
        params = {'verb': 'ListRecords', 'metadataPrefix': metadata_prefix}
        if date_from:
            params['from'] = date_from
        if date_until:
            params['until'] = date_until
        response = self._make_request(params)
        
        # Adicione os metadados analisados à lista
        parser = MetadataParser(response.content)
        parsed_metadata = parser.parse_metadata()
        self.metadata_list.extend(parsed_metadata)
        
        return response.content

    def get_metadata_by_date(self, date_from, date_until):
        """
        Method to get metadata by date range.
        
        Parameters:
            date_from (str): Starting date in YYYY-MM-DD format.
            date_until (str): Ending date in YYYY-MM-DD format.
        
        Returns:
            content (str): XML content of the response.
        """
        self.base_url = "https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi"
        params = {'verb': 'ListRecords', 'from': date_from, 'until': date_until}
        response = self._make_request(params)
        return response.content
    
    def get_subject_counts(self):
        # Use Counter para contar os tipos de 'subject' de forma mais eficiente
        subject_counts = Counter()

        # Percorra cada registro de metadados analisado e conte os 'subject'
        for record in self.metadata_list:
            if 'subject' in record:
                subjects = record['subject'].split('; ')
                subject_counts.update(subjects)

        return subject_counts

class MetadataParser:
    
    def __init__(self, xml_data):
        self.xml_data = xml_data
        self.records = []
        
    def parse_metadata(self):
        """
        Parses the metadata XML string and extracts records.
        
        Returns:
            A list of dictionaries, each representing a record.
        """
        try:
            root = ET.fromstring(self.xml_data)
        except ET.ParseError:
            return "Invalid XML data"
        
        # Locate ListRecords element
        for list_records in root.findall('.//{http://www.openarchives.org/OAI/2.0/}ListRecords'):
            for record in list_records.findall('.//{http://www.openarchives.org/OAI/2.0/}record'):
                record_data = {}
                metadata = record.find('.//{http://www.openarchives.org/OAI/2.0/oai_dc/}dc', namespaces={'oai_dc': 'http://www.openarchives.org/OAI/2.0/oai_dc/'})
                
                if metadata is not None:
                    for child in metadata:
                        tag = child.tag.split('}')[-1]  # Extracting the local name from the QName
                        record_data[tag] = child.text
                        
                    self.records.append(record_data)

        return self.records

In [7]:
# Importe as bibliotecas necessárias
import requests

# Importe as classes PubMedOAIMetadata e MetadataParser que foram definidas anteriormente

# Crie uma instância da classe PubMedOAIMetadata
pubmed_metadata = PubMedOAIMetadata()

# Faça uma chamada para obter todos os dados disponíveis
all_metadata = pubmed_metadata.get_metadata_by_prefix('oai_dc')

# Crie uma instância da classe MetadataParser
parser = MetadataParser(all_metadata)

# Analise os metadados
parsed_metadata = parser.parse_metadata()

In [8]:
len(parsed_metadata)

50

In [9]:
parsed_metadata

[{'title': 'Comparison of written reports of mammography, sonography and magnetic resonance mammography for preoperative evaluation of breast lesions, with special emphasis on magnetic resonance mammography',
  'creator': 'Schneider, Achim',
  'source': 'Breast Cancer Res',
  'subject': 'Primary Research',
  'description': 'Patients with abnormal breast findings (n = 413) were examined by mammography, sonography and magnetic resonance (MR) mammography; 185 invasive cancers, 38 carcinoma in situ and 254 benign tumours were confirmed histologically. Sensitivity for mammography was 83.7%, for sonography it was 89.1% and for MR mammography it was 94.6% for invasive cancers. In 42 patients with multifocal invasive cancers, multifocality had been detected by mammography and sonography in 26.2%, and by MR mammography in 66.7%. In nine patients with multicentric cancers, detection rates were 55.5, 55.5 and 88.8%, respectively. Carcinoma in situ was diagnosed by mammography in 78.9% and by MR m

In [10]:
pubmed_metadata.get_subject_counts()

Counter({'Primary Research': 15,
         'Research': 2,
         'Physical Sciences': 2,
         'Biological Sciences': 31})

In [11]:
# Obtenha a contagem de entradas por tipo de 'subject'
subject_counts = pubmed_metadata.get_subject_counts()

# Imprima as contagens
for subject, count in subject_counts.items():
    print(f'Subject: {subject}, Count: {count}')


Subject: Primary Research, Count: 15
Subject: Research, Count: 2
Subject: Physical Sciences, Count: 2
Subject: Biological Sciences, Count: 31


In [13]:
from pprint import pprint

# Assuming `data` contains the XML metadata as bytes
data_str = all_metadata.decode('utf-8')

parser  = MetadataParser(data_str)
records = parser.parse_metadata()

# `records` now contains a list of dictionaries, each representing a record.
pprint(records)

[{'creator': 'Schneider, Achim',
  'date': '2000-11-02',
  'description': 'Patients with abnormal breast findings (n = 413) were '
                 'examined by mammography, sonography and magnetic resonance '
                 '(MR) mammography; 185 invasive cancers, 38 carcinoma in situ '
                 'and 254 benign tumours were confirmed histologically. '
                 'Sensitivity for mammography was 83.7%, for sonography it was '
                 '89.1% and for MR mammography it was 94.6% for invasive '
                 'cancers. In 42 patients with multifocal invasive cancers, '
                 'multifocality had been detected by mammography and '
                 'sonography in 26.2%, and by MR mammography in 66.7%. In nine '
                 'patients with multicentric cancers, detection rates were '
                 '55.5, 55.5 and 88.8%, respectively. Carcinoma in situ was '
                 'diagnosed by mammography in 78.9% and by MR mammography in '
               

In [16]:
if __name__ == "__main__":
    pmc_oai = PubMedOAIMetadata()
    pmc_oai.get_metadata_by_prefix('oai_dc', '2021-01-01', '2021-12-31')  # Fetch and store metadata

    metadata = pmc_oai.metadata_list  # Access the stored metadata list

    subject_counts = pmc_oai.get_subject_counts()
    print("Number of metadata records:", len(metadata))
    print("Subject counts:", subject_counts)

Number of metadata records: 50
Subject counts: Counter({'Article': 48, 'Letter': 1, 'Original Article': 1})


In [18]:
metadata_record = metadata  # Get a sample metadata record
pprint(metadata_record)  # Print the metadata to inspect its structure

[{'creator': 'Rahman, H A',
  'date': '1996-08',
  'description': None,
  'identifier': '/pubmed/8881933',
  'language': 'en',
  'publisher': 'BMJ Publishing Group',
  'rights': None,
  'source': 'J Clin Pathol',
  'subject': 'Letter',
  'title': 'Current guidelines for sampling the cervix in hysterectomy '
           'specimens are appropriate.',
  'type': 'Text'},
 {'creator': 'Wang, Zhewu',
  'date': '2020-07',
  'description': 'OBJECTIVE: To addressed the nature of associations between '
                 'attention deficit hyperactivity disorder (ADHD) symptoms and '
                 'posttraumatic stress disorder (PTSD) psychopathology in '
                 'adult military veterans. METHOD: 95 combat veterans with '
                 'PTSD (n=63) and without PTSD (n=32) were recruited for this '
                 'study. PTSD was assessed with the Clinician Administered '
                 'PTSD Scale (CAPS) and ADHD was assessed with Conners Adult '
                 'ADHD Rating Sca

In [19]:
import requests
from xml.etree import ElementTree as ET
import os
import pymedtermino
from sickle import Sickle

class PMCOAI:
    """
    A Python class to interact with the PubMed Central OAI-PMH service.
    """
    def __init__(self):
        """
        Initializes the PMCOAI class with the base URL of the service.
        """
        self.base_url = "https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi"
                        
    def _make_request(self, params):
        """
        Private method to make HTTP requests to the PMC-OAI service.
        
        Parameters:
            params (dict): Dictionary of URL parameters.
        
        Returns:
            response (Response object): Response object containing server's response.
        """
        headers = {'Accept-Encoding': 'gzip, deflate'}
        response = requests.get(self.base_url, params=params, headers=headers)
        if response.status_code == 200:
            return response
        else:
            response.raise_for_status()

    def get_metadata_by_prefix(self, metadata_prefix, date_from=None, date_until=None):
        """
        Method to get metadata by its prefix and optionally by date range.
        
        Parameters:
            metadata_prefix (str): The metadata prefix as defined by PMC-OAI.
            date_from (str, optional): Starting date in YYYY-MM-DD format.
            date_until (str, optional): Ending date in YYYY-MM-DD format.
        
        Returns:
            content (str): XML content of the response.
        """
        params = {'verb': 'ListRecords', 'metadataPrefix': metadata_prefix}
        if date_from:
            params['from'] = date_from
        if date_until:
            params['until'] = date_until
        response = self._make_request(params)
        return response.content

    def get_metadata_by_date(self, date_from, date_until):
        """
        Method to get metadata by date range.
        
        Parameters:
            date_from (str): Starting date in YYYY-MM-DD format.
            date_until (str): Ending date in YYYY-MM-DD format.
        
        Returns:
            content (str): XML content of the response.
        """
        params = {'verb': 'ListRecords', 'from': date_from, 'until': date_until}
        response = self._make_request(params)
        return response.content
    
    def get_subject_counts(self):
        subject_counts = {}
        # Traverse through each metadata item and count the subjects
        for record in self.metadata_list:
            if 'subject' in record.metadata:
                subjects = record.metadata['subject']
                for subject in subjects:
                    if subject in subject_counts:
                        subject_counts[subject] += 1
                    else:
                        subject_counts[subject] = 1
        return subject_counts    



class PubMedOAIMetadata:
    # Define the directory path where the SNOMED CT database is located
    pymedtermino.DATA_DIR = '/path/to/snomedct_database'

    def __init__(self, metadata_list):
        self.metadata_list = metadata_list
        # Create directory if it does not exist
        if not os.path.exists('F:/pmc_oai'):
            os.makedirs('F:/pmc_oai')

    def get_metadata_by_prefix(self, metadata_prefix, start_date, end_date):
        # Initialization of the Sickle instance
        sickle = Sickle('https://www.ncbi.nlm.nih.gov/oai/oai.cgi')
        records = sickle.ListRecords(
            **{'metadataPrefix': metadata_prefix, 'from': start_date, 'until': end_date}
        )
        # Populate the metadata_list with fetched records
        self.metadata_list.extend(records)

    def get_subject_counts(self):
        subject_counts = {}
        # Traverse through each metadata item and count the subjects
        for record in self.metadata_list:
            if 'subject' in record.metadata:
                subjects = record.metadata['subject']
                for subject in subjects:
                    if subject in subject_counts:
                        subject_counts[subject] += 1
                    else:
                        subject_counts[subject] = 1
        return subject_counts


In [20]:
# !pip install pymedtermino
# !pip install sickle

In [21]:
import os
import pymedtermino
from sickle import Sickle

class PubMedOAIMetadata:
    # Define the directory path where the SNOMED CT database is located
    pymedtermino.DATA_DIR = '/path/to/snomedct_database'

    def __init__(self, metadata_list):
        self.metadata_list = metadata_list
        # Create directory if it does not exist
        if not os.path.exists('F:/pmc_oai'):
            os.makedirs('F:/pmc_oai')

    def get_metadata_by_prefix(self, metadata_prefix, start_date, end_date):
        # Initialization of the Sickle instance
        sickle = Sickle('https://www.ncbi.nlm.nih.gov/oai/oai.cgi')
        records = sickle.ListRecords(
            **{'metadataPrefix': metadata_prefix, 'from': start_date, 'until': end_date}
        )
        # Populate the metadata_list with fetched records
        self.metadata_list.extend(records)

    def get_subject_counts(self):
        subject_counts = {}
        # Traverse through each metadata item and count the subjects
        for record in self.metadata_list:
            if 'subject' in record.metadata:
                subjects = record.metadata['subject']
                for subject in subjects:
                    if subject in subject_counts:
                        subject_counts[subject] += 1
                    else:
                        subject_counts[subject] = 1
        return subject_counts


In [22]:
# Example of usage
pmc_oai = PubMedOAIMetadata([])
pmc_oai.get_metadata_by_prefix('oai_dc', '2021-01-01', '2021-12-31')
subject_counts = pmc_oai.get_subject_counts()
print(subject_counts)

HTTPError: 404 Client Error: Not Found for url: https://www.ncbi.nlm.nih.gov/oai/oai.cgi?metadataPrefix=oai_dc&from=2021-01-01&until=2021-12-31&verb=ListRecords

In [23]:
# Example usage
metadata_list = [
    {'title': 'Study A', 'subject': 'Biochemistry'},
    {'title': 'Study B', 'subject': 'Physics'},
    {'title': 'Study C', 'subject': 'Biochemistry'},
    {'title': 'Study D', 'subject': 'Computer Science'},
    {'title': 'Study E', 'subject': 'Physics'},
]

pmc_oai = PubMedOAIMetadata(metadata_list)
result = pmc_oai.count_by_subject()
print(result)

AttributeError: 'PubMedOAIMetadata' object has no attribute 'count_by_subject'

In [None]:
pprint(metadata)

In [24]:
from py2neo import Graph, Node, Relationship
import requests
import csv
import json
import time
import xml.etree.ElementTree as ET

class PubMedScraper:
    def __init__(self, api_key):
        self.api_key = api_key
        self.pubmed_ids = {}
        self.graph = Graph("bolt://localhost:7687", auth=("neo4j", "password"))
    
    def fetch_data_from_pubmed(self, term, id=False):
        base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
        url = f"{base_url}{'efetch' if id else 'esearch'}.fcgi?db=pubmed&{'rettype=abstract&' if id else 'retmode=json&retmax=1&'}api_key={self.api_key}&{'id=' if id else 'term='}{term}"
        response = requests.get(url)
        return response.json() if not id else response.text
    
    def get_and_store_pubmed_ids(self, csv_path):
        i = 0
        with open(csv_path, newline='') as csvfile:
            table = csv.reader(csvfile, delimiter=';', quotechar='|')
            for row in table:
                i += 1
                if i < 0:  # Skips lines based on counter
                    continue
                title = row[0].strip()
                try:
                    data = self.fetch_data_from_pubmed(title)
                    pubmed_id = data['esearchresult']['idlist'][0]
                    self.pubmed_ids[pubmed_id] = title
                except KeyError:
                    self.pubmed_ids["-"] = title

                if i % 10 == 0:
                    print(f"Processed {i} lines")
                    time.sleep(1)
                    
    def persist_to_neo4j(self):
        for pubmed_id, title in self.pubmed_ids.items():
            article_node = Node("Article", id=pubmed_id, title=title)
            self.graph.create(article_node)

    def run(self, csv_path):
        self.get_and_store_pubmed_ids(csv_path)
        self.persist_to_neo4j()


In [25]:
# if __name__ == "__main__":
#     scraper = PubMedScraper("")
#     scraper.run("titulos_artigos.csv")

In [26]:
# !pip install pubmed_parser

In [27]:
# !pip install pyspark

In [28]:
import os
os.listdir('F:/')

['System Volume Information', '$RECYCLE.BIN', 'pubmed_oa', 'pmc_oai']

In [29]:
import os
import platform
import requests
import tarfile
import re
from lxml import html
from dateutil import parser
from datetime import datetime
import random
import subprocess
from pyspark import SparkConf, SparkContext, Row, SQLContext
from glob import glob
from ftplib import FTP
from pyspark.sql import SparkSession

class PubMedDataProcessor:
    def __init__(self):
        # Determine the Operating System
        self.os_type = platform.system()

        # Define home directory
        self.home_dir = os.path.expanduser('~')

        # Define platform-agnostic directories
        self.download_dir = os.path.join(self.home_dir, 'Downloads')
        self.unzip_dir = os.path.join(self.download_dir, 'pubmed_oa')  
        self.save_dir = os.path.join(self.home_dir, 'Desktop')

        self.MEDLINE = 'ftp://ftp.nlm.nih.gov/nlmdata/.medleasebaseline/gz/'
        self.PUBMED_OA = 'ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/'

        # Initialize Spark session if it does not exist
        if not SparkContext.getOrCreate()._conf:
            self.conf = SparkConf().setAppName('pubmed_oa_spark') \
                                   .setMaster('local[8]') \
                                   .set('executor.memory', '8g') \
                                   .set('driver.memory', '8g') \
                                   .set('spark.driver.maxResultSize', '0')
            self.sc = SparkContext(conf=self.conf)
            self.sqlContext = SQLContext(self.sc)
        
        self.spark = SparkSession.builder.getOrCreate()
        self.sqlContext = self.spark.newSession()

    def download_file(self, url, save_path):
        ftp = FTP('ftp.nlm.nih.gov')
        ftp.login()
        filepath = '/nlmdata/.medleasebaseline/gz/'  # Modify as needed
        with open(save_path, 'wb') as f:
            ftp.retrbinary(f"RETR {filepath}", f.write)

    def extract_tar_gz(self, file_path, extract_path):
        with tarfile.open(file_path, 'r:gz') as f:
            f.extractall(extract_path)

    def get_update_date(self, option='medline'):
        link = self.MEDLINE if option == 'medline' else self.PUBMED_OA
        index_file = os.path.join(self.download_dir, 'index.html')
        self.download_file(link, index_file)
        try:
            with open(index_file, 'r', encoding='utf-8') as f:
                page = f.read()
        except FileNotFoundError as e:
            raise Exception("File not found. Detailed error: {}".format(str(e)))

        date_all = []
        tree = html.fromstring(page)
        for e in tree.xpath('body/pre/a'):
            if 'File' in e.tail:
                s = e.tail
                s_remove = re.sub(r'\([^)]*\)', '', s)
                s_remove = re.sub('File', '', s_remove).strip()
                d = parser.parse(s_remove)
                date_all.append(d)

        date = max(date_all)
        if os.path.exists(index_file):
            os.remove(index_file)
        
        return date

    def update(self):
        try:
            save_file = os.path.join(self.save_dir, 'pubmed_oa_*_*_*.parquet')
            file_list = list(filter(os.path.isdir, glob(save_file)))
            if file_list:
                d = re.search('[0-9]+_[0-9]+_[0-9]+', file_list[0]).group(0)
                date_file = datetime.strptime(d, '%Y_%m_%d')
                date_update = self.get_update_date(option='oa')
                is_update = date_update > date_file
                if is_update:
                    print("MEDLINE update available!")
                    if self.os_type == "Windows":
                        subprocess.call(['rmdir', '/s', '/q', os.path.join(self.save_dir, 'pubmed_oa_*_*_*.parquet')]) 
                    else:
                        subprocess.call(['rm', '-rf', os.path.join(self.save_dir, 'pubmed_oa_*_*_*.parquet')]) 
                    subprocess.call(['wget', 'ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/non_comm_use.A-B.xml.tar.gz', '--directory', self.download_dir])
                    if not os.path.isdir(self.unzip_dir): os.mkdir(self.unzip_dir)
                    self.extract_tar_gz(os.path.join(self.download_dir, 'non_comm_use.A-B.xml.tar.gz'), self.unzip_dir)
                else:
                    print("No update available")
            else:
                print("Download Pubmed Open-Access for the first time")
                is_update = True
                date_update = self.get_update_date(option='oa')
                subprocess.call(['wget', 'ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/non_comm_use.A-B.xml.tar.gz', '--directory', self.download_dir])
                if not os.path.isdir(self.unzip_dir): os.mkdir(self.unzip_dir)
                self.extract_tar_gz(os.path.join(self.download_dir, 'non_comm_use.A-B.xml.tar.gz'), self.unzip_dir)
        except Exception as e:
            raise Exception("An error occurred during the update process. Detailed error: {}".format(str(e)))

        return is_update, date_update
    
    # Parse names from the XML element
    def parse_name(self, author):
        first_name = author.find('./ForeName').text if author.find('./ForeName') is not None else ''
        last_name = author.find('./LastName').text if author.find('./LastName') is not None else ''
        return f"{first_name} {last_name}"

    # Parse affiliations from the XML element
    def parse_affiliation(self, author):
        affiliation_info = author.find('./AffiliationInfo/Affiliation')
        return affiliation_info.text if affiliation_info is not None else ''

    # Process a single file
    def process_file(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        tree = html.fromstring(content)
        articles = tree.xpath('//PubmedArticle')
        records = []
        for article in articles:
            pmid = article.find('.//PMID').text
            title = article.find('.//ArticleTitle').text
            abstract = article.find('.//AbstractText').text if article.find('.//AbstractText') is not None else ''
            authors = article.findall('.//Author')
            author_data = [{'name': self.parse_name(a), 'affiliation': self.parse_affiliation(a)} for a in authors]
            records.append(Row(pmid=pmid, title=title, abstract=abstract, authors=author_data))
        return records

    # Run the process
    def run(self):
        is_update, date_update = self.update()  # Removed the explicit 'self' parameter
        if is_update:
            self.process_file(date_update)  # Removed the explicit 'self' parameter
        else:
            # Assuming that you've already downloaded and extracted the data files to unzip_dir
            files = glob(os.path.join(self.unzip_dir, '*.xml'))
        
            rdd = self.sc.parallelize(files).flatMap(self.process_file)
            df = self.sqlContext.createDataFrame(rdd)
        
            today = datetime.today()
            save_path = os.path.join(self.save_dir, f"pubmed_oa_{today.year}_{today.month}_{today.day}.parquet")
            df.write.parquet(save_path)
        
        self.spark.stop()


In [30]:
# Create an instance of the PubMedDataProcessor class
processor = PubMedDataProcessor()

# Update the save_dir attribute to your specific folder
processor.save_dir = "F:/pubmed_oa"

# Create the directory if it doesn't exist
if not os.path.exists(processor.save_dir):
    os.makedirs(processor.save_dir)

# Run the class methods for your tasks
processor.run()

Download Pubmed Open-Access for the first time


Exception: An error occurred during the update process. Detailed error: 550 /nlmdata/.medleasebaseline/gz/: Not a regular file

In [31]:
import os
import requests
import tarfile
import re
import subprocess
from lxml import html
from dateutil import parser
from datetime import datetime
import random
from pyspark import SparkConf, SparkContext, Row, SQLContext
from glob import glob  
from ftplib import FTP
from pyspark.sql import SparkSession

# directory
home_dir = os.path.expanduser('~')
download_dir = os.path.join(home_dir, 'Downloads')
unzip_dir = os.path.join(download_dir, 'pubmed_oa') # path to unzip tar file
save_dir = os.path.join(home_dir, 'Desktop')

class PubMedDataProcessor:
    def __init__(self):
        self.MEDLINE = 'ftp://ftp.nlm.nih.gov/nlmdata/.medleasebaseline/gz/'
        self.PUBMED_OA = 'ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/'
        self.home_dir = os.path.expanduser('~')
        self.download_dir = os.path.join(self.home_dir, 'Downloads')
        self.unzip_dir = os.path.join(self.download_dir, 'pubmed_oa')
        self.save_dir = os.path.join(self.home_dir, 'Desktop')
        self.conf = SparkConf().setAppName('pubmed_oa_spark')\
                                .setMaster('local[8]')\
                                .set('executor.memory', '8g')\
                                .set('driver.memory', '8g')\
                                .set('spark.driver.maxResultSize', '0')
        self.sc = SparkContext(conf=self.conf)
        self.spark = SparkSession.builder.config(conf=self.conf).getOrCreate()

    def download_file(self, url, save_path):
        ftp = FTP('ftp.nlm.nih.gov')
        ftp.login()
        filepath = '/nlmdata/.medleasebaseline/gz/'  # Modify as needed
        with open(save_path, 'wb') as f:
            ftp.retrbinary(f"RETR {filepath}", f.write)
            
    def extract_tar_gz(self, file_path, extract_path):
        with tarfile.open(file_path, 'r:gz') as f:
            f.extractall(extract_path)
            
    def get_update_date(self, option='medline'):
        link = self.MEDLINE if option == 'medline' else self.PUBMED_OA
        index_file = os.path.join(self.download_dir, 'index.html')
        
        self.download_file(link, index_file)
        
        with open(index_file, 'r', encoding='utf-8') as f:
            page = f.read()

        date_all = []
        tree = html.fromstring(page)
        for e in tree.xpath('body/pre/a'):
            if 'File' in e.tail:
                s = e.tail
                s_remove = re.sub(r'\([^)]*\)', '', s)
                s_remove = re.sub('File', '', s_remove).strip()
                d = parser.parse(s_remove)
                date_all.append(d)

        date = max(date_all)
        
        if os.path.exists(index_file):
            os.remove(index_file)
        
        return date

    def parse_name(self, p):
        """Turn dataframe from pubmed_parser to list of Spark Row"""
        author_list = p.author_list
        author_table = list()
        if len(author_list) >= 1:
            for author in author_list:
                r = Row(pmc=p.pmc, pmid=p.pmid, last_name=author[0],
                        first_name=author[1], affiliation_id=author[2])
                author_table.append(r)
            return author_table
        else:
            return None
    
    def parse_affiliation(self, p):
        """Turn dataframe from pubmed_parser to list of Spark Row"""
        affiliation_list = p.affiliation_list
        affiliation_table = list()
        if len(affiliation_list) >= 1:
            for affil in affiliation_list:
                r = Row(pmc=p.pmc, pmid=p.pmid,
                        affiliation_id=affil[0], affiliation=affil[1])
                affiliation_table.append(r)
            return affiliation_table
        else:
            return None

    def update(self):
        """Download and update file"""
        save_file = os.path.join(self.save_dir, 'pubmed_oa_*_*_*.parquet')  # Added self.
        file_list = list(filter(os.path.isdir, glob(save_file)))
        if file_list:
            d = re.search('[0-9]+_[0-9]+_[0-9]+', file_list[0]).group(0)
            date_file = datetime.strptime(d, '%Y_%m_%d')
            date_update = self.get_update_date(option='oa')
            # if update is newer
            is_update = date_update > date_file
            if is_update:
                print("MEDLINE update available!")
                subprocess.call(['rm', '-rf', os.path.join(self.save_dir, 'pubmed_oa_*_*_*.parquet')])
                subprocess.call(['rm', '-rf', download_dir, 'pubmed_oa'])
                subprocess.call(['wget', 'ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/non_comm_use.A-B.xml.tar.gz', '--directory', download_dir])
                if not os.path.isdir(unzip_dir): os.mkdir(unzip_dir)
                self.subprocess.call(['tar', '-xzf', os.path.join(download_dir, 'non_comm_use.A-B.xml.tar.gz'), '--directory', unzip_dir])
            else:
                print("No update available")
        else:
            print("Download Pubmed Open-Access for the first time")
            is_update = True
            date_update = get_update_date(option='oa')
            subprocess.call(['wget', 'ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/non_comm_use.A-B.xml.tar.gz', '--directory', download_dir])
            if not os.path.isdir(unzip_dir): os.mkdir(unzip_dir)
            self.subprocess.call(['tar', '-xzf', os.path.join(download_dir, 'non_comm_use.A-B.xml.tar.gz'), '--directory', unzip_dir])
        return is_update, date_update
    
    def process_file(self, date_update, fraction=0.01):
        """Process unzipped Pubmed Open-Access folder to parquet file"""
        print("Process Pubmed Open-Access file to parquet with fraction = %s" % str(fraction))
        date_update_str = date_update.strftime("%Y_%m_%d")
        if glob(os.path.join(save_dir, 'pubmed_oa_*.parquet')):
            self.subprocess.call(['rm', '-rf', 'pubmed_oa_*.parquet']) # remove if folder still exist

        path_all = pp.list_xml_path(unzip_dir)
        if fraction < 1:
            n_sample = int(fraction * len(path_all))
            rand_index = random.sample(range(len(path_all)), n_sample)
            rand_index.sort()
            path_sample = [path_all[i] for i in rand_index]
        else:
            path_sample = path_all

        path_rdd = sc.parallelize(path_sample, numSlices=10000) # use only example path
        parse_results_rdd = path_rdd.map(lambda x: Row(file_name=os.path.basename(x), **pp.parse_pubmed_xml(x)))
        pubmed_oa_df = parse_results_rdd.toDF()
        pubmed_oa_df_sel = pubmed_oa_df[['full_title', 'abstract', 'doi',
                                        'file_name', 'pmc', 'pmid',
                                        'publication_year', 'publisher_id',
                                        'journal', 'subjects']]
        pubmed_oa_df_sel.write.parquet(os.path.join(save_dir, 'pubmed_oa_%s.parquet' % date_update_str),
                                    mode='overwrite')

        parse_name_rdd = parse_results_rdd.map(lambda x: parse_name(x)).\
            filter(lambda x: x is not None).\
            flatMap(lambda xs: [x for x in xs])
        parse_name_df = parse_name_rdd.toDF()
        parse_name_df.write.parquet(os.path.join(save_dir, 'pubmed_oa_author_%s.parquet' % date_update_str),
                                    mode='overwrite')

        parse_affil_rdd = parse_results_rdd.map(lambda x: parse_affiliation(x)).\
            filter(lambda x: x is not None).\
            flatMap(lambda xs: [x for x in xs])
        parse_affil_df = parse_affil_rdd.toDF()
        # change to parse_affil_df
        parse_affil_df.write.parquet(os.path.join(save_dir, 'pubmed_oa_affiliation_%s.parquet' % date_update_str),
                                    mode='overwrite')
        print('Finished parsing Pubmed Open-Access subset')

    def run(self):
        is_update, date_update = self.update()
        if is_update:
            self.process_file(date_update)
        self.sc.stop()

In [32]:
# Example usage
processor = PubMedDataProcessor()
latest_date = processor.get_update_date('medline')
print("Latest update date:", latest_date)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by getOrCreate at C:\Users\marco\AppData\Local\Temp\ipykernel_27376\3764800598.py:33 

In [33]:
# import os
# import re
# from glob import glob
# from datetime import datetime
# import random
# import subprocess
# import pubmed_parser as pp
# from pyspark.sql import Row, SQLContext
# from pyspark import SparkConf, SparkContext
# import subprocess
# from lxml import html
# from dateutil import parser

# MEDLINE = 'ftp://ftp.nlm.nih.gov/nlmdata/.medleasebaseline/gz/'
# PUBMED_OA = 'ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/'

# def get_update_date(option='medline'):
#     """
#     Download index page and grab lastest update from Medline
#     or Pubmed Open-Access subset

#     Parameters
#     ----------
#     option: str, 'medline' for MEDLINE or 'oa' for Pubmed Open-Access

#     Example
#     -------
#     >> date = get_update_date('medline')
#     >> print(date.strftime("%Y_%m_%d"))
#     """
#     if option == 'medline':
#         link = MEDLINE
#     elif option == 'oa':
#         link = PUBMED_OA
#     else:
#         link = MEDLINE
#         print('Specify either "medline" or "oa" for repository')

#     if os.path.exists('index.html'):
#         subprocess.call(['rm', 'index.html'])
#     subprocess.call(['wget', link])

#     with open('index.html', 'r') as f:
#         page = f.read()

#     date_all = list()
#     tree = html.fromstring(page)
#     for e in tree.xpath('body/pre/a'):
#         if 'File' in e.tail:
#             s = e.tail
#             s_remove = re.sub(r'\([^)]*\)', '', s)
#             s_remove = re.sub('File', '', s_remove).strip()
#             d = parser.parse(s_remove)
#             date_all.append(d)
#     date = max(date_all) # get lastest update
    
#     if os.path.exists('index.html'):
#         subprocess.call(['rm', 'index.html'])
#     return date

# # directory
# home_dir = os.path.expanduser('~')
# download_dir = os.path.join(home_dir, 'Downloads')
# unzip_dir = os.path.join(download_dir, 'pubmed_oa') # path to unzip tar file
# save_dir = os.path.join(home_dir, 'Desktop')

# def parse_name(p):
#     """Turn dataframe from pubmed_parser to list of Spark Row"""
#     author_list = p.author_list
#     author_table = list()
#     if len(author_list) >= 1:
#         for author in author_list:
#             r = Row(pmc=p.pmc, pmid=p.pmid, last_name=author[0],
#                     first_name=author[1], affiliation_id=author[2])
#             author_table.append(r)
#         return author_table
#     else:
#         return None

# def parse_affiliation(p):
#     """Turn dataframe from pubmed_parser to list of Spark Row"""
#     affiliation_list = p.affiliation_list
#     affiliation_table = list()
#     if len(affiliation_list) >= 1:
#         for affil in affiliation_list:
#             r = Row(pmc=p.pmc, pmid=p.pmid,
#                     affiliation_id=affil[0], affiliation=affil[1])
#             affiliation_table.append(r)
#         return affiliation_table
#     else:
#         return None

# def update():
#     """Download and update file"""
#     save_file = os.path.join(save_dir, 'pubmed_oa_*_*_*.parquet')
#     file_list = list(filter(os.path.isdir, glob(save_file)))
#     if file_list:
#         d = re.search('[0-9]+_[0-9]+_[0-9]+', file_list[0]).group(0)
#         date_file = datetime.strptime(d, '%Y_%m_%d')
#         date_update = get_update_date(option='oa')
#         # if update is newer
#         is_update = date_update > date_file
#         if is_update:
#             print("MEDLINE update available!")
#             subprocess.call(['rm', '-rf', os.path.join(save_dir, 'pubmed_oa_*_*_*.parquet')]) # remove
#             subprocess.call(['rm', '-rf', download_dir, 'pubmed_oa'])
#             subprocess.call(['wget', 'ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/non_comm_use.A-B.xml.tar.gz', '--directory', download_dir])
#             if not os.path.isdir(unzip_dir): os.mkdir(unzip_dir)
#             subprocess.call(['tar', '-xzf', os.path.join(download_dir, 'non_comm_use.A-B.xml.tar.gz'), '--directory', unzip_dir])
#         else:
#             print("No update available")
#     else:
#         print("Download Pubmed Open-Access for the first time")
#         is_update = True
#         date_update = get_update_date(option='oa')
#         subprocess.call(['wget', 'ftp://ftp.ncbi.nlm.nih.gov/pub/pmc/oa_bulk/non_comm_use.A-B.xml.tar.gz', '--directory', download_dir])
#         if not os.path.isdir(unzip_dir): os.mkdir(unzip_dir)
#         subprocess.call(['tar', '-xzf', os.path.join(download_dir, 'non_comm_use.A-B.xml.tar.gz'), '--directory', unzip_dir])
#     return is_update, date_update

# def process_file(date_update, fraction=0.01):
#     """Process unzipped Pubmed Open-Access folder to parquet file"""
#     print("Process Pubmed Open-Access file to parquet with fraction = %s" % str(fraction))
#     date_update_str = date_update.strftime("%Y_%m_%d")
#     if glob(os.path.join(save_dir, 'pubmed_oa_*.parquet')):
#         subprocess.call(['rm', '-rf', 'pubmed_oa_*.parquet']) # remove if folder still exist

#     path_all = pp.list_xml_path(unzip_dir)
#     if fraction < 1:
#         n_sample = int(fraction * len(path_all))
#         rand_index = random.sample(range(len(path_all)), n_sample)
#         rand_index.sort()
#         path_sample = [path_all[i] for i in rand_index]
#     else:
#         path_sample = path_all

#     path_rdd = sc.parallelize(path_sample, numSlices=10000) # use only example path
#     parse_results_rdd = path_rdd.map(lambda x: Row(file_name=os.path.basename(x), **pp.parse_pubmed_xml(x)))
#     pubmed_oa_df = parse_results_rdd.toDF()
#     pubmed_oa_df_sel = pubmed_oa_df[['full_title', 'abstract', 'doi',
#                                      'file_name', 'pmc', 'pmid',
#                                      'publication_year', 'publisher_id',
#                                      'journal', 'subjects']]
#     pubmed_oa_df_sel.write.parquet(os.path.join(save_dir, 'pubmed_oa_%s.parquet' % date_update_str),
#                                    mode='overwrite')

#     parse_name_rdd = parse_results_rdd.map(lambda x: parse_name(x)).\
#         filter(lambda x: x is not None).\
#         flatMap(lambda xs: [x for x in xs])
#     parse_name_df = parse_name_rdd.toDF()
#     parse_name_df.write.parquet(os.path.join(save_dir, 'pubmed_oa_author_%s.parquet' % date_update_str),
#                                 mode='overwrite')

#     parse_affil_rdd = parse_results_rdd.map(lambda x: parse_affiliation(x)).\
#         filter(lambda x: x is not None).\
#         flatMap(lambda xs: [x for x in xs])
#     parse_affil_df = parse_affil_rdd.toDF()
#     # change to parse_affil_df
#     parse_affil_df.write.parquet(os.path.join(save_dir, 'pubmed_oa_affiliation_%s.parquet' % date_update_str),
#                                 mode='overwrite')
#     print('Finished parsing Pubmed Open-Access subset')

# conf = SparkConf().setAppName('pubmed_oa_spark')\
#     .setMaster('local[8]')\
#     .set('executor.memory', '8g')\
#     .set('driver.memory', '8g')\
#     .set('spark.driver.maxResultSize', '0')

# if __name__ == '__main__':
#     sc = SparkContext(conf=conf)
#     sqlContext = SQLContext(sc)
#     is_update, date_update = update()
#     if is_update:
#         process_file(date_update)
#     sc.stop()