# Scopus Literature Search Strategy & CSV Generation
This notebook allows you to test different keyword strategies for the Scopus API.

Obtain your API key through: https://dev.elsevier.com/

Edit the `groups` and `logic` in the next code cell, then run the subsequent cells to see the results

In [11]:
# === SECTION: Import Important Functions ===
import os
import csv
import requests
from datetime import datetime
import json

print("✅ Imported all necessary libraries.")

✅ Imported all necessary libraries.


# 1. Setup folders and API key

In the below section uncomment (ctrl+/ on PC or command+/ on Mac) the relevant lines to define the csv and summary folder and to include your API key

In [None]:
# === SECTION: USER SETUP (PC/Windows) ===
# Comment/uncomment below and edit these variables to match your Windows setup
# csv_folder = r"C:\Users\YOUR_USERNAME\Documents\csvs\scopus_csv"
# summary_folder = r"C:\Users\YOUR_USERNAME\Documents\csvs\summaries"
# api_key = "YOUR_SCOPUS_API_KEY"  # Replace with your Scopus API key

# === SECTION: USER SETUP (Mac) ===
# Comment/uncomment below and edit these lines to match your Mac setup
csv_folder = r"/Users/YOUR_USERNAME/Documents/SP/csvs/scopus_csv"
summary_folder = r"/Users/YOUR_USERNAME/Documents/SP/csvs/summaries"
api_key = "YOUR_SCOPUS_API_KEY"  # Replace with your Scopus API key (https://dev.elsevier.com/)

# === SECTION: FOLDER CREATION AND CHECK ===
import os

os.makedirs(csv_folder, exist_ok=True)
os.makedirs(summary_folder, exist_ok=True)
missing = []
if not api_key or api_key == "YOUR_SCOPUS_API_KEY":
    missing.append("API key")
if not os.path.isdir(csv_folder):
    missing.append("CSV folder")
if not os.path.isdir(summary_folder):
    missing.append("Summary folder")

if missing:
    print(f"⚠️ WARNING: Please check the following: {', '.join(missing)}")
else:
    print("✅ Output folders and API key are set up and ready.")


✅ Output folders and API key are set up and ready.


# 2. Test and adjust your keyword strategy

The below 4 sections will help test different keyword groups and their combinations.
- 2.1 Run to define groups of keywords and your exclusion keyword group using AND/OR rules, then define a combination logic
- 2.2. Run to see the number of results returned for each keyword group and the combined query
- 2.3. Run to see the first 10 titles for each keyword group
- 2.4. Run to see the first 10 titles for the combined keyword group

In [None]:
# === SECTION: Define Groups, Logic, and Year Filter (CORRECTED) ===
from datetime import datetime

groups = {
    'group1': 'TITLE-ABS-KEY(keyword OR "keyword")',
    'group2': 'TITLE-ABS-KEY("keyword" OR "keyword")',
    'group3': 'TITLE-ABS-KEY("keyword" OR "keyword")',
    'group4': 'TITLE-ABS-KEY("keyword" OR "keyword")',
    'excluded': 'AND NOT TITLE-ABS-KEY(keyword OR "keyword")'
}

logic = "({group1}) AND ({group2}) AND ({group3}) AND ({group4}) {excluded}"
combined_query = logic.format(**groups)

year_from = 2016
year_to = datetime.now().year

print(f"✅ Keyword groups and logic defined.\nYear filter: {year_from}-{year_to}")
print("Combined Scopus query:", combined_query)

✅ Keyword groups and logic defined.
Year filter: 2016-2025
Combined Scopus query: (TITLE-ABS-KEY("neuroimaging pipeline" OR "MRI processing" OR "neuroinformatics pipeline")) AND (TITLE-ABS-KEY("structural MRI" OR "T1-weighted" OR "T2-weighted" OR "low-field MRI" OR "portable MRI")) AND (TITLE-ABS-KEY("continuous integration" OR "continuous deployment" OR "CI/CD" OR containerization OR containerisation OR "version control" OR "cloud-based" OR serverless OR "distributed storage" OR BIDS OR "flywheel.io" OR github OR gitlab OR reproducibility)) AND (TITLE-ABS-KEY(brain OR neuroimaging)) AND NOT TITLE-ABS-KEY(fMRI OR EEG OR MEG OR "functional connectivity" OR "clinical trial")


In [17]:
# === SECTION: Run API Query and Return Total Results for Each Group and Combined Query ===

def run_scopus_query(query, api_key, year_from=None, year_to=None, max_records=1):
    base_url = "https://api.elsevier.com/content/search/scopus"
    
    # Build date filter if years are specified
    date_filter = ""
    if year_from and year_to:
        date_filter = f" AND PUBYEAR > {year_from-1} AND PUBYEAR < {year_to+1}"
    elif year_from:
        date_filter = f" AND PUBYEAR > {year_from-1}"
    elif year_to:
        date_filter = f" AND PUBYEAR < {year_to+1}"
    
    full_query = query + date_filter
    
    params = {
        'query': full_query,
        'count': min(max_records, 25),  # Scopus API limit is 25 per request
        'start': 0,
        'view': 'STANDARD'
    }
    
    headers = {
        'Accept': 'application/json',
        'X-ELS-APIKey': api_key
    }
    
    try:
        response = requests.get(base_url, params=params, headers=headers)
        response.raise_for_status()
        data = response.json()
        
        total = int(data.get('search-results', {}).get('opensearch:totalResults', 0))
        articles = data.get('search-results', {}).get('entry', [])
        
        return total, articles
        
    except:
        return 0, []

print("Year filter applied to all queries:", f"{year_from}-{year_to}\n")
print("="*50)
print("INDIVIDUAL GROUP RESULTS:")
print("="*50)
group_results = {}
for name, query in groups.items():
    if name != 'excluded':
        count, _ = run_scopus_query(query, api_key, year_from, year_to)
        group_results[name] = count
        print(f"{name.upper():<25}: {count} results")

print("\n" + "="*50)
print("COMBINED LOGIC RESULTS:")
print("="*50)
print(f"Logic: {logic}\n")
print(f"Combined query: {combined_query}")
combined_count, combined_articles = run_scopus_query(combined_query, api_key, year_from, year_to)
print(f"Combined results: {combined_count}")


Year filter applied to all queries: 2016-2025

INDIVIDUAL GROUP RESULTS:
GROUP1                   : 153 results
GROUP2                   : 56537 results
GROUP3                   : 398942 results
GROUP4                   : 1119364 results

COMBINED LOGIC RESULTS:
Logic: ({group1}) AND ({group2}) AND ({group3}) AND ({group4}) {excluded}

Combined query: (TITLE-ABS-KEY("neuroimaging pipeline" OR "MRI processing" OR "neuroinformatics pipeline")) AND (TITLE-ABS-KEY("structural MRI" OR "T1-weighted" OR "T2-weighted" OR "low-field MRI" OR "portable MRI")) AND (TITLE-ABS-KEY("continuous integration" OR "continuous deployment" OR "CI/CD" OR containerization OR containerisation OR "version control" OR "cloud-based" OR serverless OR "distributed storage" OR BIDS OR "flywheel.io" OR github OR gitlab OR reproducibility)) AND (TITLE-ABS-KEY(brain OR neuroimaging)) AND NOT TITLE-ABS-KEY(fMRI OR EEG OR MEG OR "functional connectivity" OR "clinical trial")
Combined results: 7


The next block will show the first 10 titles for each keyword group (except the excluded keyword group).

Based on this, you can go back and adust your keyword groups.

In [18]:
# === SECTION: Show First 10 Titles for Each Keyword Group (Except NOT Group) ===

for name, query in groups.items():
    if name == 'excluded':
        continue
    print(f"\n{'='*30}\n{name.upper()} (First 10 Titles):\n{'='*30}")
    _, articles = run_scopus_query(query, api_key, year_from, year_to, max_records=10)
    for i, article in enumerate(articles, 1):
        title = article.get('dc:title', 'No Title Available')
        print(f"{i}. {title}")


GROUP1 (First 10 Titles):
1. Fibre density and cross-section associate with hallmark pathology in early Alzheimer’s disease
2. A novel approach for the detection of brain tumor and its classification via independent component analysis
3. An intensive non-invasive protocol combining non-surgical spinal decompression and supportive physiotherapeutic modalities in the treatment of double-level disc herniation at L4-L5 and L5-S1: A case report
4. Dynamic Contrast-enhanced MRI Processing Comparison for Distinguishing True Progression from Pseudoprogression in High-grade Glioma
5. Enhancing Prenatal Diagnosis: Automated Fetal Brain MRI Morphometry
6. Prognostic Survival Analysis for AD Diagnosis and Progression Using MRI Data: An AI-Based Approach
7. A novel approach for the detection of brain tumor and its classification via end-to-end vision transformer - CNN architecture
8. Advancing Thalamic Nuclei Segmentation: The Impact of Compressed Sensing on MRI Processing
9. Empirical Perspective

The next block will show the first 10 titles for the combined query.

Based on the results you can go back and adjust your groups and logic.

In [19]:
# === SECTION: Show First 10 Titles for Combined Keyword Group ===

print(f"\n{'='*30}\nCOMBINED QUERY (First 10 Titles):\n{'='*30}")
_, articles = run_scopus_query(combined_query, api_key, year_from, year_to, max_records=10)
for i, article in enumerate(articles, 1):
    title = article.get('dc:title', 'No Title Available')
    print(f"{i}. {title}")


COMBINED QUERY (First 10 Titles):
1. Reproducibility evaluation of the effects of MRI defacing on brain segmentation
2. Reproducibility and Reliability of Computing Models in Segmentation and Volumetric Measurement of Brain
3. An automatic and accurate deep learning-based neuroimaging pipeline for the neonatal brain
4. PhiPipe: A multi-modal MRI data processing pipeline with test–retest reliability and predicative validity assessments
5. FastSurfer - A fast and accurate deep learning based neuroimaging pipeline
6. Test-retest reliability and sample size estimates after MRI scanner relocation
7. A short-term scan-rescan reliability test measuring brain tissue and subcortical hyperintensity volumetrics obtained using the lesion explorer structural MRI processing pipeline


# 2. Export Scopus Results to CSV

The below script will use your combined query to download titles and abstracts and save them to a CSV file, including author name, title, abstract, year and doi. It will also update the summary table to include the total of found and downloaded records, the source the final query and a timestamp for record keeping purposes.

In [None]:
# === SECTION: Download CSV and Update Summary ===

def extract_first_author(authors_field):
    """
    Extract first author from Scopus author field - CORRECTED VERSION
    """
    if isinstance(authors_field, list) and authors_field:
        first = authors_field[0]  # Get first author from list
        if isinstance(first, dict):
            return first.get('authname', '') or first.get('ce:indexed-name', '')
    elif isinstance(authors_field, dict):
        # Single author case
        return authors_field.get('authname', '') or authors_field.get('ce:indexed-name', '')
    elif isinstance(authors_field, str):
        return authors_field
    return ''

def get_next_versioned_filename(folder, base_name="scopus_csv", ext=".csv"):
    """
    Generate next available versioned filename
    """
    i = 1
    while True:
        filename = f"{base_name}_v{i}{ext}"
        filepath = os.path.join(folder, filename)
        if not os.path.exists(filepath):
            return filename, filepath, i
        i += 1

def ensure_newline_at_end(filepath):
    """Ensures the file ends with a newline before appending."""
    if os.path.isfile(filepath):
        with open(filepath, 'rb+') as f:
            f.seek(-1, os.SEEK_END)
            last_char = f.read(1)
            if last_char != b'\n':
                f.write(b'\n')

def get_all_scopus_results(query, api_key, year_from, year_to, max_records=1000):
    """
    Retrieve all results from Scopus API with pagination
    """
    base_url = "https://api.elsevier.com/content/search/scopus"
    
    # Build date filter
    date_filter = ""
    if year_from and year_to:
        date_filter = f" AND PUBYEAR > {year_from-1} AND PUBYEAR < {year_to+1}"
    elif year_from:
        date_filter = f" AND PUBYEAR > {year_from-1}"
    elif year_to:
        date_filter = f" AND PUBYEAR < {year_to+1}"
    
    full_query = query + date_filter
    
    headers = {
        'Accept': 'application/json',
        'X-ELS-APIKey': api_key
    }
    
    all_articles = []
    start = 0
    count_per_request = 25  # Scopus API limit
    total_found = 0
    
    print(f"🔄 Starting data collection for query...")
    
    while len(all_articles) < max_records:
        params = {
            'query': full_query,
            'count': count_per_request,
            'start': start,
            'view': 'STANDARD'
        }
        
        try:
            response = requests.get(base_url, params=params, headers=headers)
            response.raise_for_status()
            data = response.json()
            
            if start == 0:  # First request
                total_found = int(data.get('search-results', {}).get('opensearch:totalResults', 0))
                print(f"📊 Total results available: {total_found}")
            
            articles = data.get('search-results', {}).get('entry', [])
            
            if not articles:
                print("🔚 No more articles found - stopping collection")
                break
                
            all_articles.extend(articles)
            start += count_per_request
            
            print(f"📥 Collected {len(all_articles)} articles so far...")
            
            # Check if we've reached the end
            if len(articles) < count_per_request:
                print("🔚 Reached end of available results")
                break
                
        except requests.exceptions.RequestException as e:
            print(f"❌ API request failed at start={start}: {e}")
            break
        except json.JSONDecodeError as e:
            print(f"❌ JSON parsing failed: {e}")
            break
    
    # Limit to max_records
    if len(all_articles) > max_records:
        all_articles = all_articles[:max_records]
        print(f"📋 Limited results to {max_records} articles as requested")
    
    return total_found, all_articles

def export_scopus_to_csv_and_update_summary(query, csv_folder, summary_folder, api_key, year_from, year_to, max_records=1000):
    """
    Export Scopus results to CSV and update summary
    """
    # Download articles
    print("🚀 Starting Scopus data export...")
    total_found, articles = get_all_scopus_results(query, api_key, year_from, year_to, max_records=max_records)
    actual_downloaded = len(articles)
    
    # Get next versioned CSV name and version number
    csv_name, csv_path, version_number = get_next_versioned_filename(csv_folder, base_name="scopus_csv", ext=".csv")
    
    print(f"💾 Writing {actual_downloaded} articles to {csv_name}")
    
    # Write to CSV
    with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['first_author', 'title', 'abstract', 'year', 'doi', 'journal', 'citation_count'])
        
        for article in articles:
            # CORRECTED field extraction
            first_author = extract_first_author(article.get('author'))
            title = article.get('dc:title', '')
            abstract = article.get('dc:description', '')  # This is correct for Scopus
            year = article.get('prism:coverDate', '')[:4] if article.get('prism:coverDate') else ''
            doi = article.get('prism:doi', '')
            journal = article.get('prism:publicationName', '')
            citation_count = article.get('citedby-count', '')
            
            writer.writerow([first_author, title, abstract, year, doi, journal, citation_count])
    
    print(f"✅ Exported {actual_downloaded} records to {csv_path}")

    # Prepare versioned source name for summary row
    source_name = f"scopus v{version_number}"

    # Format timestamp as YYYY-MM-DDTHH:MM
    timestamp = datetime.now().strftime('%Y-%m-%dT%H:%M')

    # Update summary CSV: always append, never overwrite, never repeat header
    summary_csv_path = os.path.join(summary_folder, "summary_csv.csv")
    file_exists = os.path.isfile(summary_csv_path)
    
    # Ensure file ends with a newline before appending
    if file_exists and os.path.getsize(summary_csv_path) > 0:
        ensure_newline_at_end(summary_csv_path)
        
    with open(summary_csv_path, 'a', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow(['source', 'found', 'downloaded', 'keyword_combination', 'date'])
        writer.writerow([source_name, total_found, actual_downloaded, query, timestamp])
    
    print("✅ Summary row updated.")

# Execute the export
print("🎯 Starting final export with combined query...")
export_scopus_to_csv_and_update_summary(
    combined_query, csv_folder, summary_folder, api_key, year_from, year_to, max_records=1000
)


🎯 Starting final export with combined query...
🚀 Starting Scopus data export...
🔄 Starting data collection for query...
📊 Total results available: 7
📥 Collected 7 articles so far...
🔚 Reached end of available results
💾 Writing 7 articles to scopus_csv_v3.csv
✅ Exported 7 records to /Users/petrakisherczegh/Documents/SP/csvs/scopus_csv/scopus_csv_v3.csv
✅ Summary row updated.


In [None]:
# === SECTION: Download CSV and Update Summary (COMPLETED VIEW) ===

def extract_first_author(article):
    """
    Extract first author from Scopus dc:creator field
    """
    # Scopus stores the first author in dc:creator field
    dc_creator = article.get('dc:creator', '')
    if dc_creator and isinstance(dc_creator, str):
        return dc_creator
    
    # Fallback to author field if dc:creator is not available
    authors_field = article.get('author', [])
    if isinstance(authors_field, list) and authors_field:
        first = authors_field[0]
        if isinstance(first, dict):
            return first.get('authname', '') or first.get('ce:indexed-name', '')
    
    return ''

def get_next_versioned_filename(folder, base_name="scopus_csv", ext=".csv"):
    """
    Generate next available versioned filename
    """
    i = 1
    while True:
        filename = f"{base_name}_v{i}{ext}"
        filepath = os.path.join(folder, filename)
        if not os.path.exists(filepath):
            return filename, filepath, i
        i += 1

def ensure_newline_at_end(filepath):
    """Ensures the file ends with a newline before appending."""
    if os.path.isfile(filepath):
        with open(filepath, 'rb+') as f:
            f.seek(-1, os.SEEK_END)
            last_char = f.read(1)
            if last_char != b'\n':
                f.write(b'\n')

def get_all_scopus_results(query, api_key, year_from, year_to, max_records=1000):
    """
    Retrieve all results from Scopus API with pagination
    """
    base_url = "https://api.elsevier.com/content/search/scopus"
    
    # Build date filter
    date_filter = ""
    if year_from and year_to:
        date_filter = f" AND PUBYEAR > {year_from-1} AND PUBYEAR < {year_to+1}"
    elif year_from:
        date_filter = f" AND PUBYEAR > {year_from-1}"
    elif year_to:
        date_filter = f" AND PUBYEAR < {year_to+1}"
    
    full_query = query + date_filter
    
    headers = {
        'Accept': 'application/json',
        'X-ELS-APIKey': api_key
    }
    
    all_articles = []
    start = 0
    count_per_request = 25  # Scopus API limit
    total_found = 0
    
    print(f"🔄 Starting data collection for query...")
    
    while len(all_articles) < max_records:
        params = {
            'query': full_query,
            'count': count_per_request,
            'start': start,
            # 'view': 'COMPLETE'
        }
        
        try:
            response = requests.get(base_url, params=params, headers=headers)
            response.raise_for_status()
            data = response.json()
            
            if start == 0:  # First request
                total_found = int(data.get('search-results', {}).get('opensearch:totalResults', 0))
                print(f"📊 Total results available: {total_found}")
            
            articles = data.get('search-results', {}).get('entry', [])
            
            if not articles:
                print("🔚 No more articles found - stopping collection")
                break
                
            all_articles.extend(articles)
            start += count_per_request
            
            print(f"📥 Collected {len(all_articles)} articles so far...")
            
            # Check if we've reached the end
            if len(articles) < count_per_request:
                print("🔚 Reached end of available results")
                break
                
        except requests.exceptions.RequestException as e:
            print(f"❌ API request failed at start={start}: {e}")
            break
        except json.JSONDecodeError as e:
            print(f"❌ JSON parsing failed: {e}")
            break
    
    # Limit to max_records
    if len(all_articles) > max_records:
        all_articles = all_articles[:max_records]
        print(f"📋 Limited results to {max_records} articles as requested")
    
    return total_found, all_articles

def export_scopus_to_csv_and_update_summary(query, csv_folder, summary_folder, api_key, year_from, year_to, max_records=1000):
    """
    Export Scopus results to CSV and update summary
    """
    # Download articles
    print("🚀 Starting Scopus data export...")
    total_found, articles = get_all_scopus_results(query, api_key, year_from, year_to, max_records=max_records)
    actual_downloaded = len(articles)
    
    # Get next versioned CSV name and version number
    csv_name, csv_path, version_number = get_next_versioned_filename(csv_folder, base_name="scopus_csv", ext=".csv")
    
    print(f"💾 Writing {actual_downloaded} articles to {csv_name}")
    
    # Write to CSV
    with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['first_author', 'title', 'abstract', 'year', 'doi'])
        
        for article in articles:
            # Use the corrected author extraction
            first_author = extract_first_author(article)
            title = article.get('dc:title', '')
            abstract = article.get('dc:description', '')
            year = article.get('prism:coverDate', '')[:4] if article.get('prism:coverDate') else ''
            doi = article.get('prism:doi', '')
            
            writer.writerow([first_author, title, abstract, year, doi])
    
    print(f"✅ Exported {actual_downloaded} records to {csv_path}")

    # Prepare versioned source name for summary row
    source_name = f"scopus v{version_number}"

    # Format timestamp as YYYY-MM-DDTHH:MM
    timestamp = datetime.now().strftime('%Y-%m-%dT%H:%M')

    # Update summary CSV: always append, never overwrite, never repeat header
    summary_csv_path = os.path.join(summary_folder, "summary_csv.csv")
    file_exists = os.path.isfile(summary_csv_path)
    
    # Ensure file ends with a newline before appending
    if file_exists and os.path.getsize(summary_csv_path) > 0:
        ensure_newline_at_end(summary_csv_path)
        
    with open(summary_csv_path, 'a', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow(['source', 'found', 'downloaded', 'keyword_combination', 'date'])
        writer.writerow([source_name, total_found, actual_downloaded, query, timestamp])
    
    print("✅ Summary row updated.")

# Execute the export
print("🎯 Starting final export with combined query...")
export_scopus_to_csv_and_update_summary(
    combined_query, csv_folder, summary_folder, api_key, year_from, year_to, max_records=1000
)



🎯 Starting final export with combined query...
🚀 Starting Scopus data export...
🔄 Starting data collection for query...
📊 Total results available: 7
📥 Collected 7 articles so far...
🔚 Reached end of available results
💾 Writing 7 articles to scopus_csv_v5.csv
✅ Exported 7 records to /Users/petrakisherczegh/Documents/SP/csvs/scopus_csv/scopus_csv_v5.csv
✅ Summary row updated.


# Scopus Literature Search Strategy Completed

If all scripts have been run successfully (either once or multiple times), you should've received confirmation messages for each block and have at least one csv named scopus_csv_v(n).csv in your folder defined at the start. Note, that with every single download the code generates an additional version following the naming convention of v1, v2, v3 etc. You should also have a summary table updated with a record of each download you made.