In [1]:
import requests
import re
from bs4 import BeautifulSoup
import time
import random
import sqlite3
import pandas as pd


class DemographicsManager:
    """
    Collects and stores demographic data for ZIP codes.
    Also tracks which ZIP codes have already been processed.
    """

    def __init__(self, db_path):
        # Open database connection
        self.db_path = db_path
        self.conn = sqlite3.connect(self.db_path)
        self.cursor = self.conn.cursor()

        # Table names used in this manager
        self.demographics_table_name = "demographics_info"
        self.zip_code_table_name = "la_zip_code"

    def create_table(self):
        """
        Create the demographics table if it does not already exist.
        """
        self.cursor.execute(f"""
            CREATE TABLE IF NOT EXISTS {self.demographics_table_name} (
                F_Zip_Code_ID INTEGER,
                Demographics_Info_ID INTEGER PRIMARY KEY AUTOINCREMENT,
                Latitude REAL,
                Longitude REAL,
                Radius_mi TEXT,
                Population_Density_per_sq_mi INTEGER,
                Median_Home_Value INTEGER,
                Land_Area_sq_mi INTEGER,
                Total_Male INTEGER,
                Total_Female INTEGER,
                Total_Population INTEGER,
                Median_Household_Income INTEGER,
                Employment_Rate FLOAT,
                Total_Employer_Establishments INTEGER,
                Created_At DATETIME DEFAULT (datetime('now')),
                Is_Processed INTEGER DEFAULT 0,
                Is_Valid INTEGER DEFAULT 1,
                FOREIGN KEY (F_Zip_Code_ID)
                    REFERENCES {self.zip_code_table_name}(Zip_Code_ID)
            )
        """)
        self.conn.commit()
        print(f"Created table '{self.demographics_table_name}'.")

    def get_zip_codes(self):
        """
        Return ZIP codes that have not been processed yet.
        """
        self.cursor.execute(
            f"SELECT Zip_Code FROM {self.zip_code_table_name} "
            f"WHERE Is_Processed = 0 LIMIT 10"
        )
        return self.cursor.fetchall()

    def get_zip_id(self, zip_code):
        """
        Look up the internal Zip_Code_ID for a given ZIP code.
        """
        self.cursor.execute(
            f"SELECT Zip_Code_ID FROM {self.zip_code_table_name} WHERE Zip_Code = ?",
            (zip_code,)
        )
        result = self.cursor.fetchone()
        return result[0] if result else None

    def insert_demographics(self, zip_code_id, data):
        """
        Insert demographic data for a ZIP code if it does not already exist.
        """
        self.cursor.execute(
            f"SELECT COUNT(*) FROM {self.demographics_table_name} "
            f"WHERE F_Zip_Code_ID = ?",
            (zip_code_id,)
        )
        count = self.cursor.fetchone()[0]

        if count > 0:
            print(f"Data already exists for ZIP ID {zip_code_id}. Skipping.")
            return

        columns = ", ".join(["F_Zip_Code_ID"] + list(data.keys()))
        placeholders = ", ".join(["?"] * (len(data) + 1))
        values = [zip_code_id] + list(data.values())

        self.cursor.execute(f"""
            INSERT INTO {self.demographics_table_name} ({columns})
            VALUES ({placeholders})
        """, values)

        self.conn.commit()
        print(f"Inserted demographics for ZIP ID {zip_code_id}.")

    def mark_zip_as_processed(self, zip_code_id):
        """
        Mark a ZIP code as processed so it is not scraped again.
        """
        self.cursor.execute(f"""
            UPDATE {self.zip_code_table_name}
            SET Is_Processed = 1
            WHERE Zip_Code_ID = ?
        """, (zip_code_id,))
        self.conn.commit()
        print(f"ZIP ID {zip_code_id} marked as processed.")


# Database configuration
DB_PATH = "../db/database_sqlite.db"

# Initialize the manager and ensure the table exists
dm = DemographicsManager(DB_PATH)
dm.create_table()

# Fetch ZIP codes that still need to be processed
zip_codes = dm.get_zip_codes()
print(zip_codes)

# Loop through each ZIP code and collect data
for zip_row in zip_codes:
    try:
        zip_code = zip_row[0]
        print(f"\nProcessing ZIP: {zip_code}")

        # Dictionary to store all extracted values
        data = {}

        # Fetch the demographics page for this ZIP code
        url = f"https://www.unitedstateszipcodes.org/{zip_code}/"
        headers = {
            "User-Agent": "Mozilla/5.0",
            "Accept-Language": "en-US,en;q=0.9",
            "Referer": "https://www.google.com/"
        }

        response = requests.get(url, headers=headers)

        # If the page cannot be fetched, skip this ZIP
        if response.status_code != 200:
            print(f"Failed to fetch page for ZIP {zip_code}")
            dm.mark_zip_as_processed(dm.get_zip_id(zip_code))
            continue

        soup = BeautifulSoup(response.content, "html.parser")

        # Extract latitude, longitude, and radius if available
        coordinates_row = soup.find("th", string="Coordinates:")
        if coordinates_row:
            coordinates = coordinates_row.find_next("td").text.strip()
            match = re.search(
                r'([\-]?\d+\.\d+),\s*([\-]?\d+\.\d+).*\(~(\d+)\s*(mile|yard)',
                coordinates
            )
            if match:
                latitude, longitude, radius, unit = match.groups()
                data["Latitude"] = latitude
                data["Longitude"] = longitude
                data["Radius_mi"] = float(radius) / 1760 if unit == "yard" else radius

        # Parse demographic values from the main tables
        tables = soup.find_all("table", class_="table table-hover")
        for table in tables:
            for row in table.find_all("tr"):
                th, td = row.find("th"), row.find("td")
                if not th or not td:
                    continue

                text = th.text
                value = td.text.strip()

                if "Population Density" in text:
                    data["Population_Density_per_sq_mi"] = re.sub(r"[^\d]", "", value)
                elif "Median Home Value" in text:
                    data["Median_Home_Value"] = re.sub(r"[^\d]", "", value)
                elif "Land Area" in text:
                    data["Land_Area_sq_mi"] = value.replace(",", "")

        # Extract male/female population counts
        gender_table = soup.find("table", class_="chart-legend")
        if gender_table:
            for row in gender_table.find_all("tr"):
                th, td = row.find("th"), row.find("td")
                if th and td and td.find("span", class_="value"):
                    value = re.sub(r"[^\d]", "", td.find("span", class_="value").text)
                    if "Male" in th.text:
                        data["Total_Male"] = value
                    elif "Female" in th.text:
                        data["Total_Female"] = value

        # Query Census ACS API for population, income, and employment data
        API_KEY = "77c2c2b0465af1a9d168ea5c8581e1bf72a163d1"
        acs_url = "https://api.census.gov/data/2021/acs/acs5"

        acs_vars = [
            "B01003_001E",
            "B19013_001E",
            "B23025_003E",
            "B23025_004E"
        ]

        acs_params = {
            "get": "NAME," + ",".join(acs_vars),
            "for": f"zip code tabulation area:{zip_code}",
            "key": API_KEY
        }

        acs_response = requests.get(acs_url, params=acs_params)
        if acs_response.status_code != 200:
            dm.mark_zip_as_processed(dm.get_zip_id(zip_code))
            continue

        headers, values = acs_response.json()
        acs = dict(zip(headers, values))

        data["Total_Population"] = int(acs["B01003_001E"])
        data["Median_Household_Income"] = int(acs["B19013_001E"])

        labor_force = int(acs["B23025_003E"])
        employed = int(acs["B23025_004E"])
        data["Employment_Rate"] = round(
            employed / labor_force * 100, 2
        ) if labor_force else 0

        # Query Census CBP API for number of employer establishments
        cbp_url = "https://api.census.gov/data/2019/cbp"
        cbp_params = {
            "get": "ESTAB",
            "for": f"zip code:{zip_code}",
            "key": API_KEY
        }

        cbp_response = requests.get(cbp_url, params=cbp_params)
        if cbp_response.status_code != 200:
            dm.mark_zip_as_processed(dm.get_zip_id(zip_code))
            continue

        cbp_headers, cbp_values = cbp_response.json()
        cbp = dict(zip(cbp_headers, cbp_values))
        data["Total_Employer_Establishments"] = int(cbp["ESTAB"])

        # Insert collected data into the database
        zip_code_id = dm.get_zip_id(zip_code)
        if zip_code_id:
            dm.insert_demographics(zip_code_id, data)
            dm.mark_zip_as_processed(zip_code_id)

        # Sleep to avoid hitting external services too aggressively
        time.sleep(random.randint(200, 250))

    except Exception:
        # If anything unexpected happens, mark ZIP as processed and move on
        dm.mark_zip_as_processed(dm.get_zip_id(zip_code))


âœ… Created table 'demographics_info'.
[('90888',), ('90895',), ('90899',), ('91001',), ('91003',), ('91006',), ('91007',), ('91008',), ('91009',), ('91010',)]
Processing data for ZIP: 90888
Failed to fetch page: 404
ðŸ“Œ ZIP 90888 marked as processed.
Processing data for ZIP: 90895
Failed to fetch page: 404
ðŸ“Œ ZIP 90895 marked as processed.
Processing data for ZIP: 90899
Failed to fetch page: 404
ðŸ“Œ ZIP 90899 marked as processed.
Processing data for ZIP: 91001
Page fetched successfully!
Coordinates: 34.22, -118.13ZIP (~5 mile radius)
Latitude: 34.22
Longitude: -118.13
Radius_mi: 5
Population Density: 4428
Median Home Value: 550000
Land Area (sq mi): 8.26
Number of Males: 17671
Number of Females: 18907
Total Population: 36867
Median Household Income:114668 
Employment Rate: 92.12
Total Employer Establishments: 610
âœ… Data inserted for ZIP 267.
Data inserted for ZIP 267
ðŸ“Œ ZIP 91001 marked as processed.
Processing data for ZIP: 91003
Failed to fetch page: 404
ðŸ“Œ ZIP 91003 marke

**view data**

In [2]:
import pandas as pd
import sqlite3

# Path to the SQLite database
DB_PATH = "../db/database_sqlite.db"

# Table to be loaded into a DataFrame
DEMOGRAPHICS_TABLE = "demographics_info"

# Open a connection to the database
conn = sqlite3.connect(DB_PATH)

# Read the demographics table into a pandas DataFrame
df = pd.read_sql_query(f"SELECT * FROM {DEMOGRAPHICS_TABLE}", conn)

# Close the database connection
conn.close()

# Display the DataFrame
df

Unnamed: 0,F_Zip_Code_ID,Demographics_Info_ID,Latitude,Longitude,Radius_mi,Population_Density_per_sq_mi,Median_Home_Value,Land_Area_sq_mi,Total_Male,Total_Female,Total_Population,Median_Household_Income,Employment_Rate,Total_Employer_Establishments,Created_At,Is_Processed,Is_Valid
0,1,1,33.97,-118.25,1,17030.0,249600.0,3.28,27721.0,28138.0,58245,52806,90.14,613,2025-04-14 17:58:48,0,1
1,2,2,33.95,-118.25,1,17761.0,216100.0,2.99,25882.0,27268.0,54384,46159,88.95,197,2025-04-14 18:36:54,0,1
2,3,3,33.96,-118.27,2,20071.0,231700.0,3.63,35748.0,37016.0,75190,47733,90.60,440,2025-04-14 19:03:05,0,1
3,4,4,34.08,-118.31,2,19203.0,776300.0,3.05,29255.0,29330.0,59621,54947,92.61,1252,2025-04-15 01:00:47,0,1
4,5,5,34.06,-118.31,2,32197.0,633600.0,1.18,18931.0,19056.0,36910,44913,95.51,774,2025-04-15 01:02:09,0,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
177,267,213,34.22,-118.13,5,4428.0,550000.0,8.26,17671.0,18907.0,36867,114668,92.12,610,2025-04-17 00:03:32,0,1
178,269,214,34.13,-118.03,4,4969.0,718500.0,6.43,15334.0,16615.0,32499,113817,95.32,1576,2025-04-17 00:07:23,0,1
179,270,215,34.13,-118.05,2,6140.0,756600.0,5.51,15997.0,17807.0,33689,87339,94.07,1324,2025-04-17 00:11:23,0,1
180,271,216,34.15,-117.97,1,525.0,945900.0,1.93,478.0,532.0,774,167500,89.81,18,2025-04-17 00:15:20,0,1
