# Readability Analysis of Scientific Abstracts (2015-2025)

This notebook analyzes readability trends in scientific literature using OpenAlex data.

**Domains:** AI, Medicine, Business  
**Metrics:** Flesch-Kincaid Grade Level, Automated Readability Index, Dale-Chall Score  
**Sample size:** 1,000 abstracts per year per domain

## 1. Install Dependencies

Install required Python packages.

In [None]:
!pip install requests pandas textstat scipy matplotlib numpy

## 2. Import Libraries

In [None]:
import os
import ast
import csv
import sys
import requests
import pandas as pd
import numpy as np
import textstat
import matplotlib.pyplot as plt
from scipy import stats

## 3. Configuration

Set the domain to analyze. Change CONCEPT to one of:
- "AI" (Artificial Intelligence - C154945302)
- "MEDICINE" (Medicine - C71924100)
- "BUSINESS" (Business - C144133560)

In [None]:
# Configuration
CONCEPT = "MEDICINE"  # Change to "AI" or "BUSINESS" as needed

# Map concepts to OpenAlex IDs
CONCEPT_IDS = {
    "AI": "C154945302",
    "MEDICINE": "C71924100",
    "BUSINESS": "C144133560"
}

# Output file names
READABILITY_CSV_FILE = f'readability_scores_{CONCEPT}_2015_2025.csv'
PLOT_FILENAME = f'readability_trends_{CONCEPT}_2015_2025.png'

print(f"Analyzing domain: {CONCEPT}")
print(f"OpenAlex Concept ID: {CONCEPT_IDS[CONCEPT]}")

## 4. Fetch Abstracts from OpenAlex

Query OpenAlex API for abstracts published 2015-2025.

In [None]:
def fetch_openalex(year, concept_id):
    """
    Fetch abstracts from OpenAlex for a specific year and concept.
    Returns up to 1000 abstracts with inverted index format.
    """
    BASE_URL = "https://api.openalex.org/works"
    filters = {
        "filter": f'concepts.id:{concept_id},publication_year:{year}',
        "per-page": 200
    }
    
    results = []
    page = 1
    
    while True:
        print(f"Fetching OpenAlex page {page} for {year}...")
        resp = requests.get(BASE_URL, params={**filters, "page": page})
        
        if resp.status_code != 200:
            print(f"Error: API request failed with status code {resp.status_code}")
            print(f"Response content: {resp.text}")
            break
        
        data = resp.json()
        if "results" not in data or not data["results"]:
            break
        
        for work in data["results"]:
            if work.get("abstract_inverted_index") is None:
                continue
            
            results.append({
                "source": "OpenAlex",
                "id": work.get("id"),
                "title": work.get("title"),
                "publication_year": work.get("publication_year"),
                "doi": work.get("doi"),
                "type": work.get("type"),
                "cited_by_count": work.get("cited_by_count"),
                "abstract": work.get("abstract_inverted_index"),
                "journal": work.get("host_venue", {}).get("display_name"),
                "authors": ", ".join([
                    a.get("author", {}).get("display_name", "")
                    for a in (work.get("authorships") or [])
                    if a and a.get("author")
                ])
            })
            
            if len(results) >= 1000:
                break
        
        if len(results) >= 1000:
            break
        
        page += 1
    
    return results

In [None]:
def reconstruct_abstract(inverted_index_str):
    """
    Convert OpenAlex inverted index format to plain text.
    Inverted index format: {"word": [position1, position2], ...}
    """
    if pd.isna(inverted_index_str):
        return ""
    
    try:
        if isinstance(inverted_index_str, str):
            inverted_index = ast.literal_eval(inverted_index_str)
        else:
            inverted_index = inverted_index_str
    except Exception as e:
        print(f"Warning: could not parse inverted index: {e}")
        return inverted_index_str
    
    if not isinstance(inverted_index, dict):
        return str(inverted_index)
    
    word_positions = []
    for word, positions in inverted_index.items():
        try:
            for pos in positions:
                word_positions.append((pos, word))
        except TypeError:
            continue
    
    word_positions.sort(key=lambda x: x[0])
    words_ordered = [word for pos, word in word_positions]
    
    return " ".join(words_ordered)

## 5. Download and Save Abstracts

This will fetch abstracts for years 2015-2025 and save them as individual text files.

In [None]:
concept_id = CONCEPT_IDS[CONCEPT]

for year in range(2015, 2026):
    print(f"\n{'='*60}")
    print(f"Processing year: {year}")
    print(f"{'='*60}")
    
    openalex_results = fetch_openalex(year, concept_id)
    
    df = pd.DataFrame(openalex_results)
    df["abstract_text"] = df["abstract"].apply(reconstruct_abstract)
    df.drop(columns=["abstract"], inplace=True)
    df = df[df["publication_year"] == year]
    
    os.makedirs(f"{CONCEPT}/abstracts_{year}", exist_ok=True)
    
    saved_count = 0
    for idx, row in df.iterrows():
        abstract_text = row["abstract_text"]
        if pd.isna(abstract_text) or abstract_text == "":
            continue
        
        filename = f"{CONCEPT}/abstracts_{year}/abstract_{year}_{idx}.txt"
        with open(filename, "w", encoding="utf-8") as f:
            f.write(abstract_text)
        saved_count += 1
    
    print(f"Saved {saved_count} abstracts to {CONCEPT}/abstracts_{year}/")

print(f"\n{'='*60}")
print("Data collection complete!")
print(f"{'='*60}")

## 6. Calculate Readability Scores

Define functions to calculate readability metrics.

In [None]:
def process_corpus(directory_path, corpus_name):
    """
    Scan directory for .txt files and calculate readability scores.
    Returns list of dictionaries with scores for each abstract.
    """
    corpus_results = []
    
    if not os.path.isdir(directory_path):
        print(f"Error: Directory not found: '{directory_path}'. Skipping.")
        return corpus_results
    
    print(f"Processing corpus: '{corpus_name}' from directory: '{directory_path}'")
    
    for filename in os.listdir(directory_path):
        if filename.endswith('.txt'):
            file_path = os.path.join(directory_path, filename)
            paper_id = os.path.splitext(filename)[0]
            
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    text = f.read()
                
                if not text.strip():
                    continue
                
                ari_score = textstat.automated_readability_index(text)
                fkg_score = textstat.flesch_kincaid_grade(text)
                dc_score = textstat.dale_chall_readability_score(text)
                
                corpus_results.append({
                    'paper_id': paper_id,
                    'corpus': corpus_name,
                    'ari': ari_score,
                    'flesch_kincaid_grade': fkg_score,
                    'dale_chall_score': dc_score
                })
                
            except Exception as e:
                print(f"ERROR: Could not process {filename}. Error: {e}")
    
    return corpus_results

In [None]:
def calculate_mean_sem(corpus_results):
    """
    Calculate mean and standard error of the mean (SEM) for readability scores.
    """
    ari_values = np.array([value["ari"] for value in corpus_results])
    flesch_values = np.array([value["flesch_kincaid_grade"] for value in corpus_results])
    dale_values = np.array([value["dale_chall_score"] for value in corpus_results])
    
    if ari_values.size == 0:
        return np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, 0
    
    n_samples = len(ari_values)
    
    ari_mean = ari_values.mean()
    flesch_mean = flesch_values.mean()
    dale_mean = dale_values.mean()
    
    ari_sem = stats.sem(ari_values)
    flesch_sem = stats.sem(flesch_values)
    dale_sem = stats.sem(dale_values)
    
    return ari_mean, ari_sem, flesch_mean, flesch_sem, dale_mean, dale_sem, n_samples

In [None]:
def save_results_to_csv(all_results, output_file):
    """
    Save results to CSV file.
    """
    if not all_results:
        print("No results to save.")
        return
    
    header = ['paper_id', 'corpus', 'ari', 'flesch_kincaid_grade', 'dale_chall_score']
    
    try:
        with open(output_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=header)
            writer.writeheader()
            writer.writerows(all_results)
        print(f"Successfully saved results to '{output_file}'")
    except IOError as e:
        print(f"ERROR: Could not write to CSV file. Error: {e}")

In [None]:
def plot_readability_trends(years, ari_scores, ari_errors,
                          flesch_scores, flesch_errors,
                          dale_scores, dale_errors, concept):
    """
    Generate line plot with error bars for readability scores.
    """
    plt.style.use('seaborn-v0_8-darkgrid')
    plt.figure(figsize=(12, 7))
    
    plt.errorbar(years, ari_scores, yerr=ari_errors, marker='o', linestyle='-',
                 label='Automated Readability Index (ARI)', capsize=5)
    plt.errorbar(years, flesch_scores, yerr=flesch_errors, marker='s', linestyle='--',
                 label='Flesch-Kincaid Grade', capsize=5)
    plt.errorbar(years, dale_scores, yerr=dale_errors, marker='^', linestyle=':',
                 label='Dale-Chall Score', capsize=5)
    
    plt.title(f'Readability Scores of {concept} Papers Abstracts (2015-2025) with SEM Error Bars',
              fontsize=16)
    plt.xlabel('Year', fontsize=12)
    plt.ylabel('Readability Score', fontsize=12)
    plt.xticks(years, rotation=45)
    plt.legend(fontsize=11)
    plt.tight_layout()
    
    plot_filename = f'readability_trends_{concept}_2015_2025.png'
    plt.savefig(plot_filename, dpi=300)
    print(f"Successfully saved plot to '{plot_filename}'")
    plt.show()

## 7. Run Analysis

Process all years and generate visualization.

In [None]:
years = list(range(2015, 2026))
all_results = []

ari_means = []
flesch_means = []
dale_means = []

ari_sems = []
flesch_sems = []
dale_sems = []

print("Starting readability analysis for years 2015-2025...\n")

for year in years:
    CORPUS_DIR = f'{CONCEPT}/abstracts_{year}'
    corpus_results = process_corpus(CORPUS_DIR, str(year))
    all_results.extend(corpus_results)
    
    ari_mean, ari_sem, flesch_mean, flesch_sem, dale_mean, dale_sem, n_samples = calculate_mean_sem(corpus_results)
    
    ari_means.append(ari_mean)
    flesch_means.append(flesch_mean)
    dale_means.append(dale_mean)
    
    ari_sems.append(ari_sem)
    flesch_sems.append(flesch_sem)
    dale_sems.append(dale_sem)
    
    print(f"Completed {year} (n={n_samples}). "
          f"ARI: {ari_mean:.2f} (±{ari_sem:.2f}), "
          f"Flesch: {flesch_mean:.2f} (±{flesch_sem:.2f}), "
          f"Dale: {dale_mean:.2f} (±{dale_sem:.2f})")

if all_results:
    print("\nSaving all raw results to CSV...")
    save_results_to_csv(all_results, READABILITY_CSV_FILE)
else:
    print("\nNo .txt files found or processed.")

print("\nGenerating plot with SEM error bars...")
plot_readability_trends(years, ari_means, ari_sems,
                       flesch_means, flesch_sems,
                       dale_means, dale_sems, CONCEPT)

## 8. Statistical Comparison (Optional)

Compare two specific years using Welch's t-test.

In [None]:
def compare_years(year1, year2, concept):
    """
    Compare readability scores between two years using Welch's t-test.
    """
    dir1 = f'{concept}/abstracts_{year1}'
    dir2 = f'{concept}/abstracts_{year2}'
    
    results1 = process_corpus(dir1, str(year1))
    results2 = process_corpus(dir2, str(year2))
    
    ari1 = np.array([r["ari"] for r in results1])
    ari2 = np.array([r["ari"] for r in results2])
    
    flesch1 = np.array([r["flesch_kincaid_grade"] for r in results1])
    flesch2 = np.array([r["flesch_kincaid_grade"] for r in results2])
    
    dale1 = np.array([r["dale_chall_score"] for r in results1])
    dale2 = np.array([r["dale_chall_score"] for r in results2])
    
    t_stat_ari, p_value_ari = stats.ttest_ind(ari1, ari2, equal_var=False)
    t_stat_flesch, p_value_flesch = stats.ttest_ind(flesch1, flesch2, equal_var=False)
    t_stat_dale, p_value_dale = stats.ttest_ind(dale1, dale2, equal_var=False)
    
    print(f"\nComparison: {year1} vs {year2}")
    print(f"{'='*60}")
    print(f"ARI:    {ari1.mean():.4f} vs {ari2.mean():.4f}, p = {p_value_ari:.6f}")
    print(f"Flesch: {flesch1.mean():.4f} vs {flesch2.mean():.4f}, p = {p_value_flesch:.6f}")
    print(f"Dale:   {dale1.mean():.4f} vs {dale2.mean():.4f}, p = {p_value_dale:.6f}")
    
    alpha = 0.05
    if p_value_ari < alpha and p_value_flesch < alpha and p_value_dale < alpha:
        print(f"\nConclusion: Statistically significant difference (α = {alpha})")
    else:
        print(f"\nConclusion: No statistically significant difference (α = {alpha})")

# Example: Compare 2018 (pre-AI) to 2024 (post-AI)
compare_years(2018, 2024, CONCEPT)

## 9. Summary

**Outputs Generated:**
- CSV file with raw readability scores for all abstracts
- PNG visualization showing readability trends with error bars
- Statistical comparison between selected years

**Next Steps:**
- Repeat analysis for other domains by changing CONCEPT variable
- Compare different year pairs using the compare_years function
- Export results for further analysis