In [1]:
%load_ext autoreload
%autoreload 2

In [3]:
import pandas as pd
from farmbase.config import settings
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

In [4]:
df = pd.read_csv("data/kamis_market_prices.csv")

# Replace all cells that contain only "-" (with or without surrounding whitespace) with None
df.replace(r"^\s*-\s*$", pd.NA, regex=True, inplace=True)


def split_price_column(df, column, prefix):
    """Split a 'price/unit' column into price, unit, and currency."""
    split_cols = df[column].str.split("/", expand=True)
    df[f"{prefix}_price"] = pd.to_numeric(split_cols[0], errors="coerce")
    df[f"{prefix}_unit"] = split_cols[1]
    df[f"{prefix}_ccy"] = "KES"
    return df


# Apply transformations
df = split_price_column(df, "Wholesale", "wholesale")
df = split_price_column(df, "Retail", "retail")

# Drop the original columns
df.drop(columns=["Wholesale", "Retail", "source_date"], inplace=True)
df

Unnamed: 0,Market,Commodity,Classification,Grade,Sex,Supply Volume,County,Date,wholesale_price,wholesale_unit,wholesale_ccy,retail_price,retail_unit,retail_ccy
0,Awendo,Beans (Mwezi Moja),,,,5000.0,Migori,2021-05-24,60.00,Kg,KES,68.89,Kg,KES
1,Awendo,Beans (Canadian wonder),,,,8000.0,Migori,2021-05-24,71.11,Kg,KES,75.56,Kg,KES
2,Awendo,Fertilizer,NPK,,,1800.0,Migori,2021-05-24,36.00,Kg,KES,40.00,Kg,KES
3,Awendo,Fertilizer,NPK,,,2500.0,Migori,2021-05-24,48.00,Kg,KES,54.00,Kg,KES
4,Awendo,Fertilizer,DAP,,,5000.0,Migori,2021-05-24,63.00,Kg,KES,68.00,Kg,KES
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
911958,Kipini Fish market,Rayfish,Dried,,,22.0,Tana-River,2025-06-09,280.00,Kg,KES,350.00,Kg,KES
911959,Kipini Fish market,Rabbitfish (Tafi/Tasi),Frozen/Chilled,,,24.0,Tana-River,2025-06-09,250.00,Kg,KES,330.00,Kg,KES
911960,Kipini Fish market,Fresh Water Shrimp,Dried,,,58.0,Tana-River,2025-06-09,200.00,Kg,KES,270.00,Kg,KES
911961,Kipini Fish market,Fresh Water Shrimp,Fresh,,,76.0,Tana-River,2025-06-09,100.00,Kg,KES,150.00,Kg,KES


In [8]:
from farmbase import Commodity
from tqdm import tqdm

# Create async DB engine and session
# DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dbname"  # replace with actual
engine = create_async_engine(settings.SQLALCHEMY_DATABASE_URI, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

df[["Commodity", "Classification", "Grade", "Sex"]]


async def populate_commodities(_com):
    async with AsyncSessionLocal() as session:
        for _, row in tqdm(_com.iterrows(), total=len(_com), desc="Inserting Commodities"):
            grade_value = None if pd.isna(row.Grade) else row.Grade
            classificatio_value = None if pd.isna(row.Classification) else row.Classification
            sex_value = None if pd.isna(row.Sex) else row.Sex.upper()
            com = Commodity(name=row.Commodity, classification=classificatio_value, grade=grade_value, sex=sex_value)
            session.add(com)
        await session.commit()
        print("✅ Finished populating commodities.")


df2 = df[["Commodity", "Classification", "Grade", "Sex"]].drop_duplicates()
df2 = df2[~df2.Commodity.isna()]

await populate_commodities(df2)

Inserting Commodities: 100%|██████████| 808/808 [00:00<00:00, 29121.35it/s]

2025-06-10 16:32:27,188 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-06-10 16:32:27,188 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-06-10 16:32:27,194 INFO sqlalchemy.engine.Engine select current_schema()
2025-06-10 16:32:27,195 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-06-10 16:32:27,198 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-06-10 16:32:27,199 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-06-10 16:32:27,202 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-06-10 16:32:27,210 INFO sqlalchemy.engine.Engine INSERT INTO farmbase_core.commodity (name, classification, grade, sex) SELECT p0::VARCHAR, p1::VARCHAR, p2::VARCHAR, p3::gender_enum FROM (VALUES ($1::VARCHAR, $2::VARCHAR, $3::VARCHAR, $4::gender_enum, 0), ($5::VARCHAR, $6::VARCHAR, $7::VARCHAR, $8: ... 59313 characters truncated ... nter) ORDER BY sen_counter RETURNING farmbase_core.commodity.id, farmbase_core.commodity.id AS id__1
2025-06-10 16:32:27,211 INFO sqlalchemy.




In [25]:
# df2 = df[['Commodity', 'Classification', 'Grade', 'Sex']].drop_duplicates()
df2[~df2.Commodity.isna()].Sex.unique()

array([<NA>, 'Female', 'Male', 'Castrate'], dtype=object)

In [9]:
from farmbase.market.models import Market

markets = pd.read_csv("markets.csv")

# Create async DB engine and session
# DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dbname"  # replace with actual
engine = create_async_engine(settings.SQLALCHEMY_DATABASE_URI, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


async def populate_markets(_markets):
    async with AsyncSessionLocal() as session:
        for _, row in tqdm(_markets.iterrows(), total=len(_markets), desc="Inserting Markets"):
            region = Market(name=row.market, location=f"POINT({row.lon} {row.lat})")
            session.add(region)
        await session.commit()
        print("✅ Finished populating markets.")


await populate_markets(markets)

Inserting Markets: 100%|██████████| 286/286 [00:00<00:00, 22419.37it/s]

2025-06-10 16:32:30,340 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-06-10 16:32:30,341 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-06-10 16:32:30,344 INFO sqlalchemy.engine.Engine select current_schema()
2025-06-10 16:32:30,344 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-06-10 16:32:30,348 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-06-10 16:32:30,348 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-06-10 16:32:30,350 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-06-10 16:32:30,353 INFO sqlalchemy.engine.Engine INSERT INTO farmbase_core.market (name, location) SELECT p0::VARCHAR, p1::geometry(POINT,4326) FROM (VALUES ($1::VARCHAR, ST_GeomFromEWKT($2), 0), ($3::VARCHAR, ST_GeomFromEWKT($4), 1), ($5::VARCHAR, ST_GeomFromEWKT($6), 2), ($7::VARCHAR, ST_GeomFrom ... 12596 characters truncated ... , farmbase_core.market.created_at, farmbase_core.market.updated_at, farmbase_core.market.id AS id__1
2025-06-10 16:32:30,354 INFO sqlalchemy.




In [7]:
from pprint import pprint

from farmbase import MarketPrice, Market, Commodity
from sqlalchemy import select

# Create async DB engine and session
# DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dbname"  # replace with actual
engine = create_async_engine(settings.sqlalchemy_database_uri, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


async def load_market_prices(df: pd.DataFrame):
    """
    Loads market price data from a pandas DataFrame into the database.
    It creates Commodity and Market entries if they don't exist,
    then links MarketPrice records to them.
    Performs lookups to Commodity and Market tables efficiently.
    Uses SQLAlchemy 2.0 style for queries and session management.
    """
    # Use 'with' statement for session management for proper closing
    async with AsyncSessionLocal() as session:
        try:
            # Convert 'Date' column to datetime.date objects
            df["Date"] = pd.to_datetime(df["Date"], errors="coerce").dt.date

            # --- Pre-load existing Markets ---
            print("Pre-loading existing Markets...")
            # Use await session.execute(select(Market))
            result = await session.execute(select(Market))
            market_lookup = {market.name: market for market in result.scalars().all()}
            print(f"Found {len(market_lookup)} existing markets.")

            # --- Pre-load existing Commodities ---
            print("Pre-loading existing Commodities...")
            commodity_lookup = {}
            # Use await session.execute(select(Commodity))
            result = await session.execute(select(Commodity))
            for commodity in result.scalars().all():
                sex_value = commodity.sex.value if commodity.sex else None
                commodity_lookup[(commodity.name, commodity.classification, commodity.grade, sex_value)] = commodity
            print(f"Found {len(commodity_lookup)} existing commodities.")
            pprint(commodity_lookup)

            for index, row in df.iterrows():
                # --- Handle Market Lookup/Creation ---
                market_name = row["Market"]
                market_obj = market_lookup.get(market_name)

                if not market_obj:
                    print(f"no market with name: {market_name}, {row}")
                    continue

                # --- Handle Commodity Lookup/Creation ---
                commodity_name = row["Commodity"]
                if pd.isna(commodity_name):
                    print(f"No commodity name: {row}")
                    continue
                classification = row["Classification"] if pd.notna(row["Classification"]) else None
                grade = row["Grade"] if pd.notna(row["Grade"]) else None

                sex_str = None if pd.isna(row["Sex"]) else row["Sex"].lower()
                commodity_key = (commodity_name, classification, grade, sex_str)
                commodity_obj = commodity_lookup.get(commodity_key)

                if not commodity_obj:
                    raise Exception(f"no commodity with key: {commodity_key}")

                # --- Create MarketPrice entry ---
                market_price_obj = MarketPrice(
                    market_id=market_obj.id,
                    commodity_id=commodity_obj.id,
                    date=row["Date"],
                    supply_volume=row["Supply Volume"] if pd.notna(row["Supply Volume"]) else None,
                    wholesale_price=row["wholesale_price"] if pd.notna(row["wholesale_price"]) else None,
                    wholesale_unit=row["wholesale_unit"] if pd.notna(row["wholesale_unit"]) else None,
                    wholesale_ccy=row["wholesale_ccy"] if pd.notna(row["wholesale_ccy"]) else None,
                    retail_price=row["retail_price"] if pd.notna(row["retail_price"]) else None,
                    retail_unit=row["retail_unit"] if pd.notna(row["retail_unit"]) else None,
                    retail_ccy=row["retail_ccy"] if pd.notna(row["retail_ccy"]) else None,
                )
                session.add(market_price_obj)

            await session.commit()  # Await commit
            print("\nSuccessfully loaded all market prices into the database.")

        except Exception as e:
            await session.rollback()
            print(f"\nAn error occurred: {e}. Rolling back transaction.")
        # 'finally' block is not needed with 'with' statement for session.close() as it's handled automatically


await load_market_prices(
    df.drop_duplicates(subset=["Market", "Commodity", "Classification", "Grade", "Sex", "Date"], keep="first")
)

Pre-loading existing Markets...
2025-06-20 14:41:32,906 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-06-20 14:41:32,906 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-06-20 14:41:32,910 INFO sqlalchemy.engine.Engine select current_schema()
2025-06-20 14:41:32,910 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-06-20 14:41:32,914 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-06-20 14:41:32,914 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-06-20 14:41:32,916 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-06-20 14:41:32,950 INFO sqlalchemy.engine.Engine SELECT farmbase_core.market.id, farmbase_core.market.name, ST_AsEWKB(farmbase_core.market.location) AS location, farmbase_core.market.created_at, farmbase_core.market.updated_at 
FROM farmbase_core.market
2025-06-20 14:41:32,951 INFO sqlalchemy.engine.Engine [generated in 0.00066s] ()
Found 286 existing markets.
Pre-loading existing Commodities...
2025-06-20 14:41:32,989 INFO sqlalchemy.engin

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["Date"] = pd.to_datetime(df["Date"], errors="coerce").dt.date


Found 808 existing commodities.
{('African butter catfish', 'Dried', None, None): <Commodity(id=538, name='African butter catfish')>,
 ('African butter catfish', 'Fresh', None, None): <Commodity(id=402, name='African butter catfish')>,
 ('African butter catfish', 'Frozen/Chilled', None, None): <Commodity(id=642, name='African butter catfish')>,
 ('African butter catfish', 'Smoked', None, None): <Commodity(id=567, name='African butter catfish')>,
 ('Alestes', 'Dried', None, None): <Commodity(id=467, name='Alestes')>,
 ('Amaranthus (Terere)', None, None, None): <Commodity(id=72, name='Amaranthus (Terere)')>,
 ('Anchovies', 'Dried', None, None): <Commodity(id=500, name='Anchovies')>,
 ('Anchovies', 'Fresh', None, None): <Commodity(id=339, name='Anchovies')>,
 ('Apples', 'Green', None, None): <Commodity(id=266, name='Apples')>,
 ('Apples', 'Red', None, None): <Commodity(id=279, name='Apples')>,
 ('Apples', None, None, None): <Commodity(id=120, name='Apples')>,
 ('Arrow Root', None, None, '

In [35]:
print()


