In [None]:
!date

In [None]:
import requests
from bs4 import BeautifulSoup
import os
import logging
import pandas as pd
import chardet
import time
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# ======================
# 📜 Enhanced Logging Setup
# ======================
logger = logging.getLogger()
logger.setLevel(logging.INFO)

formatter = logging.Formatter(
    '%(asctime)s [%(levelname)s] %(filename)s:%(lineno)d - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

file_handler = logging.FileHandler('sba_download.log', mode='a')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

# ======================
# 🌐 Setup Requests Session
# ======================
session = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
session.mount('https://', HTTPAdapter(max_retries=retries))

# ======================
# 🗂️ Setup Directory
# ======================
os.makedirs("sba_csv", exist_ok=True)

# ======================
# 🕵️ Encoding Detector (Optional)
# ======================
def detect_encoding(file_path, sample_size=10000):
    with open(file_path, 'rb') as f:
        raw_data = f.read(sample_size)
    result = chardet.detect(raw_data)
    return result['encoding'], result['confidence']

# ======================
# 🔍 Step 1: Scrape SBA Resource Links
# ======================
parent_url = "https://data.sba.gov/dataset/ppp-foia"

try:
    logger.info(f"Accessing parent page: {parent_url}")
    response = session.get(parent_url, timeout=20)
    response.raise_for_status()
except Exception as e:
    logger.error(f"Parent page access failed: {e}")
    raise SystemExit

soup = BeautifulSoup(response.text, "html.parser")
resource_links = [
    "https://data.sba.gov" + a['href']
    for a in soup.select('a[href*="/dataset/ppp-foia/resource/"]')
]
logger.info(f"Found {len(resource_links)} resource pages")

# ======================
# 🔗 Step 2: Extract CSV Download Links
# ======================
csv_links, csv_filenames = [], []

for url in resource_links:
    try:
        logger.info(f"Processing resource page: {url}")
        res = session.get(url, timeout=15)
        res.raise_for_status()
        sub_soup = BeautifulSoup(res.text, "html.parser")
        link_tag = sub_soup.select_one('a.resource-url-analytics')

        if link_tag and link_tag['href'].endswith('.csv'):
            link = link_tag['href']
            filename = link.split("/")[-1]
            csv_links.append(link)
            csv_filenames.append(filename)
            logger.info(f"CSV found: {filename}")
        else:
            logger.warning(f"No CSV link found on page: {url}")
    except Exception as e:
        logger.warning(f"Failed to extract link from {url}: {e}")

logger.info(f"Total CSVs identified: {len(csv_links)}")

# ======================
# 💾 Step 3: Download CSVs with Cache Check
# ======================
MAX_AGE = 5 * 86400  # 5 days

for i, (link, filename) in enumerate(zip(csv_links, csv_filenames)):
    file_path = os.path.join("sba_csv", filename)
    logger.info(f"[{i+1}/{len(csv_links)}] Checking: {filename}")

    if os.path.exists(file_path):
        age = time.time() - os.path.getmtime(file_path)
        size = os.path.getsize(file_path)
        if age < MAX_AGE and size > 1024:
            logger.info(f"Using cached file: {filename} (Age: {int(age)}s, Size: {size}B)")
            continue
        else:
            logger.info(f"Refreshing stale or small file: {filename} (Age: {int(age)}s, Size: {size}B)")

    try:
        logger.info(f"Starting download: {link}")
        with session.get(link, stream=True, timeout=30) as r:
            r.raise_for_status()
            with open(file_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=4096):  # small chunk for low memory impact
                    if chunk:
                        f.write(chunk)
        logger.info(f"Downloaded successfully: {filename}")
    except Exception as e:
        logger.error(f"Download failed for {filename}: {e}")

logger.info("All downloads complete ✅")


In [None]:
!date

In [None]:
! ls -alt ./sba_csv/

In [None]:
import pandas as pd
import os
import logging
import traceback
import gc

# ======================
# 📜 Logging Setup
# ======================
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
    '%(asctime)s [%(levelname)s] %(filename)s:%(lineno)d - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger.handlers.clear()
file_handler = logging.FileHandler('sba_download.log', mode='a')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

# ======================
# 📂 Directory Setup
# ======================
chunk_dir = "sba_csv/chunks"
os.makedirs(chunk_dir, exist_ok=True)
problematic_files = []

# ======================
# 🔁 Chunked Write
# ======================
logger.info("🔁 Starting chunked write for public_up_to_150k CSVs...")
seen = set()

for i, filename in enumerate(sorted(set(csv_filenames))):
    if filename.startswith("public_up_to_150k") and filename.endswith(".csv") and filename not in seen:
        seen.add(filename)
        file_path = os.path.join("sba_csv", filename)
        logger.info(f"[{i+1}] Reading: {filename}")
        print(f"Processing file {i+1}: {filename}")

        try:
            df = pd.read_csv(file_path, encoding='latin-1', on_bad_lines='skip', low_memory=False)
            chunk_path = os.path.join(chunk_dir, f"chunk_{i:02d}.csv")
            df.to_csv(chunk_path, index=False)
            logger.info(f"✅ Chunk written: {chunk_path}")
        except Exception as e:
            logger.warning(f"⚠️ Failed to process {filename}: {e}")
            traceback.print_exc()
            problematic_files.append(filename)

        del df
        gc.collect()

# ======================
# 🗂️ Save Problematic Filenames
# ======================
if problematic_files:
    bad_path = os.path.join("sba_csv", "problematic_files.txt")
    with open(bad_path, "w") as f:
        for file in problematic_files:
            f.write(f"{file}\n")
    logger.info(f"📄 Problematic file list written to: {bad_path}")

# ======================
# 🧠 Dtype Harmonization
# ======================
chunk_files = sorted([
    os.path.join(chunk_dir, f) for f in os.listdir(chunk_dir) if f.endswith('.csv')
])
logger.info("📊 Inspecting chunk column types for harmonization...")
column_types = {}

# First pass: collect types
for f in chunk_files:
    try:
        df = pd.read_csv(f, encoding='latin-1', low_memory=False)
        for col in df.columns:
            column_types.setdefault(col, set()).add(df[col].dtype)
        del df
        gc.collect()
    except Exception as e:
        logger.warning(f"Failed type scan on {f}: {e}")
        traceback.print_exc()

# Build harmonized dtype map
harmonized_types = {}
for col, types in column_types.items():
    if len(types) == 1:
        harmonized_types[col] = next(iter(types))
    else:
        harmonized_types[col] = "object"
        logger.info(f"Column '{col}' has mixed types: {types}. Forcing to object.")

# ======================
# 📥 Harmonized Read & Merge
# ======================
logger.info("📥 Loading chunks with harmonized dtypes...")
harmonized_chunks = []

for f in chunk_files:
    try:
        df = pd.read_csv(f, encoding='latin-1', dtype=harmonized_types, low_memory=False)
        harmonized_chunks.append(df)
        logger.info(f"✔️ Harmonized load: {os.path.basename(f)}")
    except Exception as e:
        logger.warning(f"Failed harmonized load on {f}: {e}")
        traceback.print_exc()

logger.info("🔗 Concatenating all harmonized chunks...")
combined_df = pd.concat(harmonized_chunks, ignore_index=True)

# ======================
# 💾 Save Final Output
# ======================
combined_path = os.path.join("sba_csv", "combined_public_up_to_150k.csv")
combined_df.to_csv(combined_path, index=False)
logger.info(f"✅ Final merged CSV written to: {combined_path}")
print("Merge complete with harmonized types.")

# ======================
# 📋 Schema Snapshot
# ======================
schema_path = os.path.join("sba_csv", "schema_summary.csv")
combined_df.dtypes.to_frame(name="dtype").to_csv(schema_path)
logger.info(f"📄 Column type summary saved to: {schema_path}")

# ======================
# 🔍 Type-Forced Column Sample Audit
# ======================
for col, types in column_types.items():
    if len(types) > 1 and harmonized_types.get(col) == "object":
        sample_vals = combined_df[col].dropna().astype(str).unique()[:10]
        logger.info(f"🔍 Column '{col}' forced to object due to mixed types {types}. Sample values: {sample_vals}")


In [None]:
!date

In [None]:
! ls -alt ./sba_csv/

In [None]:
import pandas as pd
import logging

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.FileHandler("csv_diagnostics.log"), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

def load_csv_robust(filepath, chunksize=100_000, dtype_map=None, encoding="latin-1"):
    logger.info(f"Starting CSV load: {filepath}")
    logger.info(f"Using encoding: {encoding}, chunksize: {chunksize}")

    try:
        reader = pd.read_csv(
            filepath,
            chunksize=chunksize,
            dtype=dtype_map,
            encoding=encoding,
            on_bad_lines='skip',
            low_memory=False
        )
        for i, chunk in enumerate(reader):
            logger.info(f"Loaded chunk {i + 1} with shape {chunk.shape}")
            yield chunk
        logger.info("Finished loading all chunks.")
    except Exception as e:
        logger.error(f"Failed to load CSV: {e}")

def filter_texas_rows(chunk):
    filtered = chunk[chunk['BorrowerZip'].astype(str).str.startswith('78')].copy()
    return filtered

def run_diagnostics(df_tx):
    # Null values
    null_counts = df_tx.isnull().sum()
    null_ratio = (df_tx.isnull().mean() * 100).round(2)
    logger.info(f"Null counts:\n{null_counts.to_string()}")
    logger.info(f"Null percentage:\n{null_ratio.to_string()}")

    # Zip code sanity
    zip_prefix_counts = df_tx['BorrowerZip'].astype(str).str[:3].value_counts()
    logger.info(f"Top zip prefixes:\n{zip_prefix_counts.head(10).to_string()}")

    invalid_zips = df_tx[~df_tx['BorrowerZip'].astype(str).str.startswith('78')]
    logger.info(f"Non-Texas zip rows (should be zero): {invalid_zips.shape[0]}")

    # NAICS distribution
    if 'NAICSCode' in df_tx.columns:
        naics_counts = df_tx['NAICSCode'].value_counts().head(10)
        logger.info(f"Top NAICS codes:\n{naics_counts.to_string()}")

# Define dtypes for optimization
types = {
    'InitialApprovalAmount': 'float32',
    'BorrowerZip': 'category',
    'NAICSCode': 'category'
}

# Load and process chunks
df_tx_list = []
for chunk in load_csv_robust("./sba_csv/combined_public_up_to_150k.csv", dtype_map=types):
    filtered = filter_texas_rows(chunk)
    df_tx_list.append(filtered)
    logger.info(f"Filtered Texas rows in chunk: {filtered.shape[0]}")

# Merge all filtered rows
df_tx = pd.concat(df_tx_list, ignore_index=True)
logger.info(f"Final Texas row count: {df_tx.shape[0]}")

# Run data diagnostics
run_diagnostics(df_tx)


In [None]:
df_tx.info()

In [None]:
!date