## 1. Read data

In [95]:
business_df = spark.read.parquet("gs://msca-bdp-student-gcs/Group_5_final_project/store_df/")
business_df.show(3)

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+--------------+-----+--------------------+-------------------+--------------------+---------------+--------------------+---------------+-----------------+--------------------+----------------------+-----------------+----------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-----+--------------+---------+---------------+------------------------+----------------+-------------+-----------------------+------------------+------------+
|             gmap_id|             address|avg_rating|            category|         description|               hours|          latitude|         longitude|          store_name|num_of_reviews|price|    relative_results|              state|  MISC_Accessibility|MISC_Acti

In [126]:
biz_closure_prob_df = spark.read.parquet("gs://msca-bdp-student-gcs/Group_5_final_project/biz_closure_prob_df/")

biz_closure_prob_df.show(3)

                                                                                

+--------------------+--------------------+----------+--------------------+
|             gmap_id|          store_name|prediction|        closure_prob|
+--------------------+--------------------+----------+--------------------+
|0x89e85a90fa80ea7...|            Checkers|       0.0|0.007217116418191323|
|0x89c2c219189e518...|   Momo Asian Fusion|       0.0|0.017988535282472868|
|0x8085b6a8ebae49f...|Rosso Pizzeria & ...|       1.0|  0.9591585341045397|
+--------------------+--------------------+----------+--------------------+
only showing top 3 rows



In [131]:
from pyspark.sql.functions import col

# Select only the columns we want to join
biz_prob_subset = biz_closure_prob_df.select("gmap_id", "prediction", "closure_prob")

# Left join with original business DataFrame
business_with_prob_df = business_df_clean.join(
    biz_prob_subset,
    on="gmap_id",
    how="left"
)

# Preview the result
business_with_prob_df.show(3, truncate=False)


                                                                                

+-------------------------------------+------------------------------------------------------------------------+----------+----------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+------------------+-------------------------------+--------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------------------------------------------------------------------------------------------------------------------------------+----------------------------+---------------+-----------------+------

[Stage 6364:>                                                     (0 + 23) / 23]

In [132]:
from pyspark.sql import functions as F

# =========================
# Parameters for shrinkage
# =========================
k = 10  # pseudo-count (adjust this to control shrinkage strength)

# global mean of predicted sentiment across all stores
global_mean = business_with_prob_df.agg(F.mean("avg_predicted_sentiment")).collect()[0][0]

# =========================
# Compute shrunk sentiment
# =========================
business_with_prob_df = business_with_prob_df.withColumn(
    "shrunk_sentiment",
    (F.col("avg_predicted_sentiment") * F.col("review_count") + global_mean * k) /
    (F.col("review_count") + k)
)

# =========================
# Show results
# =========================
business_with_prob_df.select(
    "store_name",
    "avg_predicted_sentiment",
    "review_count",
    "shrunk_sentiment",
    "avg_rating"
).show(10, truncate=False)


                                                                                

+-------------------------------+-----------------------+------------+------------------+----------+
|store_name                     |avg_predicted_sentiment|review_count|shrunk_sentiment  |avg_rating|
+-------------------------------+-----------------------+------------+------------------+----------+
|Checkers                       |3.1702048057536985     |11          |3.6295623775899735|3.5       |
|Momo Asian Fusion              |4.58498825475099       |5           |4.28489988932358  |4.6       |
|Rosso Pizzeria & Mozzarella Bar|3.864661623144518      |5           |4.044791012121423 |4.2       |
|Junior's Restaurant & Bakery   |4.290628785509187      |15          |4.228319553949462 |4.4       |
|Woori Village                  |3.1132834520223933     |10          |3.624069579316134 |3.5       |
|The Flame Broiler              |4.007555898949703      |5           |4.092422437389818 |3.8       |
|200 Fifth                      |4.116263523339339      |14          |4.124010266368729 |4.

[Stage 6364:>                                                     (0 + 23) / 23]

## 2. Checking missing value

In [133]:
from pyspark.sql.functions import col, sum as spark_sum, lit
from pyspark.sql import Row

# Step 1: compute missing counts (one row)
missing_df = business_with_prob_df.select([
    spark_sum(col(c).isNull().cast("int")).alias(c)
    for c in business_with_prob_df.columns
])

# Step 2: collect as Python dict
missing_dict = missing_df.first().asDict()

# Step 3: convert to list of Rows
rows = [Row(column=col_name, missing_count=missing_dict[col_name])
        for col_name in missing_dict]

# Step 4: create a transposed Spark DataFrame
transposed_missing_df = spark.createDataFrame(rows)

# Step 5: show results
transposed_missing_df.show(50, truncate=False)

                                                                                

+---------------------------------------------------------+-------------+
|column                                                   |missing_count|
+---------------------------------------------------------+-------------+
|gmap_id                                                  |0            |
|address                                                  |0            |
|avg_rating                                               |0            |
|category                                                 |0            |
|description                                              |72284        |
|hours                                                    |8333         |
|latitude                                                 |0            |
|longitude                                                |0            |
|store_name                                               |0            |
|num_of_reviews                                           |0            |
|price                                

[Stage 6364:>                                                     (0 + 23) / 23]

In [134]:
cols_to_drop = [
    "MISC_Activities", "MISC_From_the_business", "MISC_Getting_here",
    "MISC_Health_and_safety", "MISC_Lodging_options", "MISC_Recycling"
    "review_count", 
]

business_with_prob_df_clean = business_with_prob_df.drop(*cols_to_drop)

In [135]:
from pyspark.sql.types import DoubleType

business_with_prob_df_clean = business_with_prob_df_clean.withColumn("irs_estimated_population", col("irs_estimated_population").cast(DoubleType()))

#### One-hot encoding top 3 categories for each categorical column

In [136]:
from pyspark.sql.functions import explode, col, when, array_contains

misc_cols = [
    "MISC_Accessibility", "MISC_Amenities", "MISC_Atmosphere",
    "MISC_Crowd", "MISC_Dining_options", "MISC_Offerings",
    "MISC_Payments", "MISC_Planning", "MISC_Popular_for",
    "MISC_Service_options", "MISC_Highlights"
]

for c in misc_cols:
    # Explode the array to count frequency
    top_items = (business_with_prob_df_clean
                 .select(explode(col(c)).alias("item"))
                 .groupBy("item")
                 .count()
                 .orderBy(col("count").desc())
                 .limit(3)
                 .collect())
    
    top_items_list = [row["item"] for row in top_items if row["item"] is not None]

    # Create one-hot columns for top 3
    for item in top_items_list:
        col_name = f"{c}_{item.replace(' ', '_')}_flag"
        business_with_prob_df_clean = business_with_prob_df_clean.withColumn(
            col_name,
            when(array_contains(col(c), item), 1).otherwise(0)
        )

                                                                                

In [138]:
business_with_prob_df_clean.show(3)

                                                                                

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+--------------+-----+--------------------+-------------------+--------------------+--------------------+---------------+-----------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-----+--------------+---------+---------------+------------------------+----------------+-------------+-----------------------+------------------+------------+------------------+------------------------------------------------------+------------------------------------------------------+---------------------------------------------------------+---------------------------------+-------------------------------+----------------------------+---------------------------+-------------------------+-----------------------------+----

[Stage 6364:>                                                     (0 + 23) / 23]

#### graph algorithm

In [141]:
# graph_pipeline.py
# PySpark + GraphFrames pipeline implementing:
# 1) Neighborhood Influence Graph (business <-> zip)
# 2) Business Similarity Graph + Risk Propagation (similarity-based edges)

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.feature import VectorAssembler, MinHashLSH
from graphframes import GraphFrame

def create_spark(app_name="graph_pipeline"):
    spark = SparkSession.builder \
        .appName(app_name) \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
        .getOrCreate()
    return spark

def load_data(spark, input_path):
    # Try Parquet first (preferred). Modify if CSV.
    df = spark.read.parquet(input_path)
    return df


In [142]:
from pyspark.sql import functions as F
from graphframes import GraphFrame

def build_neighborhood_graph(biz_df, geo_col="zip_city", prefix="ZIP", closure_col="closure_prob"):
    """
    Builds a bipartite graph: business nodes and neighborhood/city nodes.
    Incorporates business closure probability in edge weights and node attributes.
    Runs PageRank and community detection on neighborhood/city nodes.

    Parameters:
        biz_df : pyspark.sql.DataFrame
            Business DataFrame with at least gmap_id, geo_col, avg_rating, review_count, closure_prob
        geo_col : str
            Column to use as neighborhood/city identifier (default "zip_city")
        prefix : str
            Prefix for the geo nodes to avoid ID collisions (default "ZIP")
        closure_col : str
            Column containing closure probability (default "closure_prob")

    Returns:
        gf : GraphFrame
            The bipartite graph of businesses ↔ neighborhoods
        ranked_geo : DataFrame
            Neighborhood/city nodes with PageRank scores and community labels
        biz_with_geo_features : DataFrame
            Original business DataFrame joined with neighborhood PageRank & community
    """

    # ---------------------------
    # 1. Prepare business nodes
    # ---------------------------
    biz_nodes = biz_df.select(
        F.col("gmap_id").alias("id"),
        "store_name",
        "avg_rating",
        "review_count",
        F.col(closure_col),
        "permanent_closed"
    ).withColumn("type", F.lit("business"))

    # ---------------------------
    # 2. Prepare geo nodes
    # ---------------------------
    geo_nodes = (
        biz_df.select(F.col(geo_col).alias("geo_name"))
        .distinct()
        .withColumn("id", F.concat(F.lit(f"{prefix}_"), F.col("geo_name").cast("string")))
        .withColumn("type", F.lit("geo"))
    )

    # ---------------------------
    # 3. Create edges: business -> geo
    # ---------------------------
    # Weight incorporates rating, review count, and closure probability (risk-aware)
    edges = biz_df.select(
        F.col("gmap_id").alias("src"),
        F.concat(F.lit(f"{prefix}_"), F.col(geo_col).cast("string")).alias("dst"),
        ((F.col("avg_rating") * F.sqrt(F.col("review_count") + 1)) * (1 - F.col(closure_col))).alias("weight")
    )

    # ---------------------------
    # 4. Combine nodes
    # ---------------------------
    nodes = biz_nodes.unionByName(geo_nodes, allowMissingColumns=True)

    # ---------------------------
    # 5. Build GraphFrame
    # ---------------------------
    gf = GraphFrame(nodes, edges)

    # ---------------------------
    # 6. PageRank on the graph
    # ---------------------------
    pr_results = gf.pageRank(resetProbability=0.15, maxIter=10)
    vertices = pr_results.vertices

    # Extract only geo nodes with PageRank
    geo_scores = vertices.filter(F.col("type") == "geo") \
        .select("id", "geo_name", F.col("pagerank").alias("pagerank"))

    # ---------------------------
    # 7. Community detection (Label Propagation)
    # ---------------------------
    communities = gf.labelPropagation(maxIter=5).select("id", "label")
    geo_communities = communities.join(geo_scores, on="id", how="inner") \
        .select("id", "geo_name", "pagerank", F.col("label").alias("community_label"))

    # ---------------------------
    # 8. Join geo features back to businesses
    # ---------------------------
    biz_with_geo_features = biz_df.join(
        geo_communities,
        on=F.concat(F.lit(f"{prefix}_"), F.col(geo_col).cast("string")) == geo_communities.id,
        how="left"
    ).drop("id")

    # ---------------------------
    # 9. Rank geo nodes by PageRank
    # ---------------------------
    ranked_geo = geo_communities.orderBy(F.desc("pagerank"))

    return gf, ranked_geo, biz_with_geo_features


#### Community Detection & Influence Ranking of Neighborhoods Using Business Performance Graphs

#### this answer following questions:
* Which neighborhoods support the most successful businesses?
*How do neighborhoods cluster into business-performance communities?
*Does a business’s location contribute to its chance of success or closure?

In [143]:
gf_city, ranked_city, biz_features_city = build_neighborhood_graph(business_with_prob_df_clean, geo_col="zip_city", prefix="CITY")
ranked_city.show(50, truncate=False)



+---------------------+----------------+------------------+---------------+
|id                   |geo_name        |pagerank          |community_label|
+---------------------+----------------+------------------+---------------+
|CITY_New York        |New York        |4784.989079045451 |1279900254801  |
|CITY_Chicago         |Chicago         |3672.3060271792574|1451698946523  |
|CITY_Los Angeles     |Los Angeles     |3232.6018261506124|1675037245711  |
|CITY_Brooklyn        |Brooklyn        |2826.6854216220618|1168231105131  |
|CITY_San Francisco   |San Francisco   |1945.425633455175 |798863917107   |
|CITY_San Diego       |San Diego       |1811.6629870369875|1468878815684  |
|CITY_San Jose        |San Jose        |969.7451536989774 |816043786295   |
|CITY_Bronx           |Bronx           |941.9743620550626 |506806141709   |
|CITY_Sacramento      |Sacramento      |867.9189176712898 |137438953641   |
|CITY_Buffalo         |Buffalo         |627.2387234240282 |300647711549   |
|CITY_Fresno

[Stage 6364:>                                                     (0 + 23) / 23]

In [144]:
# By ZIP
gf_zip, ranked_zip, biz_features_zip = build_neighborhood_graph(business_with_prob_df_clean, geo_col="zip", prefix="ZIP")
ranked_zip.show(50, truncate=False)

                                                                                0]

+---------+--------+------------------+---------------+
|id       |geo_name|pagerank          |community_label|
+---------+--------+------------------+---------------+
|ZIP_10003|10003   |353.02762260697824|309237645934   |
|ZIP_10019|10019   |292.19093487612935|1211180778081  |
|ZIP_92101|92101   |280.1164777692433 |1675037245797  |
|ZIP_10036|10036   |254.10995476979647|360777253490   |
|ZIP_10002|10002   |243.8931064485852 |1159641170542  |
|ZIP_10013|10013   |235.06946471663005|1168231105136  |
|ZIP_10011|10011   |230.88984494886174|343597384310   |
|ZIP_10016|10016   |220.2085944312318 |1357209666140  |
|ZIP_10014|10014   |210.9205505028579 |1159641170549  |
|ZIP_10001|10001   |208.59853952076443|1133871366760  |
|ZIP_10012|10012   |203.9545175565775 |790273983127   |
|ZIP_94110|94110   |203.49011536015882|927712936017   |
|ZIP_10022|10022   |199.77489778880926|1202590843491  |
|ZIP_11211|11211   |194.66647362820385|1168231105131  |
|ZIP_60614|60614   |186.30723409266773|131425999

In [None]:
Finding Similar Businesses

#### Answer following questions
* For a new business idea, what existing businesses are close to it?
* Are there many similar restaurants in this area that could cannibalize business?
* What is the competitive pressure index for this business? (Sum of similarity scores × local density.)

In [145]:
# ---------------------------
# 2) Business Similarity Graph + Risk Propagation
# ---------------------------

# ---------------------------
# Helper to extract binary feature columns from your schema automatically
# ---------------------------
def detect_binary_columns(biz_df, prefix_list=None):
    """
    Returns list of columns that are binary flags.
    If you already have columns like MISC_Atmosphere_Cozy_flag, this function gathers them.
    You can also pass prefix_list to restrict to columns that start with those prefixes.
    """
    all_cols = biz_df.columns
    if prefix_list is None:
        # common pattern in your dataset: columns end with '_flag'
        binary_cols = [c for c in all_cols if c.endswith("_flag")]
    else:
        binary_cols = [c for c in all_cols if any(c.startswith(p) for p in prefix_list)]
    return binary_cols

def prepare_binary_features(biz_df, binary_cols):
    """
    Ensures binary flags are integers (0/1) and returns assembled vector column 'feature_vec'.
    """
    df = biz_df
    for c in binary_cols:
        df = df.withColumn(c, F.when(F.col(c).isNull(), 0).otherwise(F.col(c).cast(IntegerType())))
    assembler = VectorAssembler(inputCols=binary_cols, outputCol="feature_vec", handleInvalid="keep")
    df = assembler.transform(df)
    return df

def build_similarity_edges(spark, feature_df, key_col="gmap_id", threshold=0.5):
    """
    Uses MinHashLSH for approximate nearest neighbors on binary features (Jaccard-like).
    Produces edge DataFrame with columns: src, dst, similarity
    """
    mh = MinHashLSH(inputCol="feature_vec", outputCol="hashes", numHashTables=5)
    model = mh.fit(feature_df)
    # approxSimilarityJoin returns distance; for MinHashLSH distance ~ 1 - JaccardSimilarity
    joined = model.approxSimilarityJoin(feature_df.select(key_col, "feature_vec"),
                                        feature_df.select(key_col, "feature_vec"),
                                        threshold=1.0,  # start with wide threshold and then filter
                                        distCol="dist")
    # Filter self-joins and construct similarity score
    edges = joined \
        .filter(F.col("datasetA."+key_col) != F.col("datasetB."+key_col)) \
        .select(
            F.col("datasetA."+key_col).alias("src"),
            F.col("datasetB."+key_col).alias("dst"),
            (F.lit(1.0) - F.col("dist")).alias("similarity")  # approximate Jaccard
        ) \
        .filter(F.col("similarity") >= threshold)

    # Optionally symmetrize: keep src<->dst only once (enforce ordering)
    edges = edges.withColumn("src_ord", F.when(F.col("src") < F.col("dst"), F.col("src")).otherwise(F.col("dst"))) \
                 .withColumn("dst_ord", F.when(F.col("src") < F.col("dst"), F.col("dst")).otherwise(F.col("src"))) \
                 .select(F.col("src_ord").alias("src"), F.col("dst_ord").alias("dst"), "similarity") \
                 .dropDuplicates(["src", "dst"])
    return edges

[Stage 6364:>                                                     (0 + 23) / 23]

In [108]:
business_with_prob_df_clean_sampled = business_with_prob_df_clean.sample(fraction = 1.0)

In [109]:
binary_cols = detect_binary_columns(business_with_prob_df_clean_sampled)
print(f"Detected {len(binary_cols)} binary feature columns; sample: {binary_cols[:10]}")

Detected 33 binary feature columns; sample: ['MISC_Accessibility_Wheelchair_accessible_entrance_flag', 'MISC_Accessibility_Wheelchair_accessible_restroom_flag', 'MISC_Accessibility_Wheelchair_accessible_parking_lot_flag', 'MISC_Amenities_Good_for_kids_flag', 'MISC_Amenities_High_chairs_flag', 'MISC_Amenities_Restroom_flag', 'MISC_Atmosphere_Casual_flag', 'MISC_Atmosphere_Cozy_flag', 'MISC_Atmosphere_Romantic_flag', 'MISC_Crowd_Groups_flag']


In [110]:
feature_df = prepare_binary_features(business_with_prob_df_clean_sampled, binary_cols).select("gmap_id", "feature_vec", *binary_cols)
feature_df.show(3, truncate=False)

+-------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------+------------------------------------------------------+---------------------------------------------------------+---------------------------------+-------------------------------+----------------------------+---------------------------+-------------------------+-----------------------------+----------------------+------------------------+-------------------------------+--------------------------------+------------------------------+-------------------------------+--------------------------------+------------------------------+--------------------------+--------------------------------------+------------------------------+-------------------------------+---------------------------------------+---------------------------------+----------------------------------

In [111]:
feature_df.count()

169478

In [114]:
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import array_max

# Convert vector to array and check that max value > 0
feature_df = feature_df.filter(array_max(vector_to_array(col("feature_vec"))) > 0)


In [119]:
# Sample 5% of the data
feature_df_sample = feature_df.sample(fraction=0.05, seed=42)
feature_df_sample.count()

                                                                                

8369

[Stage 6364:>             (0 + 23) / 23][Stage 6367:===>           (5 + 9) / 23]

In [120]:
SIMILARITY_THRESHOLD = 0.45
edges = build_similarity_edges(spark, feature_df_sample, key_col="gmap_id", threshold=SIMILARITY_THRESHOLD)
edges.show(3, truncate=False)

[Stage 6364:>                                                     (0 + 23) / 23]

+-------------------------------------+-------------------------------------+------------------+
|src                                  |dst                                  |similarity        |
+-------------------------------------+-------------------------------------+------------------+
|0x54d059fccd33b1ab:0x7b0becffed684fc4|0x80db7e3b80f55c6f:0x7bdc233815fe8e3c|0.5517241379310345|
|0x80816d4b494e96cb:0x8591118881d10680|0x89c25bfc4ad8e6e3:0x4a3873012de5123e|0.5862068965517241|
|0x80853d3522ebaaff:0x5aff8d5bbcf0bc60|0x809ad6fb3b363d77:0xfd4b858489b6835b|0.5217391304347826|
+-------------------------------------+-------------------------------------+------------------+
only showing top 3 rows



[Stage 6364:>                                                     (0 + 23) / 23]

In [125]:
edges_filter = edges.filter(F.col("similarity") > 0.9)
edges_filter.show(5)

[Stage 6364:>                                                     (0 + 23) / 23]

+--------------------+--------------------+------------------+
|                 src|                 dst|        similarity|
+--------------------+--------------------+------------------+
|0x80858739a8471bd...|0x89d0c516cbe4801...|0.9090909090909091|
|0x808f7e47342f152...|0x880e35740b70dad...|0.9565217391304348|
|0x89dd31606c0cf56...|0x89de05433286977...|0.9523809523809523|
|0x8085808986b060e...|0x808fe7bd446f080...|0.9166666666666666|
|0x80945c65af7c72e...|0x89c2457d39c17de...|0.9333333333333333|
+--------------------+--------------------+------------------+
only showing top 5 rows



[Stage 6364:>                                                     (0 + 23) / 23]

In [1]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from graphframes import GraphFrame
import pandas as pd