## 1. Read Store Data

In [2]:
business_df = spark.read.parquet("gs://msca-bdp-student-gcs/Group_5_final_project/store_df/")
business_df.show(3)

25/12/03 23:48:28 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+--------------+-----+--------------------+-------------------+--------------------+---------------+--------------------+---------------+-----------------+--------------------+----------------------+-----------------+----------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-----+--------------+---------+---------------+------------------------+----------------+-------------+-----------------------+------------------+------------+
|             gmap_id|             address|avg_rating|            category|         description|               hours|          latitude|         longitude|          store_name|num_of_reviews|price|    relative_results|              state|  MISC_Accessibility|MISC_Acti

                                                                                

In [3]:
print(business_df.count())



169478


                                                                                

#### Apply Bayesian Shrinkage to Store Sentiment Scores

In [4]:
from pyspark.sql import functions as F

# =========================
# Parameters for shrinkage
# =========================
k = 10  # pseudo-count (adjust this to control shrinkage strength)

# global mean of predicted sentiment across all stores
global_mean = business_df.agg(F.mean("avg_predicted_sentiment")).collect()[0][0]

# =========================
# Compute shrunk sentiment
# =========================
business_df = business_df.withColumn(
    "shrunk_sentiment",
    (F.col("avg_predicted_sentiment") * F.col("review_count") + global_mean * k) /
    (F.col("review_count") + k)
)

# =========================
# Show results
# =========================
business_df.select(
    "store_name",
    "avg_predicted_sentiment",
    "review_count",
    "shrunk_sentiment",
    "avg_rating"
).show(10, truncate=False)


                                                                                

+-------------------------------+-----------------------+------------+------------------+----------+
|store_name                     |avg_predicted_sentiment|review_count|shrunk_sentiment  |avg_rating|
+-------------------------------+-----------------------+------------+------------------+----------+
|Checkers                       |3.1702048057536985     |11          |3.629562377589972 |3.5       |
|Momo Asian Fusion              |4.58498825475099       |5           |4.284899889323579 |4.6       |
|Rosso Pizzeria & Mozzarella Bar|3.864661623144518      |5           |4.044791012121421 |4.2       |
|Junior's Restaurant & Bakery   |4.290628785509187      |15          |4.228319553949461 |4.4       |
|Woori Village                  |3.1132834520223933     |10          |3.624069579316133 |3.5       |
|The Flame Broiler              |4.007555898949703      |5           |4.092422437389816 |3.8       |
|200 Fifth                      |4.116263523339339      |14          |4.124010266368728 |4.

#### Checking Missing Value

In [5]:
from pyspark.sql.functions import col, sum as spark_sum, lit
from pyspark.sql import Row

# compute missing counts (one row)
missing_df = business_df.select([
    spark_sum(col(c).isNull().cast("int")).alias(c)
    for c in business_df.columns
])

# collect as Python dict
missing_dict = missing_df.first().asDict()

# convert to list of Rows
rows = [Row(column=col_name, missing_count=missing_dict[col_name])
        for col_name in missing_dict]

# create a transposed Spark DataFrame
transposed_missing_df = spark.createDataFrame(rows)

# show results
transposed_missing_df.show(50, truncate=False)

[Stage 12:>                                                         (0 + 1) / 1]

+------------------------+-------------+
|column                  |missing_count|
+------------------------+-------------+
|gmap_id                 |0            |
|address                 |0            |
|avg_rating              |0            |
|category                |0            |
|description             |72284        |
|hours                   |8333         |
|latitude                |0            |
|longitude               |0            |
|store_name              |0            |
|num_of_reviews          |0            |
|price                   |47163        |
|relative_results        |12725        |
|state                   |46186        |
|MISC_Accessibility      |33726        |
|MISC_Activities         |169478       |
|MISC_Amenities          |26983        |
|MISC_Atmosphere         |38927        |
|MISC_Crowd              |52454        |
|MISC_Dining_options     |50973        |
|MISC_From_the_business  |163447       |
|MISC_Getting_here       |169478       |
|MISC_Health_and

                                                                                

In [6]:
cols_to_drop = [
    "MISC_Activities", "MISC_From_the_business", "MISC_Getting_here",
    "MISC_Health_and_safety", "MISC_Lodging_options", "MISC_Recycling"
    "review_count", 
]

business_df_clean = business_df.drop(*cols_to_drop)

In [7]:
from pyspark.sql.types import DoubleType

business_df_clean = business_df_clean.withColumn("irs_estimated_population", col("irs_estimated_population").cast(DoubleType()))

#### One-Hot Encode Top 3 Categories for Each Categorical Column

In [8]:
from pyspark.sql.functions import explode, col, when, array_contains

misc_cols = [
    "MISC_Accessibility", "MISC_Amenities", "MISC_Atmosphere",
    "MISC_Crowd", "MISC_Dining_options", "MISC_Offerings",
    "MISC_Payments", "MISC_Planning", "MISC_Popular_for",
    "MISC_Service_options", "MISC_Highlights"
]

for c in misc_cols:
    # Explode the array to count frequency
    top_items = (business_df_clean
                 .select(explode(col(c)).alias("item"))
                 .groupBy("item")
                 .count()
                 .orderBy(col("count").desc())
                 .limit(3)
                 .collect())
    
    top_items_list = [row["item"] for row in top_items if row["item"] is not None]

    # Create one-hot columns for top 3
    for item in top_items_list:
        col_name = f"{c}_{item.replace(' ', '_')}_flag"
        business_df_clean = business_df_clean.withColumn(
            col_name,
            when(array_contains(col(c), item), 1).otherwise(0)
        )

                                                                                

In [9]:
business_df_clean.show(3)

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+--------------+-----+--------------------+-------------------+--------------------+--------------------+---------------+-----------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-----+--------------+---------+---------------+------------------------+----------------+-------------+-----------------------+------------------+------------+-----------------+------------------------------------------------------+------------------------------------------------------+---------------------------------------------------------+---------------------------------+-------------------------------+----------------------------+---------------------------+-------------------------+-----------------------------+-----

## 2. Compare Model Performance for Predicting Closures

In [11]:
from pyspark.sql.functions import col, when, isnan
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# -----------------------------------------------------
# 1. Feature Columns
# -----------------------------------------------------
feature_cols = [
    "avg_rating", 
    "price_numeric", 
    "num_of_reviews", 
    "irs_estimated_population",
    "shrunk_sentiment", 
    "sentiment_std",
]

one_hot_cols = [c for c in business_df_clean.columns if c.endswith("_flag")] 
feature_cols += one_hot_cols

# -----------------------------------------------------
# 2. Check for missing columns (safety)
# -----------------------------------------------------
missing = [c for c in feature_cols if c not in business_df_clean.columns]
if missing:
    raise Exception(f"Missing required feature columns: {missing}")

# -----------------------------------------------------
# 3. Clean Null values
# -----------------------------------------------------
for c in feature_cols:
    business_df_clean = business_df_clean.withColumn(
        c, when(isnan(col(c)) | col(c).isNull(), 0).otherwise(col(c))
    )

# -----------------------------------------------------
# 4. Compute class weights
# -----------------------------------------------------
counts = business_df_clean.groupBy("permanent_closed").count().collect()
count_0 = next(r['count'] for r in counts if r['permanent_closed'] == 0)
count_1 = next(r['count'] for r in counts if r['permanent_closed'] == 1)

majority = max(count_0, count_1)
minority = min(count_0, count_1)

weight_for_0 = majority / count_0
weight_for_1 = majority / count_1

business_df_clean = business_df_clean.withColumn(
    "classWeight",
    when(col("permanent_closed") == 0, weight_for_0).otherwise(weight_for_1)
)

print("Class Weights:", weight_for_0, weight_for_1)
print("Class counts:", count_0, count_1)

# -----------------------------------------------------
# 5. Train/test split
# -----------------------------------------------------
train_df, test_df = business_df_clean.randomSplit([0.8, 0.2], seed=42)

# -----------------------------------------------------
# 6. Assemble features
# -----------------------------------------------------
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="keep"   # <---- prevents schema errors
)

# -----------------------------------------------------
# 7. Evaluators
# -----------------------------------------------------
evaluator_auc = BinaryClassificationEvaluator(
    labelCol="permanent_closed", metricName="areaUnderROC")
evaluator_acc = MulticlassClassificationEvaluator(
    labelCol="permanent_closed", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="permanent_closed", metricName="f1")
evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="permanent_closed", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="permanent_closed", metricName="weightedRecall")

def evaluate_model(predictions, name="Model"):
    auc = evaluator_auc.evaluate(predictions)
    acc = evaluator_acc.evaluate(predictions)
    f1 = evaluator_f1.evaluate(predictions)
    precision = evaluator_precision.evaluate(predictions)
    recall = evaluator_recall.evaluate(predictions)

    print(f"\n===== {name} Evaluation =====")
    print(f"AUC:       {auc:.4f}")
    print(f"Accuracy:  {acc:.4f}")
    print(f"F1 Score:  {f1:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall:    {recall:.4f}")

                                                                                

Class Weights: 1.0 6.365726454865487
Class counts: 146469 23009


#### Logistic Regression (Class Weighting)

In [12]:
from pyspark.sql.functions import col, when
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.evaluation import MulticlassMetrics
import pandas as pd

# ============================================================
# 1. Handle class imbalance 
# ============================================================

# Count classes
counts = train_df.groupBy("permanent_closed").count().collect()
count_0 = [r["count"] for r in counts if r["permanent_closed"] == 0][0]
count_1 = [r["count"] for r in counts if r["permanent_closed"] == 1][0]

# Assign weights: majority = 1, minority = ratio
ratio = count_0 / count_1
train_df_balanced = train_df.withColumn(
    "weight",
    when(col("permanent_closed") == 1, ratio).otherwise(1.0)
)

print(f"Class 0: {count_0}, Class 1: {count_1}, Weight ratio: {ratio:.2f}")

# ============================================================
# 2. Logistic Regression with CV
# ============================================================

lr = LogisticRegression(
    labelCol="permanent_closed",
    featuresCol="features",
    weightCol="weight",
    maxIter=25
)

lr_pipeline = Pipeline(stages=[assembler, lr])

paramGrid_lr = (ParamGridBuilder()
    .addGrid(lr.regParam, [0.0, 0.01, 0.1])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
    .build()
)

cv_lr = CrossValidator(
    estimator=lr_pipeline,
    estimatorParamMaps=paramGrid_lr,
    evaluator=evaluator_auc,
    numFolds=3,
    parallelism=2
)

cv_model_lr = cv_lr.fit(train_df_balanced)
predictions_lr = cv_model_lr.transform(test_df)

evaluate_model(predictions_lr, "Logistic Regression (Weighted)")

# ============================================================
# 3. Confusion Matrix
# ============================================================

predictionAndLabels = predictions_lr.select("prediction", "permanent_closed") \
                                   .rdd.map(lambda r: (float(r["prediction"]), float(r["permanent_closed"])))

metrics = MulticlassMetrics(predictionAndLabels)
conf_matrix = metrics.confusionMatrix().toArray()

conf_df = pd.DataFrame(
    conf_matrix,
    index=["Actual 0", "Actual 1"],
    columns=["Predicted 0", "Predicted 1"]
)

print("\n===== Confusion Matrix =====")
print(conf_df)

# ============================================================
# 4. Feature Importance (LR coefficients)
# ============================================================

lr_model = cv_model_lr.bestModel.stages[-1]

feature_importance = pd.DataFrame({
    "feature": feature_cols,
    "coefficient": lr_model.coefficients
})

feature_importance["abs_coeff"] = feature_importance["coefficient"].abs()
feature_importance = feature_importance.sort_values("abs_coeff", ascending=False)

print("\n===== Logistic Regression Feature Importance =====")
print(feature_importance[["feature", "coefficient"]].to_string(index=False))

                                                                                

Class 0: 117438, Class 1: 18419, Weight ratio: 6.38


25/12/03 23:49:33 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
25/12/03 23:49:33 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                


===== Logistic Regression (Weighted) Evaluation =====
AUC:       0.9588
Accuracy:  0.8941
F1 Score:  0.9044
Precision: 0.9325
Recall:    0.8941





===== Confusion Matrix =====
          Predicted 0  Predicted 1
Actual 0      25793.0       3308.0
Actual 1        258.0       4307.0

===== Logistic Regression Feature Importance =====
                                                  feature   coefficient
                              MISC_Atmosphere_Casual_flag -1.494303e+00
                        MISC_Service_options_Takeout_flag -1.411024e+00
                        MISC_Amenities_Good_for_kids_flag  1.248397e+00
                        MISC_Popular_for_Solo_dining_flag -1.241674e+00
                           MISC_Offerings_Quick_bite_flag -1.228908e+00
                        MISC_Service_options_Dine-in_flag -1.206833e+00
                           MISC_Payments_Debit_cards_flag  1.159994e+00
       MISC_Planning_Dinner_reservations_recommended_flag -1.032567e+00
                       MISC_Service_options_Delivery_flag  9.973183e-01
                   MISC_Payments_NFC_mobile_payments_flag -9.904928e-01
                     

                                                                                

#### Random Forest (Class Weighting)

In [13]:
from pyspark.sql import SparkSession
import logging

# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Set Spark log level to WARN or ERROR (suppress INFO and repeated WARN)
spark.sparkContext.setLogLevel("ERROR")

# Suppress some specific loggers
log4jLogger = spark._jvm.org.apache.log4j
log4jLogger.LogManager.getLogger("org.apache.spark.scheduler.DAGScheduler").setLevel(log4jLogger.Level.ERROR)
log4jLogger.LogManager.getLogger("org.apache.spark.storage.BlockManager").setLevel(log4jLogger.Level.ERROR)

In [14]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import MulticlassMetrics
import pandas as pd

# ---------------------------------------------------------
# 1. Handle class imbalance
# ---------------------------------------------------------
rf = RandomForestClassifier(
    labelCol="permanent_closed",
    featuresCol="features",
    weightCol="weight"
)

rf_pipeline = Pipeline(stages=[assembler, rf])

paramGrid_rf = (ParamGridBuilder()
                .addGrid(rf.maxDepth, [5, 10])
                .addGrid(rf.numTrees, [50, 100])
                .build())

# ---------------------------------------------------------
# 2. Random Forest CV
# ---------------------------------------------------------
cv_rf = CrossValidator(
    estimator=rf_pipeline,
    estimatorParamMaps=paramGrid_rf,
    evaluator=evaluator_auc,
    numFolds=3,
    parallelism=2
)

cv_model_rf = cv_rf.fit(train_df_balanced)
predictions_rf = cv_model_rf.transform(test_df)

# ---------------------------------------------------------
# 3. Confusion Matrix
# ---------------------------------------------------------
evaluate_model(predictions_rf, "Random Forest (Weighted)")

predictionAndLabels = predictions_rf.select("prediction", "permanent_closed") \
                                   .rdd.map(lambda row: (float(row['prediction']), float(row['permanent_closed'])))

metrics = MulticlassMetrics(predictionAndLabels)
conf_matrix = metrics.confusionMatrix().toArray()

print("\n===== Random Forest Confusion Matrix =====")
conf_df = pd.DataFrame(conf_matrix, 
                       index=["Actual 0", "Actual 1"], 
                       columns=["Predicted 0", "Predicted 1"])
print(conf_df)


# ---------------------------------------------------------
# 4. Feature Importance
# ---------------------------------------------------------
rf_model = cv_model_rf.bestModel.stages[-1]

feature_importance = pd.DataFrame({
    "feature": feature_cols,
    "importance": rf_model.featureImportances.toArray()
}).sort_values("importance", ascending=False)

print("\n===== Random Forest Feature Importance =====")
print(feature_importance.to_string(index=False))


                                                                                


===== Random Forest (Weighted) Evaluation =====
AUC:       0.9714
Accuracy:  0.9080
F1 Score:  0.9166
Precision: 0.9417
Recall:    0.9080


[Stage 2587:===>                                                  (1 + 15) / 16]


===== Random Forest Confusion Matrix =====
          Predicted 0  Predicted 1
Actual 0      26122.0       2979.0
Actual 1        119.0       4446.0

===== Random Forest Feature Importance =====
                                                  feature  importance
                        MISC_Popular_for_Solo_dining_flag    0.160665
                              MISC_Atmosphere_Casual_flag    0.143360
                           MISC_Offerings_Quick_bite_flag    0.138450
                        MISC_Service_options_Takeout_flag    0.100326
                              MISC_Popular_for_Lunch_flag    0.058737
                                           num_of_reviews    0.058456
                             MISC_Popular_for_Dinner_flag    0.057672
                        MISC_Service_options_Dine-in_flag    0.054443
                                   MISC_Crowd_Groups_flag    0.034572
   MISC_Accessibility_Wheelchair_accessible_entrance_flag    0.031464
                                 MI

                                                                                

#### Gradient-Boosted Trees (Class Weighting)

In [15]:
import pandas as pd
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import MulticlassMetrics

# ---------------------------------------------------------
# 1. Handle class imbalance
# ---------------------------------------------------------
# Compute class weights 
counts = train_df.groupBy("permanent_closed").count().toPandas()
n0 = counts[counts["permanent_closed"] == 0]["count"].values[0]
n1 = counts[counts["permanent_closed"] == 1]["count"].values[0]

minority_weight = max(n0, n1) / min(n0, n1)

# Attach weight column
train_df = train_df.withColumn(
    "weight",
    (1.0 * (train_df.permanent_closed == 1).cast("int")) * minority_weight +
    (1.0 * (train_df.permanent_closed == 0).cast("int"))
)

# ---------------------------------------------------------
# 2. Gradient-Boosted Trees CV
# ---------------------------------------------------------
gbt = GBTClassifier(
    labelCol="permanent_closed",
    featuresCol="features",
    weightCol="weight",
    maxIter=50,   # upper bound, CV will tune
    maxDepth=5    # upper bound, CV will tune
)

gbt_pipeline = Pipeline(stages=[assembler, gbt])

paramGrid_gbt = (ParamGridBuilder()
                 .addGrid(gbt.maxDepth, [3, 5])
                 .addGrid(gbt.maxIter, [20, 50])
                 .build())

cv_gbt = CrossValidator(
    estimator=gbt_pipeline,
    estimatorParamMaps=paramGrid_gbt,
    evaluator=evaluator_auc,
    numFolds=3,
    parallelism=2
)

cv_model_gbt = cv_gbt.fit(train_df)
predictions_gbt = cv_model_gbt.transform(test_df)

evaluate_model(predictions_gbt, "Gradient-Boosted Trees")


# ---------------------------------------------------------
# 3. Confusion Matrix
# ---------------------------------------------------------
predictionAndLabels = predictions_gbt.select("prediction", "permanent_closed") \
    .rdd.map(lambda row: (float(row["prediction"]), float(row["permanent_closed"])))

metrics = MulticlassMetrics(predictionAndLabels)
conf_matrix = metrics.confusionMatrix().toArray()

print("\n===== Gradient-Boosted Trees Confusion Matrix =====")
conf_df = pd.DataFrame(
    conf_matrix,
    index=["Actual 0", "Actual 1"],
    columns=["Predicted 0", "Predicted 1"]
)
print(conf_df)

# ---------------------------------------------------------
# 4. Feature Importance
# ---------------------------------------------------------
gbt_model = cv_model_gbt.bestModel.stages[-1]

feature_importance = pd.DataFrame({
    "feature": feature_cols,
    "importance": gbt_model.featureImportances.toArray()
}).sort_values("importance", ascending=False)

print("\n===== Gradient-Boosted Trees Feature Importance =====")
pd.set_option("display.max_rows", None)
print(feature_importance.to_string(index=False))

                                                                                


===== Gradient-Boosted Trees Evaluation =====
AUC:       0.9717
Accuracy:  0.9120
F1 Score:  0.9198
Precision: 0.9422
Recall:    0.9120





===== Gradient-Boosted Trees Confusion Matrix =====
          Predicted 0  Predicted 1
Actual 0      26293.0       2808.0
Actual 1        155.0       4410.0

===== Gradient-Boosted Trees Feature Importance =====
                                                  feature  importance
                        MISC_Popular_for_Solo_dining_flag    0.443764
                        MISC_Service_options_Takeout_flag    0.119859
                                           num_of_reviews    0.086911
                              MISC_Atmosphere_Casual_flag    0.053423
                        MISC_Amenities_Good_for_kids_flag    0.035284
                        MISC_Service_options_Dine-in_flag    0.030336
                         MISC_Offerings_Comfort_food_flag    0.030047
   MISC_Accessibility_Wheelchair_accessible_entrance_flag    0.030043
                                            price_numeric    0.019744
                  MISC_Planning_Accepts_reservations_flag    0.016096
                 

                                                                                

## 3. Interpret Feature Importance and Direction

In [18]:
import pandas as pd
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression

# ============================================================
# 1. Prepare features for Elastic Net Logistic Regression (Multicollinearity between Features)
# ============================================================

assembler_enet = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_enet"
)

# Remove old features_enet column if it exists
if "features_enet" in train_df.columns:
    train_df = train_df.drop("features_enet")
if "features_enet" in test_df.columns:
    test_df = test_df.drop("features_enet")

train_df_enet = assembler_enet.transform(train_df)
test_df_enet = assembler_enet.transform(test_df)

# Standardize numeric features
scaler = StandardScaler(inputCol="features_enet", outputCol="features_scaled", withMean=True, withStd=True)
scaler_model = scaler.fit(train_df_enet)
train_df_scaled = scaler_model.transform(train_df_enet)
test_df_scaled = scaler_model.transform(test_df_enet)

# ============================================================
# 2. Train Elastic Net Logistic Regression (for direction)
# ============================================================

elastic_lr = LogisticRegression(
    featuresCol="features_scaled",
    labelCol="permanent_closed",
    elasticNetParam=0.2,   # Elastic Net (mix of Lasso & Ridge)
    regParam=0.01,
    maxIter=50
)

elastic_model = elastic_lr.fit(train_df_scaled)

# ============================================================
# 3. Extract coefficients with direction
# ============================================================

elastic_coef = pd.DataFrame({
    "feature": feature_cols,
    "ENet_Coefficient": elastic_model.coefficients.toArray()
})
elastic_coef["abs_coef"] = elastic_coef["ENet_Coefficient"].abs()

# ============================================================
# 4. Gradient-Boosted Trees feature importance
# ============================================================

gbt_importances = gbt_model.featureImportances.toArray()
if len(gbt_importances) != len(feature_cols):
    usable_cols = feature_cols[:len(gbt_importances)]
else:
    usable_cols = feature_cols

feature_importance_gbt = pd.DataFrame({
    "feature": usable_cols,
    "GBT_Importance": gbt_importances
}).sort_values("GBT_Importance", ascending=False)

# ============================================================
# 5. Merge GBT importance with Elastic Net coefficients
# ============================================================

final_df = feature_importance_gbt.merge(
    elastic_coef,
    on="feature",
    how="left"
)

# ============================================================
# 6. Determine effect direction
# ============================================================

def determine_effect(coef):
    if pd.isna(coef) or abs(coef) < 0.01:
        return "Neutral / Not Selected"
    elif coef > 0.15:
        return "↑ Strong Failure Driver"
    elif coef > 0:
        return "↑ Weak Failure Driver"
    elif coef < -0.15:
        return "↓ Strong Survival Driver"
    else:
        return "↓ Weak Survival Driver"

final_df["Closure_Effect_Direction"] = final_df["ENet_Coefficient"].apply(determine_effect)

# ============================================================
# 7. Select top N features by GBT importance
# ============================================================

top_n = 50
top_features = final_df.head(top_n).copy()

# ============================================================
# 8. Clean feature names for presentation
# ============================================================

top_features["Feature_Clean"] = (
    top_features["feature"]
    .str.replace("MISC_", "", regex=False)
    .str.replace("_flag", "", regex=False)
    .str.replace("_", " ", regex=False)
)

# ============================================================
# 9. Final presentation table
# ============================================================

final_cols = [
    "Feature_Clean",
    "GBT_Importance",
    "ENet_Coefficient",
    "Closure_Effect_Direction"
]

print("\n#############################################################")
print("###  Top Predictors with GBT Importance and Direction ###")
print("#############################################################\n")

print(top_features[final_cols].to_string(index=False))

                                                                                


#############################################################
###  Top Predictors with GBT Importance and Direction ###
#############################################################

                                  Feature_Clean  GBT_Importance  ENet_Coefficient Closure_Effect_Direction
                        Popular for Solo dining        0.443764         -0.464500 ↓ Strong Survival Driver
                        Service options Takeout        0.119859         -0.310330 ↓ Strong Survival Driver
                                 num of reviews        0.086911         -0.557761 ↓ Strong Survival Driver
                              Atmosphere Casual        0.053423         -0.450025 ↓ Strong Survival Driver
                        Amenities Good for kids        0.035284          0.409087  ↑ Strong Failure Driver
                        Service options Dine-in        0.030336         -0.505792 ↓ Strong Survival Driver
                         Offerings Comfort food        0.030047    

## 4. Predict Store Closures for All Restaurants

In [39]:
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col

# 1️⃣ Transform all data
all_predictions = cv_model_rf.bestModel.transform(business_df_clean)

# 2️⃣ Convert SparseVector probability to array
all_predictions = all_predictions.withColumn(
    "prob_array",
    vector_to_array(col("business_closure_prob"))
)

# 3️⃣ Extract the probability of closure (class 1)
all_predictions = all_predictions.withColumn(
    "closure_prob",
    col("prob_array")[1]  # now safe
)

# 4️⃣ Select the final columns
biz_closure_prob_df = all_predictions.select(
    "gmap_id",
    "store_name",
    "prediction",
    "closure_prob"
)

# 5️⃣ Show results
biz_closure_prob_df.show(10, truncate=False)


+-------------------------------------+-------------------------------+----------+--------------------+
|gmap_id                              |store_name                     |prediction|closure_prob        |
+-------------------------------------+-------------------------------+----------+--------------------+
|0x89e85a90fa80ea79:0x2f56cdd1f58118f |Checkers                       |0.0       |0.007217116418191323|
|0x89c2c219189e5189:0x684a238fa71eb176|Momo Asian Fusion              |0.0       |0.017988535282472868|
|0x8085b6a8ebae49f1:0x9f24861c2d643f9a|Rosso Pizzeria & Mozzarella Bar|1.0       |0.9591585341045397  |
|0x89c258545813c6bf:0x8ee1343834123591|Junior's Restaurant & Bakery   |1.0       |0.5504999064554688  |
|0x880fc80bb9076013:0xf73fd28c4d61b7ae|Woori Village                  |0.0       |0.18938758140253537 |
|0x80dce7e4827a029f:0xcb9497eb98076b9d|The Flame Broiler              |0.0       |0.05205610831563328 |
|0x89c25baa8b6fb9c9:0x5c4f2f36d6850943|200 Fifth                

####  Save Results

In [40]:
biz_closure_prob_df.write.mode("overwrite").parquet("gs://msca-bdp-student-gcs/Group_5_final_project/biz_closure_prob_df/") 

                                                                                

## 5. EDA on NLP Data

In [15]:
from pyspark.sql.functions import col, count, round

# Basic statistics
business_df.select("avg_rating").describe().show()


# Optional: Bin ratings into ranges (e.g., 1-2, 2-3, 3-4, 4-5)
from pyspark.sql.functions import when

from pyspark.sql.functions import when, col

business_df = business_df.withColumn(
    "rating_bin",
    when(col("avg_predicted_sentiment") < 1.5, "1-1.5") \
    .when(col("avg_predicted_sentiment") < 2, "1.5-2") \
    .when(col("avg_predicted_sentiment") < 2.5, "2-2.5") \
    .when(col("avg_predicted_sentiment") < 3, "2.5-3") \
    .when(col("avg_predicted_sentiment") < 3.5, "3-3.5") \
    .when(col("avg_predicted_sentiment") < 4, "3.5-4") \
    .when(col("avg_predicted_sentiment") < 4.5, "4-4.5") \
    .otherwise("4.5-5")
)


business_df.groupBy("rating_bin").agg(count("*").alias("count")) \
               .orderBy("rating_bin") \
               .show()

                                                                                

+-------+------------------+
|summary|        avg_rating|
+-------+------------------+
|  count|            169478|
|   mean| 4.199176294268276|
| stddev|0.4117197819420847|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+





+----------+-----+
|rating_bin|count|
+----------+-----+
|     1.5-2|   42|
|     2-2.5|  486|
|     2.5-3| 2874|
|     3-3.5|12928|
|     3.5-4|40745|
|     4-4.5|75402|
|     4.5-5|37001|
+----------+-----+



                                                                                

In [16]:
combined_df = spark.read.parquet("gs://msca-bdp-student-gcs/Group_5_final_project/combined_rest_df/") 
combined_df.show(3)

+--------------------+--------------+------+----+--------------------+-------------+--------------------+--------------------+----------+--------------------+-----------+--------------------+------------------+------------------+--------------------+--------------+-----+--------------------+------------------+------------------+---------------+---------------+---------------+--------------------+-------------------+----------------------+-----------------+----------------------+---------------+--------------------+--------------------+-------------+-------------+--------------------+--------------+--------------------+-----+--------+---------+-------------+------------------------+
|             gmap_id|     cust_name|rating|resp|                text|         time|             user_id|             address|avg_rating|            category|description|               hours|          latitude|         longitude|          store_name|num_of_reviews|price|    relative_results|             stat

In [17]:
from pyspark.sql.functions import when, col
combined_df = combined_df.withColumn(
    "permanent_closed",
    when(col("state").like("%Permanently closed%"), 1).otherwise(0)
)

In [18]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import re

#### Most Common Tokens for Restaurants

In [24]:
from pyspark.sql import functions as F

# Filter reviews for closed stores
closed_reviews = english_df.filter(F.col("permanent_closed") == 1)

# Explode english_tokens into separate rows
closed_tokens = closed_reviews.select(F.explode(F.col("english_tokens")).alias("token"))

# Count token frequency
closed_token_counts = closed_tokens.groupBy("token").count().orderBy(F.desc("count"))

# Show top 20 most common tokens
closed_token_counts.show(50, truncate=False)



+----------+------+
|token     |count |
+----------+------+
|food      |431697|
|great     |330406|
|good      |305473|
|place     |203824|
|service   |173118|
|like      |99338 |
|best      |98113 |
|nice      |92601 |
|really    |85097 |
|love      |84047 |
|go        |81500 |
|one       |78695 |
|get       |76827 |
|friendly  |75135 |
|staff     |73305 |
|time      |69861 |
|always    |65960 |
|delicious |64934 |
|back      |61985 |
|restaurant|60099 |
|chicken   |58369 |
|excellent |53132 |
|pizza     |52727 |
|order     |51318 |
|also      |50503 |
|even      |49435 |
|little    |48366 |
|ordered   |48013 |
|definitely|47579 |
|got       |47344 |
|amazing   |45921 |
|never     |45329 |
|eat       |43611 |
|come      |43357 |
|try       |42644 |
|recommend |41082 |
|us        |40773 |
|fresh     |39197 |
|came      |38897 |
|menu      |38123 |
|went      |36914 |
|people    |35815 |
|first     |33765 |
|pretty    |33491 |
|well      |33152 |
|made      |33033 |
|awesome   |32877 |


                                                                                

#### Remove Store Name Top N-Grams

In [26]:
import re
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, StopWordsRemover, NGram

# ----------------------------
# 1. Keep only reviews from permanently closed stores
# ----------------------------
closed_df = combined_df.filter(
    (F.col("text").isNotNull()) &
    (F.col("rating").isNotNull()) &
    (F.col("permanent_closed") == 1)
)

In [27]:
closed_df.count()

                                                                                

1188577

#### Top Trigrams Closed Restaurants¶

In [28]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, NGram

# ----------------------------
# 1. Keep only reviews from permanently closed stores
# ----------------------------
closed_df = combined_df.filter(
    (F.col("text").isNotNull()) &
    (F.col("rating").isNotNull()) &
    (F.col("permanent_closed") == 1)
)

# sample 1% for testing
closed_df = closed_df.sample(fraction=1.0, seed=42)

# ----------------------------
# 2. Tokenize
# ----------------------------
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
closed_df = tokenizer.transform(closed_df)

# ----------------------------
# 3. Remove stopwords
# ----------------------------
stop_remover = StopWordsRemover(inputCol="tokens", outputCol="tokens_clean")
closed_df = stop_remover.transform(closed_df)

# ----------------------------
# 4. Keep only alphabetic English tokens >= 3 chars
# ----------------------------
closed_df = closed_df.withColumn(
    "english_tokens",
    F.expr("filter(transform(tokens_clean, x -> lower(x)), x -> x rlike '^[a-z]{3,}$')")
)

# ----------------------------
# 5. Remove store name words using broadcast UDF
# ----------------------------
store_words_list = (
    combined_df
    .select(F.lower(F.col("store_name")).alias("name"))
    .select(F.split("name", " ").alias("words"))
    .select(F.explode("words").alias("w"))
    .filter(F.length("w") > 2)
    .distinct()
    .rdd.flatMap(lambda x: x)
    .collect()
)

broadcast_store_words = spark.sparkContext.broadcast(set(store_words_list))

def remove_store_words(tokens):
    return [t for t in tokens if t not in broadcast_store_words.value]

remove_store_words_udf = F.udf(remove_store_words, ArrayType(StringType()))

closed_df = closed_df.withColumn("operational_tokens", remove_store_words_udf("english_tokens"))

# ----------------------------
# 6. Create trigrams
# ----------------------------
trigrammer = NGram(n=3, inputCol="operational_tokens", outputCol="trigrams")
closed_df = trigrammer.transform(closed_df)

# ----------------------------
# 7. Explode trigrams and count
# ----------------------------
trigram_exploded = closed_df.select(F.explode("trigrams").alias("ngram"))
trigram_counts = trigram_exploded.groupBy("ngram").count().withColumnRenamed("count", "closed_count")

# ----------------------------
# 8. Rank trigrams by frequency
# ----------------------------
trigram_stats = trigram_counts.orderBy(F.col("closed_count").desc())

# ----------------------------
# 9. Show top 50 trigrams
# ----------------------------
trigram_stats.show(50, truncate=False)




+-----------------------------+------------+
|ngram                        |closed_count|
+-----------------------------+------------+
|highly recommend anyone      |376         |
|definitely recommend anyone  |264         |
|highly recommend try         |208         |
|decided give try             |199         |
|ordered took minutes         |179         |
|highly recommend trying      |168         |
|highly recommend going       |165         |
|customer highly recommend    |157         |
|definitely highly recommend  |145         |
|recommend anyone wants       |143         |
|highly recommend definitely  |128         |
|loved highly recommend       |124         |
|took almost minutes          |103         |
|highly recommend checking    |102         |
|definitely recommend trying  |100         |
|definitely recommend try     |98          |
|definitely going try         |96          |
|waited minutes waitress      |93          |
|try highly recommend         |92          |
|took minu

                                                                                

#### Comparative Trigram Analysis for Closed vs. Open Restaurants

In [31]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, NGram

# ----------------------------
# 1. Filter closed and open store reviews
# ----------------------------
closed_df = combined_df.filter(
    (F.col("text").isNotNull()) &
    (F.col("rating").isNotNull()) &
    (F.col("permanent_closed") == 1)
)

open_df = combined_df.filter(
    (F.col("text").isNotNull()) &
    (F.col("rating").isNotNull()) &
    (F.col("permanent_closed") == 0)
)

# Optional: sample 1% for testing
closed_df = closed_df.sample(1.0, seed=42)
open_df   = open_df.sample(1.0, seed=42)

# ----------------------------
# 2. Tokenize & clean
# ----------------------------
def clean_tokens(df):
    tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
    df = tokenizer.transform(df)

    stop_remover = StopWordsRemover(inputCol="tokens", outputCol="tokens_clean")
    df = stop_remover.transform(df)

    # Keep only alphabetic tokens of length >=3
    df = df.withColumn(
        "english_tokens",
        F.expr("filter(transform(tokens_clean, x -> lower(x)), x -> x rlike '^[a-z]{3,}$')")
    )

    # Filter out nonsense tokens like repeated letters
    df = df.withColumn(
        "english_tokens",
        F.expr("filter(english_tokens, x -> not x rlike '^(.)\\1+$')")  # removes 'aaaa' or 'uuu'
    )

    return df

closed_df = clean_tokens(closed_df)
open_df   = clean_tokens(open_df)

# ----------------------------
# 3. Remove store name words
# ----------------------------
store_words_list = (
    combined_df
    .select(F.lower(F.col("store_name")).alias("name"))
    .select(F.split("name", " ").alias("words"))
    .select(F.explode("words").alias("w"))
    .filter(F.length("w") > 2)
    .distinct()
    .rdd.flatMap(lambda x: x)
    .collect()
)
broadcast_store_words = spark.sparkContext.broadcast(set(store_words_list))

def remove_store_words(tokens):
    return [t for t in tokens if t not in broadcast_store_words.value]

remove_store_words_udf = F.udf(remove_store_words, ArrayType(StringType()))

closed_df = closed_df.withColumn("operational_tokens", remove_store_words_udf("english_tokens"))
open_df   = open_df.withColumn("operational_tokens", remove_store_words_udf("english_tokens"))

# ----------------------------
# 4. Create trigrams
# ----------------------------
trigrammer = NGram(n=3, inputCol="operational_tokens", outputCol="trigrams")
closed_df = trigrammer.transform(closed_df)
open_df   = trigrammer.transform(open_df)

# ----------------------------
# 5. Explode trigrams and count
# ----------------------------
def trigram_count(df, count_col):
    exploded = df.select(F.explode("trigrams").alias("ngram"))
    return exploded.groupBy("ngram").count().withColumnRenamed("count", count_col)

closed_counts = trigram_count(closed_df, "closed_count")
open_counts   = trigram_count(open_df, "open_count")

# ----------------------------
# 6. Combine counts & compute log-odds
# ----------------------------
combined_counts = closed_counts.join(open_counts, on="ngram", how="outer").fillna(0)
combined_counts = combined_counts.withColumn(
    "log_odds_ratio",
    F.log((F.col("closed_count")+1)/(F.col("open_count")+1))
)

# ----------------------------
# 7. Rank for comparative insight
# ----------------------------
top_closed_trigrams = combined_counts.orderBy(F.col("log_odds_ratio").desc())
top_open_trigrams   = combined_counts.orderBy(F.col("log_odds_ratio").asc())

# ----------------------------
# 8. Show top results
# ----------------------------
print("Trigrams indicative of closed stores:")
top_closed_trigrams.show(50, truncate=False)

print("Trigrams indicative of open stores:")
top_open_trigrams.show(50, truncate=False)

                                                                                

Trigrams indicative of closed stores:


                                                                                

+-------------------------------+------------+----------+------------------+
|ngram                          |closed_count|open_count|log_odds_ratio    |
+-------------------------------+------------+----------+------------------+
|scammers scammers scammers     |55          |0         |4.02535169073515  |
|fluffy fluffy fluffy           |17          |0         |2.8903717578961645|
|uuuuu uuuuu uuuuu              |16          |0         |2.833213344056216 |
|spend courteous recommend      |5           |0         |1.791759469228055 |
|gambar pertama adalah          |4           |0         |1.6094379124341003|
|particular recommend arugula   |4           |0         |1.6094379124341003|
|bathroom toilet prosper        |4           |0         |1.6094379124341003|
|generous alcohol ordering      |4           |0         |1.6094379124341003|
|recommend comparison many      |4           |0         |1.6094379124341003|
|ordered takes prepare          |4           |0         |1.6094379124341003|



+---------------------------------+------------+----------+-------------------+
|ngram                            |closed_count|open_count|log_odds_ratio     |
+---------------------------------+------------+----------+-------------------+
|lolol lolol lolol                |0           |363       |-5.8971538676367405|
|frickin frickin frickin          |0           |220       |-5.3981627015177525|
|wearing masks gloves             |0           |192       |-5.262690188904886 |
|worst customer wait              |0           |177       |-5.181783550292085 |
|wait waited almost               |0           |163       |-5.099866427824199 |
|wrong missing items              |0           |160       |-5.081404364984463 |
|took minutes cars                |1           |298       |-5.0072963928307415|
|gave someone wait                |0           |148       |-5.003946305945459 |
|covid highly recommend           |0           |139       |-4.941642422609305 |
|rude customer skills             |0    

                                                                                

#### Comparative 4-Gram Analysis for Open vs. Closed Restaurants

In [32]:
# ----------------------------
# 4. Create 4-grams
# ----------------------------
four_grammer = NGram(n=4, inputCol="operational_tokens", outputCol="four_grams")
closed_df = four_grammer.transform(closed_df)
open_df   = four_grammer.transform(open_df)

# ----------------------------
# 5. Explode 4-grams and count
# ----------------------------
def four_gram_count(df, count_col):
    return df.select(F.explode("four_grams").alias("ngram")) \
             .groupBy("ngram").count().withColumnRenamed("count", count_col)

closed_counts = four_gram_count(closed_df, "closed_count")
open_counts   = four_gram_count(open_df, "open_count")

# ----------------------------
# 6. Combine counts and compute log-odds
# ----------------------------
combined_counts = closed_counts.join(open_counts, on="ngram", how="outer").fillna(0)
combined_counts = combined_counts.withColumn(
    "log_odds_ratio",
    F.log((F.col("closed_count")+1) / (F.col("open_count")+1))
)

# ----------------------------
# 7. Rank for comparative insight
# ----------------------------
top_closed_4_grams = combined_counts.orderBy(F.col("log_odds_ratio").desc())
top_open_4_grams   = combined_counts.orderBy(F.col("log_odds_ratio").asc())

# ----------------------------
# 8. Show top results
# ----------------------------
print("4-grams indicative of closed stores:")
top_closed_4_grams.show(50, truncate=False)

print("4-grams indicative of open stores:")
top_open_4_grams.show(50, truncate=False)

4-grams indicative of closed stores:


                                                                                

+-----------------------------------------+------------+----------+------------------+
|ngram                                    |closed_count|open_count|log_odds_ratio    |
+-----------------------------------------+------------+----------+------------------+
|scammers scammers scammers scammers      |61          |0         |4.127134385045092 |
|fluffy fluffy fluffy fluffy              |16          |0         |2.833213344056216 |
|uuuuu uuuuu uuuuu uuuuu                  |9           |0         |2.302585092994046 |
|gambar pertama adalah facebook           |4           |0         |1.6094379124341003|
|dengan gambar pertama adalah             |4           |0         |1.6094379124341003|
|called said cheating expensive           |4           |0         |1.6094379124341003|
|alcohol ordering issue fairly            |4           |0         |1.6094379124341003|
|said cheating expensive without          |4           |0         |1.6094379124341003|
|ketiga dengan gambar pertama             |



+-----------------------------------------+------------+----------+-------------------+
|ngram                                    |closed_count|open_count|log_odds_ratio     |
+-----------------------------------------+------------+----------+-------------------+
|lolol lolol lolol lolol                  |0           |262       |-5.572154032177765 |
|frickin frickin frickin frickin          |0           |176       |-5.176149732573829 |
|regularly charge convenient rates        |0           |117       |-4.770684624465665 |
|comes instantly many types               |0           |95        |-4.564348191467836 |
|boner boner boner boner                  |0           |93        |-4.543294782270004 |
|trying consistently towards visitors     |1           |182       |-4.516338972281476 |
|constantly charge reasonable rates       |0           |90        |-4.51085950651685  |
|cars ahead took minutes                  |0           |90        |-4.51085950651685  |
|surely recommend visiting custo

                                                                                

#### Comparative 5-Gram Analysis for Open vs. Closed Restaurants

In [34]:
# ----------------------------
# 4. Create 5-grams
# ----------------------------
five_grammer = NGram(n=5, inputCol="operational_tokens", outputCol="five_grams")
closed_df = five_grammer.transform(closed_df)
open_df   = five_grammer.transform(open_df)

# ----------------------------
# 5. Explode 5-grams and count
# ----------------------------
def five_gram_count(df, count_col):
    return df.select(F.explode("five_grams").alias("ngram")) \
             .groupBy("ngram").count().withColumnRenamed("count", count_col)

closed_counts = five_gram_count(closed_df, "closed_count")
open_counts   = five_gram_count(open_df, "open_count")

# ----------------------------
# 6. Combine counts and compute log-odds
# ----------------------------
combined_counts = closed_counts.join(open_counts, on="ngram", how="outer").fillna(0)
combined_counts = combined_counts.withColumn(
    "log_odds_ratio",
    F.log((F.col("closed_count")+1) / (F.col("open_count")+1))
)

# ----------------------------
# 7. Rank for comparative insight
# ----------------------------
top_closed_5_grams = combined_counts.orderBy(F.col("log_odds_ratio").desc())
top_open_5_grams   = combined_counts.orderBy(F.col("log_odds_ratio").asc())

# ----------------------------
# 8. Show top results
# ----------------------------
print("5-grams indicative of closed stores:")
top_closed_5_grams.show(50, truncate=False)

print("5-grams indicative of open stores:")
top_open_5_grams.show(50, truncate=False)

5-grams indicative of closed stores:


                                                                                

+--------------------------------------------------+------------+----------+------------------+
|ngram                                             |closed_count|open_count|log_odds_ratio    |
+--------------------------------------------------+------------+----------+------------------+
|scammers scammers scammers scammers scammers      |53          |0         |3.9889840465642745|
|fluffy fluffy fluffy fluffy fluffy                |15          |0         |2.772588722239781 |
|informed refund almost year passed                |4           |0         |1.6094379124341003|
|contacted credit informed refund almost           |4           |0         |1.6094379124341003|
|refund almost year passed received                |4           |0         |1.6094379124341003|
|gambar pertama adalah facebook sebelum            |4           |0         |1.6094379124341003|
|called said cheating expensive without            |4           |0         |1.6094379124341003|
|ketiga dengan gambar pertama adalah    



+---------------------------------------------------+------------+----------+-------------------+
|ngram                                              |closed_count|open_count|log_odds_ratio     |
+---------------------------------------------------+------------+----------+-------------------+
|frickin frickin frickin frickin frickin            |0           |301       |-5.71042701737487  |
|lolol lolol lolol lolol lolol                      |0           |291       |-5.676753802268282 |
|blah blah blah blah blah                           |0           |78        |-4.3694478524670215|
|boner boner boner boner boner                      |0           |70        |-4.2626798770413155|
|trying consistently towards visitors often         |1           |133       |-4.204692619390966 |
|pacer meme deadthe pacer meme                      |0           |65        |-4.189654742026425 |
|meme deadthe pacer meme deadthe                    |0           |65        |-4.189654742026425 |
|deadthe pacer meme 

                                                                                