# Cost of Travel Index - Regional Attractions (Census Divisions)

**Purpose**: Calculate median per-visit attraction costs at Census Division level using Spark DataFrames

**Methodology**:
- Individual transactions analyzed (no aggregation)
- Geographic Level: 9 Census Divisions (New England, Middle Atlantic, etc.)
- Outlier removal: P5/P98 per division
- MCC Codes: 12 codes covering amusement parks, aquariums, recreation services, bowling alleys, theaters, etc.

In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from datetime import date

In [None]:
# CONFIGURATION: Update these values as needed
START_DATE = date(2025, 7, 1)
END_DATE = date(2025, 7, 31)
DISTANCE_THRESHOLD = 60  # Miles for visitor classification

# DATA QUALITY THRESHOLDS (95% CI, 5% MOE)
MIN_SAMPLE_EXCLUDE = 600  # Exclude geography if below this
MIN_SAMPLE_ROLLING = 2000  # Use 3-month rolling if below this

# GCS bucket URIs
MERCHANT_TOURISM_PATH = "gs://spend-codecs-prod/enrichment/merchant_tourism"
TRANSACTION_TOURISM_PATH = "gs://spend-codecs-prod/enrichment/transaction_tourism"
CENSUS_BRIDGE_TABLE_PATH = "gs://prj-sandbox-i7sk/jk_testing/us_census_region_divisions_bridge_table"

# Attraction MCC codes
ATTRACTION_MCC_CODES = [
    '7996',  # Amusement Parks, Carnivals, Circuses, Fortune Tellers
    '7995',  # Betting/Casino Gambling
    '7998',  # Aquariums, Seaquariums, Dolphinariums
    '7991',  # Tourist Attractions and Exhibits
    '7933',  # Bowling Alleys
    '7832',  # Motion Picture Theaters
    '7911',  # Dance Halls, Studios, and Schools
    '7929',  # Bands, Orchestras, and Miscellaneous Entertainers
    '7922',  # Theatrical Producers (Except Motion Pictures), Ticket Agencies
    '7932',  # Billiard and Pool Establishments
    '7994',  # Video Game Arcades/Establishments
    '7999'   # Recreation Services (Not Elsewhere Classified)
]

In [None]:
# Read data from GCS buckets
merchant_df = spark.read.parquet(MERCHANT_TOURISM_PATH)
transaction_df = spark.read.parquet(TRANSACTION_TOURISM_PATH)
census_bridge_df = spark.read.parquet(CENSUS_BRIDGE_TABLE_PATH)

In [None]:
# Filter and join to get attraction transactions with census division mapping
attraction_transactions = (
    transaction_df
    .filter(
        (F.col("trans_date") >= F.lit(START_DATE)) &
        (F.col("trans_date") <= F.lit(END_DATE)) &
        (F.col("trans_distance") > DISTANCE_THRESHOLD)
    )
    .join(
        merchant_df.filter(
            (F.col("merch_type") == 0) &  # Physical locations only
            (F.col("merch_country") == "US") &
            (F.col("mcc").isin(ATTRACTION_MCC_CODES))
        ),
        on=["mtid", "ref_date"],
        how="inner"
    )
    .join(
        census_bridge_df,
        F.col("merch_state") == F.col("abbreviation"),
        how="inner"
    )
    .select(
        F.col("census_region_division"),
        F.trunc(F.col("trans_date"), "month").alias("month_date"),
        F.col("trans_amount"),
        F.col("membccid")
    )
)

print(f"Total attraction transactions: {attraction_transactions.count():,}")

In [None]:
# Calculate P5 and P98 thresholds per division per month for outlier removal
division_thresholds = (
    attraction_transactions
    .groupBy("census_region_division", "month_date")
    .agg(
        F.expr("percentile_approx(trans_amount, 0.05)").alias("p5"),
        F.expr("percentile_approx(trans_amount, 0.98)").alias("p98")
    )
)

In [None]:
# Filter out outliers (transactions below P5 or above P98)
attractions_no_outliers = (
    attraction_transactions
    .join(
        division_thresholds,
        on=["census_region_division", "month_date"],
        how="inner"
    )
    .filter(
        (F.col("trans_amount") >= F.col("p5")) &
        (F.col("trans_amount") <= F.col("p98"))
    )
)

In [None]:
# Calculate median attraction cost with data quality flags
attraction_regional_results = (
    attractions_no_outliers
    .groupBy("census_region_division", "month_date")
    .agg(
        F.expr("percentile_approx(trans_amount, 0.50)").alias("attraction_cost"),
        F.count("*").alias("transaction_count"),
        F.countDistinct("membccid").alias("unique_visitors")
    )
    .withColumn(
        "data_quality_flag",
        F.when(F.col("unique_visitors") < MIN_SAMPLE_EXCLUDE, "EXCLUDE")
        .when(F.col("unique_visitors") < MIN_SAMPLE_ROLLING, "ROLLING_3MO")
        .otherwise("SINGLE_MONTH")
    )
    .withColumn("period_start", F.lit(START_DATE))
    .withColumn("period_end", F.lit(END_DATE))
    .withColumn("calculation_timestamp", F.current_timestamp())
    .orderBy(F.col("month_date"), F.col("census_region_division"))
)

In [None]:
# Display results
attraction_regional_results.show(50, truncate=False)

In [None]:
# Export to CSV in GCS bucket with date range in filename
output_filename = f"gs://cost_of_travel_index_staging/results/attraction_regional_results_{START_DATE.strftime('%Y%m%d')}_{END_DATE.strftime('%Y%m%d')}.csv"
attraction_regional_results.toPandas().to_csv(output_filename, index=False)
print(f"Results saved to: {output_filename}")