# üìä Avetrack Application Data Generator - 116KB Documents

Generate large-scale avetrack-style application data matching customer's **116KB document size** for DocumentDB performance testing.

## üéØ Document Size Target:
- **~116 KB per document** (matches customer sample: 118,580 bytes raw)
- **Customer scenario**: 4TB collection = ~37M documents

## üì¶ Size Components:
| Component | Size | Description |
|-----------|------|-------------|
| **WPP fields** | ~20 KB | 30+ identity verification fields |
| **losgs array** | ~40-50 KB | 10-15 lineItems with full price/tax details |
| **neustar data** | ~15-20 KB | Nested fraud detection responses |
| **Base fields** | ~25-30 KB | checkOutList, orderList, payments, shipping |
| **Total** | **~116 KB** | ‚úÖ Matches customer sample |

## üìä Scaling:
- **1M records** = ~116 GB
- **10M records** = ~1.16 TB
- **37M records** = ~4.3 TB (customer target)

In [None]:
# Configuration - Adjust these parameters as needed
NUM_RECORDS = 1000000  # Start with 1M for testing (~116 GB total)
NUM_PARTITIONS = 200
DELTA_TABLE_PATH = "abfss://delta2iceberg@onelake.dfs.fabric.microsoft.com/sampledata.Lakehouse/Tables/att/avetrack_applications_1m"

# Composite partitioning strategy
HASH_BUCKETS = 50  # Creates run √ó buckets = 4 √ó 50 = 200 partitions

# Document size target
TARGET_DOC_SIZE_KB = 116  # Matches customer sample (118,580 bytes)

print(f"üéØ Target: {TARGET_DOC_SIZE_KB} KB per document")
print(f"üìä Records: {NUM_RECORDS:,}")
print(f"üíæ Total size: {NUM_RECORDS * TARGET_DOC_SIZE_KB / 1024 / 1024:.1f} GB")
print(f"üóÇÔ∏è  Composite partitioning: run √ó {HASH_BUCKETS} hash buckets = ~{4 * HASH_BUCKETS} partitions")

In [None]:
# Import required libraries
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import json

print("‚úÖ Libraries imported successfully")

In [None]:
# Auto-adjust partitions based on record count
if NUM_RECORDS <= 1000000:
    NUM_PARTITIONS = 100
elif NUM_RECORDS <= 10000000:
    NUM_PARTITIONS = 500
else:
    NUM_PARTITIONS = 1000

print(f"‚ö° Using {NUM_PARTITIONS} partitions")
print(f"üì¶ Est. size: {NUM_RECORDS * TARGET_DOC_SIZE_KB / 1024 / 1024:.1f} GB")

In [None]:
# Reference data for realistic generation
ALL_US_STATES = ["AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN", "IA", "KS", 
                 "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", 
                 "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY", "DC"]

# Weighted distributions for realism
WEIGHTED_STATES = ["TX"] * 15 + ["CA"] * 15 + ["NY"] * 10 + ["FL"] * 10 + ALL_US_STATES * 2
RUN_TYPES = ["RUN2"] * 40 + ["RUN4"] * 35 + ["MITIGATIONRESULT"] * 15 + ["RUN1"] * 5 + ["RUN3"] * 5
CHANNELS = ["OCEDIRECT"] * 40 + ["RETAIL"] * 35 + ["CARE"] * 20 + ["DIGITAL"] * 5
CREDIT_STATUSES = ["APPROVED"] * 70 + ["DECLINED"] * 20 + ["PENDING"] * 10
DEVICE_MAKES = ["APPLE", "SAMSUNG", "GOOGLE", "ONEPLUS", "MOTOROLA", "LG"]
MARKETS = ["AUS", "DAL", "HOU", "SAT", "DEN", "PHX", "LAX", "SFO", "NYC", "CHI", "ATL", "MIA"]
FIRST_NAMES = ["James", "Mary", "John", "Patricia", "Robert", "Jennifer", "Michael", "Linda"]
LAST_NAMES = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis"]

print(f"‚úÖ Reference data loaded: {len(ALL_US_STATES)} states, {len(DEVICE_MAKES)} device makes")

## Define UDFs for Large Document Generation

These UDFs create the large nested structures needed to reach 116KB per document.

In [None]:
# UDF: Generate large WPP verification fields (~20KB)
@udf(returnType=StringType())
def generate_wpp_json():
    import random, json, builtins
    wpp = {
        "wppPrimaryPhoneCheckSubscriberName": f"User{random.randint(1000,9999)} Last{random.randint(100,999)}",
        "wppSecondaryAddressCheckLinkedToPrimaryResident": random.choice([True, False]),
        "wppEmailAddressCheckIsValid": True,
        "wppIdentityNetworkScore": builtins.round(random.uniform(0.5, 1.0), 3),
        "wppTransactionId": f"{random.randint(100000000, 999999999)}_{builtins.hex(random.getrandbits(128))[2:]}",
        "wppPrimaryPhoneCheckCountryCode": "US",
        "wppPrimaryPhoneCheckLineType": random.choice(["Landline", "Mobile", "VoIP"]),
        "wppIdentityCheckScore": random.randint(300, 900),
        "wppPrimaryPhoneCheckCarrier": random.choice(["AT&T", "Verizon", "T-Mobile"]),
        "wppPrimaryAddressCheckType": random.choice(["Single unit", "Multi unit"]),
        "wppIpAddressCheck": {
            "zipCode": str(random.randint(10000, 99999)),
            "country": "United States of America",
            "city": random.choice(["New York", "Los Angeles", "Chicago", "Houston"]),
            "countryCode": "US", "isValid": True, "subDivision": "California", "isProxy": False
        },
        # Add 80 fields with 500 chars each for ~40KB total
        **{f"wppField{i}": f"value_{i}_" + "x"*500 for i in range(80)}
    }
    return json.dumps(wpp)

print("‚úÖ WPP UDF defined (~40KB per document)")

In [None]:
# UDF: Generate large losgs array with 10-15 lineItems (~50KB)
@udf(returnType=StringType())
def generate_losgs_json():
    import random, json, builtins
    num_items = random.randint(12, 18)  # More items for size
    line_items = []
    for i in range(num_items):
        line_items.append({
            "catalogSKUId": f"{random.randint(10000, 99999)}",
            "displayName": f"PRODUCT_{i+1}_DESCRIPTION_WITH_LONG_NAME_FOR_SIZE_" + "X"*200,
            "payments": [{"paymentTenderReference": f"PAYMENT_{j+1}", "amount": builtins.round(random.uniform(0, 500), 2)} for j in range(3)],
            "hardGood": {
                "hardGoodType": random.choice(["SIM", "DEVICE", "ACCESSORY"]), 
                "deliveryPromiseNote": {f"note{k}": "N"*100 for k in range(5)}
            },
            "price": {
                "currencyType": "USD",
                "amount": builtins.round(random.uniform(0, 1000), 2),
                "total": builtins.round(random.uniform(0, 1000), 2),
                "taxDetail": {
                    "taxablePriceDetail": {
                        "shipToTaxAreaId": [random.randint(100000000, 999999999) for _ in range(5)],
                        "taxableCost": builtins.round(random.uniform(0, 500), 2),
                        "orderTaxAreaId": random.randint(100000000, 999999999)
                    },
                    "lineItemTaxes": [{
                        "jurisdictionLevel": f"TAX_LEVEL_{j}",
                        "taxCode": f"TAXCODE_{j}",
                        "taxRate": builtins.round(random.uniform(0.05, 0.10), 4),
                        "taxAmount": builtins.round(random.uniform(0, 50), 2),
                        "description": "T"*150
                    } for j in range(3)]
                },
                "priceType": random.choice(["DUE_UPON_FFL", "RC", "NRC"])
            },
            "id": f"LOSG-WLS-AL-001-LI-{i+1}",
            "productType": "HARDGOOD",
            "status": random.choice(["IN_PROGRESS", "COMPLETED"]),
            # Add padding fields
            **{f"itemField{k}": "P"*200 for k in range(5)}
        })
    
    return json.dumps([{
        "dealerCode": f"Z{random.randint(1000, 9999)}",
        "primaryIndicator": "TRUE",
        "productCategory": "WIRELESS",
        "lineItems": line_items,
        "id": "LOSG-WLS-AL-001",
        # Add padding
        **{f"losgField{m}": "L"*300 for m in range(10)}
    }])

print("‚úÖ losgs UDF defined (~50-60KB per document)")

In [None]:
# UDF: Generate neustar fraud detection data (~20KB)
@udf(returnType=StringType())
def generate_neustar_json():
    import random, json, builtins
    return json.dumps({
        "nsr": f"dc{random.randint(10,99)}-eid-prod-gwy{random.randint(1,9):02d}:{random.randint(100,999)}",
        "transid": f"{builtins.hex(random.getrandbits(128))[2:]}-ocedirect",
        "response": [{
            "provider": "NEUSTAR",
            "decision": random.choice(["APPROVE", "REVIEW", "DECLINE"]),
            "score": random.randint(0, 1000),
            "riskFactors": [f"FACTOR_{i}_" + "R"*100 for i in range(12)],
            "details": {f"field{i}": f"value_{i}_" + "D"*300 for i in range(20)}
        }],
        "RUN4": {
            "nsr": f"dc{random.randint(10,99)}-eid-prod-gwy02:{random.randint(100,999)}",
            "response": [{
                "provider": "NEUSTAR_RUN4",
                "additionalData": {f"data{i}": "A"*400 for i in range(15)},
                "extendedFields": {f"ext{j}": "E"*300 for j in range(10)}
            }]
        },
        # Add more padding fields
        **{f"neustarField{k}": "N"*500 for k in range(10)}
    })

print("‚úÖ Neustar UDF defined (~20KB per document)")

In [None]:
# Standard UDFs for IDs, timestamps, etc.
@udf(returnType=StringType())
def generate_uuid():
    import uuid
    return str(uuid.uuid4()).upper()

@udf(returnType=StringType())
def generate_session_id():
    import uuid, random
    return f"{random.randint(100000000, 999999999)}_{uuid.uuid4().hex}"

@udf(returnType=StringType())
def random_datetime():
    import random
    from datetime import datetime, timedelta
    base = datetime(2024, 1, 1)
    return (base + timedelta(days=random.randint(0, 365))).isoformat() + "Z"

@udf(returnType=StringType())
def random_choice(choices):
    import random
    return random.choice(choices)

print("‚úÖ Standard UDFs defined")

## Generate Base DataFrame

In [None]:
print(f"üöÄ Generating {NUM_RECORDS:,} base records...")

base_df = spark.range(0, NUM_RECORDS, numPartitions=NUM_PARTITIONS) \
    .withColumn("partition_id", spark_partition_id())

print(f"‚úÖ Base DataFrame created with {NUM_PARTITIONS} partitions")

## Generate Large Document Structure

In [None]:
print("üì¶ Building large document structure...")

# Add large JSON fields
docs_df = base_df \
    .withColumn("wpp_json", generate_wpp_json()) \
    .withColumn("losgs_json", generate_losgs_json()) \
    .withColumn("neustar_json", generate_neustar_json()) \
    .withColumn("_id", generate_uuid()) \
    .withColumn("run", random_choice(array([lit(r) for r in RUN_TYPES]))) \
    .withColumn("channel", random_choice(array([lit(c) for c in CHANNELS]))) \
    .withColumn("state", random_choice(array([lit(s) for s in WEIGHTED_STATES]))) \
    .withColumn("creditStatus", random_choice(array([lit(cs) for cs in CREDIT_STATUSES])))

# Build final document with all fields
final_df = docs_df.select(
    col("_id"),
    random_datetime().alias("applicationTimeStamp"),
    random_datetime().alias("createdDateTime"),
    random_datetime().alias("lastUpdateDateTime"),
    lit("ocedirect").alias("serviceId"),
    generate_uuid().alias("version"),
    lit("com.att.bdcoe.cmt.ApplicationRecord").alias("_class"),
    struct(
        generate_session_id().alias("sessionId"),
        col("run").alias("run"),
        col("channel").alias("channel"),
        struct(
            col("state").alias("state")
        ).alias("shippingAddress"),
        col("creditStatus").alias("creditStatus"),
        from_json(col("wpp_json"), MapType(StringType(), StringType())).alias("wpp"),
        from_json(col("losgs_json"), ArrayType(MapType(StringType(), StringType()))).alias("losgs"),
        from_json(col("neustar_json"), MapType(StringType(), StringType())).alias("neustarDigitalIdentityRisk")
    ).alias("fields"),
    col("run")
).withColumn("hash_bucket", abs(hash(col("_id"))) % HASH_BUCKETS) \
 .repartition(col("run"), col("hash_bucket"))

print(f"‚úÖ Large documents generated with ~116KB size")
print(f"üóÇÔ∏è  Added hash_bucket column for composite partitioning (0-{HASH_BUCKETS-1})")

## Verify Document Size

In [None]:
import json

# Check actual document size
sample = final_df.limit(1).collect()[0]
sample_json = json.dumps(sample.asDict(recursive=True))
doc_size_bytes = len(sample_json.encode('utf-8'))
doc_size_kb = doc_size_bytes / 1024

print(f"üìè Actual document size: {doc_size_bytes:,} bytes ({doc_size_kb:.1f} KB)")
print(f"üéØ Target: {TARGET_DOC_SIZE_KB} KB")
print(f"‚úÖ Size match: {'YES' if 100 <= doc_size_kb <= 130 else 'ADJUST UDFs'}")
print(f"\nüíæ Total collection size: {NUM_RECORDS * doc_size_kb / 1024 / 1024:.1f} GB")

In [None]:
# Show sample data
final_df.select("_id", "serviceId", "fields.run", "run", "hash_bucket", "fields.channel", "fields.shippingAddress.state").show(10, truncate=False)

## Write to Delta Lake

In [None]:
print("üíæ Writing to Delta Lake with composite partitioning...")
print(f"üóÇÔ∏è  Partitioning by: run √ó hash_bucket = ~{4 * HASH_BUCKETS} physical partitions")

final_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("run", "hash_bucket") \
    .option("path", DELTA_TABLE_PATH) \
    .save()

print(f"‚úÖ Data written to: {DELTA_TABLE_PATH}")
print(f"üì¶ Physical partitions: run (4 values) √ó hash_bucket ({HASH_BUCKETS} buckets) = {4 * HASH_BUCKETS} partitions")
print(f"üìä Avg partition size: {(NUM_RECORDS * TARGET_DOC_SIZE_KB / 1024 / 1024) / (4 * HASH_BUCKETS):.2f} GB")
print(f"‚úÖ Optimal for parallel reads/writes (avoids 100GB+ partition bottleneck)")

## Verify Data and Show Distributions

In [None]:
# Verify data
delta_df = spark.read.format("delta").load(DELTA_TABLE_PATH)
total_count = delta_df.count()

print(f"‚úÖ Verified: {total_count:,} records written")
print(f"üì¶ Total size: {total_count * doc_size_kb / 1024 / 1024:.1f} GB")

# Show composite partition distribution
print(f"\nüóÇÔ∏è  Composite Partition Distribution:")
print(f"   Physical partitions: {4 * HASH_BUCKETS}")
print(f"   Avg records/partition: {total_count // (4 * HASH_BUCKETS):,}")
print(f"   Avg size/partition: {(total_count * doc_size_kb / 1024 / 1024) / (4 * HASH_BUCKETS):.2f} GB")

# Show distributions
print("\nüìä Distribution by run type (LOGICAL PARTITION):")
delta_df.groupBy("run").count().orderBy(desc("count")).show()

print("\nüìä Distribution by hash_bucket (showing first 10):")
delta_df.groupBy("hash_bucket").count().orderBy("hash_bucket").show(10)

print("\nüìä Distribution by channel:")
delta_df.groupBy("fields.channel").count().orderBy(desc("count")).show()

print("\nüìä Distribution by credit status:")
delta_df.groupBy("fields.creditStatus").count().orderBy(desc("count")).show()

print("\nüìä Distribution by state (top 10):")
delta_df.groupBy("fields.shippingAddress.state").count().orderBy(desc("count")).show(10)

## üéØ Summary

### ‚úÖ Generated Data:
- **Document size**: ~116 KB per document (matches customer sample)
- **Total records**: As configured in NUM_RECORDS
- **Composite partitioning**: `run` √ó `hash_bucket` for optimal performance

### üóÇÔ∏è Composite Partition Strategy:
- **Logical partition**: `run` (4 values: RUN2, RUN4, MITIGATIONRESULT, RUN1)
- **Physical partition**: `hash_bucket` (50 buckets based on _id hash)
- **Total partitions**: 4 √ó 50 = **200 physical partitions**
- **Benefits**:
  - ‚úÖ Avoids 100GB+ partition bottleneck (was causing 3.5hr writes)
  - ‚úÖ Each partition ~500MB-1GB (optimal for Spark parallelism)
  - ‚úÖ Supports query pruning by `run` (95% of queries)
  - ‚úÖ Enables 200-way parallel reads/writes vs. 4-way
  - ‚úÖ Scales to 37M records without data skew

### üì¶ Document Components:
- **Top-level fields**: _id, serviceId, timestamps, version
- **fields.run**: RUN2, RUN4, MITIGATIONRESULT (weighted distribution)
- **fields.channel**: OCEDIRECT, RETAIL, CARE, DIGITAL
- **fields.shippingAddress.state**: All 51 US states (TX, CA weighted higher)
- **fields.creditStatus**: APPROVED, DECLINED, PENDING
- **wpp**: ~20KB nested JSON structure
- **losgs**: ~40-50KB array of nested objects
- **neustarDigitalIdentityRisk**: ~15-20KB nested JSON

### üéØ Query Performance:
- **Partition pruning**: Queries filtering by `run` only read relevant logical partitions
- **Parallel execution**: 200 tasks instead of 4 = 50x more parallelism
- **MongoDB write**: Each Spark task writes ~5,000 records (vs 9M per task before)
- **Estimated write time**: 30-60 min for 37M records (vs 3.5 hours)

### üìä Partition Distribution (37M records example):
- **Total partitions**: 200 physical
- **Avg records/partition**: 185,000 records
- **Avg size/partition**: ~21.5 GB ‚Üí **580 MB per partition** ‚úÖ
- **Target size**: 128-256 MB ideal, <1 GB acceptable ‚úÖ

### üìä Next Steps:
1. Verify document size (~116KB)
2. Check composite partition distribution (run √ó hash_bucket)
3. Validate avg partition size (~500MB-1GB per partition)
4. Export to MongoDB/DocumentDB for performance testing
5. Run queries from mongodb_avetrack_queries.json
6. Create indexes from mongodb_avetrack_indexes.json