This notebook showcases the code necessary to connect with Library of Congress's <i>Chronicling America</i> database API to extract specific subsets of newspaper metadata and full OCR-ed texts.

Code Attribution: Most of the code was provided by <i>Chronicling America</i> (see API documentation page: https://libraryofcongress.github.io/data-exploration/loc.gov%20JSON%20API/Chronicling_America/ChronAm_analyzing_specific_titles_limit_results.html). See detailed attributions in specific code cells below. After modifying the code, I used the Duke Compute Cluster (see slurm script in corresponding code cell below) to obtain enough computing power to complete the data processing steps as well as learning how to use a hih-performance computer cluster

In [None]:
'''The majority of this code cell was provided by the Chronicling America API, except for places that I've specified'''
import requests, pandas as pd

# Accessing search URL. Edit this URL to reflect the search criteria needed.
searchURLTest = "https://www.loc.gov/newspapers/?end_date=1930-01-01&ops=~10&qs=los+angeles+oil&searchType=advanced&start_date=1890-01-01&location_country=united+states&fo=json"

# I added this counter to keep track of how many search results we have.
numberOfResults = 0 

def get_item_ids_test(url, items=[], conditional='True'):
    global numberOfResults
    # Check that the query URL is not an item or resource link.
    exclude = ["loc.gov/item","loc.gov/resource"]
    if any(string in url for string in exclude):
        raise NameError('Your URL points directly to an item or '
                        'resource page (you can tell because "item" '
                        'or "resource" is in the URL). Please use '
                        'a search URL instead. For example, instead '
                        'of \"https://www.loc.gov/item/2009581123/\", '
                        'try \"https://www.loc.gov/maps/?q=2009581123\". ')

    # request pages of 100 results at a time
    params = {"fo": "json", "c": 100, "at": "results,pagination"}
    call = requests.get(url, params=params)
    # Check that the API request was successful
    if (call.status_code==200) & ('json' in call.headers.get('content-type')):
        data = call.json()
        results = data['results'] # deleted the top 20 limit
        for result in results:
            # Filter out anything that's a collection or web page
            filter_out = ("collection" in result.get("original_format")) \
                    or ("web page" in result.get("original_format")) \
                    or (eval(conditional)==False)
            if not filter_out:
                # Get the link to the item record
                if result.get("id"):
                    item = result.get("id")
                    # Filter out links to Catalog or other platforms
                    if item.startswith("http://www.loc.gov/resource"):
                      resource = item  # Assign item to resource
                      items.append(resource)
                    if item.startswith("http://www.loc.gov/item"):
                      items.append(item)
            
            # Added the following two lines to keep track of the number of results processed
            numberOfResults += 1
            print(f"Processed {numberOfResults} results.")
            
        # Repeat the loop on the next page, unless we're on the last page.
        if data["pagination"]["next"] is not None:
            next_url = data["pagination"]["next"]
            print(f"Total number of pages: {data['pagination']['total']}")
            get_item_ids_test(next_url, items, conditional)

        return items
    else:
            print('There was a problem. Try running the cell again, or check your searchURL.')

# Generate a list of records found from performing a query and save these Item IDs. (Create ids_list based on items found in the searchURL result)
ids_list_test = get_item_ids_test(searchURLTest, items=[])

# Add 'fo=json' to the end of each row in ids_list (All individual ids from from the ids_list are now listed in JSON format in new_ids)
ids_list_json_test = []
for id in ids_list_test:
  if not id.endswith('&fo=json'):
    id += '&fo=json'
  ids_list_json_test.append(id)
ids = ids_list_json_test

print('\nSuccess! Your API Search Query found '+str(len(ids_list_json_test))+' related newspaper pages. Proceed to the next step')

In [None]:
'''The majority of this code cell was provided by the Chronicling America API, except for places that I've specified'''

# Create a list of dictionaries to store the item metadata
item_metadata_list = []

# I created this counter to keep track of the number of newspapers processed
counter = 0

# Iterate over the list of item IDs
for item_id in ids_list_json_test:
  item_response = requests.get(item_id)

  # Check if the API call was successful and Parse the JSON response
  if item_response.status_code == 200:
    # Iterate over the ids_list_json list and extract the relevant metadata from each dictionary.
    item_data = item_response.json()
    # NOT filtering out newspapers that do not have a city associated with it.
    # if 'location_city' not in item_data['item']:
    #   continue

    # Extract the relevant item metadata
    Newspaper_Title = item_data['item']['newspaper_title']
    Issue_Date = item_data['item']['date']
    Page = item_data['pagination']['current']
    State = item_data['item']['location_state']
    City = item_data['item']['location_city']
    LCCN = item_data['item']['number_lccn']
    Contributor = item_data['item']['contributor_names']
    Batch = item_data['item']['batch']
    pdf = item_data['resource']['pdf']

    # Add the item metadata to the list
    item_metadata_list.append({
        'Newspaper Title': Newspaper_Title,
        'Issue Date': Issue_Date,
        'Page Number': Page,
        'LCCN': LCCN,
        'City': City,
        'State': State,
        'Contributor': Contributor,
        'Batch': Batch,
        'PDF Link': pdf,
    })

    # I added the following two lines to keep track of the number of results processed.
    counter += 1
    print(f"Processed {counter} results.")
    

# Change date format to MM-DD-YYYY
for item in item_metadata_list:
  item['Issue Date'] = pd.to_datetime(item['Issue Date']).strftime('%m-%d-%Y')

# Create a Pandas DataFrame from the list of dictionaries
df = pd.DataFrame(item_metadata_list)

print('\nSuccess! Ready to proceed to the next step!')

# Add your Local saveTo Location (e.g. C:/Downloads/)
saveTo = '/hpc/home/zz341/test4'

# Set File Name. Make sure to rename the file so it doesn't overwrite previous!
filename = 'LOCLAOilInitialExtract'

metadata_dataframe = pd.DataFrame(item_metadata_list)
metadata_dataframe.to_csv(saveTo + '/' + filename + '.csv')
print("Finished compiling CSV")

This was the code I wrote to use the Duke Compute Cluster to access the API. I used DCC because of the large amount of newspaper search results that my computer's memory couldn't hold all the search results. I wrote this slurm script to access and use DCC:
```
#!/bin/bash
#SBATCH --output=outputNov21.out
#SBATCH --error=errorNov21.err
#SBATCH --mem=7G
#SBATCH --partition=common

srun --cpu-bind=none python /hpc/home/zz341/test4/testLOCExtract.py
```

I then saved the data in a CSV. This API call returned about 15,100 results.

Because Library of Congress stores their OCR-ed texts as labeled XML files, I needed to download ~15,100 XMLs to my computer and parse them into TXT files. I used the following code to do so: 

In [None]:
'''This was the Python object I wrote to turn Library of Congress newspaper OCR text XMLs into TXTs. This resulted in about 15,100 TXT files (one for each search result). Claude Sonnet 3.5 coded most of the code. I designed this part into a Python object so that it could be reusable for other people and other purposes.'''

import os, csv, requests, re, gc, io
import pandas as pd
import xml.etree.ElementTree as ET

class ExtractCSV:
    def __init__(self, CSVFilePath: str):
        self.CSVFilePath = CSVFilePath
        self.csvDataFrame = pd.read_csv(CSVFilePath)

    def makeFileName(self, partOfFileName):
        """Make string safe for filenames (remove spaces and bad chars)."""
        return re.sub(r'[^A-Za-z0-9_-]+', '', partOfFileName.replace(" ", "_"))

    def xmlToPlainText(self, xmlContent):
        """Convert ALTO XML content into plain text with newlines preserved (namespace-agnostic)."""
        # Remove namespaces by reparsing
        it = ET.iterparse(io.BytesIO(xmlContent))
        for _, el in it:
            if "}" in el.tag:
                el.tag = el.tag.split("}", 1)[1]  # strip namespace
        root = it.root

        all_lines = []
        for textblock in root.findall(".//TextBlock"):
            block_lines = []
            for textline in textblock.findall("TextLine"):
                words = [
                    string.attrib.get("CONTENT", "")
                    for string in textline.findall("String")
                ]
                if words:
                    block_lines.append(" ".join(words))
            if block_lines:
                all_lines.append("\n".join(block_lines))

        return "\n\n".join(all_lines)


    def turnColumnIntoList(self, column="PDF Link"):
        """Convert a column into a Python list."""
        return self.csvDataFrame[column].tolist()

    def turnPDFLinkIntoXMLLink(self):
        """Return list of (PDFLink, XMLLink)."""
        pdf_links = self.turnColumnIntoList("PDF Link")
        return [(link, link[:-3] + "xml") for link in pdf_links]

    def downloadAndProcessXML(self, txtDirectory="txt_out", xmlDirectory="xml_out", failedFile="failed_links.txt"):
        """Download XML, save raw XML and parsed TXT with descriptive names. 
        Log any failed/empty results into a separate file."""
        os.makedirs(txtDirectory, exist_ok=True)
        os.makedirs(xmlDirectory, exist_ok=True)

        failedLinks = []

        for i, row in self.csvDataFrame.iterrows():
            try:
                # Build filename base from metadata
                filename_base = "_".join([
                    self.makeFileName(str(row["NewspaperTitle"])),
                    self.makeFileName(str(row["IssueDate"])),
                    self.makeFileName(str(row["City"])),
                    self.makeFileName(str(row["State"])),
                    self.makeFileName(str(row["Region"])),
                    f"p{row['PageNumber']}",
                ])

                xmlURLLink = row["PDF Link"][:-3] + "xml"

                # Download XML
                response = requests.get(xmlURLLink, stream=True)
                response.raise_for_status()
                xmlContent = response.content

                # Save raw XML
                xmlPath = os.path.join(xmlDirectory, filename_base + ".xml")
                with open(xmlPath, "wb") as f:
                    f.write(xmlContent)

                # Parse XML into TXT
                text = self.xmlToPlainText(xmlContent)
                txtPath = os.path.join(txtDirectory, filename_base + ".txt")
                with open(txtPath, "w", encoding="utf-8") as f:
                    f.write(text)

                if not text.strip():
                    failedLinks.append(xmlURLLink)
                    print(f"[{i+1}/{len(self.csvDataFrame)}] EMPTY TEXT for {xmlURLLink}")
                else:
                    print(f"[{i+1}/{len(self.csvDataFrame)}] Saved {txtPath} and {xmlPath}")

            except Exception as e:
                print(f"Error with row {i} ({row['PDF Link']}): {e}")
                failedLinks.append(row["PDF Link"])

            finally:
                # Free memory after each file
                del response, xmlContent
                gc.collect()

        # Save failed links to file
        if failedLinks:
            with open(failedFile, "w", encoding="utf-8") as f:
                for link in failedLinks:
                    f.write(link + "\n")
            print(f"\nSaved {len(failedLinks)} failed links to {failedFile}")
        else:
            print("\nAll files processed with non-empty text.")


    def checkBlankCells(self, column="Region"):
        """Check how many blank cells are in a given column."""
        blankCounter = 0
        with open(self.CSVFilePath, newline="", encoding="utf-8") as csvfile:
            reader = csv.DictReader(csvfile)

            if column not in reader.fieldnames:
                raise ValueError(f"CSV does not contain a '{column}' column.")

            for row in reader:
                if row[column] is None or row[column].strip() == "":
                    blankCounter += 1

        return blankCounter

In [None]:
'''Run the XML to TXT python object'''

# This CSV stores the newspaper metadata from the API call.
CSVFilePath = "/Users/Jerry/Desktop/DHproj-reading/LAOilProject/LAOil/DataFolder/testCSVNewspaperWithXMLLinks.csv"
extractor = ExtractCSV(CSVFilePath)

# Choose your own folders
txtFolder = "/Volumes/JZ/LAOilTXTXML/TXTOutput"
xmlFolder = "/Volumes/JZ/LAOilTXTXML/XMLOutput"

extractor.downloadAndProcessXML(txtDirectory=txtFolder, xmlDirectory=xmlFolder)

After storing all TXT files in the TXT folder, we will use the `LLMAPICleaning.py` file to clean our messy OCR texts (with spelling or scanning errors) into cleaned and edited OCRs suitable for analysis.