In [None]:
import gzip, json, pathlib, itertools, pandas as pd, re, ast
from tqdm import tqdm

DATA_DIR = pathlib.Path('../Data') / 'ppa_corpus_2025-02-03_1308'
PAGES_FILE = DATA_DIR / 'ppa_pages.jsonl.gz'
META_CSV   = DATA_DIR / 'ppa_metadata.csv'

metadata_df = pd.read_csv(META_CSV, dtype=str)

In [None]:
# spelling variants for each poetic form
# longer multi-word patterns come first so regex matches them before shorter ones (e.g. 'pindaric ode' before 'ode')
POETIC_FORMS_KEYWORDS = {
    'Ballad': ['balad', 'ballade', 'ballad'],
    'Ghazal': ['ghazel', 'ghazels', 'guzel', 'guzels', 'gazal', 'gazals', 'ghazul', 'ghazuls', 'ghazal', 'ghazals'],
    'Haiku': ['haicu', 'haïku', 'hiaku', 'hokku', 'haiku'],
    'Limerick': ['limeric', 'limerik', 'limerick'],
    'Pantoum': ['pantun', 'pantoun', 'pantowm', 'pantoum'],
    'Sestina': ['sestine', 'sestena', 'sistina', 'sestina'],
    'Sonnet': ['sonet', 'sonnete', 'sonnette', 'sonneta', 'sonnetto', 'sonnet'],
    'Villanelle': ['villanella', 'villanell', 'villanela', 'Villǎnelle', 'villanel', 'villanelle'],
    'Blank Verse': ['blank verse', 'blanke verse', 'blanck verse', 'blancke verse'],
    'Free Verse': ['free verse', 'vers libre', 'freee verse'],
    'Common Measure': ['common meter', 'common metre', 'common measure', 'common-measure'],
    'Ars Poetica': ['ars poetica', 'ars poeticæ'],
    'Aubade': ['aubade', 'aubad', 'aubadee', 'aubadé'],
    'Concrete Poetry': ['concrete poetry', 'concrete poem', 'pattern poem', 'pattern poetry'],
    'Dramatic Monologue': ['dramatic monologue', 'dramatic soliloquy'],
    'Ekphrasis': ['ekphrasis', 'ecphrasis', 'ekphrastic'],
    'Elegy': ['elegy', 'elegie', 'elogy', 'elegiac'],
    'Ode': ['pindaric ode', 'pindarick ode', 'horatian ode', 'pindaric', 'odes', 'ode'],
    'Hymn': ['hymn', 'hynm'],
    'Rondeau': ['rondeau'],
    'Pastoral': ['pastoral', 'pastorel', 'pastorall'],
    'Prose Poem': ['prose poem', 'prose poetry', 'prose-poetry'],
    'Verse Novel': ['verse novel', 'novel in verse'],
    'Epic': ['epic', 'epics', 'epick', 'epicks'],
    "Ruba'i": ['rubai', 'rubayat', 'rubaiyat', "ruba'ee", "rubá'í"],
    'Song': ['song', 'songs'],
    'Lyric': ['lyric']
}

In [None]:
# load the top 5000 most common corpus words (generated by most-frequently-occuring-words.ipynb)
with open('../Data/top_5000_words_list.txt', 'r') as f:
    top_5000_words = json.load(f)

In [None]:
# merge poetic forms + top-5000 into one dict, tagging each entry with its source
def prepare_keyword_dict_with_sources(poetic_forms, top_5000):
    combined_keywords = {}
    keyword_sources = {}

    for form, spellings in poetic_forms.items():
        combined_keywords[form] = spellings
        keyword_sources[form] = 'poetic_forms'

    for word in top_5000:
        combined_keywords[word] = [word]
        keyword_sources[word] = 'top_5000'

    return combined_keywords, keyword_sources

combined_keywords, keyword_sources = prepare_keyword_dict_with_sources(POETIC_FORMS_KEYWORDS, top_5000_words)
print(f'Total keywords: {len(combined_keywords)} ({len(POETIC_FORMS_KEYWORDS)} poetic forms + {len(top_5000_words)} top words)')

In [None]:
def page_iter(pages_file):
    # stream pages one at a time
    with gzip.open(pages_file, 'rt', encoding='utf-8') as fh:
        for line in fh:
            yield json.loads(line)

def extract_context(text, pos, length, window=300):
    # grab 300 chars on either side of a match
    start = max(0, pos - window)
    end = min(len(text), pos + length + window)
    return text[start:end]

def compile_patterns(keywords_dict, keyword_sources):
    # compile all regexes once upfront
    # short words get word boundaries to avoid false positives (e.g. 'ode' inside 'model')
    needs_boundaries = {
        'ode', 'odes', 'epic', 'epics', 'epick', 'epicks', 'hymn',
        'ghazel', 'ghazels', 'guzel', 'guzels', 'gazal', 'gazals',
        'ghazul', 'ghazuls', 'ghazal', 'ghazals', 'song', 'songs', 'lyric', 'lay'
    }
    patterns = {}

    for form, spellings in keywords_dict.items():
        pats = []
        source = keyword_sources[form]
        for spelling in spellings:
            clean = spelling.strip()
            if source == 'top_5000' or clean.lower() in needs_boundaries:
                pats.append(r'\b' + re.escape(clean) + r'\b')
            else:
                pats.append(re.escape(clean))
        patterns[form] = re.compile('|'.join(pats), re.IGNORECASE)

    return patterns

def find_matches(page, patterns, keyword_sources):
    # search a single page for all keywords, return one row per keyword hit
    text = page.get('text', '')
    matches = []

    for form, pattern in patterns.items():
        found = list(pattern.finditer(text))
        if found:
            source = keyword_sources[form]
            matches.append({
                'page_id': page['id'],
                'work_id': page['work_id'],
                'order': page['order'],
                'poetic_form': form if source == 'poetic_forms' else None,
                'top_5000_word': form if source == 'top_5000' else None,
                'keyword_source': source,
                'tags': page.get('tags'),
                'counts': len(found),
                'contexts': [extract_context(text, m.start(), len(m.group())) for m in found],
                'page_text': text,
                'spelling': list(set(m.group().lower() for m in found))
            })

    return matches

In [None]:
def search_keywords(pages_file, keywords_dict, keyword_sources, metadata_df=None,
                    batch_size=10000, max_pages=None, output_dir='keyword_results'):
    # main pipeline: filter to lit/ling works, search every page, write batched CSVs, then combine
    # set max_pages to a small number (e.g. 1000) for a quick test run
    import os
    os.makedirs(output_dir, exist_ok=True)

    lit_ling_work_ids = set()
    meta_lookup = None

    if metadata_df is not None:
        meta_lookup = metadata_df.set_index('work_id')
        meta_lookup['collections_parsed'] = meta_lookup['collections'].apply(ast.literal_eval)
        mask = meta_lookup['collections_parsed'].isin([
            ['Literary'], ['Linguistic'], ['Linguistic', 'Literary'], ['Literary', 'Linguistic']
        ])
        lit_ling_work_ids = set(meta_lookup[mask].index)
        meta_dict = meta_lookup.to_dict('index')
        print(f'Found {len(lit_ling_work_ids):,} Literary/Linguistic works')

    patterns = compile_patterns(keywords_dict, keyword_sources)
    pages_processed = 0
    matched_works = set()
    pages_iter = page_iter(pages_file)
    csv_batch_num = 0
    batch_results = []

    # count total pages for the progress bar (~30s on the full corpus)
    with gzip.open(pages_file, 'rt') as f:
        total_pages = max_pages or sum(1 for _ in f)

    with tqdm(total=total_pages, desc='Searching', unit='pages') as pbar:
        while True:
            batch = list(itertools.islice(pages_iter, batch_size))
            if not batch:
                break

            batch_processed = 0
            for page in batch:
                if max_pages and pages_processed >= max_pages:
                    break
                pages_processed += 1
                batch_processed += 1
                work_id = page['work_id']

                # skip works outside our collection filter
                if work_id not in lit_ling_work_ids:
                    continue

                matches = find_matches(page, patterns, keyword_sources)

                if matches:
                    matched_works.add(work_id)
                    if meta_lookup is not None and work_id in meta_dict:
                        for match in matches:
                            match.update(meta_dict[work_id])
                    batch_results.extend(matches)

            # flush to disk after each batch
            if batch_results:
                df_batch = pd.DataFrame(batch_results)
                csv_file = os.path.join(output_dir, f'results_batch_{csv_batch_num:04d}.csv')
                df_batch.to_csv(csv_file, index=False)
                print(f'\nBatch {csv_batch_num}: {len(df_batch)} rows -> {csv_file}')
                batch_results = []
                csv_batch_num += 1

            pbar.update(batch_processed)
            if max_pages and pages_processed >= max_pages:
                break

    # stitch all the batch files into one
    print('\nCombining batches...')
    all_dfs = []
    for i in range(csv_batch_num + 1):
        csv_file = os.path.join(output_dir, f'results_batch_{i:04d}.csv')
        if os.path.exists(csv_file):
            all_dfs.append(pd.read_csv(csv_file))

    if all_dfs:
        final_df = pd.concat(all_dfs, ignore_index=True)
        final_file = os.path.join(output_dir, 'results_combined.csv')
        final_df.to_csv(final_file, index=False)
        print(f'Done — {len(final_df):,} rows across {final_df["work_id"].nunique():,} works -> {final_file}')
        return final_df

    return pd.DataFrame()

In [None]:
results = search_keywords(PAGES_FILE, combined_keywords, keyword_sources, metadata_df)

In [None]:
results.head()

In [None]:
results.to_csv('ppa_keyword_db.csv', index=False)