# Unlocking Business Potential: Leveraging Bias-Aware Hierarchical Clustering for Actionable Insights from Yelp Data

### Zeinab Gaeini, Debbie Hernandez, Harper Strickland

## Part 3 - Clustering

### Using Normalized Features File, Identify Candidate Clusters for Analysis

#### This notebook has 1 output:
1. top clusters csv file (1 row per business: cluster name, k-value, cluster number, business id)

#### After running this notebook, get csv file from new local folder, rename, and move to local

In [1]:
import pyspark
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import VectorAssembler 
from pyspark.ml.clustering import KMeans 
from pyspark.sql.functions import count, min, max, mean, median, std, col, lit, concat, row_number
from pyspark.sql.types import IntegerType, FloatType, StringType, StructType, StructField
from pyspark.sql.window import Window
from functools import reduce


In [3]:
# Suppress native-hadoop warning
!sed -i '$a\# Add the line for suppressing the NativeCodeLoader warning \nlog4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR,console' /$HADOOP_HOME/etc/hadoop/log4j.properties

#### Terminal commands, Upload Files to HDFS

In [3]:
# make a directory on hdfs (repeated in case this notebook is used alone)
! hdfs dfs -mkdir /Project/

In [2]:
# copy csv file to hdfs: features data (normalized features of selected businesses)
! hdfs dfs -copyFromLocal Features.csv /Project

copyFromLocal: `/Project/Restaurants_Features.csv': File exists
Found 7 items
-rw-r--r--   1 root supergroup   52067470 2024-06-03 02:33 /Project/CanadianPostalCodes202403.csv
drwxr-xr-x   - root supergroup          0 2024-06-03 02:35 /Project/Restaurants_Data.csv
drwxr-xr-x   - root supergroup          0 2024-06-03 02:35 /Project/Restaurants_Features.csv
drwxr-xr-x   - root supergroup          0 2024-06-03 03:25 /Project/Top_Clusters.csv
-rw-r--r--   1 root supergroup    5909393 2024-06-03 02:17 /Project/USZIPCodes202403.csv
-rw-r--r--   1 root supergroup  118863795 2024-06-03 02:16 /Project/yelp_academic_dataset_business.json
-rw-r--r--   1 root supergroup 5341868833 2024-06-03 02:17 /Project/yelp_academic_dataset_review.json


In [None]:
! hdfs dfs -ls /Project

#### Terminal commands, Upload Files to HDFS

In [3]:
# Change the number of cores = n in `local[n]`
conf = pyspark.SparkConf().setAll([('spark.master', 'local[4]'),
                                   ('spark.app.name', 'Clustering')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()


#### 3.1. Features CSV file to DataFrame

In [4]:
# read features data from csv file
path = "hdfs:///Project/Features.csv"
features = spark.read.csv(path,header=True,inferSchema=True).cache()


### 3.2. K-Means Clustering

#### *UPDATE HERE*
Change list of k-values to test different range

In [5]:
# List K-Values for Analysis
k_vals = [50, 100, 500, 1000, 5000, 10000]


#### 3.2.1. Define Functions for K-Means Clustering

In [6]:
# input: dataframe with business_id and stars as 1st 2 columns,
#    other columns are normalized features (0-1)
def get_vectors (df):
    vec_assembler = VectorAssembler(inputCols = df.columns[2:], outputCol='features') 
    data = vec_assembler.transform(df) 
    return data
    

In [7]:
# inputs: dataframe, num of clusters
# output: dataframe columns: business_id, stars, cluster, k_value
def get_clusters (df, num):
    data = get_vectors(df)
    kmeans = KMeans(featuresCol='features',k=num) 
    model = kmeans.fit(data) 
    with_labels = model.transform(data) # combine with next
    result = with_labels.select('business_id', 'stars', 'prediction') \
        .withColumnRenamed('prediction', 'cluster')
    result = result.withColumn('k_value', lit(num))
    return result


In [8]:
# inputs: cluster dataframe with 3 columns (business_id, stars, cluster)
def cluster_stats (df):
    result = df.groupBy('k_value', 'cluster').agg(count('business_id'), min('stars'), max('stars'),
                                       mean('stars'), median('stars'), std('stars'))
    result = result.orderBy(['count(business_id)'], ascending = [False])
    return result


In [9]:
# compare results for list of candidate k values
def compare_all (df, k_list):
    df.cache()
    # Create schema and empty dataframe for results (1 row for each cluster)
    columns = StructType([StructField('K_Value', # number of clusters 
                                  IntegerType(), True),
                    StructField('Cluster', # cluster number 
                                  IntegerType(), True),
                    StructField('Business_Count', # min businesses per cluster
                                IntegerType(), True),
                    StructField('Min_Stars', # min stars in cluster
                                FloatType(), True),
                    StructField('Max_Stars', # max stars in cluster
                                FloatType(), True),
                    StructField('Mean_Stars', # mean stars in cluster
                                FloatType(), True),
                    StructField('Median_Stars', # median stars in cluster
                                FloatType(), True),
                    StructField('Stars_Std', # standard deviation of stars in cluster
                                FloatType(), True)])
 
    final_result = spark.createDataFrame(data = [], schema = columns)
    for i in range (len(k_list)):
        result = cluster_stats(get_clusters(df, k_list[i]))
        final_result = reduce(DataFrame.unionAll, [final_result,result])
    return final_result
        

#### 3.2.1. Perform K-Means Clustering

In [10]:
# this cell takes time to run, dependent on how many k values have been chosen
# Run K-Means for List of Ks, one row per cluster, business count, stars stats
k_results = compare_all(features, k_vals)


                                                                                

03:26:22.853 [block-manager-storage-async-thread-pool-48] ERROR org.apache.spark.storage.BlockManagerStorageEndpoint - Error in removing broadcast 28
org.apache.spark.SparkException: Block broadcast_28 does not exist
	at org.apache.spark.errors.SparkCoreErrors$.blockDoesNotExistError(SparkCoreErrors.scala:318) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.storage.BlockInfoManager.blockInfo(BlockInfoManager.scala:269) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.storage.BlockInfoManager.removeBlock(BlockInfoManager.scala:544) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:2095) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:2061) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:2033) ~[spark-core_2.12-3.5.0.jar:3.5.0]
	at org.apache.spark.storage.BlockManager.$anonf

                                                                                

### 3.3. Compare All Clusters

#### 3.3.1. Define Functions to compare clusters

In [11]:
# return top 10 list for clusters ordered by 'type'
# type options: high_min, low_max, high_mean, low_mean, high_med, low_med, low_std
def top_ten (df, type):
    # low_std as default
    asc = True
    order_by = 'Stars_Std'
    if (type=='high_min'):
        asc = False
        order_by = 'Min_Stars'
    elif (type=='low_max'):
        order_by = 'Max_Stars'
    elif (type=='high_mean'):
        asc = False
        order_by = 'Mean_Stars'
    elif (type=='low_mean'):
        order_by = 'Mean_Stars'
    elif (type=='high_med'):
        asc = False
        order_by = 'Median_Stars'
    elif (type=='low_med'):
        order_by = 'Median_Stars'
    result = df.filter(col('Business_Count')>50).orderBy([order_by], ascending = [asc]).limit(10)
    return result


In [12]:
def get_best (df):
    # top 4 by mean stars, no duplicates
    group_cols = ['K_Value', 'Cluster']
    result = df.groupBy(group_cols).agg(mean("Mean_Stars").alias("Mean_Stars")) \
        .orderBy(['Mean_Stars'], ascending = [False]).limit(4)
    w = Window().orderBy(lit('A'))
    result = result.withColumn('Row_Number', row_number().over(w)).withColumn('Best', lit('Best'))
    result = result.select(concat(col('Best'), lit('_'), col('Row_Number')).alias('Cluster_Name'),
                    'K_Value', 'Cluster')
    return result


In [13]:
def get_worst (df):
    # bottom 4 by mean stars, no duplicates
    group_cols = ['K_Value', 'Cluster']
    result = df.groupBy(group_cols).agg(mean("Mean_Stars").alias("Mean_Stars")) \
        .orderBy(['Mean_Stars'], ascending = [True]).limit(4)
    w = Window().orderBy(lit('A'))
    result = result.withColumn('Row_Number', row_number().over(w)).withColumn('Worst', lit('Worst'))
    result = result.select(concat(col('Worst'), lit('_'), col('Row_Number')).alias('Cluster_Name'),
                    'K_Value', 'Cluster')
    return result
   

In [14]:
# compare results for list of candidate k values
def compare_k (df):
    df.cache()
    # Create schema and empty dataframe for results (1 row for each cluster)
    columns = StructType([StructField('K_Value', # number of clusters 
                                  IntegerType(), True),
                    StructField('Cluster', # cluster number 
                                  IntegerType(), True),
                    StructField('Mean_Stars', # mean stars in cluster
                                FloatType(), True)])
    # DF of all top 10 lists
    allDF = spark.createDataFrame(data = [], schema = columns)

    types = ['high_min', 'low_max', 'high_mean', 'low_mean', 'high_med', 'low_med', 'low_std']
    type_labels = ['Highest Minimum Stars', 'Lowest Maximum Stars',
               'Highest Mean Stars', 'Lowest Mean Stars',
               'Highest Median Stars', 'Lowest Median Stars',
               'Lowest Standard Deviation of Stars']
    for i in range (len(types)):
        print(type_labels[i])
        result = top_ten(k_results, types[i])
        result.show()
        allDF = reduce(DataFrame.unionAll, [allDF,result.select('K_Value', 'Cluster', 'Mean_Stars')])

    # get best and worst of allDF, combine into final_result
    final_result = get_best(allDF).union(get_worst(allDF))
    return final_result


#### 3.3.2. Perform comparison

Print Top 10 Lists and save Top 'Best' and 'Worst' Clusters

In [15]:
top_clusters = compare_k(k_results)


Highest Minimum Stars


                                                                                

+-------+-------+--------------+---------+---------+------------------+------------+-------------------+
|K_Value|Cluster|Business_Count|Min_Stars|Max_Stars|        Mean_Stars|Median_Stars|          Stars_Std|
+-------+-------+--------------+---------+---------+------------------+------------+-------------------+
|    500|    439|            60|      3.5|      4.5| 4.166666666666667|         4.0|0.30065841120113157|
|    500|    132|            57|      3.5|      5.0| 4.166666666666667|         4.0|0.35773760003135024|
|    500|     43|            55|      3.5|      5.0| 4.181818181818182|         4.0| 0.3646870343138524|
|    100|     59|           160|      3.0|      4.5|             3.925|         4.0|  0.438694461087303|
|    500|    107|            59|      3.0|      5.0|3.9915254237288136|         4.0|0.40991510289798927|
|   1000|    119|            73|      3.0|      4.5|3.8698630136986303|         4.0|  0.353822354793903|
|    500|    265|            68|      3.0|      5.0|   

In [16]:
top_clusters.show()




+------------+-------+-------+
|Cluster_Name|K_Value|Cluster|
+------------+-------+-------+
|      Best_1|   1000|     80|
|      Best_2|    500|    317|
|      Best_3|   1000|    129|
|      Best_4|    500|    452|
|     Worst_1|   1000|    558|
|     Worst_2|  10000|   1099|
|     Worst_3|   1000|    125|
|     Worst_4|   5000|   2188|
+------------+-------+-------+



                                                                                

### 3.4. Save 'Best' and 'Worst' with Businesses for Analysis

#### 3.4.1. Define Functions to compile dataframe

In [17]:
# Get business_id list for a specific cluster
def get_id_list (k, cluster):
    bus_list = get_clusters(features, k).filter(col('cluster')==cluster).select('k_value', 'cluster', 'business_id')
    return bus_list


In [18]:
def combine_id_list (k_cl_df):
    # create list of tuples (name, k, c)
    name = [str(row['Cluster_Name']) for row in k_cl_df.collect()]
    k = [int(row['K_Value']) for row in k_cl_df.collect()]
    c = [int(row['Cluster']) for row in k_cl_df.collect()]
    list_ = [name, k, c]
    k_cl_list = [list(tup) for tup in zip(*list_)]
    
    # Create schema and empty dataframe for results (1 row for each cluster)
    columns = StructType([StructField('Cluster_Name', # number of clusters 
                                StringType(), True),
                    StructField('K_Value', # number of clusters 
                                IntegerType(), True),
                    StructField('Cluster', # cluster number 
                                IntegerType(), True),
                    StructField('Business_ID', # min businesses per cluster
                                StringType(), True)])
 
    final_result = spark.createDataFrame(data = [], schema = columns)
    for i in range (len(k_cl_list)):
        result = get_id_list(k_cl_list[i][1], k_cl_list[i][2])
        result = result.select(lit(k_cl_list[i][0]).alias('Cluster_Name'),
                    'k_value', 'cluster', 'business_id')
        final_result = reduce(DataFrame.unionAll, [final_result,result])
    return final_result


#### 3.4.2. Compile Top Clusters Dataframe

In [19]:
# this step takes the most time (looking up all businesses in each of the top clusters)
top_ids = combine_id_list(top_clusters)


                                                                                

In [20]:
# File name to save in HDFS
save_path = 'hdfs:///Project/Top_Clusters.csv'


In [21]:
top_ids.coalesce(1).write.csv(save_path, mode='overwrite', header='true')


                                                                                

#### End Spark Session

In [22]:
# End Spark Session
spark.stop()


Copy csv file from HDFS to local

#### *UPDATE HERE:*

##### (filename in Project folder should match name of .csv folders where file was saved in HDFS at top of notebook)


In [23]:
# UPDATE with folder name in HDFS (see output above)
! hdfs dfs -copyToLocal /Project/Top_Clusters.csv Clusters_CSV_Folder


##### after notebook runs, find the new file named “part-00000-…” in folder named “Clusters_CSV_Folder” Rename the “part-…” file as “Top_Clusters.csv” and move it into local folder.

##### Proceed to next notebook (4-Analysis.ipynb)

