# PySpark ETL with Simple UDF Hash Fetching

**Approach:** Simple UDF - define function, register as UDF, use with `withColumn`

Clean and straightforward!

## 1. Create Sample Data

In [3]:
from pyspark.sql import Row
from datetime import date

# Claims data
claims_data = [
    Row(claim_id="CL_001", policyholder_id="PH1", claim_amount=5000, claim_date=date(2024, 1, 15), region="North"),
    Row(claim_id="CL_002", policyholder_id="PH2", claim_amount=3000, claim_date=date(2024, 2, 20), region="South"),
    Row(claim_id="RX_001", policyholder_id="PH3", claim_amount=7000, claim_date=date(2024, 3, 10), region="East"),
    Row(claim_id="CL_003", policyholder_id="PH1", claim_amount=2000, claim_date=date(2024, 4, 5), region="West"),
    Row(claim_id="RX_002", policyholder_id="PH4", claim_amount=4500, claim_date=date(2024, 5, 12), region="North"),
    Row(claim_id="CL_004", policyholder_id="PH2", claim_amount=6000, claim_date=date(2024, 6, 18), region="South"),
]

# Policyholders data
policyholders_data = [
    Row(policyholder_id="PH1", policyholder_name="Alice Johnson"),
    Row(policyholder_id="PH2", policyholder_name="Bob Smith"),
    Row(policyholder_id="PH3", policyholder_name="Charlie Brown"),
    Row(policyholder_id="PH4", policyholder_name="Diana Prince"),
]

# Create DataFrames
claims_df = spark.createDataFrame(claims_data)
policyholders_df = spark.createDataFrame(policyholders_data)

print("‚úÖ Sample data created")
claims_df.show()
policyholders_df.show()

NameError: name 'spark' is not defined

## 2. Define and Register UDF

In [None]:
import requests
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the hash function
def get_hash(claim_id):
    """
    Fetches MD4 hash for a claim_id from external API.
    """
    if not claim_id:
        return ""
        
    try:
        url = f"https://api.hashify.net/hash/md4/hex?value={claim_id}"
        response = requests.get(url, timeout=5)
        if response.status_code == 200:
            return response.json().get("Digest", "")
        return ""
    except Exception:
        return ""

# Register as UDF
get_hash_udf = udf(get_hash, StringType())

print("‚úÖ UDF registered")

## 3. Extract and Transform Data

In [None]:
from pyspark.sql.functions import col, when, split, date_format

print("üîÑ Starting ETL transformation...\n")

# Step 1: Join Claims and Policyholders
print("üìä Joining claims and policyholders...")
joined_df = claims_df.join(
    policyholders_df, 
    "policyholder_id", 
    "left"
)

# Step 2: Apply UDF to add hash_id column
print("üì° Fetching hashes using UDF...")
joined_with_hashes_df = joined_df.withColumn("hash_id", get_hash_udf(col("claim_id")))

print("\nüìã DataFrame with Hashes:")
joined_with_hashes_df.select("claim_id", "policyholder_name", "hash_id").show(truncate=False)

# Step 3: Apply business transformations
print("\nüîß Applying business transformations...")
final_df = joined_with_hashes_df.withColumn(
    "claim_type",
    when(col("claim_id").like("CL%"), "Coinsurance")
    .when(col("claim_id").like("RX%"), "Reinsurance")
    .otherwise("Unknown")
).withColumn(
    "claim_priority",
    when(col("claim_amount") > 4000, "Urgent")
    .otherwise("Normal")
).withColumn(
    "claim_period",
    date_format(col("claim_date"), "yyyy-MM")
).withColumn(
    "source_system_id",
    split(col("claim_id"), "_").getItem(1)
)

# Select final columns in specific order
final_df = final_df.select(
    "claim_id",
    "policyholder_name",
    "region",
    "claim_type",
    "claim_priority",
    "claim_amount",
    "claim_period",
    "source_system_id",
    "hash_id"
)

print("\nüìä Final Transformed DataFrame:")
final_df.show(truncate=False)

print("\n‚úÖ Transformation complete!")

## 4. Load - Write to Parquet

In [None]:
# Define output path
output_path = "/tmp/processed_claims_output"

print(f"üíæ Writing output to {output_path}...")
final_df.coalesce(1).write.parquet(output_path, mode="overwrite")
print("‚úÖ Write complete!")

# Verify output
print("\nüîç Verifying output...")
result_df = spark.read.parquet(output_path)
print(f"‚úÖ Successfully read {result_df.count()} rows from output")
result_df.show(truncate=False)

## 5. Summary

### ‚úÖ What We Did

**Simple 3-step UDF approach:**

1. **Define function** - `get_hash(claim_id)`
2. **Register as UDF** - `get_hash_udf = udf(get_hash, StringType())`
3. **Use with withColumn** - `df.withColumn("hash_id", get_hash_udf(col("claim_id")))`

That's it! Clean and simple.

### üìä Complete ETL Pipeline

1. ‚úÖ Extract - Load claims and policyholders
2. ‚úÖ Transform - Join tables, fetch hashes, apply business rules
3. ‚úÖ Load - Write to Parquet

### üéØ This Works Great For:

- Small to medium datasets
- APIs that can handle parallel requests
- Databricks/proper Spark cluster environment

### üí° Note:

This approach creates a **new HTTP connection for each row**.

For better performance at scale, consider:
- Connection pooling with `mapPartitions`
- Driver-side batching with deduplication

But for getting started and testing - this simple UDF approach is perfect! ‚ú®