# OAuth2 Password Grant Demo

This notebook demonstrates running the ETL pipeline against an API endpoint using OAuth2 Password Grant (Resource Owner Password Credentials) authentication via Keycloak.

## Prerequisites

Start Keycloak and the mock API service:
```bash
make up-keycloak
```

Services:
- Keycloak Admin Console: `http://localhost:8180` (admin/admin)
- Mock API: `http://mock-api:8000` (from Docker network)

## OAuth2 Flow

1. The driver obtains an access token from Keycloak using user credentials
2. Tokens are distributed to executors via RPC
3. Executors use the token for API requests
4. Token refresh is handled automatically

In [1]:
import sys
from pathlib import Path

In [2]:
project_root = Path("/opt/spark/work")
src_path = project_root / "src"

if str(src_path) not in sys.path:
    sys.path.insert(0, str(src_path))

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sha2, expr
from pipeline.orchestrator import run_pipeline

In [7]:
spark = (
    SparkSession.builder
    .appName("oauth2_password_demo_pipeline")
    .getOrCreate()
)

## Create Source DataFrame

Generate a DataFrame with unique tracking IDs that will be used to make API requests.

In [5]:
df = (
    spark.range(50)
         .repartition(4)
         .select(
             sha2(expr("uuid()"), 256).alias("tracking_id")
         )
)
df.show(10)

+--------------------+
|         tracking_id|
+--------------------+
|ded0a3107c8334a60...|
|df8f72dfd0a8bffed...|
|161b445ce7a24ecf0...|
|af26666ea13b9ae78...|
|848f7615a10ff69b5...|
|9a6a8fd2f978e41bd...|
|a5ae76985a21cf042...|
|44028bdf3812d002e...|
|81bd4bc0dff1e13ff...|
|479353dfdaad52fd1...|
+--------------------+
only showing top 10 rows



## Run Pipeline

Execute the ETL pipeline using the OAuth2 Password Grant configuration.

The pipeline will:
1. Authenticate with Keycloak using user credentials
2. Distribute the access token to all Spark executors
3. Make API requests with `Authorization: Bearer <token>` header
4. Automatically refresh tokens when they expire

In [6]:
config_path = project_root / "configs" / "examples" / "oauth2_keycloak_demo.yml"

In [7]:
run_pipeline(
    spark=spark,
    config_path=config_path,
    source_df=df,
    source_id="tracking_id"
)

2026-02-08 14:30:30,818 [INFO] [PipelineOrchestrator]: Starting driver-side authentication runtime service
2026-02-08 14:30:30,819 [INFO] [RpcBootstrapper]: Starting RPC token service...
2026-02-08 14:30:30,836 [INFO] AsyncBackgroundService[DriverTokenManager]: Background service started
2026-02-08 14:30:30,839 [INFO] AsyncBackgroundService[RpcService]: Background service started
2026-02-08 14:30:30,841 [INFO] [TokenRpcService]: Started at http://89d602eb842e:36275
2026-02-08 14:30:30,843 [INFO] [RpcBootstrapper]: TokenManager background refresh started.
2026-02-08 14:30:30,843 [INFO] [RpcBootstrapper]: RPC Token Service running at http://89d602eb842e:36275
2026-02-08 14:30:30,843 [INFO] [PipelineOrchestrator]: Adding authentication middleware
2026-02-08 14:30:30,844 [INFO] [PipelineOrchestrator]: Request from URL: /http://mock-api:8000/api/oauth2/data
2026-02-08 14:30:30,844 [INFO] [TableManager]: Creating database demo
2026-02-08 14:30:31,244 [INFO] [DriverTokenManager]: Background t

## Verify Results

Read the sink table to verify the API responses were captured.

In [11]:
response_df = spark.table("demo.oauth2_user_credentials_demo_response")
response_df.show(10)

+--------------------+--------------------+--------------------+------+--------------------+--------------+--------------------+-----------+--------------------+--------------------+-------+-------------+--------+--------------------+--------------------+
|          request_id|            row_hash|                 url|method|     request_headers|request_params|    request_metadata|status_code|    response_headers|           body_text|success|error_message|attempts|   response_metadata|       _request_time|
+--------------------+--------------------+--------------------+------+--------------------+--------------+--------------------+-----------+--------------------+--------------------+-------+-------------+--------+--------------------+--------------------+
|e6caee7b79a3b1dc1...|75372c37a418ab529...|/http://mock-api:...|   GET|{"Accept": "appli...|            {}|{"vendor": "mock-...|        200|{"Date": "Mon, 09...|{"request_id":"1b...|   true|         NULL|       1|{"connection_warm..

In [10]:
# Summary statistics
print(f"Total records: {response_df.count()}")
response_df.groupBy("status_code").count().show()

Total records: 50
+-----------+-----+
|status_code|count|
+-----------+-----+
|        200|   50|
+-----------+-----+



In [11]:
# Sample response body - note the auth_method field shows "oauth2:bearer"
response_df.select("body_text").limit(3).show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|body_text                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
+-------------------------------------------------------------------------------------------------------------

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 36668)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/usr/local/spark/python/pyspark/accumulators.py", line 271, in accum_updates
    num_updates =