# Parallel Data Enrichment with Apache Spark

This notebook demonstrates how to use the `parallel-web-tools` package to enrich data directly in Apache Spark using SQL-native UDFs.

## Features

- **SQL-native UDFs**: Use `parallel_enrich()` directly in Spark SQL queries
- **DataFrame API**: Use UDFs with `withColumn()`, `select()`, etc.
- **Multiple processors**: Choose speed vs. depth tradeoff (lite, base, core, pro, ultra)

## Prerequisites

```bash
pip install parallel-web-tools[spark]
export PARALLEL_API_KEY="your-api-key"
```

You also need Java Runtime Environment (JRE) for Spark.

## Setup

In [None]:
# Install dependencies if needed
# !pip install parallel-web-tools[spark]

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StringType, StructField, StructType

# Import Parallel Spark integration
from parallel_web_tools.integrations.spark import (
    create_parallel_enrich_udf,
    register_parallel_udfs,
)

In [None]:
# Create Spark session
# Note: spark.driver.bindAddress is set for local development compatibility
spark = (
    SparkSession.builder.master("local[*]")
    .appName("ParallelEnrichmentDemo")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.sql.shuffle.partitions", "4")
    .getOrCreate()
)

print(f"Spark version: {spark.version}")

## Authentication

The Spark UDFs use credentials in this order:
1. `api_key` parameter passed to `register_parallel_udfs()`
2. `PARALLEL_API_KEY` environment variable

For detailed setup instructions for Databricks, EMR, Dataproc, and other platforms, see **[docs/spark-setup.md](../docs/spark-setup.md)**.

In [None]:
import os

from dotenv import load_dotenv

# Load environment variables from .env file (if present)
# Create a .env file with: PARALLEL_API_KEY=your-key
load_dotenv()

api_key = os.environ.get("PARALLEL_API_KEY")
if api_key:
    print(f"PARALLEL_API_KEY is set ({len(api_key)} chars)")
else:
    print("PARALLEL_API_KEY not found. Create a .env file with:")
    print("  PARALLEL_API_KEY=your-key")

## Register UDFs with Spark

This makes `parallel_enrich()` available in Spark SQL queries.

In [None]:
# Register UDFs with default settings
# processor="lite-fast" is the default (fastest, cheapest)
register_parallel_udfs(
    spark,
    processor="lite-fast",  # Options: lite, base, core, pro, ultra (+ -fast variants)
    timeout=300,  # Timeout in seconds
)

print("UDFs registered: parallel_enrich, parallel_enrich_with_processor")

## Create Sample Data

In [None]:
# Sample company data
companies = [
    ("Parallel Web Systems", "https://parallel.ai", "Technology"),
    ("Google", "https://google.com", "Technology"),
    ("Microsoft", "https://microsoft.com", "Technology"),
    ("Apple", "https://apple.com", "Technology"),
    ("Amazon", "https://amazon.com", "E-commerce"),
]

df = spark.createDataFrame(companies, ["company_name", "website", "industry"])

df.toPandas()

In [None]:
# Register as temp view for SQL queries
df.createOrReplaceTempView("companies")

## Method 1: SQL-based Enrichment

Use `parallel_enrich()` directly in SQL queries. This is the most natural way to enrich data in Spark.

In [None]:
# Enrich companies with CEO name and founding year
# Note: This will make API calls - may take a few seconds per row

enriched_df = spark.sql("""
    SELECT
        company_name,
        website,
        industry,
        parallel_enrich(
            map('company_name', company_name, 'website', website),
            array(
                'CEO name (current CEO or equivalent leader)',
                'Founding year (YYYY format)',
                'Brief company description (1-2 sentences)'
            )
        ) as enriched_json
    FROM companies
    LIMIT 2  -- Start with just 2 for demo
""").cache()  # Cache to avoid re-running UDF on subsequent actions

enriched_df.toPandas()

## Parse the JSON Results

The enrichment returns JSON. Let's parse it into structured columns.

**Important**: We used `.cache()` above to store the enriched results. Without caching, Spark would re-execute the UDF (and make new API calls) each time we perform an action on derived DataFrames.

In [None]:
# Define schema for the enriched data
enriched_schema = StructType(
    [
        StructField("ceo_name", StringType(), True),
        StructField("founding_year", StringType(), True),
        StructField("brief_company_description", StringType(), True),
    ]
)

# Parse JSON and extract fields (uses cached enriched_df - no re-computation)
parsed_df = enriched_df.select(
    "company_name", "website", "industry", from_json(col("enriched_json"), enriched_schema).alias("enriched")
).select(
    "company_name",
    "website",
    "industry",
    col("enriched.ceo_name").alias("ceo"),
    col("enriched.founding_year").alias("founded"),
    col("enriched.brief_company_description").alias("description"),
)

parsed_df.toPandas()

## Method 2: DataFrame API with UDF

Instead of SQL, you can use the UDF directly with Spark's DataFrame API (`withColumn`, `select`, etc.).

In [None]:
from pyspark.sql.functions import array, create_map, lit

# Create a UDF instance (can customize processor, timeout, etc.)
enrich_udf = create_parallel_enrich_udf(processor="lite-fast", timeout=300)

In [None]:
# Use the UDF with DataFrame API instead of SQL
enriched_via_api = (
    df.limit(2).withColumn(
        "enriched_json",
        enrich_udf(
            # Input: map of column names to values
            create_map(lit("company_name"), col("company_name"), lit("website"), col("website")),
            # Output: array of field descriptions
            array(lit("CEO name"), lit("Founding year"), lit("Brief company description")),
        ),
    )
).cache()

enriched_via_api.toPandas()

## Processor Options

Choose a processor based on your needs:

| Processor | Speed | Cost | Best For |
|-----------|-------|------|----------|
| `lite`, `lite-fast` | Fastest | Lowest | Basic metadata, high volume |
| `base`, `base-fast` | Fast | Low | Standard enrichments |
| `core`, `core-fast` | Medium | Medium | Cross-referenced data |
| `pro`, `pro-fast` | Slow | High | Deep research (enables SSE) |
| `ultra`, `ultra-fast` | Slowest | Highest | Multi-source research |

In [None]:
# Use a different processor via SQL
# parallel_enrich_with_processor allows per-query processor override

result_df = spark.sql("""
    SELECT
        company_name,
        parallel_enrich_with_processor(
            map('company_name', company_name),
            array('Recent news headline about this company'),
            'base-fast'  -- Use base processor for more depth
        ) as news
    FROM companies
    LIMIT 1
""")

result_df.toPandas()

## Streaming Enrichment

For Spark Structured Streaming examples, see the dedicated notebook: **[spark_streaming_demo.ipynb](spark_streaming_demo.ipynb)**

## Best Practices

### 1. Always Cache Enriched Results
The UDF makes API calls, so cache to avoid redundant requests:

```python
enriched_df = df.withColumn("enriched", enrich_udf(...)).cache()
# Now subsequent operations use cached data
enriched_df.select(...).toPandas()
```

### 2. Column Descriptions
Be specific in your output column descriptions:

```python
# Good - specific descriptions help the AI
output_columns = array(
    lit("CEO name (current CEO or equivalent leader)"),
    lit("Founding year (YYYY format)"),
    lit("Annual revenue (USD, most recent fiscal year)")
)

# Less specific - may get inconsistent results
output_columns = array(lit("CEO"), lit("Year"), lit("Revenue"))
```

### 3. Error Handling
Check for errors in the JSON response:

```python
from pyspark.sql.functions import get_json_object

df.withColumn("error", get_json_object(col("enriched_json"), "$.error"))
```

### 4. Use Appropriate Processors
- `lite-fast`: Basic metadata, high volume (cheapest)
- `base-fast`: Standard enrichments
- `pro-fast`: Deep research requiring multiple sources

## Cleanup

In [None]:
# Stop Spark session
spark.stop()
print("Spark session stopped.")

## Next Steps

- **Production deployment**: Use `spark-submit` with your cluster
- **Streaming**: Connect to Kafka, Kinesis, or file sources
- **Scaling**: Increase parallelism with more executors
- **Monitoring**: Track enrichment success/failure rates

For more information:
- [Parallel Documentation](https://docs.parallel.ai)
- [parallel-web-tools GitHub](https://github.com/parallel-web/parallel-web-tools)