In [7]:
import duckdb as db 
import numpy as np
import json
import time, os, requests, pandas as pd
from urllib.parse import urlencode

In [5]:
url = "Iowa_Liquor_Sales_20250926.csv"

con = db.connect()
con.execute(f"""
    CREATE OR REPLACE TABLE sales AS
    SELECT * REPLACE (try_strptime("Date", '%m/%d/%Y')::DATE AS "Date")
    FROM read_csv_auto('Iowa_Liquor_Sales_20250926.csv',
                       sample_size=-1,    
                       dateformat='%m/%d/%Y');
""")


<duckdb.duckdb.DuckDBPyConnection at 0x159a9d570>

In [9]:
df = con.execute(
"""
    SELECT *
    FROM sales
    LIMIT 5
"""
).fetchdf()
df

Unnamed: 0,Date,Store Number,City,Zip Code,Store Location,County Number,County,Category,Vendor Number,Item Number,Pack,Bottle Volume (ml),State Bottle Cost,State Bottle Retail,Bottles Sold,Sale (Dollars),Volume Sold (Liters),Volume Sold (Gallons)
0,2023-01-02,5245,DES MOINES,50315,POINT (-93.61379 41.57608),,POLK,1022200,260,89177,6,750,29.99,44.99,3,134.97,2.25,0.59
1,2023-01-02,6303,DES MOINES,50320,POINT (-93.59738 41.55503),,POLK,1022100,395,89193,48,200,3.63,5.45,48,261.6,9.6,2.53
2,2023-01-02,6303,DES MOINES,50320,POINT (-93.59738 41.55503),,POLK,1012200,260,4626,12,750,20.99,31.49,12,377.88,9.0,2.37
3,2023-01-02,2190,DES MOINES,50314,POINT (-93.61979 41.60558),,POLK,1032100,978,34598,6,1750,14.84,22.26,6,130.5,10.5,2.77
4,2023-01-02,2699,ANKENY,50023,POINT (-93.62362 41.70324),,POLK,1031100,301,38176,12,750,10.0,15.0,24,360.0,18.0,4.75


In [27]:
BASE = "https://data.iowa.gov/resource/m3tr-qhgy.json"
HEADERS = {"Accept-Encoding": "gzip"}  # faster transfers

def fetch_by_year_chunks(start_year=2023, end_year=2025, batch_size=100000, out_dir="iowa_parquet",
                         max_retries=3, read_timeout=120, backoff_base=1.5):
    
    # create directory to send batches before combining
    os.makedirs(out_dir, exist_ok=True)

    # start session for API requests
    with requests.Session() as s:
        s.headers.update(HEADERS)

        for yr in range(start_year, end_year + 1):
            print("Starting", yr)

            # set parameters for API call
            start = f"{yr}-01-01"
            end   = f"{yr+1}-01-01"
            where = f"date >= '{start}' AND date < '{end}'"

            offset = 0
            total_rows = 0
            year_batches = [] 

            while True:
                params = {
                    "$where": where,
                    "$limit": batch_size,
                    "$offset": offset,
                    "$order": "date ASC, invoice_line_no ASC" 
                }
                url = f"{BASE}?{urlencode(params)}"

                # retry loop to bypass rate limits and other errors
                last_err = None
                for attempt in range(max_retries):
                    try:
                        resp = s.get(url, timeout=read_timeout)
                        if resp.status_code == 429:
                            wait = int(resp.headers.get("Retry-After", 2))
                            print(f"[{yr}] 429 rate limit. Sleeping {wait}s…")
                            time.sleep(wait)
                            continue
                        resp.raise_for_status()
                        batch = resp.json()
                        break
                    except Exception as e:
                        last_err = e
                        wait = backoff_base ** attempt
                        print(f"[{yr}] Retry {attempt+1}/{max_retries} @ offset {offset}: {e}. Sleep {wait:.1f}s")
                        time.sleep(wait)
                else:
                    print(f"[{yr}] Giving up at offset {offset}. Last error: {last_err}")
                    break
                # end loop after success or too many failures

                if not batch:
                    break
                
                # save as df and clean a few types that were causing errors / storage issues
                df = pd.DataFrame(batch)
                if "date" in df.columns:
                    df["date"] = pd.to_datetime(df["date"], errors="coerce")
                for c in ["pack","bottle_volume_ml","sale_bottles"]:
                    if c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce")
                for c in ["sale_dollars","sale_liters","sale_gallons","state_bottle_cost","state_bottle_retail"]:
                    if c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce")

                # add batch to this years df
                year_batches.append(df)
                offset += batch_size
                total_rows += len(df)
                print(f"[{yr}] rows pulled so far: {total_rows}")
            
            if year_batches:
                year_df = pd.concat(year_batches, ignore_index=True)
                out_path = os.path.join(out_dir, f"iowa_liquor_{yr}.parquet")
                year_df.to_parquet(out_path, index=False) 
                print(f"[{yr}] saved {len(year_df):,} rows to {out_path}\n")
    
    # combine all batches to single parquet with duckdb
    combine_all_to = f"iowa_liquor_{start_year}_{end_year}.parquet"
    print(f"Combining all yearly files to {combine_all_to}")
    con = db.connect()  # <-- use duckdb
    # pattern to match the yearly files we just wrote:
    pattern = os.path.join(out_dir, "iowa_liquor_*.parquet")
    con.execute(f"""
        COPY (
            SELECT * FROM parquet_scan('{pattern}')
        )
        TO '{combine_all_to}' (FORMAT PARQUET);
    """)
    con.close()
    print(f"Done. Wrote single file: {combine_all_to}")

In [28]:
df = fetch_by_year_chunks(2023, 2025, batch_size=100000)

Starting 2023
[2023] rows pulled so far: 100000
[2023] rows pulled so far: 200000
[2023] rows pulled so far: 300000
[2023] rows pulled so far: 400000
[2023] rows pulled so far: 500000
[2023] rows pulled so far: 600000
[2023] rows pulled so far: 700000
[2023] rows pulled so far: 800000
[2023] rows pulled so far: 900000
[2023] rows pulled so far: 1000000
[2023] rows pulled so far: 1100000
[2023] rows pulled so far: 1200000
[2023] rows pulled so far: 1300000
[2023] rows pulled so far: 1400000
[2023] rows pulled so far: 1500000
[2023] rows pulled so far: 1600000
[2023] rows pulled so far: 1700000
[2023] rows pulled so far: 1800000
[2023] rows pulled so far: 1900000
[2023] rows pulled so far: 2000000
[2023] rows pulled so far: 2100000
[2023] rows pulled so far: 2200000
[2023] rows pulled so far: 2300000
[2023] rows pulled so far: 2400000
[2023] rows pulled so far: 2500000
[2023] rows pulled so far: 2600000
[2023] rows pulled so far: 2639557
[2023] saved 2,639,557 rows to iowa_parquet/iowa_l

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Done. Wrote single file: iowa_liquor_2023_2025.parquet


TypeError: object of type 'NoneType' has no len()

In [30]:
df = pd.read_parquet("../data/iowa_liquor_2023_2025.parquet")

In [32]:
df.head()

Unnamed: 0,invoice_line_no,date,store,name,address,city,zipcode,store_location,county,category,...,itemno,im_desc,pack,bottle_volume_ml,state_bottle_cost,state_bottle_retail,sale_bottles,sale_dollars,sale_liters,sale_gallons
0,INV-54554000001,2023-01-02,4829,CENTRAL CITY 2,1501 MICHIGAN AVE,DES MOINES,50314,"{'coordinates': [-93.61378, 41.60575], 'type':...",POLK,1022200,...,87306,CAZADORES BLANCO,12,750,14.5,21.75,12,261.0,9.0,2.37
1,INV-54554000002,2023-01-02,4829,CENTRAL CITY 2,1501 MICHIGAN AVE,DES MOINES,50314,"{'coordinates': [-93.61378, 41.60575], 'type':...",POLK,1031100,...,36874,BLUE OX VODKA,12,1000,4.65,6.98,60,418.8,60.0,15.85
2,INV-54554000003,2023-01-02,4829,CENTRAL CITY 2,1501 MICHIGAN AVE,DES MOINES,50314,"{'coordinates': [-93.61378, 41.60575], 'type':...",POLK,1032200,...,34995,THREE OLIVES GRAPE,12,750,9.96,14.94,24,358.56,18.0,4.75
3,INV-54554000004,2023-01-02,4829,CENTRAL CITY 2,1501 MICHIGAN AVE,DES MOINES,50314,"{'coordinates': [-93.61378, 41.60575], 'type':...",POLK,1081200,...,73053,RUMCHATA,6,1000,17.0,25.5,12,306.0,12.0,3.17
4,INV-54554000005,2023-01-02,4829,CENTRAL CITY 2,1501 MICHIGAN AVE,DES MOINES,50314,"{'coordinates': [-93.61378, 41.60575], 'type':...",POLK,1062400,...,43337,CAPTAIN MORGAN ORIGINAL SPICED,12,1000,12.49,18.74,60,1124.4,60.0,15.85


In [3]:
prox = pd.read_csv('../data/proximity.csv')

In [4]:
prox

Unnamed: 0.1,Unnamed: 0,Store Number,Store Location,lon,lat,# of stores within 5 mile radius,Nearest other store (mi)
0,0,5245,POINT (-93.61379 41.57608),-1.633869,0.725639,86,0.566167
1,1,6303,POINT (-93.59738 41.55503),-1.633582,0.725272,64,0.051137
2,3,2190,POINT (-93.61979 41.60558),-1.633974,0.726154,98,0.024184
3,4,2699,POINT (-93.62362 41.70324),-1.634040,0.727859,63,0.140796
4,5,2636,POINT (-93.57411 41.60444),-1.633176,0.726134,80,0.298991
...,...,...,...,...,...,...,...
2316,6835250,10653,POINT (-95.12921 43.38217),-1.660318,0.757162,21,0.007031
2317,6847204,10639,POINT (-91.4257 41.48638),-1.595679,0.724074,1,0.258425
2318,6847490,10652,POINT (-93.71378 41.77213),-1.635614,0.729061,6,0.018401
2319,6853047,10657,POINT (-91.57717 42.03488),-1.598323,0.733647,66,0.020202


In [10]:
combine_all_to = f"../data/iowa_liquor_{2023}_{2025}.parquet"
out_dir="../iowa_parquet"
print(f"Combining all yearly files to {combine_all_to}")
con = db.connect()  # <-- use duckdb
# pattern to match the yearly files we just wrote:
pattern = os.path.join(out_dir, "iowa_liquor_*.parquet")
con.execute(f"""
    COPY (
        SELECT * FROM parquet_scan('{pattern}')
    )
    TO '{combine_all_to}' (FORMAT PARQUET);
    """)
con.close()
print(f"Done. Wrote single file: {combine_all_to}")

Combining all yearly files to ../data/iowa_liquor_2023_2025.parquet


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Done. Wrote single file: ../data/iowa_liquor_2023_2025.parquet
