# DynamoDB Data Source for PySpark — Examples

This notebook demonstrates all supported operations:
- **Batch Write** — write a DataFrame to a DynamoDB table
- **Batch Write with Delete Flag** — conditionally delete items during a write
- **Batch Read** — read an entire table into a DataFrame
- **Batch Read with Parallel Scan** — use multiple segments for faster reads
- **Batch Read with Schema Projection** — read only specific columns
- **Streaming Write** — write a streaming DataFrame to DynamoDB
- **Service Credentials** — authenticate via Databricks Unity Catalog service credentials

## Setup

Install the wheel (if running outside Databricks):
```bash
pip install dist/dynamodb_data_source-0.1.0-py3-none-any.whl
```

In [None]:
from pyspark.sql import SparkSession
from dynamodb_data_source import DynamoDbDataSource

spark = SparkSession.builder.appName("dynamodb-examples").getOrCreate()
spark.dataSource.register(DynamoDbDataSource)

In [None]:
# Common options — update these for your environment
common_options = {
    "table_name": "<your-table-name>",
    "aws_region": "us-west-2",
    "aws_access_key_id": "<your-access-key-id>",
    "aws_secret_access_key": "<your-secret-access-key>",
    "aws_session_token": "<your-session-token>",  # optional, for temporary credentials
}

---
## 1. Batch Write

In [None]:
data = [
    ("id-001", "Alice", 30, 100),
    ("id-002", "Bob", 25, 200),
    ("id-003", "Charlie", 35, 150),
]
df = spark.createDataFrame(data, ["id", "name", "age", "score"])

# create_table will create the table if it doesn't exist, using "id" as the hash key.
# If the table already exists, this is a no-op.
df.write.format("dynamodb") \
    .mode("append") \
    .options(**common_options) \
    .option("create_table", "true") \
    .option("hash_key", "id") \
    .save()

print(f"Wrote {df.count()} rows")

### Batch Write with Repartitioning (for large DataFrames)

Repartitioning distributes the write across multiple Spark tasks, each with its own
`batch_writer` connection to DynamoDB. This is the recommended approach for large writes.

In [None]:
large_data = [(f"id-{i:05d}", f"User_{i}", i % 100, i * 10) for i in range(10_000)]
df_large = spark.createDataFrame(large_data, ["id", "name", "age", "score"])

df_large.repartition(4).write.format("dynamodb") \
    .mode("append") \
    .options(**common_options) \
    .save()

print(f"Wrote {df_large.count()} rows using 4 parallel partitions")

---
## 2. Batch Write with Delete Flag

Rows where the delete flag column matches the specified value will be **deleted** from
DynamoDB. All other rows are written (upserted) as normal.

In [None]:
data_with_deletes = [
    ("id-001", "Alice", 30, 100, False),   # upsert
    ("id-002", "Bob", 25, 200, True),      # delete
    ("id-004", "Diana", 28, 300, False),   # insert
]
df_del = spark.createDataFrame(data_with_deletes, ["id", "name", "age", "score", "is_deleted"])

df_del.write.format("dynamodb") \
    .mode("append") \
    .options(**common_options) \
    .option("delete_flag_column", "is_deleted") \
    .option("delete_flag_value", "true") \
    .save()

print("Write with delete flag complete")

---
## 3. Batch Write with TTL

DynamoDB TTL is just a regular numeric attribute containing a Unix epoch timestamp.
Enable TTL on the table via the AWS console or API, then include the TTL column in your data.

In [None]:
import time

expire_in_30_days = int(time.time()) + 86400 * 30

data_with_ttl = [
    ("id-010", "Temporary User", expire_in_30_days),
    ("id-011", "Another Temp", expire_in_30_days + 3600),
]
df_ttl = spark.createDataFrame(data_with_ttl, ["id", "name", "expire_at"])

df_ttl.write.format("dynamodb") \
    .mode("append") \
    .options(**common_options) \
    .save()

print("Write with TTL column complete")

---
## 4. Batch Read

Reads the entire table. Schema is automatically derived by sampling items.

In [None]:
df_read = spark.read.format("dynamodb") \
    .options(**common_options) \
    .load()

print(f"Read {df_read.count()} rows, columns: {df_read.columns}")

display(df_read)

### Batch Read with Parallel Scan

Set `total_segments` to distribute the scan across multiple Spark tasks.
AWS recommends 1 segment per 2 GB of table data.

In [None]:
df_parallel = spark.read.format("dynamodb") \
    .options(**common_options) \
    .option("total_segments", "4") \
    .load()

print(f"Read {df_parallel.count()} rows using 4 parallel segments")

display(df_parallel)

### Batch Read with Consistent Reads

In [None]:
df_consistent = spark.read.format("dynamodb") \
    .options(**common_options) \
    .option("consistent_read", "true") \
    .load()

print(f"Read {df_consistent.count()} rows with strongly consistent reads")

### Batch Read with Schema Projection

Provide an explicit schema to read only specific columns. This uses DynamoDB's
`ProjectionExpression` to fetch only the requested attributes, reducing data transfer.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

projection_schema = StructType([
    StructField("id", StringType()),
    StructField("name", StringType()),
])

df_projected = spark.read.format("dynamodb") \
    .schema(projection_schema) \
    .options(**common_options) \
    .load()

print(f"Projected columns: {df_projected.columns}")

display(df_projected)

---
## 5. Streaming Write

Write a streaming DataFrame to DynamoDB. This example uses the built-in `rate` source
to generate test data. Replace with your actual streaming source (Kafka, Kinesis, etc.).

In [None]:
import pyspark.sql.functions as F

stream_df = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 5)
    .option("numPartitions", 1)
    .load()
    .withColumn("id", F.concat(F.lit("stream-"), F.col("value").cast("string")))
    .withColumn("name", F.lit("streamed_item"))
    .select("id", "name")
)

query = (
    stream_df.writeStream
    .format("dynamodb")
    .outputMode("append")
    .options(**common_options)
    .option("checkpointLocation", "/tmp/dynamodb_stream_checkpoint")
    .start()
)

print(f"Streaming query started: {query.id}")
print("Run query.stop() to stop the stream")

In [None]:
# Stop the streaming query when done
# query.stop()

---
## 6. Using Databricks Unity Catalog Service Credentials

On Databricks (DBR 15.4+), you can authenticate using a Unity Catalog service credential
instead of explicit AWS keys. Set the `credential_name` option to the name of your
service credential.

In [None]:
# Resolve AWS credentials from a Unity Catalog service credential
from databricks.sdk.runtime import dbutils

uc_service_credential_name = "<your-service-credential-name>"
provider = dbutils.credentials.getServiceCredentialsProvider(uc_service_credential_name)
credentials = provider.get_credentials().get_frozen_credentials()

uc_options = {
    "table_name": "<your-table-name>",
    "aws_region": "us-west-2",
    "aws_access_key_id": credentials.access_key,
    "aws_secret_access_key": credentials.secret_key,
    "aws_session_token": credentials.token,
    "credential_name": uc_service_credential_name
}

# Batch write with service credentials
df.write.format("dynamodb") \
    .mode("append") \
    .options(**uc_options) \
    .save()

# Batch read with service credentials
df = spark.read.format("dynamodb") \
    .options(**uc_options) \
    .load()

display(df)

---
## Configuration Reference

| Option | Required | Default | Description |
|--------|----------|---------|-------------|
| `table_name` | Yes | — | DynamoDB table name |
| `aws_region` | Yes | — | AWS region (e.g. `us-east-1`) |
| `aws_access_key_id` | No | — | AWS access key (uses default credentials if not set) |
| `aws_secret_access_key` | No | — | AWS secret key |
| `aws_session_token` | No | — | AWS session token for temporary credentials |
| `endpoint_url` | No | — | Custom endpoint (e.g. `http://localhost:8000` for DynamoDB Local) |
| `credential_name` | No | — | Databricks Unity Catalog service credential name |
| `delete_flag_column` | No | — | Column indicating deletion (write only) |
| `delete_flag_value` | No | — | Value that triggers deletion (write only) |
| `total_segments` | No | `1` | Number of parallel scan segments (read only) |
| `consistent_read` | No | `false` | Use strongly consistent reads (read only) |