In [68]:
import logging, sys
from typing import List, Optional

from pydantic import BaseModel, HttpUrl
from Bio import Entrez

from configs import ENTREZ_EMAIL, BATCH_SIZE

class ReadyAuthor(BaseModel):

    forename: str
    lastname: str

class ReadyPaper(BaseModel):

    title: str
    pmid: int
    doi: str
    link: HttpUrl

    abstract: str
    journal: str

    authors: Optional[List[ReadyAuthor]]

def setup_ncbi() -> logging.Logger:
    """Setups NCBI Entrez module and returns logger."""
    Entrez.email = ENTREZ_EMAIL

    pubmed_logger = logging.getLogger('pubmed')
    pubmed_logger.addHandler(logging.StreamHandler(sys.stdout))
    pubmed_logger.setLevel(logging.INFO)

    return pubmed_logger

def search(query: str, results_num: int = BATCH_SIZE, sort='pub+date') -> dict:
    handle = Entrez.esearch(
        db='pubmed', 
        sort=sort, 
        retmax=str(results_num),
        retmode='xml', 
        term=query
    )
    results = Entrez.read(handle)

    return results

def search_by_journal(journal_name: str, results_num: int = BATCH_SIZE, sort='pub_date') -> dict:
    handle = Entrez.esearch(
        db='pubmed', 
        sort=sort, 
        retmax=str(results_num),
        retmode='xml', 
        term=f'"{journal_name}"[Journal]'
    )
    results = Entrez.read(handle)

    return results

def fetch_details(id_list):
    ids = ','.join(id_list)
    handle = Entrez.efetch(db='pubmed',
                           retmode='xml',
                           id=ids)
    details = Entrez.read(handle)
    return details

def clean_text(s: str) -> str:
    
    s = ' '.join(s.replace('/', ' / ').split())
    s = s.replace('!', '\!')
    
    return s


def get_doi(article_id_list) -> str:
    for string_element in article_id_list:
        if string_element.__dict__['attributes']['IdType'] == 'doi':
            return string_element.__str__().lower()


def get_abstract(paper) -> str:
    
    string_list = []

    try:
        abstract = paper['Abstract']['AbstractText']
    except KeyError:
        return 'No abstract available for this paper :('
    
    for part in abstract:
        if part.__dict__['attributes']:
            string_list.append('*' + part.__dict__['attributes']['Label'] + '*')
        string_list.append(part.__str__())
        string_list.append('')

    cleaned_list = [clean_text(s) for s in string_list]
    
    return '\n'.join(cleaned_list)

def parse_paper(paper) -> ReadyPaper:

    title = clean_text(paper['MedlineCitation']['Article']['ArticleTitle'])
    pmid = int(paper['MedlineCitation']['PMID'])
    doi = get_doi(paper['PubmedData']['ArticleIdList'])
    link = f'https://doi.org/{doi}'

    abstract = get_abstract(paper['MedlineCitation']['Article'])
    journal = paper['MedlineCitation']['Article']['Journal']['Title']

    authors = []
    if 'AuthorList' in paper['MedlineCitation']['Article'].keys():
        for author in paper['MedlineCitation']['Article']['AuthorList']:
            if author.attributes['ValidYN'] == 'Y':
                authors.append(
                    ReadyAuthor(
                        forename=author['ForeName'],
                        lastname=author['LastName']
                    )
                )
                

    ready_paper = ReadyPaper(
        title=title,
        pmid=pmid,
        doi=doi,
        link=link,
        abstract=abstract,
        journal=journal,
        authors = None if not authors else authors
    )

    if authors:
        ready_paper.authors = authors=authors

    return ready_paper

def get_ready_papers(pubmed_logger: logging.Logger, query: str, results_num: int = BATCH_SIZE) -> List[ReadyPaper]:

    results = search(query, results_num)
    ids = results['IdList']
    papers = fetch_details(ids)['PubmedArticle']

    if len(papers) < results_num:
        results = search(query, results_num * 2)
        ids = results['IdList']
        papers = fetch_details(ids)['PubmedArticle']

    outuput = []

    for paper in papers:
        try:
            outuput.append(parse_paper(paper))
        except Exception as e:
            pubmed_logger.error(f'Error while parsing paper about {query}!')
            pubmed_logger.error(e)
            continue

    if len(outuput) < results_num:
        results = search(query, results_num * 2)
        ids = results['IdList']
        papers = fetch_details(ids)['PubmedArticle']

    for paper in papers:
        try:
            outuput.append(parse_paper(paper))
        except Exception as e:
            pubmed_logger.error(f'Error while parsing paper about {query}!')
            pubmed_logger.error(e)
            continue

    if len(outuput) < results_num:
        pubmed_logger.warning(f'Not enough papers about {query}, found only {len(outuput)}!')

    if len(outuput) > results_num:
        outuput = outuput[:results_num]

    return outuput

def get_papers_by_journal(pubmed_logger: logging.Logger, journal_name: str, results_num = BATCH_SIZE):

    results = search_by_journal(journal_name, results_num)
    ids = results['IdList']
    papers = fetch_details(ids)['PubmedArticle']


    print(results)
    print('---')
    print(papers)

    if len(papers) < results_num:
        results = search_by_journal(journal_name, results_num * 2)
        ids = results['IdList']
        papers = fetch_details(ids)['PubmedArticle']

    outuput = []

    for paper in papers:
        try:
            outuput.append(parse_paper(paper))
        except Exception as e:
            pubmed_logger.error(f'Error while parsing papers from journal "{journal_name}"!')
            pubmed_logger.error(e)
            continue

    if len(outuput) < results_num:
        results = search_by_journal(journal_name, results_num * 2)
        ids = results['IdList']
        papers = fetch_details(ids)['PubmedArticle']

    for paper in papers:
        try:
            outuput.append(parse_paper(paper))
        except Exception as e:
            pubmed_logger.error(f'Error while parsing papers from journal "{journal_name}"!')
            pubmed_logger.error(e)
            continue

    if len(outuput) < results_num:
        pubmed_logger.warning(f'Not enough papers from journal "{journal_name}", found only {len(outuput)}!')

    if len(outuput) > results_num:
        outuput = outuput[:results_num]

    # TODO DRY

In [69]:
journal_name = 'Nature'
results_num = 20
pubmed_logger = setup_ncbi()

In [70]:
results = search_by_journal(journal_name, results_num)
ids = results['IdList']
papers = fetch_details(ids)['PubmedArticle']


if len(papers) < results_num:
    results = search_by_journal(journal_name, results_num * 2)
    ids = results['IdList']
    papers = fetch_details(ids)['PubmedArticle']

outuput = []

for paper in papers:
    try:
        outuput.append(parse_paper(paper))
    except Exception as e:
        pubmed_logger.error(f'Error while parsing papers from journal "{journal_name}"!')
        pubmed_logger.error(e)
        continue

if len(outuput) < results_num:
    results = search_by_journal(journal_name, results_num * 2)
    ids = results['IdList']
    papers = fetch_details(ids)['PubmedArticle']

for paper in papers:
    try:
        outuput.append(parse_paper(paper))
    except Exception as e:
        pubmed_logger.error(f'Error while parsing papers from journal "{journal_name}"!')
        pubmed_logger.error(e)
        continue

if len(outuput) < results_num:
    pubmed_logger.warning(f'Not enough papers from journal "{journal_name}", found only {len(outuput)}!')

if len(outuput) > results_num:
    outuput = outuput[:results_num]

In [74]:
papers[4]['MedlineCitation']['Article'].keys()

dict_keys(['ELocationID', 'Language', 'ArticleDate', 'Journal', 'ArticleTitle', 'Abstract', 'AuthorList', 'PublicationTypeList'])

In [75]:
papers[4]['MedlineCitation']['Article']['AuthorList'][1]

DictElement({'AffiliationInfo': [{'Identifier': [], 'Affiliation': 'Chemistry and Nanoscience Center, National Renewable Energy Laboratory, Golden, CO, USA.'}], 'Identifier': [StringElement('0000-0003-0605-140X', attributes={'Source': 'ORCID'})], 'LastName': 'Tirawat', 'ForeName': 'Robert', 'Initials': 'R'}, attributes={'ValidYN': 'Y'})

In [77]:
results = get_papers_by_journal(pubmed_logger, journal_name, results_num)

{'Count': '130208', 'RetMax': '20', 'RetStart': '0', 'IdList': ['37697059', '37696972', '37696971', '37696289', '37696288', '37689756', '37684394', '37684393', '37684392', '37684391', '37684389', '37684388', '37684390', '37679491', '37679490', '37679489', '37679487', '37674093', '37679488', '37674092'], 'TranslationSet': [], 'QueryTranslation': '"Nature"[Journal]'}
---
[{'MedlineCitation': DictElement({'OtherAbstract': [], 'OtherID': [], 'SpaceFlightMission': [], 'GeneralNote': [], 'KeywordList': [], 'CitationSubset': ['IM'], 'PMID': StringElement('37697059', attributes={'Version': '1'}), 'DateRevised': {'Year': '2023', 'Month': '09', 'Day': '11'}, 'Article': DictElement({'ELocationID': [StringElement('10.1038/s41586-023-06584-6', attributes={'EIdType': 'doi', 'ValidYN': 'Y'})], 'Language': ['eng'], 'ArticleDate': [DictElement({'Year': '2023', 'Month': '09', 'Day': '11'}, attributes={'DateType': 'Electronic'})], 'Journal': {'ISSN': StringElement('1476-4687', attributes={'IssnType': 'El

In [78]:
results