# FreshGoods Polaris Data Pipeline Demo 🚀

**A retail‑analytics story in four short scenes**

Every night, each FreshGoods store uploads a CSV “drop” of that day’s *product movement* to Ceph RGW.  Our mission is to turn those raw files into business‑ready insight by 9 a.m. —*without ever revealing customer emails (PII) to unauthorized users.*

We’ll walk that journey in this notebook using three personas:
- **Engineer** ingests raw CSV into an Iceberg RAW table (`products_raw`).
- **Compliance** reads RAW, hashes `email`, computes `total = price * quantity`, writes a clean GOLD table (`products_gold`).
- **Analyst** inspects the GOLD table, confirming the pipeline’s output.


## Table of Contents
1. [Load OAuth2 Tokens](#load-tokens)
2. [Spark Sessions Setup](#sessions-setup)
3. [Engineer: Ingest Raw Data](#engineer-ingest)
4. [Engineer: Verify RAW Table](#engineer-verify-raw)
5. [Compliance: Curate & Protect GOLD](#compliance-curate)
5.1 [Compliance: Verify GOLD Table](#compliance-verify-gold)
6. [🔐 Validation: Enforce Least Privilege](#validation)
7. [Analyst: Verify GOLD Table](#analyst-verify)


<a id="load-tokens"></a>
## 1️⃣ Load OAuth2 Tokens

Read the JSON of tokens that Terraform minted for our three personas, so Spark can delegate fine‑grained credentials per table.

In [None]:
import json, pathlib, pprint

TOKENS = json.loads(pathlib.Path('/home/jovyan/work/tokens.json').read_text())
ENG_TOKEN     = TOKENS['engineer']
COMP_TOKEN    = TOKENS['compliance']
ANALYST_TOKEN = TOKENS['analyst']

print('Tokens loaded for personas:')
pprint.pprint(TOKENS, width=40)

<a id="sessions-setup"></a>
## 2️⃣ Spark Sessions Setup

Create a SparkSession for **Engineer** using their token. Then **clone** it for **Compliance** and **Analyst**, swapping in each persona’s token to enforce table‑level RBAC under the hood.

In [None]:
from pyspark.sql import SparkSession

# Base session (Engineer)
engineer = (
    SparkSession.builder
      .appName('polaris-engineer')
      .config('spark.jars.packages', \
          'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,org.apache.hadoop:hadoop-aws:3.4.0')
      .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
      .config('spark.sql.catalog.polaris', 'org.apache.iceberg.spark.SparkCatalog')
      .config('spark.sql.catalog.polaris.type', 'rest')
      .config('spark.sql.catalog.polaris.uri', 'http://polaris:8181/api/catalog')
      .config('spark.sql.catalog.polaris.warehouse', 'prod')
      .config('spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation', 'vended-credentials')
      .config('spark.sql.catalog.polaris.token', ENG_TOKEN)
      .config('spark.sql.defaultCatalog', 'polaris')
      .getOrCreate()
)
print('✅ Session: Engineer →', ENG_TOKEN[:8] + '...')

# Clone for Compliance
compliance = engineer.newSession()
compliance.conf.set('spark.sql.catalog.polaris.token', COMP_TOKEN)
print('✅ Session: Compliance →', COMP_TOKEN[:8] + '...')

# Clone for Analyst
analyst = engineer.newSession()
analyst.conf.set('spark.sql.catalog.polaris.token', ANALYST_TOKEN)
print('✅ Session: Analyst →', ANALYST_TOKEN[:8] + '...')

<a id="engineer-ingest"></a>
## 3️⃣ Engineer: Ingest Raw Data

The **Engineer** lands last night’s `products_raw_200.csv` drop into our Iceberg RAW table `products_raw`. No filtering or masking yet—this is a faithful mirror of the source CSV.

In [None]:
raw_df = (
    engineer.read
      .option('header', True)
      .option('inferSchema', True)
      .csv('/home/jovyan/work/products_raw_200.csv')
)
raw_df.writeTo('polaris.prod_ns.products_raw').append()
print('✅ Raw data appended to products_raw')

<a id="engineer-verify-raw"></a>
### 4️⃣ Engineer: Verify RAW Table

Quickly peek at the raw table to confirm ingestion. We should see the original `email` column with PII here.

In [None]:
import pandas as pd
from IPython.display import display

# Preview first 5 rows of RAW
raw_preview = engineer.sql(
    'SELECT * FROM polaris.prod_ns.products_raw LIMIT 5'
).toPandas()
print('Preview of products_raw:')
display(raw_preview)

<a id="compliance-curate"></a>
## 5️⃣ Compliance: Curate & Protect GOLD

Now the **Compliance** persona reads from `products_raw`, **hashes** the `email` column to remove PII, **computes** `total = price * quantity`, and **overwrites** the GOLD table `products_gold`. We specify Parquet by default in the catalog properties, so the data lands in compressed, columnar format.

In [None]:
from pyspark.sql.functions import col, sha2, lit

gold_df = (
    compliance.read
      .table('polaris.prod_ns.products_raw')
      .withColumn('total',      col('price') * col('quantity'))
      .withColumn('email_hash', sha2(col('email'), 256))
      .select(
         'product_id',
         'product_name',
         'category',
         'total',
         'email_hash',
         'timestamp'
      )
)

# overwrite(lit(True)) replaces all rows in GOLD
gold_df.writeTo('polaris.prod_ns.products_gold')  \
      .overwrite(lit(True))

print('✅ Curated & protected data into products_gold')

<a id="compliance-verify-gold"></a>
### 5.1️⃣ Compliance: Verify GOLD Table

Confirm that `products_gold` now contains the curated records (no raw `email`, shows hashed emails and `total`).

In [None]:
import pandas as pd
from IPython.display import display

gold_compliance_preview = compliance.sql(
    'SELECT * FROM polaris.prod_ns.products_gold LIMIT 5'
).toPandas()
print('Compliance sees products_gold:')
display(gold_compliance_preview)

<a id="validation"></a>
## 6️⃣ 🔐 Validation: Enforce Least Privilege

Demonstrate that each persona is restricted to only their allowed operations.

**a) Engineer cannot write to GOLD**

The Engineer only has `TABLE_RW` on the RAW table; any attempt to modify GOLD should be denied.

In [None]:
try:
    engineer.sql("INSERT INTO polaris.prod_ns.products_gold VALUES ('X','Test','t',0.00,'h',CURRENT_TIMESTAMP)")
except Exception as e:
    from IPython.display import HTML, display
    msg = str(e).split('\n')[0]
    display(HTML(f"<div style='color:red;font-weight:bold'>🔒 Engineer write to GOLD denied: {msg}</div>"))

**b) Compliance cannot modify RAW**

Compliance can read from RAW but should not have write/delete privileges there.

In [None]:
try:
    compliance.sql("DELETE FROM polaris.prod_ns.products_raw WHERE quantity < 0")
except Exception as e:
    from IPython.display import HTML, display
    msg = str(e).split('\n')[0]
    display(HTML(f"<div style='color:red;font-weight:bold'>🔒 Compliance delete on RAW denied: {msg}</div>"))

**c) Analyst cannot see raw PII**

The Analyst only has read access on GOLD; any direct RAW `email` query should be forbidden.

In [None]:
try:
    analyst.sql('SELECT email FROM polaris.prod_ns.products_raw LIMIT 1').show()
except Exception as e:
    from IPython.display import HTML, display
    msg = str(e).split('\n')[0]
    display(HTML(f"<div style='color:red;font-weight:bold'>🔒 Analyst read RAW PII denied: {msg}</div>"))

# Why? The Analyst has no privileges on products_raw, so Polaris denies RAW-PII access.

<a id="analyst-verify"></a>
## 7️⃣ Analyst: Verify GOLD Table

Now the **Analyst** lists and previews the curated GOLD table, safe in the knowledge that PII has been masked and only the intended columns are exposed.

In [None]:
import pandas as pd
from IPython.display import display

# List available tables in prod_ns
tbls = analyst.sql('SHOW TABLES IN polaris.prod_ns').toPandas()
print('Tables in prod_ns:')
display(tbls)

# Preview first 5 rows of GOLD
gold_preview = analyst.sql(
    'SELECT * FROM polaris.prod_ns.products_gold LIMIT 5'
).toPandas()
print('Preview of products_gold:')
display(gold_preview)