# Turkish Food Prices Dataset Example

This example assumes you have the test database up and running locally. 
From the root of the project run the command: ``make infra-up``.

In [1]:
from datetime import datetime
import logging
from typing import Any, List

import polars as pl

from polars_hist_db.config import Config
from polars_hist_db.core import (
    AuditOps,
    DataframeOps,
    TableConfigOps,
    TimeHint,
    make_engine,
)
from polars_hist_db.dataset import run_workflows
from polars_hist_db.loaders import FunctionRegistry

## Initialise Configuration and Logging

This example uses configuration and data used in the test suite.

In [2]:
config_path = "../tests/_testdata_dataset_configs/foodprices_dataset.yaml"
data_path = "../tests/_testdata_dataset_configs/foodprices_data.csv"

logging.basicConfig(level=logging.INFO)

config = Config.from_yaml(config_path)

loading Config from: ../tests/_testdata_dataset_configs/foodprices_dataset.yaml


(Optionally) Print the config file to the console

In [3]:
with open(config_path, "r") as f:
    print(f.read())

db:
  backend: mariadb
  hostname: 127.0.0.1
  port: 3306
  username: root
  password: admin
  
table_configs:
  - name: unit_info
    schema: test
    is_temporal: false
    delta_config:
      drop_unchanged_rows: false
      # pre_prune_idential_rows: false
      # truncate_table_first: false
      on_duplicate_key: take_last
    primary_keys:
      - id
    columns:
      - id<um_id
      - name<um_name

  - name: product_info
    schema: test
    is_temporal: false
    delta_config:
      drop_unchanged_rows: false
      # pre_prune_idential_rows: false
      # truncate_table_first: false
      on_duplicate_key: take_last
    primary_keys:
      - id
    columns:
      - id<product_id
      - name<product_name

  - name: food_prices
    schema: test
    is_temporal: true
    delta_config:
      drop_unchanged_rows: true
      # pre_prune_idential_rows: true
      # truncate_table_first: true
      time_partition:
        column: time
        truncate: 5d
        unique_strategy: l

## Create a SQLAlchemy Engine to connect to the database

In [4]:
engine = make_engine(
    backend="mariadb",
    hostname="127.0.0.1",
    port=3306,
    username="root",
    password="admin",
)

## Register a custom parser function

The food prices from the data soruce are given in Turkish TRY.

For some reason, the team only cares about the USD price. As per the dataset config, a yearly TRYUSD fx-rate is applied to the ``price`` column at scrape time, creating a derived ``price_usd`` column of type ``DECIMAL(10,4)`` in the database.

In [5]:
registry = FunctionRegistry()


def custom_try_to_usd(df: pl.DataFrame, args: List[Any]) -> pl.DataFrame:
    usdtry_fx_rates = pl.from_dict(
        {
            "Year": [
                2010,
                2011,
                2012,
                2013,
                2014,
                2015,
                2016,
                2017,
                2018,
                2019,
                2020,
                2021,
                2022,
                2023,
            ],
            "fx_usdtry": [
                1.507,
                1.674,
                1.802,
                1.915,
                2.188,
                2.724,
                3.020,
                3.646,
                4.830,
                5.680,
                7.004,
                8.886,
                16.566,
                23.085,
            ],
        }
    )

    col_result = args[0]
    col_try = args[1]
    col_year = args[2]
    df = (
        df.join(usdtry_fx_rates, left_on=col_year, right_on="Year", how="left")
        .with_columns((pl.col(col_try) * 1 / pl.col("fx_usdtry")).alias(col_result))
        .drop("fx_usdtry")
    )

    return df


registry.delete_function("try_to_usd")
registry.register_function("try_to_usd", custom_try_to_usd)

print("loaded functions", registry.list_functions())

loaded functions ['null_if_gte', 'apply_type_casts', 'combine_columns', 'try_to_usd']


## Run the workflow

This scrapes any new files into to the database.

(Try running the function a second time...)

In [6]:
run_workflows(config, engine)

INFO:polars_hist_db.dataset.workflow:scraping dataset turkey_food_prices
INFO:polars_hist_db.dataset.workflow:starting ingest for food_prices
INFO:polars_hist_db.loaders.file_search:searching files ['turkey_food_prices.csv'] in ../tests/_testdata_dataset_data
INFO:polars_hist_db.loaders.file_search:found 1 files
INFO:polars_hist_db.loaders.file_search:found total 1 files
INFO:polars_hist_db.core.table_config:creating table test.__audit_log
INFO:polars_hist_db.core.table_config:creating table test.unit_info
INFO:polars_hist_db.core.table_config:creating table test.product_info
INFO:polars_hist_db.core.table_config:creating table test.food_prices
INFO:polars_hist_db.core.table_config:creating table test.turkey_food_prices
INFO:polars_hist_db.dataset.workflow:[1/1] processing file mtime=2025-01-01 00:00:01
INFO:polars_hist_db.loaders.dsv_loader:loading csv ../tests/_testdata_dataset_data/turkey_food_prices.csv
INFO:polars_hist_db.loaders.fn_registry:applying fn try_to_usd to dataframe (73

## Querying the temporal tables

Query the latest food prices.

In [7]:
with engine.begin() as connection:
    latest_food_prices_df = DataframeOps(connection).from_table("test", "food_prices")

Query all the food prices.

In [8]:
with engine.begin() as connection:
    time_hint = TimeHint(mode="all")
    all_food_prices = DataframeOps(connection).from_table(
        "test", "food_prices", time_hint
    )


all_food_prices

product_id,um_id,price,price_usd,__valid_from,__valid_to
i32,i32,"decimal[10,4]","decimal[10,4]",datetime[μs],datetime[μs]
52,5,4.4920,2.3456,2013-04-30 00:00:00,2013-05-30 00:00:00
52,5,4.5786,2.3909,2013-05-30 00:00:00,2013-11-01 00:00:00
52,5,4.7865,2.4994,2013-11-01 00:00:00,2013-12-01 00:00:00
52,5,5.1337,2.6807,2013-12-01 00:00:00,2013-12-31 00:00:00
52,5,5.5099,2.5182,2013-12-31 00:00:00,2014-01-30 00:00:00
…,…,…,…,…,…
502,5,47.9040,8.4338,2019-07-28 00:00:00,2019-09-01 00:00:00
502,5,49.1176,8.6474,2019-09-01 00:00:00,2019-10-01 00:00:00
502,5,50.8347,8.9497,2019-10-01 00:00:00,2019-10-31 00:00:00
502,5,51.7985,9.1194,2019-10-31 00:00:00,2019-11-30 00:00:00


Query the food prices at a specific point in time. In this case _1-Jan-2015_.

In [9]:
with engine.begin() as connection:
    time_hint = TimeHint(mode="asof", asof_utc=datetime(2015, 1, 1))
    food_prices_at_2015_date = DataframeOps(connection).from_table(
        "test", "food_prices", time_hint
    )


food_prices_at_2015_date

product_id,um_id,price,price_usd,__valid_from,__valid_to
i32,i32,"decimal[10,4]","decimal[10,4]",datetime[μs],datetime[μs]
52,5,6.7550,2.4798,2014-12-31 00:00:00,2015-01-30 00:00:00
58,5,2.6840,0.9853,2014-12-31 00:00:00,2015-01-30 00:00:00
66,5,7.6650,2.8138,2014-12-31 00:00:00,2015-01-30 00:00:00
92,33,0.3210,0.1178,2014-12-31 00:00:00,2015-01-30 00:00:00
94,5,6.6720,2.4493,2014-12-31 00:00:00,2015-01-30 00:00:00
…,…,…,…,…,…
360,5,1.1160,0.4096,2014-12-31 00:00:00,2015-01-30 00:00:00
401,5,13.4840,4.9500,2014-12-31 00:00:00,2015-01-30 00:00:00
433,5,28.2390,10.3667,2014-12-31 00:00:00,2015-01-30 00:00:00
463,15,2.8410,1.0429,2014-12-31 00:00:00,2015-01-30 00:00:00


## Delete the data associated with the dataset

Reset the example. Subsequent attempts to upload the same data (or past data) into the database will fail.

In [10]:
with engine.begin() as connection:
    TableConfigOps(connection).drop_all(config.tables)
    AuditOps(config.tables.schemas()[0]).drop(connection)

INFO:polars_hist_db.core.table_config:dropped table food_prices
INFO:polars_hist_db.core.table_config:dropped table product_info
INFO:polars_hist_db.core.table_config:dropped table unit_info
INFO:polars_hist_db.core.table_config:dropped table __audit_log
