# Test Star Schema - CEVA Animal Health

This notebook queries the star schema parquet files using DuckDB.

In [3]:
import duckdb

# Connect to DuckDB (in-memory)
con = duckdb.connect()

# Base path to parquet files
BASE_PATH = '../data/b-silver-star-schema/'

print("✓ DuckDB connected")

✓ DuckDB connected


## 1. Dimension Tables Overview

In [4]:
# dim_product sample
con.execute(f"""
SELECT 
    product_sk,
    product_code,
    product_name,
    therapeutic_class,
    category,
    form,
    bu_source
FROM read_parquet('{BASE_PATH}dim_product.parquet')
LIMIT 10
""").df()

Unnamed: 0,product_sk,product_code,product_name,therapeutic_class,category,form,bu_source
0,1,COMP-DENT-001,Orozyme Gel,,dental,gel,companion
1,2,COMP-DENT-002,Plaque Off,,dental,powder,companion
2,3,COMP-HYGI-001,Douxo S3 Calm,,hygiene,shampoo,companion
3,4,COMP-PARA-001,Vectra 3D,,antiparasitic,spot-on,companion
4,5,COMP-PARA-002,Broadline,,antiparasitic,spot-on,companion
5,6,COMP-PARA-003,Milbemax,,antiparasitic,tablet,companion
6,7,COMP-VAC-001,Canigen DHP,,vaccine,injectable,companion
7,8,COMP-VAC-002,Canigen L,,vaccine,injectable,companion
8,9,COMP-VAC-003,Feligen CRP,,vaccine,injectable,companion
9,10,COMP-VAC-004,Purevax RCP,,vaccine,injectable,companion


In [None]:
# dim_specie - all species
con.execute(f"""
SELECT * FROM read_parquet('{BASE_PATH}dim_specie.parquet')
ORDER BY animal_type, specie_code
""").df()

In [None]:
# dim_site - all sites
con.execute(f"""
SELECT * FROM read_parquet('{BASE_PATH}dim_site.parquet')
ORDER BY region, country
""").df()

## 2. Fact Table Sample

In [None]:
# fact_batch_production sample
con.execute(f"""
SELECT 
    batch_production_sk,
    batch_id,
    product_fk,
    site_fk,
    production_date,
    expiry_date,
    batch_status,
    quantity_doses,
    quantity_units,
    bu_source,
    targeted_species
FROM read_parquet('{BASE_PATH}fact_batch_production.parquet')
LIMIT 10
""").df()

## 3. Business Queries

In [None]:
# Production by BU
con.execute(f"""
SELECT 
    bu_source,
    COUNT(*) as batch_count,
    SUM(quantity_doses) as total_doses,
    SUM(quantity_units) as total_units
FROM read_parquet('{BASE_PATH}fact_batch_production.parquet')
GROUP BY bu_source
ORDER BY bu_source
""").df()

In [None]:
# Production by site and country
con.execute(f"""
SELECT 
    s.site_code,
    s.country,
    s.region,
    COUNT(*) as batch_count,
    SUM(f.quantity_doses) as total_doses,
    SUM(f.quantity_units) as total_units
FROM read_parquet('{BASE_PATH}fact_batch_production.parquet') f
LEFT JOIN read_parquet('{BASE_PATH}dim_site.parquet') s ON f.site_fk = s.site_sk
WHERE s.site_code IS NOT NULL
GROUP BY s.site_code, s.country, s.region
ORDER BY batch_count DESC
""").df()

In [None]:
# Products by therapeutic class
con.execute(f"""
SELECT 
    p.therapeutic_class,
    COUNT(DISTINCT f.batch_id) as batch_count,
    COUNT(DISTINCT p.product_code) as product_count
FROM read_parquet('{BASE_PATH}fact_batch_production.parquet') f
JOIN read_parquet('{BASE_PATH}dim_product.parquet') p ON f.product_fk = p.product_sk
WHERE p.therapeutic_class IS NOT NULL
GROUP BY p.therapeutic_class
ORDER BY batch_count DESC
""").df()

In [None]:
# Batch status distribution
con.execute(f"""
SELECT 
    batch_status,
    bu_source,
    COUNT(*) as batch_count
FROM read_parquet('{BASE_PATH}fact_batch_production.parquet')
GROUP BY batch_status, bu_source
ORDER BY bu_source, batch_count DESC
""").df()

## 4. Queries with targeted_species Array

In [None]:
# Batches with their target species (unnest array)
con.execute(f"""
SELECT 
    f.batch_id,
    p.product_name,
    s.specie_name,
    s.animal_type,
    f.bu_source,
    f.production_date
FROM read_parquet('{BASE_PATH}fact_batch_production.parquet') f
JOIN read_parquet('{BASE_PATH}dim_product.parquet') p ON f.product_fk = p.product_sk
CROSS JOIN UNNEST(f.targeted_species) AS t(specie_fk)
JOIN read_parquet('{BASE_PATH}dim_specie.parquet') s ON t.specie_fk = s.specie_sk
ORDER BY f.production_date DESC
LIMIT 20
""").df()

In [None]:
# Production by target species
con.execute(f"""
SELECT 
    s.specie_name,
    s.animal_type,
    COUNT(DISTINCT f.batch_id) as batch_count,
    COUNT(DISTINCT p.product_code) as product_count
FROM read_parquet('{BASE_PATH}fact_batch_production.parquet') f
JOIN read_parquet('{BASE_PATH}dim_product.parquet') p ON f.product_fk = p.product_sk
CROSS JOIN UNNEST(f.targeted_species) AS t(specie_fk)
JOIN read_parquet('{BASE_PATH}dim_specie.parquet') s ON t.specie_fk = s.specie_sk
GROUP BY s.specie_name, s.animal_type
ORDER BY batch_count DESC
""").df()

## 5. Complete Business View

In [None]:
# Complete view with all dimensions joined
con.execute(f"""
SELECT 
    f.batch_id,
    p.product_name,
    p.therapeutic_class,
    p.form,
    s.site_code,
    s.country,
    s.region,
    f.production_date,
    f.expiry_date,
    f.batch_status,
    f.quantity_doses,
    f.quantity_units,
    f.bu_source,
    f.targeted_species
FROM read_parquet('{BASE_PATH}fact_batch_production.parquet') f
JOIN read_parquet('{BASE_PATH}dim_product.parquet') p ON f.product_fk = p.product_sk
LEFT JOIN read_parquet('{BASE_PATH}dim_site.parquet') s ON f.site_fk = s.site_sk
ORDER BY f.production_date DESC
LIMIT 15
""").df()

In [None]:
# Close connection
con.close()
print("✓ Connection closed")