In [None]:
import os
import requests
import zipfile
import io
import pandas as pd

In [None]:


#um do we include 2020??? This will load the datasets into your env btw
# The data set is < 1GB BUTTT records wise, we have 1,928,458 which is A LOT 
# Storage shouldnt even be a problem bc our dataset isnt necessarly dense like images are 
def download_acs_1year_person_data(state_abbr="ca", years=[2018,2019, 2021, 2022, 2023]):
    """
    Downloads 1-Year ACS PUMS person files. 
    """
    for year in years:
        url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{year}/1-Year/csv_p{state_abbr}.zip"
        dest_folder = f"data_persons_{state_abbr}_1yr/{year}"
        os.makedirs(dest_folder, exist_ok=True)
        
        print(f"Downloading {year} 1-Year data...")
        try:
            r = requests.get(url, stream=True)
            r.raise_for_status()
            with zipfile.ZipFile(io.BytesIO(r.content)) as z:
                z.extractall(dest_folder)
                print(f"Done: {year}")
        except Exception as e:
            print(f"Skipping {year}: {e}")

# We only run once 
#download_acs_1year_person_data()

Downloading 2018 1-Year data...
Done: 2018
Downloading 2019 1-Year data...
Done: 2019
Downloading 2021 1-Year data...
Done: 2021
Downloading 2022 1-Year data...
Done: 2022
Downloading 2023 1-Year data...
Done: 2023


In [None]:
import duckdb
import os

# Connect to database
con = duckdb.connect('census_master.db')

# List of downloaded yrs, should we include 2020? 
years = ['2018', '2019', '2021', '2022', '2023']
state = '06' # CA

#This isnt always best PRACTICE, but so we dont duplicate data lets keep this for now until we know what
# db we are happy with 
con.execute("DROP TABLE IF EXISTS master_person_data")

print("Building Master Table...")

for year in years:
    # I guess maybe we can actually update the name to make this a bit cleanr
    p_file = f"data_persons_ca_1yr/{year}/psam_p06.csv"
    print(p_file)
    
    if not os.path.exists(p_file):
        print(f"Skipping {year}: File not found at {p_path}")
        continue
    
    if os.path.exists(p_file):
        # Check if table already exists
        table_exists = con.execute(
            "SELECT count(*) FROM information_schema.tables WHERE table_name = 'master_person_data'"
        ).fetchone()[0]

        # Use CREATE for the first file, INSERT for the rest
        operation = "INSERT INTO master_person_data" if table_exists > 0 else "CREATE TABLE master_person_data AS"
        
        print(f"Processing Year: {year} ({operation})...")
        con.execute(f"""
            {operation}
            SELECT 
                SERIALNO, 
                {year} as DATA_YEAR, 
                AGEP, 
                SEX, 
                SCHL, 
                WAGP,
                ESR,
                POVPIP
            FROM read_csv_auto('{p_file}')
        """)
    else:
        print(f"Skipping {year}: File not found at {p_file}")
        
print("Aggregation Complete!")

# 2. Check the final volume
print(con.execute("SELECT DATA_YEAR, count(*) as person_records FROM master_person_data GROUP BY DATA_YEAR ORDER BY DATA_YEAR").df())


Building Master Table...
data_persons_ca_1yr/2018/psam_p06.csv
Processing Year: 2018 (CREATE TABLE master_person_data AS)...
data_persons_ca_1yr/2019/psam_p06.csv
Processing Year: 2019 (INSERT INTO master_person_data)...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

data_persons_ca_1yr/2021/psam_p06.csv
Processing Year: 2021 (INSERT INTO master_person_data)...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

data_persons_ca_1yr/2022/psam_p06.csv
Processing Year: 2022 (INSERT INTO master_person_data)...
data_persons_ca_1yr/2023/psam_p06.csv
Processing Year: 2023 (INSERT INTO master_person_data)...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Aggregation Complete!
   DATA_YEAR  person_records
0       2018          378817
1       2019          380091
2       2021          386061
3       2022          391171
4       2023          392318


In [None]:
all_headers_persons = pd.read_csv('data_persons_ca_1yr/2018/psam_p06.csv', nrows=0).columns.tolist()
print("<------------------ These column names are the following ------------------>")
print(f"The length of Person Variables are: {len(all_headers_persons)}")

<------------------ These column names are the following ------------------>
The length of Person Variables are: 286


In [None]:
#quick and dirty column clean, this is just  a list though 
clean_persons_cols = [c for c in all_headers_persons if not c.startswith('PWG') and not c.startswith('F')]
print("<------------------ The 'Clean' column names are the following ------------------>")
print(*clean_persons_cols[:286])

<------------------ The 'Clean' column names are the following ------------------>
RT SERIALNO DIVISION SPORDER PUMA REGION ST ADJINC AGEP CIT CITWP COW DDRS DEAR DEYE DOUT DPHY DRAT DRATX DREM ENG GCL GCM GCR HINS1 HINS2 HINS3 HINS4 HINS5 HINS6 HINS7 INTP JWMNP JWRIP JWTR LANX MAR MARHD MARHM MARHT MARHW MARHYP MIG MIL MLPA MLPB MLPCD MLPE MLPFG MLPH MLPI MLPJ MLPK NWAB NWAV NWLA NWLK NWRE OIP PAP RELP RETP SCH SCHG SCHL SEMP SEX SSIP SSP WAGP WKHP WKL WKW WRK YOEP ANC ANC1P ANC2P DECADE DIS DRIVESP ESP ESR HICOV HISP INDP JWAP JWDP LANP MIGPUMA MIGSP MSP NAICSP NATIVITY NOP OC OCCP PAOC PERNP PINCP POBP POVPIP POWPUMA POWSP PRIVCOV PUBCOV QTRBIR RAC1P RAC2P RAC3P RACAIAN RACASN RACBLK RACNH RACNUM RACPI RACSOR RACWHT RC SCIENGP SCIENGRLP SFN SFR SOCP VPS WAOB


In [None]:
columns_info = con.execute("DESCRIBE master_person_data").df()
print(columns_info)

  column_name column_type null   key default extra
0    SERIALNO     VARCHAR  YES  None    None  None
1   DATA_YEAR     INTEGER  YES  None    None  None
2        AGEP      BIGINT  YES  None    None  None
3         SEX      BIGINT  YES  None    None  None
4        SCHL     VARCHAR  YES  None    None  None
5        WAGP      BIGINT  YES  None    None  None
6         ESR      BIGINT  YES  None    None  None
7      POVPIP      BIGINT  YES  None    None  None


In [20]:
import duckdb, shutil, os
## creates a back up of the original census_master.db file
db_path = "census_master.db"
backup_path = "census_master_BACKUP.db"
if not os.path.exists(backup_path):
    shutil.copy(db_path, backup_path)

con = duckdb.connect(db_path)


In [21]:

# HERE
import pandas as pd
from functools import reduce
## check for variables that exist in all years in each file
years = ['2018', '2019', '2021', '2022', '2023']
base_dir = "data_persons_ca_1yr"
state = "06"  # CA, matches your psam_p06.csv naming

paths = [f"{base_dir}/{y}/psam_p{state}.csv" for y in years]

# read headers only
headers_by_year = [set(pd.read_csv(p, nrows=0).columns) for p in paths]

# apply your cleaning rule getting rid of variables with PWG
def clean_cols(cols):
    return {c for c in cols if not c.startswith("PWG") and not c.startswith("F")}

headers_by_year = [clean_cols(h) for h in headers_by_year]

common_cols = sorted(reduce(set.intersection, headers_by_year))

print("Number of columns present in ALL years:", len(common_cols))
print(common_cols[:30])


Number of columns present in ALL years: 120
['ADJINC', 'AGEP', 'ANC', 'ANC1P', 'ANC2P', 'CIT', 'CITWP', 'COW', 'DDRS', 'DEAR', 'DECADE', 'DEYE', 'DIS', 'DIVISION', 'DOUT', 'DPHY', 'DRAT', 'DRATX', 'DREM', 'DRIVESP', 'ENG', 'ESP', 'ESR', 'GCL', 'GCM', 'GCR', 'HICOV', 'HINS1', 'HINS2', 'HINS3']


In [22]:
select_cols_sql = ", ".join([f'"{c}"' for c in common_cols])

union_sql = "\nUNION ALL\n".join([
    f"""
    SELECT {select_cols_sql}
    FROM read_csv_auto('{p}', header=True)
    """.strip()
    for p in paths
])

create_sql = f"""
CREATE OR REPLACE TABLE census_masterdb_all AS
{union_sql};
"""

con.execute(create_sql)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<_duckdb.DuckDBPyConnection at 0x7f5cda285bf0>

In [23]:
con.execute("DESCRIBE census_masterdb_all").fetchdf()


Unnamed: 0,column_name,column_type,null,key,default,extra
0,ADJINC,BIGINT,YES,,,
1,AGEP,BIGINT,YES,,,
2,ANC,BIGINT,YES,,,
3,ANC1P,VARCHAR,YES,,,
4,ANC2P,VARCHAR,YES,,,
...,...,...,...,...,...,...
115,WAOB,BIGINT,YES,,,
116,WKHP,BIGINT,YES,,,
117,WKL,BIGINT,YES,,,
118,WRK,BIGINT,YES,,,


In [24]:
con.execute("SELECT * FROM census_masterdb_all LIMIT 10").fetchdf()


Unnamed: 0,ADJINC,AGEP,ANC,ANC1P,ANC2P,CIT,CITWP,COW,DDRS,DEAR,...,SPORDER,SSIP,SSP,VPS,WAGP,WAOB,WKHP,WKL,WRK,YOEP
0,1013097,30,1,210,999,1,,6.0,2,2,...,1,0,0,,500,1,40.0,1,,
1,1013097,18,1,290,999,1,,,2,2,...,1,0,0,,0,1,,3,2.0,
2,1013097,69,2,902,917,1,,,2,2,...,1,0,0,6.0,0,1,,3,2.0,
3,1013097,25,1,290,999,1,,,1,1,...,1,0,0,,0,1,,3,2.0,
4,1013097,31,1,924,999,1,,,2,2,...,1,0,0,,0,1,,3,,
5,1013097,19,1,706,999,5,,1.0,2,2,...,1,0,0,,50,4,5.0,1,,2018.0
6,1013097,21,2,148,50,1,,4.0,2,2,...,1,0,0,,7700,1,20.0,1,1.0,
7,1013097,65,1,50,999,1,,2.0,2,2,...,1,0,11300,,5000,1,8.0,1,1.0,
8,1013097,55,1,740,999,1,,,1,2,...,1,0,7200,,0,1,,3,2.0,
9,1013097,82,4,999,999,4,1940.0,,1,2,...,1,0,0,,0,4,,3,,2005.0


In [25]:
union_sql = "\nUNION ALL\n".join([
    f"""
    SELECT {select_cols_sql}, '{y}' AS year
    FROM read_csv_auto('{base_dir}/{y}/psam_p{state}.csv', header=True)
    """.strip()
    for y in years
])

con.execute(f"""
CREATE OR REPLACE TABLE census_masterdb_all AS
{union_sql};
""")


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<_duckdb.DuckDBPyConnection at 0x7f5cda285bf0>