# SKOL IV: All the Data

In [1]:
bahir_package = 'org.apache.bahir:spark-sql-cloudant_2.12:2.4.0'
!spark-shell --packages $bahir_package < /dev/null

25/11/22 21:39:56 WARN Utils: Your hostname, puchpuchobs resolves to a loopback address: 127.0.1.1; using 172.16.227.68 instead (on interface wlp130s0f0)
25/11/22 21:39:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/data/piggy/miniconda3/envs/skol/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/piggy/.ivy2/cache
The jars for the packages stored in: /home/piggy/.ivy2/jars
org.apache.bahir#spark-sql-cloudant_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9245636f-90bd-44cc-a1d3-d182e256019e;1.0
	confs: [default]
	found org.apache.bahir#spark-sql-cloudant_2.12;2.4.0 in central
	found org.apache.bahir#bahir-common_2.12;2.4.0 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found com.cloudant#cloudant-client;2.17.0 in central
	found com.google.code.gson#gson;2.8.2 in central
	fou

In [28]:
from io import BytesIO
import json
import hashlib
import os
from pathlib import Path, PurePath
import requests
import shutil
import sys
import tempfile
from typing import Any, Dict, Iterator, List, Optional
from urllib.robotparser import RobotFileParser

# Be sure to get version 2: https://simple-repository.app.cern.ch/project/bibtexparser/2.0.0b8/description
import bibtexparser
import couchdb
import feedparser
import fitz # PyMuPDF

import pandas as pd  # TODO(piggy): Remove this dependency in favor of pure pyspark DataFrames.

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import (
    Tokenizer, CountVectorizer, IDF, StringIndexer, VectorAssembler, IndexToString
)
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier

from pyspark.sql import SparkSession, DataFrame, Row
from pyspark.sql.functions import (
    input_file_name, collect_list, regexp_extract, col, udf,
    explode, trim, row_number, min, expr, concat, lit
)
from pyspark.sql.types import (
    ArrayType, BooleanType, IntegerType, MapType, NullType,
    StringType, StructType, StructField 
)
from pyspark.sql.window import Window

import redis
from uuid import uuid4

# Local modules
current_dir = os.getcwd()
parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir))
parent_path = Path(parent_dir)
if parent_dir not in sys.path:
    sys.path.append(parent_dir)

from couchdb_file import CouchDBFile as CDBF
from fileobj import FileObject
from finder import parse_annotated, remove_interstitials
import line
from line import Line

# Import the SKOL classifier jupyter/ist769_skol.ipynb
from skol_classifier import SkolClassifier as SC, get_file_list
from skol_classifier.preprocessing import SuffixTransformer, ParagraphExtractor
from skol_classifier.utils import calculate_stats

from taxon import group_paragraphs, Taxon


In [3]:
couchdb_host = "127.0.0.1:5984" # e.g., "ACCOUNT.cloudant.com" or "localhost"
couchdb_username = "admin"
couchdb_password = "SU2orange!"
ingest_db_name = "skol_dev"
taxon_db_name = "skol_taxa_dev"

couchdb_url = f'http://{couchdb_host}'

spark = SparkSession \
    .builder \
    .appName("CouchDB Spark SQL Example in Python using dataframes") \
    .master("local[2]") \
    .config("cloudant.protocol", "http") \
    .config("cloudant.host", couchdb_host) \
    .config("cloudant.username", couchdb_username) \
    .config("cloudant.password", couchdb_password) \
    .config("spark.jars.packages", bahir_package) \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "20g") \
    .config("spark.submit.pyFiles",
            f'{parent_path / "line.py"},{parent_path / "fileobj.py"},'
            f'{parent_path / "couchdb_file.py"},{parent_path / "finder.py"},'
            f'{parent_path / "taxon.py"},{parent_path / "paragraph.py"},'
            f'{parent_path / "label.py"},{parent_path / "file.py"}'
           ) \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR") # Keeps the noise down!!!

couch = couchdb.Server(couchdb_url)
couch.resource.credentials = (couchdb_username, couchdb_password)

if ingest_db_name not in couch:
    db = couch.create(ingest_db_name)
else:
    db = couch[ingest_db_name]

user_agent = "synoptickeyof.life"

ingenta_rp = RobotFileParser()
ingenta_rp.set_url("https://www.ingentaconnect.com/robots.txt")
ingenta_rp.read() # Reads and parses the robots.txt file from the URL

25/11/22 21:40:02 WARN Utils: Your hostname, puchpuchobs resolves to a loopback address: 127.0.1.1; using 172.16.227.68 instead (on interface wlp130s0f0)
25/11/22 21:40:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/piggy/.ivy2/cache
The jars for the packages stored in: /home/piggy/.ivy2/jars
org.apache.bahir#spark-sql-cloudant_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-13cc1161-8088-4be5-8926-d29121005404;1.0
	confs: [default]


:: loading settings :: url = jar:file:/data/piggy/miniconda3/envs/skol/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.bahir#spark-sql-cloudant_2.12;2.4.0 in central
	found org.apache.bahir#bahir-common_2.12;2.4.0 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found com.cloudant#cloudant-client;2.17.0 in central
	found com.google.code.gson#gson;2.8.2 in central
	found commons-codec#commons-codec;1.6 in central
	found com.cloudant#cloudant-http;2.17.0 in central
	found commons-io#commons-io;2.4 in central
	found com.squareup.okhttp3#okhttp;3.12.2 in central
	found com.squareup.okio#okio;1.15.0 in central
	found com.typesafe#config;1.3.1 in central
	found org.scalaj#scalaj-http_2.12;2.3.0 in central
:: resolution report :: resolve 208ms :: artifacts dl 8ms
	:: modules in use:
	com.cloudant#cloudant-client;2.17.0 from central in [default]
	com.cloudant#cloudant-http;2.17.0 from central in [default]
	com.google.code.gson#gson;2.8.2 from central in [default]
	com.squareup.okhttp3#okhttp;3.12.2 from central in [default]
	com.squareup.okio#okio;1.15.0 from central in [def

## The Data Sources

The goal is to collect all the open access taxonomic literature in Mycology. Most of the sources below mainly cover macro-fungi and slime molds.

### Ingested Data Sources

* [Mycotaxon at Ingenta Connect](https://www.ingentaconnect.com/content/mtax/mt)
* [Studies in Mycology at Ingenta Connect](https://www.studiesinmycology.org/)

### Source of many older public domain and open access works

Mycoweb includes scans of many older works in mycology. I have local copies but need to ingest them.

* [Mycoweb](https://mykoweb.com/)

### Journals in hand

These are journals we've collected over the years. The initial annotated issues are from early years of Mycotaxon. We still need to ingest all of these.

* Mycologia (back issues)
* [Mycologia at Taylor and Francis](https://www.tandfonline.com/journals/umyc20)
  Mycologia is the main journal of the Mycological Society of America. It is a mix of open access and traditional access articles. The connector for this journal will need to identify the open access articles.
* Persoonia (all issues)
  Persoonia is no longer published.
* Mycotaxon (back issues)
  Mycotaxon is no longer published.

### Journals that need connectors

These are journals we're aware that include open access articles.

* [Amanitaceae.org](http://www.tullabs.com/amanita/?home)
* [Mycosphere](https://mycosphere.org/)
* [Mycoscience](https://mycoscience.org/)
* [Journal of Fungi](https://www.mdpi.com/journal/jof)
* [Mycology](https://www.tandfonline.com/journals/tmyc20)
* [Open Access Journal of Mycology & Mycological Sciences](https://www.medwinpublishers.com/OAJMMS/)
* [Mycokeys](https://mycokeys.pensoft.net/)


## Ingestion

Each journal or other data source gets an ingester that puts PDFs into our document store along with any metadata we can collect. The metadata is sufficient to create citations for each issue, book, or article. If bibtex citations are available we prefer to store these verbatim.

### Ingenta RSS ingestion

Ingenta Connect is an electronic publisher that holds two Mycology journals. New articles are available via RSS (Really Simple Syndication).

In [4]:
def ingest_from_bibtex(
        db: couchdb.Database,
        content: bytes,
        bibtex_link: str,
        meta: Dict[str, Any],
        rp
        ) -> None:
    """Load documents referenced in an Ingenta BibTeX database."""
    bib_database = bibtexparser.parse_string(content)

    bibtex_data = {
        'link': bibtex_link,
        'bibtex': bibtexparser.write_string(bib_database),
    }
    
    for bib_entry in bib_database.entries:
        doc = {
            '_id': uuid4().hex,
            'meta': meta,
            'pdf_url': f"{bib_entry['url']}?crawler=true",
        }

        # Do not fetch if we already have an entry.
        selector = {'selector': {'pdf_url': doc['pdf_url']}}
        found = False
        for e in db.find(selector):
            found = True
        if found:
            print(f"Skipping {doc['pdf_url']}")
            continue

        if not rp.can_fetch(user_agent, doc['pdf_url']):
            # TODO(piggy): We should probably record blocked URLs.
            print(f"Robot permission denied {doc['pdf_url']}")
            continue

        print(f"Adding {doc['pdf_url']}")
        for k in bib_entry.fields_dict.keys():
            doc[k] = bib_entry[k]
        
        doc_id, doc_rev = db.save(doc)
        with requests.get(doc['pdf_url'], stream=False) as pdf_f:
            pdf_f.raise_for_status()
            pdf_doc = pdf_f.content
        
        attachment_filename = 'article.pdf'
        attachment_content_type = 'application/pdf'
        attachment_file = BytesIO(pdf_doc)

        db.put_attachment(doc, attachment_file, attachment_filename, attachment_content_type)

        print("-" * 10)

In [5]:
def ingest_ingenta(
        db: couchdb.Database,
        rss_url: str,
        rp
) -> None:
    """Ingest documents from an Ingenta RSS feed."""

    feed = feedparser.parse(rss_url)
    
    feed_meta = {
        'url': rss_url,
        'title': feed.feed.title,
        'link': feed.feed.link,
        'description': feed.feed.description,
    }

    for entry in feed.entries:
        entry_meta = {
            'title': entry.title,
            'link': entry.link,
        }
        if hasattr(entry, 'summary'):
            entry_meta['summary'] = entry.summary
        if hasattr(entry, 'description'):
            entry_meta['description'] = entry.description

        bibtex_link = f'{entry.link}?format=bib'
        print(f"bibtex_link: {bibtex_link}")

        if not rp.can_fetch(user_agent, bibtex_link):
            print(f"Robot permission denied {bibtex_link}")
            continue

        with requests.get(bibtex_link, stream=False) as bibtex_f:
            bibtex_f.raise_for_status()  # Raise an HTTPError for bad responses (4xx or 5xx)

            ingest_from_bibtex(
                db=db,
                content=bibtex_f.content\
                    .replace(b"\"\nparent", b"\",\nparent")\
                    .replace(b"\n", b""),
                bibtex_link=bibtex_link,
                meta={
                    'feed': feed_meta,
                    'entry': entry_meta,
                },
                rp=rp
            )
        print("=" * 20)

In [6]:
def ingest_from_local_bibtex(
    db: couchdb.Database,
    root: Path,
    rp
) -> None:
    """Ingest from a local directory with Ingenta bibtext files in it."""
    for dirpath, dirnames, filenames in os.walk(root):
        for filename in filenames:
            if not filename.endswith('format=bib'):
                continue
            full_filepath = os.path.join(dirpath, filename)
            bibtex_link = f"https://www.ingentaconnect.com/{full_filepath[len(str(root)):]}"
            with open(full_filepath) as f:
                content = f.read()\
                    .replace("\"\nparent", "\",\nparent")\
                    .replace("\n", "")
                ingest_from_bibtex(db, content, bibtex_link, meta={}, rp=rp)


Download the RSS

Read bibtex files and create records for each article.

Download the PDFs at the URLs in the bibtex entries.

Create a JSON record with the PDF as an attachment.

### Text extraction

We extract the text, optionally with OCR. Add as an additional attachment on the source record.

In [7]:
df = spark.read.load(
    format="org.apache.bahir.cloudant",
    database=ingest_db_name
)

                                                                                

In [8]:
df.describe()

DataFrame[summary: string, _id: string, _rev: string, abstract: string, author: string, doi: string, eissn: string, issn: string, itemtype: string, journal: string, number: string, pages: string, parent_itemid: string, pdf_url: string, publication date: string, publishercode: string, title: string, url: string, volume: string, year: string]

In [9]:
# Content-Type: text/html; charset=UTF-8

def pdf_to_text(pdf_contents: bytes) -> bytes:
    doc = fitz.open(stream=BytesIO(pdf_contents), filetype="pdf")

    full_text = ''
    for page_num in range(len(doc)):
        page = doc.load_page(page_num)
        # Possibly perform OCR on the page
        text = page.get_text("text", flags=fitz.TEXT_PRESERVE_WHITESPACE | fitz.TEXT_DEHYPHENATE)
        # full_text += f"\n--- PDF Page {page_num+1} ---\n"
        full_text += text

    return full_text.encode("utf-8")

def add_text_to_partition(iterator) -> None:
    couch = couchdb.Server(couchdb_url)
    couch.resource.credentials = (couchdb_username, couchdb_password)
    local_db = couch[ingest_db_name]
    for row in iterator:
        if not row:
            continue
        if not row._attachments:
            continue
        row_dict = row.asDict()
        attachment_dict = row._attachments.asDict()
        for pdf_filename in attachment_dict:
            pdf_path = PurePath(pdf_filename)
            if pdf_path.suffix != '.pdf':
                continue
            pdf_path = PurePath(pdf_filename)
            txt_path_str = pdf_path.stem + '.txt'
            # if txt_path_str in attachment_dict:
            #     # TODO(piggy): Recalculate text if text is terrible. Too much noise vocabulary?
            #     print(f"Already have text for {row.pdf_url}")
            #     continue
            print(f"{row._id}, {row.pdf_url}")
            pdf_file = local_db.get_attachment(row._id, str(pdf_path)).read()
            txt_file = pdf_to_text(pdf_file)
            attachment_content_type = 'text/simple; charset=UTF-8'
            attachment_file = BytesIO(txt_file)
            local_db.put_attachment(row_dict, attachment_file, txt_path_str, attachment_content_type)
    


In [10]:
# Identical to skol_classifier.CouchDBConnection.
from skol_classifier import CouchDBConnection as CDBC

class CouchDBConnection(CDBC):
    """
    Manages CouchDB connection and provides I/O operations.

    This class encapsulates connection parameters and provides an idempotent
    connection method that can be safely called multiple times.
    """

    def __init__(
        self,
        couchdb_url: str,
        database: str,
        username: Optional[str] = None,
        password: Optional[str] = None
    ):
        """
        Initialize CouchDB connection parameters.
CouchDBFile
        Args:
            couchdb_url: CouchDB server URL (e.g., "http://localhost:5984")
            database: Database name
            username: Optional username for authentication
            password: Optional password for authentication
        """
        self.couchdb_url = couchdb_url
        self.database = database
        self.username = username
        self.password = password
        self._server = None
        self._db = None

    def _connect(self):
        """
        Idempotent connection method that returns a CouchDB server object.

        This method can be called multiple times safely - it will only create
        a connection if one doesn't already exist.

        Returns:
            couchdb.Server: Connected CouchDB server object
        """
        if self._server is None:
            self._server = couchdb.Server(self.couchdb_url)
            if self.username and self.password:
                self._server.resource.credentials = (self.username, self.password)

        if self._db is None:
            self._db = self._server[self.database]

        return self._server

    @property
    def db(self):
        """Get the database object, connecting if necessary."""
        if self._db is None:
            self._connect()
        return self._db

    def get_document_list(
        self,
        spark: SparkSession,
        pattern: str = "*.txt"
    ) -> DataFrame:
        """
        Get a list of documents with text attachments from CouchDB.

        This only fetches document metadata (not content) to create a DataFrame
        that can be processed in parallel. Creates ONE ROW per attachment, so if
        a document has multiple attachments matching the pattern, it will have
        multiple rows in the resulting DataFrame.
ist769_skol
        Args:
            spark: SparkSession
            pattern: Pattern for attachment names (e.g., "*.txt")

        Returns:
            DataFrame with columns: doc_id, attachment_name
            One row per (doc_id, attachment_name) pair
        """
        # Connect to CouchDB (driver only)
        db = self.db

        # Get all documents with attachments matching pattern
        doc_list = []
        for doc_id in db:
            try:
                doc = db[doc_id]
                attachments = doc.get('_attachments', {})

                # Loop through ALL attachments in the document
                for att_name in attachments.keys():
                    # Check if attachment matches pattern
                    # Pattern matching: "*.txt" matches files ending with .txt
                    if pattern == "*.txt" and att_name.endswith('.txt'):
                        doc_list.append((doc_id, att_name))
                    elif pattern == "*.*" or pattern == "*":
                        # Match all attachments
                        doc_list.append((doc_id, att_name))
                    elif pattern.startswith("*.") and att_name.endswith(pattern[1:]):
                        # Generic pattern matching for *.ext
                        doc_list.append((doc_id, att_name))
            except Exception:
                # Skip documents we can't read
                continue

        # Create DataFrame with document IDs and attachment names
        schema = StructType([
            StructField("doc_id", StringType(), False),
            StructField("attachment_name", StringType(), False)
        ])

        return spark.createDataFrame(doc_list, schema)

    def fetch_partition(
        self,
        partition: Iterator[Row]
    ) -> Iterator[Row]:
        """
        Fetch CouchDB attachments for an entire partition.

        This function is designed to be used with foreachPartition or mapPartitions.
        It creates a single CouchDB connection per partition and reuses it for all rows.

        Args:
            partition: Iterator of Rows with doc_id and attachment_name

        Yields:
            Rows with doc_id, doc, attachement_name, and value (content)
        """
        # Connect to CouchDB once per partition
        try:
            db = self.db

            # Process all rows in partition with same connection
            # Note: Each row represents one (doc_id, attachment_name) pair
            # If a document has multiple .txt attachments, there will be multiple rows
            for row in partition:
                try:
                    doc = db[row.doc_id]
                    # Get the specific attachment for this row
                    if row.attachment_name in doc.get('_attachments', {}):
                        attachment = db.get_attachment(doc, row.attachment_name)
                        if attachment:
                            content = attachment.read().decode('utf-8', errors='ignore')
                            yield Row(
                                doc_id=row.doc_id,
                                doc=doc,
                                attachment_name=row.attachment_name,
                                value=content
                            )
                except Exception as e:
                    # Log error but continue processing
                    print(f"Error fetching {row.doc_id}/{row.attachment_name}: {e}")
                    continue

        except Exception as e:
            print(f"Error connecting to CouchDB: {e}")
            return

    def save_partition(
        self,
        partition: Iterator[Row],
        suffix: str = ".ann"
    ) -> Iterator[Row]:
        """
        Save annotated content to CouchDB for an entire partition.

        This function is designed to be used with foreachPartition or mapPartitions.
        It creates a single CouchDB connection per partition and reuses it for all rows.

        Args:
            partition: Iterator of Rows with doc_id, attachment_name, final_aggregated_pg
            suffix: Suffix to append to attachment names

        Yields:
            Rows with doc_id, attachment_name, and success status
        """
        # Connect to CouchDB once per partition
        try:
            db = self.db

            # Process all rows in partition with same connection
            # Note: Each row represents one (doc_id, attachment_name) pair
            # If a document had multiple .txt files, we save multiple .ann files
            for row in partition:
                success = False
                try:
                    doc = db[row.doc_id]

                    # Create new attachment name by appending suffix
                    # e.g., "article.txt" becomes "article.txt.ann"
                    new_attachment_name = f"{row.attachment_name}{suffix}"

                    # Save the annotated content as a new attachment
                    db.put_attachment(
                        doc,
                        row.final_aggregated_pg.encode('utf-8'),
                        filename=new_attachment_name,
                        content_type='text/plain'
                    )

                    success = True

                except Exception as e:
                    print(f"Error saving {row.doc_id}/{row.attachment_name}: {e}")

                yield Row(
                    doc_id=row.doc_id,
                    attachment_name=row.attachment_name,
                    success=success
                )

        except Exception as e:
            print(f"Error connecting to CouchDB: {e}")
            # Yield failures for all rows
            for row in partition:
                yield Row(
                    doc_id=row.doc_id,
                    doc=row.attachment_name,
                    success=False
                )

    def load_distributed(
        self,
        spark: SparkSession,
        pattern: str = "*.txt"
    ) -> DataFrame:
        """
        Load text attachments from CouchDB using foreachPartition.

        This function:
        1. Gets list of documents (on driver)
        2. Creates a DataFrame with doc IDs
        3. Uses mapPartitions to fetch content efficiently (one connection per partition)

        Args:
            spark: SparkSession
            pattern: Pattern for attachment names

        Returns:
            DataFrame with columns: doc_id, doc, value
        """
        # Get document list
        doc_df = self.get_document_list(spark, pattern)

        # Use mapPartitions for efficient batch fetching
        # Create new connection instance with same params for workers
        conn_params = (self.couchdb_url, self.database, self.username, self.password)

        def fetch_partition(partition):
            # Each worker creates its own connection
            conn = CouchDBConnection(*conn_params)
            return conn.fetch_partition(partition)

        # Apply mapPartitions
        result_df = doc_df.rdd.mapPartitions(fetch_partition).toDF()

        result_df.printSchema()
        return result_df

    def save_distributed(
        self,
        df: DataFrame,
        suffix: str = ".ann"
    ) -> DataFrame:
        """
        Save annotated predictions to CouchDB using foreachPartition.

        This function uses mapPartitions where each partition creates a single
        CouchDB connection and reuses it for all rows.

        Args:
            df: DataFrame with columns: doc_id, attachment_name, final_aggregated_pg
            suffix: Suffix to append to attachment names

        Returns:
            DataFrame with doc_id, attachment_name, and success columns
        """
        # Use mapPartitions for efficient batch saving
        # Create new connection instance with same params for workers
        conn_params = (self.couchdb_url, self.database, self.username, self.password)

        def save_partition(partition):
            # Each worker creates its own connection
            conn = CouchDBConnection(*conn_params)
            return conn.save_partition(partition, suffix)

        # Define output schema
        schema = StructType([
            StructField("doc_id", StringType(), False),
            StructField("attachment_name", StringType(), False),
            StructField("success", BooleanType(), False)
        ])

        # Apply mapPartitions
        result_df = df.rdd.mapPartitions(save_partition).toDF(schema)

        return result_df

    def process_partition_with_func(
        self,
        partition: Iterator[Row],
        processor_func,
        suffix: str = ".ann"
    ) -> Iterator[Row]:
        """
        Generic function to read, process, and save in one partition operation.

        This allows custom processing logic while maintaining single connection per partition.

        Args:
            partition: Iterator of Rows
            processor_func: Function to process content (takes content string, returns processed string)
            suffix: Suffix for output attachment

        Yields:
            Rows with processing results
        """
        try:
            db = self.db

            for row in partition:
                try:
                    doc = db[row.doc_id]

                    # Fetch
                    if row.attachment_name in doc.get('_attachments', {}):
                        attachment = db.get_attachment(doc, row.attachment_name)
                        if attachment:
                            content = attachment.read().decode('utf-8', errors='ignore')

                            # Process
                            processed = processor_func(content)

                            # Save
                            new_attachment_name = f"{row.attachment_name}{suffix}"
                            db.put_attachment(
                                doc,
                                processed.encode('utf-8'),
                                filename=new_attachment_name,
                                content_type='text/plain'
                            )

                            yield Row(
                                doc_id=row.doc_id,
                                attachment_name=row.attachment_name,
                                success=True
                            )
                            continue

                except Exception as e:
                    print(f"Error processing {row.doc_id}/{row.attachment_name}: {e}")

                yield Row(
                    doc_id=row.doc_id,
                    attachment_name=row.attachment_name,
                    success=False
                )

        except Exception as e:
            print(f"Error connecting to CouchDB: {e}")
            for row in partition:
                yield Row(
                    doc_id=row.doc_id,
                    attachment_name=row.attachment_name,
                    success=False
                )



In [11]:
"""
Main classifier module for SKOL text classification
"""
class SkolClassifier(SC):
    """
    Text classifier for taxonomic literature.

    This version only includes the redis and couchdb I/O methods.
    All other methods are in SC.

    Supports multiple classification models (Logistic Regression, Random Forest)
    and feature types (word TF-IDF, suffix TF-IDF, combined).
    """

    def save_to_redis(self) -> bool:
        """
        Save the trained models to Redis.

        The models are saved to a temporary directory, then packaged and stored in Redis
        as a compressed binary blob along with metadata.

        Uses the Redis client and key configured in the constructor.

        Returns:
            True if successful, False otherwise

        Raises:
            ValueError: If no models are trained or Redis client is not configured
        """
        if self.pipeline_model is None or self.classifier_model is None:
            raise ValueError(
                "No models to save. Train models using fit() or train_classifier() first."
            )

        if self.redis_client is None:
            raise ValueError(
                "No Redis client configured. Initialize classifier with redis_client."
            )

        temp_dir = None
        try:
            # Create temporary directory for model files
            temp_dir = tempfile.mkdtemp(prefix="skol_model_")
            temp_path = Path(temp_dir)

            # Save pipeline model
            pipeline_path = temp_path / "pipeline_model"
            self.pipeline_model.save(str(pipeline_path))

            # Save classifier model
            classifier_path = temp_path / "classifier_model"
            self.classifier_model.save(str(classifier_path))

            # Save metadata (labels and model info)
            metadata = {
                "labels": self.labels,
                "version": "0.0.1"
            }
            metadata_path = temp_path / "metadata.json"
            with open(metadata_path, 'w') as f:
                json.dump(metadata, f)

            # Create archive in memory
            import io
            import tarfile

            archive_buffer = io.BytesIO()
            with tarfile.open(fileobj=archive_buffer, mode='w:gz') as tar:
                tar.add(temp_path, arcname='.')

            # Get compressed data
            archive_data = archive_buffer.getvalue()

            # Save to Redis
            self.redis_client.set(self.redis_key, archive_data)

            return True

        except Exception as e:
            print(f"Error saving to Redis: {e}")
            return False

        finally:
            # Clean up temporary directory
            if temp_dir and Path(temp_dir).exists():
                shutil.rmtree(temp_dir)

    def load_from_redis(self) -> bool:
        """
        Load trained models from Redis.

        Uses the Redis client and key configured in the constructor.

        Returns:
            True if successful, False otherwise

        Raises:
            ValueError: If Redis client is not configured or key doesn't exist
        """
        if self.redis_client is None:
            raise ValueError(
                "No Redis client configured. Initialize classifier with redis_client."
            )

        temp_dir = None
        try:
            # Retrieve from Redis
            archive_data = self.redis_client.get(self.redis_key)
            if archive_data is None:
                raise ValueError(f"No model found in Redis with key: {self.redis_key}")

            # Create temporary directory for extraction
            temp_dir = tempfile.mkdtemp(prefix="skol_model_load_")
            temp_path = Path(temp_dir)

            # Extract archive
            import io
            import tarfile

            archive_buffer = io.BytesIO(archive_data)
            with tarfile.open(fileobj=archive_buffer, mode='r:gz') as tar:
                tar.extractall(temp_path)

            # Load pipeline model
            pipeline_path = temp_path / "pipeline_model"
            self.pipeline_model = PipelineModel.load(str(pipeline_path))

            # Load classifier model
            classifier_path = temp_path / "classifier_model"
            self.classifier_model = PipelineModel.load(str(classifier_path))

            # Load metadata
            metadata_path = temp_path / "metadata.json"
            with open(metadata_path, 'r') as f:
                metadata = json.load(f)
                self.labels = metadata.get("labels")

            return True

        except Exception as e:
            print(f"Error loading from Redis: {e}")
            return False

        finally:
            # Clean up temporary directory
            if temp_dir and Path(temp_dir).exists():
                shutil.rmtree(temp_dir)


    def load_from_couchdb(self, pattern: str = "*.txt") -> DataFrame:
        """
        Load raw text from CouchDB attachments using distributed UDFs.

        This method uses Spark UDFs to fetch attachments in parallel across workers,
        rather than loading all data on the driver.

        Uses the CouchDB configuration from the constructor.

        Args:
            pattern: Pattern for attachment names (default: "*.txt")

        Returns:
            DataFrame with columns: doc_id, attachment_name, value

        Raises:
            ValueError: If CouchDB is not configured
        """
        if self.couchdb_url is None or self.database is None:
            raise ValueError(
                "CouchDB not configured. Initialize classifier with couchdb_url and database."
            )

        conn = CouchDBConnection(
            self.couchdb_url, self.database, self.username, self.password
        )
        return conn.load_distributed(self.spark, pattern)


    def predict_from_couchdb(
        self,
        pattern: str = "*.txt",
        output_format: str = "annotated"
    ) -> DataFrame:
        """
        Load text from CouchDB, predict labels, and return predictions.

        Uses the CouchDB configuration from the constructor.

        Args:
            pattern: Pattern for attachment names
            output_format: Output format ('annotated' or 'simple')

        Returns:
            DataFrame with predictions, including doc_id and attachment_name

        Raises:
            ValueError: If models are not trained or CouchDB is not configured
        """
        if self.pipeline_model is None or self.classifier_model is None:
            raise ValueError(
                "Models not trained. Call fit_features() and train_classifier() first."
            )

        # Load data from CouchDB
        df = self.load_from_couchdb(pattern)

        # Process paragraphs
        heuristic_udf = udf(
            ParagraphExtractor.extract_heuristic_paragraphs,
            ArrayType(StringType())
        )

        # Window specification for ordering
        window_spec = Window.partitionBy("doc_id", "attachment_name").orderBy("start_idx")

        # Group and extract paragraphs
        grouped_df = (
            df.groupBy("doc_id", "attachment_name")
            .agg(
                collect_list("value").alias("lines"),
                min(lit(0)).alias("start_idx")
            )
            .withColumn("value", explode(heuristic_udf(col("lines"))))
            .drop("lines")
            .filter(trim(col("value")) != "")
            .withColumn("row_number", row_number().over(window_spec))
        )

        # Extract features
        features = self.pipeline_model.transform(grouped_df)

        # Predict
        predictions = self.classifier_model.transform(features)

        # Convert label indices to strings
        from pyspark.ml.feature import IndexToString

        converter = IndexToString(
            inputCol="prediction",
            outputCol="predicted_label",
            labels=self.labels
        )
        labeled_predictions = converter.transform(predictions)

        # Format output
        if output_format == "annotated":
            labeled_predictions = labeled_predictions.withColumn(
                "annotated_pg",
                concat(
                    lit("[@ "),
                    col("value"),
                    lit("#"),
                    col("predicted_label"),
                    lit("*]")
                )
            )

        return labeled_predictions

    def save_to_couchdb(
        self,
        predictions: DataFrame,
        suffix: str = ".ann"
    ) -> List[Dict[str, Any]]:
        """
        Save annotated predictions back to CouchDB using distributed UDFs.

        This method uses Spark UDFs to save attachments in parallel across workers,
        distributing the write operations.

        Uses the CouchDB configuration from the constructor.

        Args:
            predictions: DataFrame with predictions (must include annotated_pg column)
            suffix: Suffix to append to attachment names (default: ".ann")

        Returns:
            List of results from CouchDB operations

        Raises:
            ValueError: If CouchDB is not configured
        """
        if self.couchdb_url is None or self.database is None:
            raise ValueError(
                "CouchDB not configured. Initialize classifier with couchdb_url and database."
            )

        conn = CouchDBConnection(
            self.couchdb_url, self.database, self.username, self.password
        )

        # Aggregate paragraphs by document and attachment
        aggregated_df = (
            predictions.groupBy("doc_id", "attachment_name")
            .agg(
                expr("sort_array(collect_list(struct(row_number, annotated_pg))) AS sorted_list")
            )
            .withColumn("annotated_pg_ordered", expr("transform(sorted_list, x -> x.annotated_pg)"))
            .withColumn("final_aggregated_pg", expr("array_join(annotated_pg_ordered, '\n')"))
            .select("doc_id", "attachment_name", "final_aggregated_pg")
        )

        # Save to CouchDB using distributed UDF
        result_df = conn.save_distributed(aggregated_df, suffix)

        # Collect results
        results = []
        for row in result_df.collect():
            results.append({
                'doc_id': row.doc_id,
                'attachment_name': f"{row.attachment_name}{suffix}",
                'success': row.success
            })

        return results


## Build a classifier to identify paragraph types.

We save this to redis so that we don't need to train the model every time.

In [12]:
# Train classifier on annotated data and save to Redis
# Connect to Redis
redis_client = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    decode_responses=False
)
classifier_model_name = "skol:classifier:model:v1.0"

if not redis_client.exists(classifier_model_name):

    # Initialize classifier with Redis connection
    classifier = SkolClassifier(
        spark=spark,
        redis_client=redis_client,
        redis_key=classifier_model_name,
        auto_load=False,  # Don't auto-load, we want to train fresh
        couchdb_url=couchdb_url,
        database=ingest_db_name,
        username=couchdb_username,
        password=couchdb_password
    )
    
    # Get annotated training files
    annotated_path = Path.cwd().parent / "data" / "annotated"
    print(f"Loading annotated files from: {annotated_path}")
    
    if annotated_path.exists():
        annotated_files = get_file_list(str(annotated_path), pattern="**/*.ann")
        
        if len(annotated_files) > 0:
            print(f"Found {len(annotated_files)} annotated files")
            
            # Train the classifier
            print("Training classifier...")
            results = classifier.fit(annotated_files)
            
            print(f"Training complete!")
            print(f"  Accuracy: {results['accuracy']:.4f}")
            print(f"  F1 Score: {results['f1_score']:.4f}")
            print(f"  Labels: {classifier.labels}")
            
            # Save model to Redis
            print("\nSaving model to Redis...")
            if classifier.save_to_redis():
                print(f"✓ Model successfully saved to Redis with key: {classifier_model_name}.")
            else:
                print("✗ Failed to save model to Redis")
        else:
            print(f"No annotated files found in {annotated_path}")
    else:
        print(f"Directory does not exist: {annotated_path}")
        print("Please ensure annotated training data is available.")
else:
    print(f"Skipping generation of model {classifier_model_name}.")

Skipping generation of model skol:classifier:model:v1.0.


## Extract the taxa names and descriptions

We use a classifier to extract taxa names and descriptions from articles, issues, and books. The YEDDA annotated texts are written back to CouchDB.

In [13]:
classifier = SkolClassifier(
    spark=spark,
    redis_client=redis_client,
    redis_key=classifier_model_name,
    auto_load=True,
    couchdb_url=couchdb_url,
    database=ingest_db_name,
    username=couchdb_username,
    password=couchdb_password
)

if classifier.labels is None:
    raise ValueError("No model found in Redis. Please train a model first.")

print(f"Model loaded with labels: {classifier.labels}")

print("\nLoading and classifying documents from CouchDB...")
predictions = classifier.predict_from_couchdb(pattern="*.txt")

# Show sample predictions
print("\nSample predictions:")
predictions.select(
    "doc_id", "attachment_name", "predicted_label"
).show(5, truncate=50)

# Save results back to CouchDB using distributed writes
print("\nSaving predictions back to CouchDB...")
print("(Each partition writes its documents using a single connection)")
results = classifier.save_to_couchdb(predictions=predictions, suffix=".ann")

# Report results
successful = sum(1 for r in results if r['success'])
failed = len(results) - successful

print(f"\nSaved {successful} annotated files to CouchDB")
if failed > 0:
    print(f"Failed to save {failed} files")
    for r in results:
        if not r['success']:
            print(f"  {r['doc_id']}/{r['attachment_name']}")


  tar.extractall(temp_path)


Model loaded with labels: ['Misc-exposition', 'Description', 'Nomenclature']

Loading and classifying documents from CouchDB...


  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version


root
 |-- doc_id: string (nullable = true)
 |-- doc: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- attachment_name: string (nullable = true)
 |-- value: string (nullable = true)


Sample predictions:


  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/data/piggy/miniconda3/envs/skol/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 193, in manager
BrokenPipeError: [Errno 32] Broken pipe
                                                                                

+--------------------------------+---------------+---------------+
|                          doc_id|attachment_name|predicted_label|
+--------------------------------+---------------+---------------+
|006b331e284e4dc8b7ab9e275d00ad22|    article.txt|    Description|
|00769b24893c40ea9062aa486f6cebe2|    article.txt|Misc-exposition|
|013e630386744976a13eddfc71ea3b97|    article.txt|Misc-exposition|
|0149ece76caa4937a07d12406500c198|    article.txt|Misc-exposition|
|015e247f133f485984ecaf092b168ca6|    article.txt|Misc-exposition|
+--------------------------------+---------------+---------------+
only showing top 5 rows


Saving predictions back to CouchDB...
(Each partition writes its documents using a single connection)


  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version


Saved 2005 annotated files to CouchDB


                                                                                

In [14]:
predictions.select("predicted_label", "annotated_pg").where('predicted_label = "Nomenclature"').show()

[Stage 50:>                                                         (0 + 1) / 1]

+---------------+--------------------+
|predicted_label|        annotated_pg|
+---------------+--------------------+
|   Nomenclature|[@ ISSN (print) 0...|
|   Nomenclature|[@ MYCOTAXON\nVol...|
|   Nomenclature|[@ ISSN (print) 0...|
|   Nomenclature|[@ ISSN (print) 0...|
|   Nomenclature|[@ MYCOTAXON\nVol...|
|   Nomenclature|[@ MYCOTAXON \nIS...|
|   Nomenclature|[@ ISSN (print) 0...|
|   Nomenclature|[@ ISSN (print) 0...|
|   Nomenclature|[@ ISSN (print) 0...|
|   Nomenclature|[@ MYCOTAXON\nVol...|
|   Nomenclature|[@ ISSN (print) 0...|
|   Nomenclature|[@ 87\n© 2022 Wes...|
|   Nomenclature|[@ ISSN (print) 0...|
|   Nomenclature|[@ MYCOTAXON \nIS...|
|   Nomenclature|[@ MYCOTAXON \nIS...|
|   Nomenclature|[@ ISSN (print) 0...|
|   Nomenclature|[@ ISSN (print) 0...|
|   Nomenclature|[@ ISSN (print) 0...|
|   Nomenclature|[@ Studies in Myc...|
|   Nomenclature|[@ MYCOTAXON\nVol...|
+---------------+--------------------+
only showing top 20 rows



Exception ignored in: <_io.BufferedWriter name=5>                               
Traceback (most recent call last):
  File "/data/piggy/miniconda3/envs/skol/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 193, in manager
BrokenPipeError: [Errno 32] Broken pipe


## Build the Taxon objects and store them in CouchDB
We use CouchDB to store a full record for each taxon. We copy all metadata to the taxon records.

In [15]:
class CouchDBFile(CDBF):
    """
    File-like object that reads from CouchDB attachment content.

    This class extends FileObject to support reading text from CouchDB
    attachments while preserving database metadata (doc_id, attachment_name,
    and database name).
    """

    _doc_id: str
    _attachment_name: str
    _db_name: str
    _url: Optional[str]
    _content_lines: List[str]

    def __init__(
        self,
        content: str,
        doc_id: str,
        attachment_name: str,
        db_name: str,
        url: Optional[str] = None
    ) -> None:
        """
        Initialize CouchDBFile from attachment content.

        Args:
            content: Text content from CouchDB attachment
            doc_id: CouchDB document ID
            attachment_name: Name of the attachment (e.g., "article.txt.ann")
            db_name: Database name where document is stored (ingest_db_name)
            url: Optional URL from the CouchDB row
        """
        self._doc_id = doc_id
        self._attachment_name = attachment_name
        self._db_name = db_name
        self._url = url
        self._line_number = 0
        self._page_number = 1
        self._empirical_page_number = None

        # Split content into lines
        self._content_lines = content.split('\n')

    def _get_content_iterator(self) -> Iterator[str]:
        """Get iterator over content lines."""
        return iter(self._content_lines)

    @property
    def filename(self) -> str:
        """
        Return a composite identifier for CouchDB documents.

        Format: db_name/doc_id/attachment_name
        This allows tracking the source of each line.
        """
        return f"{self._db_name}/{self._doc_id}/{self._attachment_name}"

    @property
    def doc_id(self) -> str:
        """CouchDB document ID."""
        return self._doc_id

    @property
    def attachment_name(self) -> str:
        """Attachment filename."""
        return self._attachment_name

    @property
    def db_name(self) -> str:
        """Database name (ingest_db_name)."""
        return self._db_name

    @property
    def url(self) -> Optional[str]:
        """URL from the CouchDB row."""
        return self._url


In [16]:
def read_couchdb_partition(
    partition: Iterator[Row],
    db_name: str
) -> Iterator[Line]:
    """
    Read annotated files from CouchDB rows in a PySpark partition.

    This is the UDF alternative to read_files() for CouchDB-backed data.
    It processes rows containing CouchDB attachment content and yields
    Line objects that preserve database metadata.

    Args:
        partition: Iterator of PySpark Rows with columns:
            - doc_id: CouchDB document ID
            - attachment_name: Attachment filename
            - value: Text content from attachment
        db_name: Database name to store in metadata (ingest_db_name)

    Yields:
        Line objects with content and CouchDB metadata (doc_id, attachment_name, db_name)

    Example:
        >>> # In a PySpark context
        >>> from pyspark.sql.functions import col
        >>> from couchdb_file import read_couchdb_partition
        >>>
        >>> # Assume df has columns: doc_id, attachment_name, value
        >>> def process_partition(partition):
        ...     lines = read_couchdb_partition(partition, "mycobank")
        ...     # Process lines with finder.parse_annotated()
        ...     return lines
        >>>
        >>> result = df.rdd.mapPartitions(process_partition)
    """
    for row in partition:
        # Extract url from row if available
        url = row.doc.get('url', None)

        # Create CouchDBFile object from row data
        file_obj = CouchDBFile(
            content=row.value,
            doc_id=row.doc_id,
            attachment_name=row.attachment_name,
            db_name=db_name,
            url=url
        )

        # Yield all lines from this file
        for line in file_obj.read_line():
            yield line


def read_couchdb_rows(
    rows: List[Row],
    db_name: str
) -> Iterator[Line]:
    """
    Read annotated files from a list of CouchDB rows.

    This is a convenience function for non-distributed processing or testing.
    For production use with PySpark, use read_couchdb_partition().

        rows: List of Rows with columns:
            - doc_id: CouchDB document ID
            - attachment_name: Attachment filename
            - value: Text content from attachment
        db_name: Database name to store in metadata

    Yields:
        Line objects with content and CouchDB metadata

    Example:
        >>> from couchdb_file import read_couchdb_rows
        >>>
        >>> # Collect rows from DataFrame
        >>> rows = df.collect()
        >>>
        >>> # Process all lines
        >>> lines = read_couchdb_rows(rows, "mycobank")
        >>> paragraphs = parse_annotated(lines)
        >>> taxa = group_paragraphs(paragraphs)
    """
    return read_couchdb_partition(iter(rows), db_name)



In [17]:
def generate_taxon_doc_id(doc_id: str, url: Optional[str], line_number: int) -> str:
    """
    Generate a unique, deterministic document ID for a taxon.

    This ensures idempotent writes - the same taxon from the same source
    will always have the same document ID.

    Args:
        doc_id: Source document ID from ingest database
        url: URL from the source line
        line_number: Line number from the source

    Returns:
        Unique document ID as a hash string
    """
    # Create composite key
    key_parts = [
        doc_id,
        url if url else "no_url",
        str(line_number)
    ]
    composite_key = ":".join(key_parts)

    # Generate deterministic hash
    hash_obj = hashlib.sha256(composite_key.encode('utf-8'))
    doc_hash = hash_obj.hexdigest()

    return f"taxon_{doc_hash}"


def extract_taxa_from_partition(
    partition: Iterator[Row],
    ingest_db_name: str
) -> Iterator[Taxon]:
    """
    Extract Taxa from a partition of CouchDB rows.

    This function processes annotated files from CouchDB and yields
    Taxon objects for further processing.

    Args:
        partition: Iterator of Rows with columns:
            - doc_id: CouchDB document ID
            - attachment_name: Attachment filename
            - value: Text content
            - url: Optional URL
        ingest_db_name: Database name for metadata tracking

    Yields:
        Taxon objects with nomenclature and description paragraphs
    """
    # Read lines from partition
    lines = read_couchdb_partition(partition, ingest_db_name)

    # Parse annotated content
    paragraphs = parse_annotated(lines)

    # Remove interstitial paragraphs
    filtered = remove_interstitials(paragraphs)

    # Convert to list to preserve paragraph objects for metadata extraction
    filtered_list = list(filtered)

    # Group into taxa (returns Taxon objects with references to paragraphs)
    taxa = group_paragraphs(iter(filtered_list))

    # Yield Taxon objects directly
    for taxon in taxa:
        # Only yield taxa that have nomenclature
        if taxon.has_nomenclature():
            yield taxon


def convert_taxa_to_rows(partition: Iterator[Taxon]) -> Iterator[Row]:
    """
    Convert Taxon objects to PySpark Rows suitable for DataFrame creation.

    Args:
        partition: Iterator of Taxon objects

    Yields:
        PySpark Row objects with fields:
            - taxon: String of concatenated nomenclature paragraphs
            - description: String of concatenated description paragraphs
            - source: Dict with keys doc_id, url, db_name
            - line_number: Line number of first nomenclature paragraph
            - paragraph_number: Paragraph number of first nomenclature paragraph
            - page_number: Page number of first nomenclature paragraph
            - empirical_page_number: Empirical page number of first nomenclature paragraph
    """
    for taxon in partition:
        taxon_dict = taxon.as_row()
        # Convert dict to Row
        yield Row(**taxon_dict)


## Build Taxon objects

Here we extract the Taxon objects from the annotated attachments.

In [18]:
ingest_couchdb_url = couchdb_url
ingest_username = couchdb_username
ingest_password = couchdb_password
pattern = '*.txt.ann'

In [19]:
ingest_conn = CouchDBConnection(
    ingest_couchdb_url,
    ingest_db_name,
    ingest_username,
    ingest_password
)

df = ingest_conn.load_distributed(spark, pattern)

# Extract taxa from each partition as Taxon objects, then convert to dicts
# Schema matches Taxon.as_row() output format
from pyspark.sql.types import MapType

extract_schema = StructType([
    StructField("taxon", StringType(), False),
    StructField("description", StringType(), False),
    StructField("source", MapType(StringType(), StringType()), False),
    StructField("line_number", StringType(), False),
    StructField("paragraph_number", StringType(), False),
    StructField("page_number", StringType(), False),
    StructField("empirical_page_number", StringType(), True),
])

def extract_partition(partition):
    # Extract Taxon objects
    taxa = extract_taxa_from_partition(partition, ingest_db_name)
    # Convert to Rows for DataFrame
    return convert_taxa_to_rows(taxa)

taxa_rdd = df.rdd.mapPartitions(extract_partition)
taxa_df = spark.createDataFrame(taxa_rdd, extract_schema)

root
 |-- doc_id: string (nullable = true)
 |-- doc: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- attachment_name: string (nullable = true)
 |-- value: string (nullable = true)



There is a problem with the model. Out of more than 2000 articles we only found 54 Nomenclature labels. There should be an order of magnitude more. This is not directly relevant to the current project, so this is TODO for later.

In [32]:
for t in taxa_rdd.toLocalIterator():
    t

print(f"DEBUG: complete taxa: {taxa_rdd.count()}")

  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
[Stage 56:>                                                         (0 + 1) / 1]
=== parse_annotated Label Summary ===
Total labels counted: 1002

Label distribution:
  Misc-exposition              783 ( 78.1%)
  Description                  188 ( 18.8%)
  Nomenclature                  30 (  3.0%)
  None                           1 (  0.1%)
[Stage 57:>                                                         (0 + 1) / 1]
=== parse_annotated Label Summary ===
Total labels counted: 1003

Label distribution:
  Misc-exposition              800 ( 79.8%)
  Description                  178 ( 17.7%)
  Nomenclature                  24 (  2.4%)
  None                           1 (  0.1%)
  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
[Stage 58:>                                                         (0 + 2

DEBUG: complete taxa: 39



=== parse_annotated Label Summary ===
Total labels counted: 1002

Label distribution:
  Misc-exposition              783 ( 78.1%)
  Description                  188 ( 18.8%)
  Nomenclature                  30 (  3.0%)
  None                           1 (  0.1%)
                                                                                

In [33]:
def save_taxa_to_couchdb_partition(
    partition: Iterator[Row],
    couchdb_url: str,
    taxon_db_name: str,
    username: Optional[str] = None,
    password: Optional[str] = None
) -> Iterator[Row]:
    """
    Save taxa to CouchDB for an entire partition (idempotent).

    This function creates deterministic document IDs based on
    (source.doc_id, source.url, line_number) to ensure
    idempotent writes.

    Args:
        partition: Iterator of Rows with columns from Taxon.as_row():
            - taxon: Nomenclature text
            - description: Description text
            - source: Dict with doc_id, url, db_name
            - line_number: Line number
            - paragraph_number, page_number, empirical_page_number
        couchdb_url: CouchDB server URL
        taxon_db_name: Target database name
        username: Optional username
        password: Optional password

    Yields:
        Rows with save results (doc_id, success, error_message)
    """
    # Connect to CouchDB once per partition
    try:
        server = couchdb.Server(couchdb_url)
        if username and password:
            server.resource.credentials = (username, password)

        # Get or create database
        if taxon_db_name in server:
            db = server[taxon_db_name]
        else:
            db = server.create(taxon_db_name)  # pyright: ignore[reportUnknownMemberType]

        # Process each taxon in the partition
        for row in partition:
            success = False
            error_msg = ""
            doc_id = "unknown"

            try:
                # Extract source metadata from row
                source_dict = row.source if hasattr(row, 'source') else {}  # type: ignore[reportUnknownMemberType]
                source: Dict[str, Any] = dict(source_dict) if isinstance(source_dict, dict) else {}  # type: ignore[reportUnknownArgumentType]
                source_doc_id: str = str(source.get('doc_id', 'unknown'))
                source_url: Optional[str] = source.get('url')  # type: ignore[reportUnknownArgumentType]
                line_number: Any = row.line_number if hasattr(row, 'line_number') else 0  # type: ignore[reportUnknownMemberType]

                # Generate deterministic document ID
                doc_id = generate_taxon_doc_id(
                    source_doc_id,
                    source_url if isinstance(source_url, str) else None,
                    int(line_number) if line_number else 0
                )

                # Convert row to dict for CouchDB storage
                taxon_doc = row.asDict()

                # Check if document already exists (idempotent)
                if doc_id in db:
                    # Document exists - update it
                    existing_doc = db[doc_id]
                    taxon_doc['_id'] = doc_id
                    taxon_doc['_rev'] = existing_doc['_rev']
                else:
                    # New document - create it
                    taxon_doc['_id'] = doc_id

                db.save(taxon_doc)  # pyright: ignore[reportUnknownMemberType]
                success = True

            except Exception as e:  # pyright: ignore[reportUnknownExceptionType]
                error_msg = str(e)
                print(f"Error saving taxon {doc_id}: {e}")

            yield Row(
                doc_id=doc_id,
                success=success,
                error_message=error_msg
            )

    except Exception as e:  # pyright: ignore[reportUnknownExceptionType]
        print(f"Error connecting to CouchDB: {e}")
        # Yield failures for all rows
        for row in partition:
            yield Row(
                doc_id="unknown_connection_error",
                success=False,
                error_message=str(e)
            )


In [37]:
taxon_couchdb_url = couchdb_url
taxon_username = couchdb_username
taxon_password = couchdb_password


In [38]:
# Save taxa to taxon database
save_schema = StructType([
    StructField("doc_id", StringType(), False),
    StructField("success", BooleanType(), False),
    StructField("error_message", StringType(), False),
])

def save_partition(partition):
    return save_taxa_to_couchdb_partition(
        partition,
        taxon_couchdb_url,
        taxon_db_name,
        taxon_username,
        taxon_password
    )

results_df = taxa_df.rdd.mapPartitions(save_partition).toDF(save_schema)

results_df.collect()

  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
[Stage 60:>                                                         (0 + 2) / 2]
=== parse_annotated Label Summary ===
Total labels counted: 1003

Label distribution:
  Misc-exposition              800 ( 79.8%)
  Description                  178 ( 17.7%)
  Nomenclature                  24 (  2.4%)
  None                           1 (  0.1%)
  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
=== parse_annotated Label Summary ===
Total labels counted: 1002

Label distribution:
  Misc-exposition              783 ( 78.1%)
  Description                  188 ( 18.8%)
  Nomenclature                  30 (  3.0%)
  None                           1 (  0.1%)
  __version__ = __import__('pkg_resources').get_distribution('CouchDB').version
                                                                           

[Row(doc_id='taxon_1b4f3ec446e41faf21880f301d23fd3f24bd06a19272ad4f3e048d491708d262', success=True, error_message=''),
 Row(doc_id='taxon_3b1577f110837b2dace32fe0da649d9a5897464da81c2cbe6869488bc009b45a', success=True, error_message=''),
 Row(doc_id='taxon_4766318b3fff772008b87c3ab90b326162c8e2f9afd54a734387bc1a1a2a2025', success=True, error_message=''),
 Row(doc_id='taxon_bdfea252b61ca52783a4175a4017c61f06f5b24f7a582c546ecf6cecb44af3ed', success=True, error_message=''),
 Row(doc_id='taxon_6c7088379ce84dbbd0154b76a0554b4221a42a65d102c55aece821279a2da06d', success=True, error_message=''),
 Row(doc_id='taxon_953239ac7b1c2887e7d8b7da489f9949558551a718eaae3b3ceebea85b4ddc07', success=True, error_message=''),
 Row(doc_id='taxon_e138e002ddd77fdd993942da68486f56ed4554a1a349abf638aaaa095a8fc5d0', success=True, error_message=''),
 Row(doc_id='taxon_63e244b159fdcae785c35d4b63d78e44669fe50bb7b6830ddf2c64d37f0562d3', success=True, error_message=''),
 Row(doc_id='taxon_63a55d3e217e723abb8262b01c937

## Dr. Drafts document embedding

Dr. Drafts loads documents from CouchDB. Save the embedding to redis.


In [22]:
from dr_drafts_mycosearch.data import SKOL_TAXA as STX
from dr_drafts_mycosearch.compute_embeddings import EmbeddingsComputer as EC

class SKOL_TAXA(STX):

    def load_data(self):
        """Load taxon data from CouchDB into a pandas DataFrame."""
        # TODO(piggy): Convert whole package to pyspark DataFrame for better scaling.
        # Connect to CouchDB
        server = couchdb.Server(self.couchdb_url)
        if self.username and self.password:
            server.resource.credentials = (self.username, self.password)

        # Access the database
        if self.db_name not in server:
            raise ValueError(f"Database '{self.db_name}' not found in CouchDB server")

        db = server[self.db_name]

        # Fetch all documents from the database
        records = []
        for doc_id in db:
            # Skip design documents
            if doc_id.startswith('_design/'):
                continue

            doc = db[doc_id]
            records.append(doc)

        if not records:
            # Create empty DataFrame if no records found
            self.df = pd.DataFrame()
            print(f"Warning: No taxon records found in database '{self.db_name}'")
            return

        # Convert to DataFrame
        self.df = pd.DataFrame(records)
        print(f"Loaded {len(self.df)} taxon records from CouchDB database '{self.db_name}'")


In [23]:
class EmbeddingsComputer(EC):
    """Class for computing and storing embeddings from narrative data."""

    def write_embeddings_to_redis(self):
        """Write embeddings to Redis using instance configuration."""
        import redis

        if self.redis_username and self.redis_password:
            r = redis.from_url(self.redis_url, username=self.redis_username, password=self.redis_password, db=self.redis_db)
        else:
            r = redis.from_url(self.redis_url, db=self.redis_db)

        pickled_data = pickle.dumps(self.result)
        r.set(self.embedding_name, pickled_data)
        print(f'Embeddings written to Redis (db={self.redis_db}) with key: {self.embedding_name}')

    def run(self, descriptions):
        """Run the full embeddings computation pipeline.

        Returns:
            pandas.DataFrame: The computed embeddings
        """
        df = descriptions.drop_duplicates(
            subset=['description'],
            keep='last',
            ignore_index=True
        )

        if not torch.cuda.is_available():
            print('Warning: No GPU detected. Using CPU.')

        embeddings = self.encode_narratives(df.description.astype(str))
        self.result = pandas.concat([df, embeddings], axis=1)

        # Write to Redis if embedding name is specified
        if self.embedding_name:
            if not self.redis_url:
                raise ValueError("redis_url must be provided when embedding_name is specified")
            self.write_embeddings_to_redis()
        else:
            # Write to local filesystem
            self.write_embeddings_to_file()

        return self.result



## Compute Embeddings

We use SBERT to embed the taxa into a search space.

In [30]:
skol_taxa = SKOL_TAXA(
    couchdb_url="http://localhost:5984",
    username=couchdb_username,
    password=couchdb_password,
    db_name=taxon_db_name
)
descriptions = skol_taxa.get_descriptions()



In [29]:
embedder = EmbeddingsComputer(
    idir='/dev/null',
    redis_url='redis://localhost:6379',
    embedding_name='skol:embedding:v0.1',
)

embedding_result = embedder.run([descriptions])



TypeError: SKOL_TAXA.get_descriptions() missing 1 required positional argument: 'self'

## Bibliography

* doi Foundation, "DOI Citation Formatter HTTP API", https://citation.doi.org/api-docs.html, accessed 2025-11-12.
* Yang, Jie and Zhang, Yue and Li, Linwei and Li, Xingxuan, 2018, "YEDDA: A Lightweight Collaborative Text Span Annotation Tool", Proceedings of the 56th Annual Meeting of the Association for Computational Linguistics, http://aclweb.org/anthology/P18-4006


## Appendix: On the use of an AI Coder

Portions of this work were completed with the aid of Claude Code Pro. I wish to give a clarifying example of how I've used this very powerful tool, and reveal why I am comfortable with claiming authorship of the resulting code.

For this project I needed results from an earlier class project in which a trio of students built and evaluated models for classifying paragraphs. The earlier work was built as a iPython Notebook, with many examples and inline code. Just copying the earlier notebook would have introduced many irrelevant details and would not further the overall project.

I asked Claude Code to translate the notebook into a module that I could import. It did a pretty good job. Without being told, it made a submodule, extract the illustrative code as examples, wrote reasonable documentation and created packaging for the module.

The skill level of the coding was roughly that of a highly disciplined average junior programmer. The architecture was simplistic and violated several design constraints such as DRY. I requested specific refactorings, such as asking for a group of functions to be converted into an object that shared duplicated parameters.

The initial code used REST interfaces directly, and read all the data into a single machine, not using pyspark correctly. Through a series of refactorings, I asked that the code use appropriate libraries I named, and create correct udf functions to execute transformations in parallel.

I walked the AI through creating an object that I could use to illustrate my use of redis and couchdb interfaces, while leaving the irrelevant details in a separate library.

In short, I still have to understand good design principles. I have to be able to recognize where appropriate libraries were applicable. I still have to understand the frameworks I am working with.

I now have a strong understanding of the difference between "vibe coding" and AI-assisted software engineering. In my first 4 hours with Claude Code, I was able to produce roughly 4 days' worth of professional-grade working code.