In [None]:
import numpy as np
import requests

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType

from pyspark.ml import Pipeline
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
from pyspark.ml.feature import Tokenizer, Word2Vec, Normalizer, BucketedRandomProjectionLSH
from pyspark.ml.tuning import ParamGridBuilder

### Load data
___

In [None]:
spark = SparkSession.builder.appName("HW1 MMDS").getOrCreate()

# Airbnb
listings_df = spark.read.options(header=True, multiline=True, escape='"').csv("listings.csv.gz", inferSchema=True)


In [None]:
windowSpec = Window.partitionBy("name").orderBy("name")
df_with_row_num = listings_df.withColumn("row_number", row_number().over(windowSpec))
duplicates = df_with_row_num.filter(col("row_number") > 1)
duplicates.count()

788

 We assign a row number within each partition of identical names and filter out the first occurrence (row_number > 1) to isolate and count the duplicates.

Duplicates can hinder LSH by overloading hash buckets with identical or very similar items, reducing the algorithm's ability to distinguish between unique data points.

In [None]:
names_df = listings_df.drop_duplicates(["name"]).select("name")

# Wiki
url = 'https://wikimedia.org/api/rest_v1/metrics/pageviews/top/uk.wikisource/all-access/2019/04/all-days'
response = requests.get(url, headers={'User-Agent': 'MyPythonApp/1.0'})

articles = [
    article['article']
    for item in response.json()['items']
    for article in item['articles']
]


In [None]:
articles[:10]

['Головна_сторінка',
 'Вірую',
 'Мойсей_(Іван_Франко)/Пролог',
 'Закон_України_«Про_авторське_право_і_суміжні_права»',
 'Закон_України_«Про_Національну_поліцію»',
 'Конституція_України',
 'Архіви/ДАЖО/178/3',
 'Конституція_Пилипа_Орлика',
 'Конституція_США',
 'Молитва_за_померлих']

In [None]:
articles_df = spark.createDataFrame(
    [(a.replace('_', ' ').lower(),) for a in articles],
    ["name"]
).filter("name != ''").drop_duplicates(["name"])

### Define general pipeline
___


We have decided to use word2vec instead of tfidf, since it captures semantic meaning with vectors with lower amount of dimensions. Semantic meaning allows search based not only by the occurence of words, but also using the contextual info. Lower dimensionality in vectors reduces computational complexity. Normalizing Word2Vec vectors ensures that their magnitudes do not bias similarity measures

In [None]:
tokenizer = Tokenizer(inputCol="name", outputCol="words")
word2Vec = Word2Vec(inputCol="words", outputCol="word2vec", minCount=0)
word2vec_normalizer = Normalizer(inputCol="word2vec", outputCol="NormalizedWord2Vec", p=2)

pipeline = Pipeline(stages=[tokenizer, word2Vec, word2vec_normalizer])

brpLSH = BucketedRandomProjectionLSH(inputCol="NormalizedWord2Vec", outputCol="hashes")

paramGrid = (ParamGridBuilder()
            .addGrid(brpLSH.numHashTables, [1, 3, 5])
            .addGrid(brpLSH.bucketLength, [0.5, 1.0, 2.0])
            .build())

### Define evaluation framework
___

In [None]:
import time

def track_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"Execution time of {func.__name__}: {execution_time:.4f} seconds")
        return result
    return wrapper

In [None]:
def evaluate_model_with_euclidean_distance(model, df, num_query_points=50, num_neighbors=250):
    """
    Evaluates the performance of a LSH model by comparing its
    approximate nearest neighbors to the exact nearest neighbors based on Euclidean distance.

    Parameters:
    - model (BucketedRandomProjectionLSH): pretrained model.
    - df (Dataframe): contains at least the columns 'name' and 'NormalizedWord2Vec'.
    - num_query_points (int, optional): Number of query points to sample for evaluation. Default is 50.
    - num_neighbors (int, optional): Number of nearest neighbors to retrieve for each query point. Default is 250.

    Returns:
    - float: The average similarity precision over all query points. The similarity score for each query point
      is the fraction of common neighbors between the approximate nearest neighbors returned by the LSH model
      and the exact nearest neighbors computed using Euclidean distance on word2vec embeddings.
    """
    query_points = df.select("name", "NormalizedWord2Vec").sample(False, 0.1).limit(num_query_points).collect()
    similarity_results = []

    for query_point in query_points:
        key_vector = query_point["NormalizedWord2Vec"]

        compute_l2_distance_udf = udf(lambda vec: float(np.linalg.norm(vec - key_vector)), FloatType())

        # Get top real nearest neighbors based on Euclidean distance (excluding the query point)
        real_neighbors = (df.withColumn("distCol", compute_l2_distance_udf(col("NormalizedWord2Vec")))
                          .filter(f"distCol > 0")
                          .orderBy("distCol")
                          .limit(num_neighbors)
                          .select("name")
                          .collect())

        # Get top approximate nearest neighbors from the LSH model
        ann_neighbors = (model.approxNearestNeighbors(df, key_vector, num_neighbors + 1)
                         .filter("distCol > 0")
                         .select("name")
                         .collect())

        real_neighbor_names = {neighbor["name"] for neighbor in real_neighbors}
        ann_neighbor_names = {neighbor["name"] for neighbor in ann_neighbors}

        similarity = len(real_neighbor_names & ann_neighbor_names) / num_neighbors
        similarity_results.append(similarity)

    return np.mean(similarity_results)

In [None]:
@track_time
def tune_params(df):
    """
    Performs hyperparameter tuning for the BucketedRandomProjectionLSH (LSH) model using a predefined parameter grid,
    and selects the best model based on the highest average similarity score.

    The function:
    - Transforms the input DataFrame using a predefined pipeline.
    - Iterates over a grid of LSH parameters (`numHashTables` and `bucketLength`).
    - Fits the LSH model with each parameter combination on the transformed data.
    - Evaluates each model using the `evaluate_model_with_euclidean_distance` function, which computes the average recall at k.
    - Identifies and retains the model with the highest average similarity score.

    Parameters:
    - df (pyspark.sql.DataFrame): The input DataFrame containing the data to be used for tuning.
      It should include at least the columns required by the pipeline (e.g., 'name').

    Side Effects:
    - Prints evaluation results for each parameter combination, including the parameters used and the average similarity score.
    - Prints the best parameters found and the highest average similarity score.

    """
    transformed_data = pipeline.fit(df).transform(df)

    best_model, best_params = None, None
    highest_avg_similarity = 0.0

    print("\nEvaluation Results:")
    for params in paramGrid:
        brpLSH.setParams(numHashTables=params[brpLSH.numHashTables], bucketLength=params[brpLSH.bucketLength])
        model = brpLSH.fit(transformed_data)

        avg_similarity = evaluate_model_with_euclidean_distance(model, transformed_data)

        print(f"numHashTables={params[brpLSH.numHashTables]}, bucketLength={params[brpLSH.bucketLength]}")
        print(f"Average Similarity: {avg_similarity}")

        if avg_similarity > highest_avg_similarity:
            highest_avg_similarity = avg_similarity
            best_model, best_params = model, params

    print(f"\nBest Parameters:")
    print(f"numHashTables={best_params[brpLSH.numHashTables]}, bucketLength={best_params[brpLSH.bucketLength]}")
    print(f"Highest Average Similarity: {highest_avg_similarity}")

### AirBnb param tuning
___

In [None]:
tune_params(names_df)

                                                                                


Evaluation Results:


                                                                                

numHashTables=1, bucketLength=0.5
Average Similarity: 0.84112


                                                                                

numHashTables=1, bucketLength=1.0
Average Similarity: 0.8613599999999999


                                                                                

numHashTables=1, bucketLength=2.0
Average Similarity: 0.80688


                                                                                

numHashTables=3, bucketLength=0.5
Average Similarity: 0.9878399999999999


                                                                                

numHashTables=3, bucketLength=1.0
Average Similarity: 0.99424


                                                                                

numHashTables=3, bucketLength=2.0
Average Similarity: 0.9847999999999999


                                                                                

numHashTables=5, bucketLength=0.5
Average Similarity: 0.9995200000000001


                                                                                

numHashTables=5, bucketLength=1.0
Average Similarity: 0.99976


                                                                                

numHashTables=5, bucketLength=2.0
Average Similarity: 0.99936

Best Parameters:
numHashTables=5, bucketLength=1.0
Highest Average Similarity: 0.99976
Execution time of tune_params: 994.2438 seconds


### Wiki param tuning
___


In [None]:
tune_params(articles_df)

                                                                                


Evaluation Results:


                                                                                

numHashTables=1, bucketLength=0.5
Average Similarity: 0.68
numHashTables=1, bucketLength=1.0
Average Similarity: 0.68
numHashTables=1, bucketLength=2.0
Average Similarity: 0.72
numHashTables=3, bucketLength=0.5
Average Similarity: 0.96
numHashTables=3, bucketLength=1.0
Average Similarity: 0.9199999999999999
numHashTables=3, bucketLength=2.0
Average Similarity: 0.9199999999999999
numHashTables=5, bucketLength=0.5
Average Similarity: 1.0
numHashTables=5, bucketLength=1.0
Average Similarity: 1.0
numHashTables=5, bucketLength=2.0
Average Similarity: 1.0

Best Parameters:
numHashTables=5, bucketLength=0.5
Highest Average Similarity: 1.0
Execution time of tune_params: 28.3503 seconds


### Wiki with Airbnb params
___



*  We can notice that although the parameters were tuned on the airbnb dataset, we were still able to achieve considerable performance. This could be due to the fact that we try a big amount of hashtables, that improves recall and would perform better for any dataset, although we some other parameter set still performs better in this particular case.



In [None]:
transformed_data = pipeline.fit(articles_df).transform(articles_df)

brpLSH = BucketedRandomProjectionLSH(inputCol="NormalizedWord2Vec", outputCol="hashes", numHashTables=3, bucketLength=0.5)
model = brpLSH.fit(transformed_data)

average_accuracy = evaluate_model_with_euclidean_distance(model, transformed_data)
print(f"\nAverage KNN-ANN Accuracy with Airbnb Parameters: {average_accuracy}")

sample_vector = transformed_data.select("NormalizedWord2Vec").limit(1).collect()[0][0]
similar_articles = model.approxNearestNeighbors(transformed_data, sample_vector, 6)
similar_articles.select("name", "distCol").filter("distCol > 0").show()

                                                                                


Average KNN-ANN Accuracy with Airbnb Parameters: 0.9199999999999999
+--------------------+------------------+
|                name|           distCol|
+--------------------+------------------+
|Сторінка:Твори_Ко...|1.1629896902350065|
|Будапештський_мем...|1.2045824380578187|
|Зернятка_(Борис_Г...|1.2058485606076885|
|Вікіджерела:Прави...|1.2114045430846063|
|Перша_Французька_...|1.2270553489447011|
+--------------------+------------------+



### System characteristics

---

We have use Google Colab for running this experiments. We've used CPU (Intel(R) Xeon(R) with 2 cores) with 12 GB's of RAM. You can observe the detailed description of the hardware below

In [None]:
print("CPU Information:")
!lscpu

print("\nMemory Information:")
!free -h

CPU Information:
Architecture:             x86_64
  CPU op-mode(s):         32-bit, 64-bit
  Address sizes:          46 bits physical, 48 bits virtual
  Byte Order:             Little Endian
CPU(s):                   2
  On-line CPU(s) list:    0,1
Vendor ID:                GenuineIntel
  Model name:             Intel(R) Xeon(R) CPU @ 2.20GHz
    CPU family:           6
    Model:                79
    Thread(s) per core:   2
    Core(s) per socket:   1
    Socket(s):            1
    Stepping:             0
    BogoMIPS:             4399.99
    Flags:                fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 cl
                          flush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc re
                          p_good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3
                           fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand
                           hypervisor lahf_lm ab