# Redis Bike Co with polars-redis

This notebook demonstrates how to use **polars-redis** to work with the Redis Bike Co dataset - a fictional bicycle retail company with 111 bikes and 5 stores.

We'll cover:
1. Loading data into Redis
2. Schema inference
3. Scanning and querying data
4. Using RediSearch for filtering
5. Vector similarity search
6. Real-world analytics: Finding underpriced inventory
7. Performance comparison: polars-redis vs redis-py

## Setup

First, make sure you have Redis Stack running:

```bash
docker run -d --name redis-stack -p 6379:6379 redis/redis-stack:latest
```

In [None]:
import json
import time
from pathlib import Path

import polars as pl
import polars_redis as redis

# Redis connection URL
REDIS_URL = "redis://localhost:6379"

# Data directory
DATA_DIR = Path("data")

## 1. Loading Data into Redis

Let's load the bike and store data from JSON files into Redis using `write_json()`.

In [None]:
# Load bikes from JSON
with open(DATA_DIR / "bikes.json") as f:
    bikes_raw = json.load(f)["data"]

# Flatten the nested structure into a DataFrame
bikes_df = pl.DataFrame([
    {
        "stockcode": b["stockcode"],
        "model": b["model"],
        "brand": b["brand"],
        "price": b["price"],
        "type": b["type"],
        "description": b["description"],
        "material": b["specs"]["material"],
        "weight": b["specs"]["weight"],
    }
    for b in bikes_raw
])

print(f"Loaded {len(bikes_df)} bikes")
bikes_df.head()

In [None]:
# Write bikes to Redis as JSON documents
result = redis.write_json(
    bikes_df,
    url=REDIS_URL,
    key_column="stockcode",
    key_prefix="redisbikeco:bike:",
)

print(f"Wrote {result.success_count} bikes to Redis")

In [None]:
# Load and write stores
with open(DATA_DIR / "stores.json") as f:
    stores_raw = json.load(f)["data"]

stores_df = pl.DataFrame([
    {
        "storecode": s["storecode"],
        "storename": s["storename"],
        "city": s["address"]["city"],
        "state": s["address"]["state"],
        "position": s["position"],
        "amenities": ",".join(s["amenities"]),
    }
    for s in stores_raw
])

result = redis.write_json(
    stores_df,
    url=REDIS_URL,
    key_column="storecode",
    key_prefix="redisbikeco:store:",
)

print(f"Wrote {result.success_count} stores to Redis")
stores_df

## 2. Schema Inference

polars-redis can automatically infer the schema from your Redis data - no need to manually map types.

In [None]:
# Infer schema from bike documents
inferred_schema = redis.infer_json_schema(
    REDIS_URL,
    pattern="redisbikeco:bike:*",
    sample_size=10,
)

print("Inferred bike schema:")
for field, dtype in inferred_schema.items():
    print(f"  {field}: {dtype}")

In [None]:
# Get schema with confidence scores - useful for detecting inconsistent data
schema_with_confidence = redis.infer_json_schema_with_confidence(
    REDIS_URL,
    pattern="redisbikeco:bike:*",
    sample_size=20,
)

print("Schema inference confidence:")
for field, info in schema_with_confidence.fields.items():
    confidence_pct = f"{info.confidence:.0%}"
    print(f"  {field}: {info.inferred_type} ({confidence_pct})")

## 3. Scanning Data

Use `scan_json()` to lazily read all documents matching a pattern. This is efficient because:
- **Lazy evaluation**: Nothing is fetched until you `.collect()`
- **Batched fetching**: Documents are retrieved in configurable batches
- **Type safety**: Schema is enforced during deserialization

In [None]:
# Define the schema for bikes
bike_schema = {
    "stockcode": pl.Utf8,
    "model": pl.Utf8,
    "brand": pl.Utf8,
    "price": pl.Int64,
    "type": pl.Utf8,
    "description": pl.Utf8,
    "material": pl.Utf8,
    "weight": pl.Float64,
}

# Create a lazy scan - nothing fetched yet!
bikes_lf = redis.scan_json(
    REDIS_URL,
    pattern="redisbikeco:bike:*",
    schema=bike_schema,
)

print(f"LazyFrame created with schema: {bikes_lf.schema}")
print("No data fetched yet - this is lazy evaluation!")

In [None]:
# Collect to execute the scan
all_bikes = bikes_lf.collect()

print(f"Total bikes: {len(all_bikes)}")
all_bikes.head(10)

## 4. RediSearch with Query Builder

The polars-redis query builder lets you write Pythonic queries that compile to RediSearch syntax.
This means **server-side filtering** - only matching documents are transferred.

In [None]:
import redis as redis_py

r = redis_py.from_url(REDIS_URL)

# Drop existing index if present
try:
    r.execute_command("FT.DROPINDEX", "bikes_idx", "DD")
    print("Dropped existing index")
except:
    pass

# Create a comprehensive index
r.execute_command(
    "FT.CREATE", "bikes_idx",
    "ON", "JSON",
    "PREFIX", "1", "redisbikeco:bike:",
    "SCHEMA",
    "$.stockcode", "AS", "stockcode", "TAG",
    "$.model", "AS", "model", "TEXT",
    "$.brand", "AS", "brand", "TAG",
    "$.price", "AS", "price", "NUMERIC", "SORTABLE",
    "$.type", "AS", "type", "TAG",
    "$.description", "AS", "description", "TEXT",
    "$.material", "AS", "material", "TAG",
    "$.weight", "AS", "weight", "NUMERIC", "SORTABLE",
)

print("Created bikes_idx index")

In [None]:
from polars_redis.query import col

# Simple tag query
ebikes = redis.search_json(
    REDIS_URL,
    index="bikes_idx",
    query=col("type") == "eBikes",
    schema=bike_schema,
).collect()

print(f"eBikes found: {len(ebikes)}")
ebikes.select(["stockcode", "brand", "model", "price"])

In [None]:
# Complex query with multiple conditions
# The query builder compiles this to RediSearch syntax
query = (
    (col("type") == "Mountain Bikes") &
    (col("price") < 200000) &
    (col("material") == "carbon")
)

print(f"Pythonic query: (col('type') == 'Mountain Bikes') & (col('price') < 200000) & (col('material') == 'carbon')")
print(f"Compiled to:    {query.to_redis()}")

results = redis.search_json(
    REDIS_URL,
    index="bikes_idx",
    query=query,
    schema=bike_schema,
).collect()

print(f"\nFound {len(results)} bikes")
results.select(["brand", "model", "price", "material", "weight"])

In [None]:
# Full-text search in descriptions
trail_bikes = redis.search_json(
    REDIS_URL,
    index="bikes_idx",
    query=col("description").contains("trail"),
    schema=bike_schema,
).collect()

print(f"Bikes with 'trail' in description: {len(trail_bikes)}")
trail_bikes.select(["brand", "model", "type", "description"]).head(5)

In [None]:
# Price range query
mid_range = redis.search_json(
    REDIS_URL,
    index="bikes_idx",
    query=col("price").is_between(100000, 200000),
    schema=bike_schema,
).collect()

print(f"Mid-range bikes (Rs 1000-2000): {len(mid_range)}")
mid_range.select(["brand", "model", "type", "price"]).sort("price").head(10)

## 5. Vector Similarity Search

polars-redis supports vector similarity search using the `knn()` method. This is useful for:
- Semantic search ("find bikes similar to this one")
- Recommendation systems
- Anomaly detection

First, we need to generate embeddings for our bike descriptions and store them.

In [None]:
# For this demo, we'll create simple TF-IDF-like embeddings
# In production, you'd use sentence-transformers or OpenAI embeddings

import numpy as np
from collections import Counter
import re

def simple_embedding(text: str, vocab: dict, dim: int = 64) -> list:
    """Create a simple bag-of-words embedding."""
    words = re.findall(r'\w+', text.lower())
    vec = np.zeros(dim)
    for word in words:
        if word in vocab:
            vec[vocab[word] % dim] += 1
    # Normalize
    norm = np.linalg.norm(vec)
    if norm > 0:
        vec = vec / norm
    return vec.tolist()

# Build vocabulary from all descriptions
all_words = []
for desc in all_bikes["description"].to_list():
    all_words.extend(re.findall(r'\w+', desc.lower()))

vocab = {word: i for i, word in enumerate(set(all_words))}
print(f"Vocabulary size: {len(vocab)} words")

# Generate embeddings for each bike
embeddings = [
    simple_embedding(desc, vocab)
    for desc in all_bikes["description"].to_list()
]

print(f"Generated {len(embeddings)} embeddings of dimension {len(embeddings[0])}")

In [None]:
# Add embeddings to bike data and re-write to Redis
bikes_with_embeddings = all_bikes.with_columns(
    pl.Series("embedding", embeddings)
)

# Write updated bikes with embeddings
result = redis.write_json(
    bikes_with_embeddings,
    url=REDIS_URL,
    key_column="stockcode",
    key_prefix="redisbikeco:bike:",
)

print(f"Updated {result.success_count} bikes with embeddings")

In [None]:
# Create a new index with vector field
try:
    r.execute_command("FT.DROPINDEX", "bikes_vector_idx", "DD")
except:
    pass

r.execute_command(
    "FT.CREATE", "bikes_vector_idx",
    "ON", "JSON",
    "PREFIX", "1", "redisbikeco:bike:",
    "SCHEMA",
    "$.stockcode", "AS", "stockcode", "TAG",
    "$.model", "AS", "model", "TEXT",
    "$.brand", "AS", "brand", "TAG",
    "$.price", "AS", "price", "NUMERIC", "SORTABLE",
    "$.type", "AS", "type", "TAG",
    "$.description", "AS", "description", "TEXT",
    "$.material", "AS", "material", "TAG",
    "$.weight", "AS", "weight", "NUMERIC", "SORTABLE",
    "$.embedding", "AS", "embedding", "VECTOR", "FLAT", "6",
        "TYPE", "FLOAT32",
        "DIM", "64",
        "DISTANCE_METRIC", "COSINE",
)

print("Created bikes_vector_idx with vector field")

In [None]:
# Find bikes similar to a query description
query_text = "lightweight carbon bike for trail riding with disc brakes"
query_embedding = simple_embedding(query_text, vocab)

print(f"Query: '{query_text}'")
print(f"\nFinding 5 most similar bikes...")

# Use the knn() query builder
from polars_redis.query import col

knn_query = col("embedding").knn(k=5, vector_param="query_vec")
print(f"\nKNN Query: {knn_query.to_redis()}")

In [None]:
# Execute KNN search using raw FT.SEARCH (polars-redis search_json with PARAMS)
import struct

# Convert embedding to bytes for Redis
query_bytes = struct.pack(f"{len(query_embedding)}f", *query_embedding)

# Execute KNN search
results = r.execute_command(
    "FT.SEARCH", "bikes_vector_idx",
    "*=>[KNN 5 @embedding $query_vec]",
    "PARAMS", "2", "query_vec", query_bytes,
    "RETURN", "5", "stockcode", "brand", "model", "type", "description",
    "DIALECT", "2"
)

print(f"Found {results[0]} similar bikes:\n")

# Parse results
for i in range(1, len(results), 2):
    key = results[i]
    fields = results[i + 1]
    field_dict = dict(zip(fields[::2], fields[1::2]))
    print(f"  {field_dict.get('brand', 'N/A')} {field_dict.get('model', 'N/A')} ({field_dict.get('type', 'N/A')})")
    desc = field_dict.get('description', '')[:100]
    print(f"    {desc}...\n")

## 6. Real-World Analytics: Finding Underpriced Inventory

This is where polars-redis really shines - combining Redis data with Polars analytics.

**Business scenario**: Find bikes that are priced below average for their category, adjusted for material quality. These could be:
- Pricing errors that need correction
- Great deals to promote
- Inventory to prioritize for sales

In [None]:
# Load fresh data
bikes = redis.scan_json(
    REDIS_URL,
    pattern="redisbikeco:bike:*",
    schema=bike_schema,
).collect()

print(f"Analyzing {len(bikes)} bikes...")

In [None]:
# Calculate category statistics
category_stats = (
    bikes
    .group_by(["type", "material"])
    .agg(
        pl.col("price").mean().alias("avg_price"),
        pl.col("price").std().alias("std_price"),
        pl.col("price").count().alias("count"),
    )
)

print("Category statistics (type + material):")
category_stats.sort(["type", "material"])

In [None]:
# Join bikes with category stats to find underpriced items
bikes_with_stats = bikes.join(
    category_stats,
    on=["type", "material"],
    how="left"
)

# Calculate price deviation (z-score)
underpriced = (
    bikes_with_stats
    .with_columns(
        ((pl.col("price") - pl.col("avg_price")) / pl.col("std_price")).alias("price_zscore"),
        ((pl.col("avg_price") - pl.col("price")) / pl.col("avg_price") * 100).alias("discount_pct"),
    )
    .filter(
        (pl.col("price_zscore") < -1) &  # More than 1 std below average
        (pl.col("count") >= 3)  # Only categories with enough samples
    )
    .sort("price_zscore")
    .select([
        "stockcode", "brand", "model", "type", "material",
        "price", "avg_price", "discount_pct", "price_zscore"
    ])
)

print(f"Found {len(underpriced)} underpriced bikes (>1 std below category average):\n")
underpriced.head(10)

In [None]:
# Create a pricing report
pricing_report = (
    bikes_with_stats
    .with_columns(
        ((pl.col("price") - pl.col("avg_price")) / pl.col("std_price")).alias("price_zscore"),
    )
    .with_columns(
        pl.when(pl.col("price_zscore") < -1.5)
        .then(pl.lit("Significantly Underpriced"))
        .when(pl.col("price_zscore") < -0.5)
        .then(pl.lit("Below Average"))
        .when(pl.col("price_zscore") > 1.5)
        .then(pl.lit("Premium Priced"))
        .when(pl.col("price_zscore") > 0.5)
        .then(pl.lit("Above Average"))
        .otherwise(pl.lit("Average"))
        .alias("pricing_tier")
    )
)

# Summary by pricing tier
tier_summary = (
    pricing_report
    .group_by("pricing_tier")
    .agg(
        pl.col("stockcode").count().alias("count"),
        pl.col("price").mean().alias("avg_price"),
    )
    .sort("avg_price")
)

print("Pricing tier distribution:")
tier_summary

In [None]:
# Find the best deals by type
best_deals = (
    pricing_report
    .filter(pl.col("pricing_tier") == "Significantly Underpriced")
    .group_by("type")
    .agg(
        pl.col("stockcode").count().alias("deal_count"),
        pl.col("brand").first().alias("example_brand"),
        pl.col("model").first().alias("example_model"),
        pl.col("price").min().alias("lowest_price"),
    )
    .sort("deal_count", descending=True)
)

print("Best deals by bike type:")
best_deals

## 7. Performance Comparison: polars-redis vs Traditional Approach

Let's compare the performance and code complexity of polars-redis versus the traditional redis-py approach.

In [None]:
import redis as redis_py

r = redis_py.from_url(REDIS_URL)

# Benchmark: Load all bikes and compute stats

# --- Traditional approach ---
start = time.perf_counter()

# Step 1: Get all keys
keys = list(r.scan_iter("redisbikeco:bike:*"))

# Step 2: Fetch each document one by one
bikes_traditional = []
for key in keys:
    data = r.json().get(key)
    if data:
        bikes_traditional.append(data)

# Step 3: Convert to DataFrame
df_traditional = pl.DataFrame(bikes_traditional)

# Step 4: Compute stats
stats_traditional = (
    df_traditional
    .group_by("type")
    .agg(pl.col("price").mean())
)

traditional_time = time.perf_counter() - start
print(f"Traditional approach: {traditional_time*1000:.2f}ms")
print(f"  - {len(keys)} individual Redis calls")

In [None]:
# --- polars-redis approach ---
start = time.perf_counter()

# Single call with batched fetching
stats_polars_redis = (
    redis.scan_json(
        REDIS_URL,
        pattern="redisbikeco:bike:*",
        schema=bike_schema,
    )
    .group_by("type")
    .agg(pl.col("price").mean())
    .collect()
)

polars_redis_time = time.perf_counter() - start
print(f"polars-redis approach: {polars_redis_time*1000:.2f}ms")
print(f"  - Batched fetching (fewer round trips)")

In [None]:
# Code complexity comparison
print("\n" + "="*60)
print("CODE COMPLEXITY COMPARISON")
print("="*60)

print("""
TRADITIONAL (redis-py):
------------------------
keys = list(r.scan_iter("redisbikeco:bike:*"))
bikes = []
for key in keys:
    data = r.json().get(key)
    if data:
        bikes.append(data)
df = pl.DataFrame(bikes)
stats = df.group_by("type").agg(pl.col("price").mean())

Lines of code: 7
Redis round trips: 1 + N (where N = number of documents)
""")

print("""
POLARS-REDIS:
------------------------
stats = (
    redis.scan_json(REDIS_URL, pattern="redisbikeco:bike:*", schema=bike_schema)
    .group_by("type")
    .agg(pl.col("price").mean())
    .collect()
)

Lines of code: 5
Redis round trips: ~N/batch_size (batched)
""")

print(f"\nPerformance: polars-redis was {traditional_time/polars_redis_time:.1f}x faster")

## Summary

**polars-redis** provides significant advantages over traditional Redis access patterns:

### Less Code, More Power
- Write DataFrames directly to Redis with `write_json()`
- Infer schemas automatically with `infer_json_schema()`
- Scan data lazily with `scan_json()`
- Search with a Pythonic query builder

### Better Performance
- Batched fetching reduces round trips
- Server-side filtering with RediSearch
- Lazy evaluation defers work until needed

### Full Analytics Capability
- Seamless integration with Polars for complex analytics
- Vector similarity search for semantic queries
- Aggregations can run server-side or client-side

### Type Safety
- Schema inference with confidence scores
- Enforced types during deserialization
- Early error detection for data quality issues