# Extract Spending Data by Fiscal Year

In [1]:
import requests
import xml.etree.ElementTree as ET
import time
import pandas as pd
from pathlib import Path
import xml.dom.minidom
import math
from tqdm.notebook import tqdm
import pyarrow as pa
import pyarrow.parquet as pq

In [2]:
API_URL = "https://www.checkbooknyc.com/api"
TEMP_XML_FILE = Path("tmp_latest_response.xml")  # same temp file each time
TIMEOUT = (300, 300) # connect_timeout, read_timeout for requests
OUTPUT_DIR = Path("checkbook_data")
OUTPUT_DIR.mkdir(exist_ok=True)
BATCH_SIZE = 20000  # API's retrieval limit on records per request

In [3]:
# session set up: bypass incapsula bot protection using session mgmt + browser headers
session = requests.Session()
HEADERS = {
    'content-type': 'application/xml',
    'user-agent': 'mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36'
}

session.headers.update(HEADERS)

# * visit main site initially *
response = session.get('https://www.checkbooknyc.com/', timeout=TIMEOUT)
print(f"main site status: {response.status_code}")
time.sleep(2)

session.headers.update({
    'content-type': 'application/xml',
    'referer': 'https://www.checkbooknyc.com/',
    'origin': 'https://www.checkbooknyc.com'
})

main site status: 200


## Helpers

In [4]:
def fetch_xml(payload: str, verbose: bool = False) -> ET.Element:
    response = session.post(API_URL, data=payload, headers=HEADERS, timeout=TIMEOUT)
    if verbose: 
        print(response.text[:500])
    response.raise_for_status()

    if verbose:
        xml_str = response.content.decode('utf-8')
        print(xml_str)
        
    return ET.fromstring(response.content)

def get_record_count(xml_root: ET.Element) -> int:
    count_tag = xml_root.find(".//result_records/record_count")
    return int(count_tag.text) if count_tag is not None else 0



## Initial Metadata Request: Determining Pagination

In [5]:
fiscal_year = 2024

In [6]:
seed_request=f"""
<request>
  <type_of_data>Spending</type_of_data>
  <records_from>1</records_from>
  <max_records>1</max_records>
  <search_criteria>
   <criteria>
      <name>fiscal_year</name>
      <type>value</type>
      <value>{fiscal_year}</value>
    </criteria>
  </search_criteria>
  <response_columns/>
</request>
"""

In [7]:
seed_root = fetch_xml(seed_request,verbose=True)

<?xml version="1.0"?>
<response>
  <status>
    <result>success</result>
  </status>
  <request_criteria>
    <request>
      <type_of_data>Spending</type_of_data>
      <records_from>1</records_from>
      <max_records>1</max_records>
      <search_criteria>
        <criteria>
          <name>fiscal_year</name>
          <type>value</type>
          <value>2024</value>
        </criteria>
      </search_criteria>
      <response_columns/>
    </request>
  </request_criteria>
  <result_records>

<?xml version="1.0"?>
<response>
  <status>
    <result>success</result>
  </status>
  <request_criteria>
    <request>
      <type_of_data>Spending</type_of_data>
      <records_from>1</records_from>
      <max_records>1</max_records>
      <search_criteria>
        <criteria>
          <name>fiscal_year</name>
          <type>value</type>
          <value>2024</value>
        </criteria>
      </search_criteria>
      <response_columns/>
    </request>
  </request_criteria>
  <result_records>

In [8]:
retrievable_records = get_record_count(seed_root)
batches_needed = math.ceil(retrievable_records / BATCH_SIZE)

print(f"""
Seed Request Results for FY {fiscal_year}
=========================================
Total retrievable records: {retrievable_records}
Batches needed: {batches_needed}
""")


Seed Request Results for FY 2024
Total retrievable records: 3227410
Batches needed: 162



## Fetch and Save FY Spending

### Function and Helpers

In [9]:
def download_spending_atomic(xml_template: str, year: int, max_records_wanted: int, verbose: bool = True):
    """download spending data with proper range tracking and atomic saves"""
    from datetime import datetime
    
    out_file = OUTPUT_DIR / f"spending_{year}.parquet"
    
    # determine starting position
    if out_file.exists():
        existing_df = pd.read_parquet(out_file)
        current_count = len(existing_df)
        next_offset = current_count + 1
        if verbose:
            print(f"resuming from record {next_offset} (file has {current_count} records)")
    else:
        current_count = 0
        next_offset = 1
        if verbose:
            print("starting fresh download")
    
    # calculate how many records to fetch
    records_to_fetch = min(max_records_wanted, BATCH_SIZE)
    timestamp = datetime.now().isoformat()
    
    # build xml request with proper offset
    xml_request = xml_template.format(
        records_from=next_offset,
        max_records=records_to_fetch,
        fiscal_year=year
    )
    
    if verbose:
        print(f"requesting {records_to_fetch} records starting from {next_offset}")
    
    # make api call
    response = session.post(API_URL, data=xml_request, timeout=60)
    
    # validate response
    if response.text.strip().startswith('<html'):
        raise Exception("request blocked by incapsula")
    
    try:
        root = ET.fromstring(response.text.strip())
        status = root.find('.//status/result')
        
        if status is None or status.text != 'success':
            messages = root.findall('.//message/description')
            error_msgs = [msg.text for msg in messages]
            raise Exception(f"api error: {error_msgs}")
        
        df = parse_transactions(root)
        
    except ET.ParseError as e:
        raise Exception(f"invalid xml response: {e}")
    
    # validate expected record count
    if len(df) != records_to_fetch:
        raise Exception(f"expected {records_to_fetch} records, got {len(df)}")
    
    # add timestamp for audit trail
    df['time_added'] = timestamp
    df['batch_offset'] = next_offset
    
    # atomic save
    if out_file.exists():
        # final sync check
        existing_df = pd.read_parquet(out_file)
        if len(existing_df) != current_count:
            raise Exception(f"file changed during download: expected {current_count}, found {len(existing_df)}")
        
        # combine and save
        combined_df = pd.concat([existing_df, df], ignore_index=True)
        combined_df.to_parquet(out_file, engine="pyarrow", index=False)
    else:
        df.to_parquet(out_file, engine="pyarrow", index=False)
    
    if verbose:
        total_records = current_count + len(df)
        print(f"saved {len(df)} records (total: {total_records})")
    
    return len(df)

# usage:
xml_template = """<request>
  <type_of_data>Spending</type_of_data>
  <records_from>{records_from}</records_from>
  <max_records>{max_records}</max_records>
  <search_criteria>
   <criteria>
      <name>fiscal_year</name>
      <type>value</type>
      <value>{fiscal_year}</value>
    </criteria>
  </search_criteria>
  <response_columns>
    <column>agency</column>
    <column>payee_name</column>
    <column>check_amount</column>
    <column>issue_date</column>
    <column>document_id</column>
  </response_columns>
</request>"""


In [None]:
tqdm._instances.clear()

def save_parquet_append(df: pd.DataFrame, out_file: Path) -> int:
    out_file = Path(out_file)
    if out_file.exists():
        try:
            pa.unregister_extension_type("pandas.period")
        except KeyError:
            pass
        existing_df = pd.read_parquet(out_file, engine="pyarrow")
        original_count = len(existing_df)
        combined = pd.concat([existing_df, df], ignore_index=True)
        deduped = combined.drop_duplicates()
        
        # fix: calculate actual new records added
        final_count = len(deduped)
        new_records_attempted = len(df)
        
        # net change can be negative, but "added" should be non-negative
        net_change = final_count - original_count
        actually_added = max(0, net_change)  # never negative
        
        deduped.to_parquet(out_file, engine="pyarrow", index=False)
        return actually_added
    else:
        df.to_parquet(out_file, engine="pyarrow", index=False)
        return len(df)

        
def parse_transactions(xml_root: ET.Element) -> pd.DataFrame:
    """convert <transaction> elements into dataframe rows"""
    records = []
    for txn in xml_root.findall(".//transaction"):
        row = {child.tag: (child.text or "").strip() for child in txn}
        records.append(row)
    return pd.DataFrame(records)

def download_all_spending(xml_template: str, year: int, total_available: int, verbose: bool = True):
    """download all available spending records for given year"""

    out_file = OUTPUT_DIR / f"spending_{year}.parquet"
    
    # check current progress
    if out_file.exists():
        existing_df = pd.read_parquet(out_file)
        records_downloaded = len(existing_df)
    else:
        records_downloaded = 0
    
    records_remaining = total_available - records_downloaded
    
    if records_remaining <= 0:
        if verbose:
            print(f"download already complete: {records_downloaded:,} records")
        return
    
    if verbose:
        print(f"downloading {records_remaining:,} remaining records (total: {total_available:,})")
    
    with tqdm(total=records_remaining, desc=f"FY{year}", unit="records") as pbar:
        while records_remaining > 0:
            # calculate batch size for this request
            batch_size = min(BATCH_SIZE, records_remaining)
            
            try:
                # download one batch
                records_saved = download_spending_atomic(xml_template, year, batch_size, verbose=False)
                
                # update progress
                records_remaining -= records_saved
                pbar.update(records_saved)
                
                if verbose:
                    total_now = total_available - records_remaining
                    tqdm.write(f"batch complete: {records_saved:,} saved, {total_now:,} total, {records_remaining:,} remaining")
                
                # rate limiting - api requires 1 request per second
                time.sleep(1.1)
                
            except Exception as e:
                tqdm.write(f"batch failed: {e}")
                tqdm.write("retrying in 5 seconds...")
                time.sleep(5)
                # continue loop without updating counters - will retry same batch
    
    if verbose:
        final_df = pd.read_parquet(out_file)
        print(f"download complete: {len(final_df):,} total records saved")

# usage:
xml_template = """<request>
  <type_of_data>Spending</type_of_data>
  <records_from>{records_from}</records_from>
  <max_records>{max_records}</max_records>
  <search_criteria>
   <criteria>
      <name>fiscal_year</name>
      <type>value</type>
      <value>{fiscal_year}</value>
    </criteria>
  </search_criteria>
  <response_columns>
    <column>agency</column>
    <column>payee_name</column>
    <column>check_amount</column>
    <column>issue_date</column>
    <column>document_id</column>
    <column>spending_category</column>
    <column>department</column>
    <column>fiscal_year</column>
  </response_columns>
</request>"""

### Function Calls

In [11]:
# download all records
download_all_spending(xml_template, fiscal_year, retrievable_records)

downloading 3,227,410 remaining records (total: 3,227,410)


FY2024:   0%|                                                      | 0/3227410 [00:00<?, ?records/s]

batch complete: 20,000 saved, 20,000 total, 3,207,410 remaining
batch complete: 20,000 saved, 40,000 total, 3,187,410 remaining
batch complete: 20,000 saved, 60,000 total, 3,167,410 remaining
batch complete: 20,000 saved, 80,000 total, 3,147,410 remaining
batch complete: 20,000 saved, 100,000 total, 3,127,410 remaining
batch complete: 20,000 saved, 120,000 total, 3,107,410 remaining
batch complete: 20,000 saved, 140,000 total, 3,087,410 remaining
batch complete: 20,000 saved, 160,000 total, 3,067,410 remaining
batch complete: 20,000 saved, 180,000 total, 3,047,410 remaining
batch complete: 20,000 saved, 200,000 total, 3,027,410 remaining
batch complete: 20,000 saved, 220,000 total, 3,007,410 remaining
batch complete: 20,000 saved, 240,000 total, 2,987,410 remaining
batch complete: 20,000 saved, 260,000 total, 2,967,410 remaining
batch complete: 20,000 saved, 280,000 total, 2,947,410 remaining
batch complete: 20,000 saved, 300,000 total, 2,927,410 remaining
batch complete: 20,000 saved,

## Peek in Parquet Files

In [20]:
file_path = "checkbook_data/spending_2024.parquet"
df = pd.read_parquet(file_path, engine="pyarrow") 

print(df.head())

print(df.info())

print(df.columns)

                    agency  check_amount                      department  \
0        Police Department  414069787.65                      OPERATIONS   
1  Department of Education  155467775.81  GE INSTR & SCH LEADERSHIP - PS   
2  Department of Education  146837179.49  GE INSTR & SCH LEADERSHIP - PS   
3  Department of Education  144663291.19  GE INSTR & SCH LEADERSHIP - PS   
4  Department of Education  144085954.76  GE INSTR & SCH LEADERSHIP - PS   

  document_id fiscal_year  issue_date                      payee_name  \
0                    2024  2023-07-21                      OPERATIONS   
1                    2024  2023-09-29  GE INSTR & SCH LEADERSHIP - PS   
2                    2024  2024-02-15  GE INSTR & SCH LEADERSHIP - PS   
3                    2024  2024-02-29  GE INSTR & SCH LEADERSHIP - PS   
4                    2024  2023-12-29  GE INSTR & SCH LEADERSHIP - PS   

  spending_category                  time_added  batch_offset  
0           Payroll  2025-08-23T02:42:02

In [16]:
# check what actually got saved
out_file = OUTPUT_DIR / f"spending_{fiscal_year}.parquet"
if out_file.exists():
    df_check = pd.read_parquet(out_file)
    print(f"file exists: {len(df_check)} records")
else:
    print("no file created")
    
# check if xml was saved
xml_files = list(OUTPUT_DIR.glob("raw_FY*.xml"))
print(f"xml files: {len(xml_files)}")

file exists: 3227410 records
xml files: 6


##  API Notes

https://www.checkbooknyc.com/spending-api

- retrieval limit: 20K records per call
- FY25: 3.2M transactions 
- to retrieve all: stagger `records_from` 

3,157,063 transactions <- as of 8/22 from data feeds https://www.checkbooknyc.com/data-feeds 

##### XML request:
- global params: `type_of_data`, `records_from`, `max_records`
- filters: put in `<search_criteria>`
- requested columns: put in `<response_columns>`

##### XML response: 
- `<search_criteria>`: echoes request xml
- `<result_records>`: current result batch based on `records_from` and `max_records`
- `<record_count>`: TOTAL rows
- `<messages>`: errors + status info
- `<status>`: success/failure of request

##### prossibly outdated thread on rate limits
https://groups.google.com/g/checkbooknyc/c/hgU1niDG00Y?pli=1

## Technical Summary

### api integration:
- bypass incapsula bot protection using session mgmt + browser headers
- learn xml format structure requirements of checkbooknyc api
- debug XML templating issues betewen f-strings vs `.format()`

### pagination / data integrity:
- **problem**: deduplication approach failed - legitimate different records marked as duplicates
- **root cause**: overlapping requests (requesting records 1-20000 when file had 1-19815)
- **solution**: range tracking with atomic saves eliminates deduplication need

### download architecture:
- resumable downloads using file length to calculate next offset
- atomic batch operations ensure data consistency if interrupted
- complying with rate limiting + error handling with auto retry
- progress tracking for slo downloads (e.g., full FY)

### key insights
- solve api pagination problem at the request level (proper offsets) instead of data level (deduplication).
- separate concerns: overlapping requests vs. data consistency
- well-structured batch system with proper offsets -> remarkably less data mucking / data handling complexity