<a href="https://colab.research.google.com/github/shandrayu/mining-massive-databases/blob/main/notebooks/homework_pyspark_yuliia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Mining massive databases homework

## Colab setup


### Clone repo

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# !wget -q https://raw.githubusercontent.com/tsunrise/colab-github/main/colab_github.py
# import colab_github
# colab_github.github_auth(persistent_key=True)

In [None]:
!git clone git@github.com:shandrayu/mining-massive-databases.git

Cloning into 'mining-massive-databases'...
Host key verification failed.
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.


In [None]:
!ls && ls mining-massive-databases

drive  sample_data
ls: cannot access 'mining-massive-databases': No such file or directory


### PySpark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

--2023-11-11 19:43:46--  https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400395283 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.0-bin-hadoop3.tgz’


2023-11-11 19:43:59 (31.9 MB/s) - ‘spark-3.5.0-bin-hadoop3.tgz’ saved [400395283/400395283]



In [None]:
!tar xzvf spark-3.5.0-bin-hadoop3.tgz > /dev/null


In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [None]:
import findspark
findspark.init()

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, avg, when
import pandas as pd

In [None]:
sc = pyspark.SparkContext('local[*]')
spark = SparkSession(sc)
spark

## Barcelona dataset recommendation system

The goal of this task is to recommend similar apartments (items) based on input query (apartment description).

Tasks:

- Convert text feature with TF-IDF to vector of features
- Grid search for parameters
- Grid search for number of features
- Calculate ground truth
- Choose metrics. Explain choice

- Add to report:
  - Accuracy for 4 different set of parameters
  - Computation time for tuning (grid search) procedure
  - Machine characteristics

## Download and preprocess data

In [None]:
from pyspark import SparkFiles
from pyspark.sql.functions import substring
from pyspark.sql.functions import split
from pyspark.sql import functions as F
import pyspark.sql.types as T

# listings_url = "http://data.insideairbnb.com/spain/catalonia/barcelona/2023-09-06/data/listings.csv.gz" # full data
listings_url = "http://data.insideairbnb.com/spain/catalonia/barcelona/2023-09-06/visualisations/listings.csv" # short data

def load_file_to_spark(url):
  spark.sparkContext.addFile(url)
  filename = url.split("/")[-1]
  df = spark.read.csv("file://" + SparkFiles.get(filename), header=True, multiLine=True, escape='\"', inferSchema=True)
  return df

def bucket_rating(arr):
  """
  Preprocess rating into 6 buckets:
  - 5
  - 4.5
  - 4
  - 3.5
  - 3
  - 2
  """
  if arr and len(arr) >= 2:
    if isinstance(arr[1], str) and arr[1].startswith("★"):
      try:
        num = float(arr[1][1:])

        if num >= 4.8:
          arr[1] = "5"
        elif num < 4.8 and num >= 4.5:
          arr[1] = "4.5"
        elif num < 4.5 and num >= 4:
          arr[1] = "4"
        elif num < 4 and num >= 3.5:
          arr[1] = "3.5"
        elif num < 3.5 and num >= 3.0:
          arr[1] = "3"
        else:
          arr[1] = "2"
      except:
        # New listitng, no rating. OK for categorical classification
        arr[1] = "0"
  return arr

def preprocess_listing_database(df):
  df_preprocessed = df.withColumn("name_tokens", split("name", "\\ · "))
  df_preprocessed = df_preprocessed.withColumn("split_tokens", bucket_rating_udf("name_tokens"))
  df_preprocessed = df_preprocessed.withColumn("rating_bucket", col("split_tokens")[1].cast('float'))
  return df_preprocessed


bucket_rating_udf = F.udf(bucket_rating, T.ArrayType(T.StringType()))
listings_df = load_file_to_spark(listings_url)
listings_df = preprocess_listing_database(listings_df)
listings_df.select("name", "name_tokens", "split_tokens", "rating_bucket").show(15, False)

+--------------------------------------------------------------------------+------------------------------------------------------------------------+----------------------------------------------------------------------+-------------+
|name                                                                      |name_tokens                                                             |split_tokens                                                          |rating_bucket|
+--------------------------------------------------------------------------+------------------------------------------------------------------------+----------------------------------------------------------------------+-------------+
|Rental unit in Barcelona · ★4.30 · 3 bedrooms · 6 beds · 2 baths          |[Rental unit in Barcelona, ★4.30, 3 bedrooms, 6 beds, 2 baths]          |[Rental unit in Barcelona, 4, 3 bedrooms, 6 beds, 2 baths]            |4.0          |
|Rental unit in Sant Adria de Besos · ★4.77 · 3 bedrooms · 4

## Convert text feature with TF-IDF to vector of features

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import col, concat, slice, size

def convert_to_tokens(df, input_column_name, additional_stop_words, stop_words_exceptions):
    tokenizer = Tokenizer(inputCol=input_column_name, outputCol="tokens")
    with_tokens = tokenizer.transform(df)

    stopwords_filename = "stopwords-en.txt"
    if not os. path. exists(stopwords_filename):
      !wget https://raw.githubusercontent.com/stopwords-iso/stopwords-en/master/stopwords-en.txt


    stop_words = []
    with open(stopwords_filename) as file:
        for line in file:
          word = line.rstrip()
          if word not in stop_words_exceptions:
            stop_words.append(word)

    stop_words.extend(additional_stop_words)
    remover = StopWordsRemover(stopWords=stop_words)
    remover.setInputCol("tokens")
    remover.setOutputCol("clean_tokens")
    clean_tokens = remover.transform(with_tokens)

    return clean_tokens


def tf_idf(df, clean_tokens_column_name, num_features):
    # Perform TFIDF
    hashing_tf = HashingTF(inputCol=clean_tokens_column_name, outputCol="raw_features", numFeatures=num_features)
    featurized_data = hashing_tf.transform(df)

    idf = IDF(inputCol="raw_features", outputCol="vector_space")
    idf_model = idf.fit(featurized_data)
    results = idf_model.transform(featurized_data)

    return results

def convert_title_to_features(df, num_features):
    df_with_title = df.withColumn("title", col("split_tokens")[0])
    clean_tokens = convert_to_tokens(df=df_with_title, input_column_name="title", additional_stop_words=['·', '★', '1', 'in'], stop_words_exceptions={"home"})
    # Extend clean tokens with hand-crafted split tokens
    clean_tokens = clean_tokens.withColumn("clean_tokens", concat(col("clean_tokens"), slice(col("split_tokens"), 2, size(col("split_tokens")) - 1)))
    # TODO: why??? why "home" is in stop words???
    results = tf_idf(df=clean_tokens, clean_tokens_column_name="clean_tokens", num_features=num_features)
    return results

num_features = 15
results = convert_title_to_features(df=listings_df, num_features=num_features)
results.select("title", "name", "split_tokens", "clean_tokens", "raw_features", "vector_space").show(20, False)

--2023-11-11 19:44:55--  https://raw.githubusercontent.com/stopwords-iso/stopwords-en/master/stopwords-en.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7677 (7.5K) [text/plain]
Saving to: ‘stopwords-en.txt’


2023-11-11 19:44:55 (57.8 MB/s) - ‘stopwords-en.txt’ saved [7677/7677]

+----------------------------------+--------------------------------------------------------------------------+----------------------------------------------------------------------+--------------------------------------------------------------------+-----------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title      

### LSH

### Implement

In [None]:
from pyspark.ml.feature import BucketedRandomProjectionLSH

def run_lsh(df, input_column, output_column, bucket_length, num_hash_tables):
    """
    Parameter meaning:
     - bucket_length - the length of each hash bucket, a larger bucket lowers the false negative rate
     - num_hash_tables - number of hash tables, where increasing number of hash tables lowers the false negative rate, and decreasing it improves the running performance
    """
    brp = BucketedRandomProjectionLSH()
    brp.setInputCol(input_column)
    brp.setOutputCol(output_column)

    brp.setSeed(123456)
    brp.setBucketLength(bucket_length)
    brp.setNumHashTables(num_hash_tables)

    model = brp.fit(df)
    model.setOutputCol(output_column)

    return model

model = run_lsh(df=results, input_column="vector_space", output_column="hashes", bucket_length=1.0, num_hash_tables=1)

### Query

#### first

In [None]:
query = results.first()

print(f"Query name: {query.asDict()['name']}")
print(f"Query ID: {query.asDict()['id']}")

# Parameters
# number of neighbour points which are taken for the comparison
num_neighbours = 50

reccomendations = model.approxNearestNeighbors(results, query.vector_space, num_neighbours, distCol="distance")
reccomendations.select("id", "name", "clean_tokens", "distance").show(num_neighbours, False)

Query name: Rental unit in Barcelona · ★4.30 · 3 bedrooms · 6 beds · 2 baths
Query ID: 18674
+--------+----------------------------------------------------------------+-----------------------------------------------------------+--------+
|id      |name                                                            |clean_tokens                                               |distance|
+--------+----------------------------------------------------------------+-----------------------------------------------------------+--------+
|7663286 |Rental unit in Barcelona · ★4.64 · 3 bedrooms · 6 beds · 2 baths|[rental, unit, barcelona, 4.5, 3 bedrooms, 6 beds, 2 baths]|0.0     |
|1814378 |Rental unit in Barcelona · ★4.20 · 3 bedrooms · 6 beds · 2 baths|[rental, unit, barcelona, 4, 3 bedrooms, 6 beds, 2 baths]  |0.0     |
|112538  |Rental unit in Barcelona · ★4.38 · 3 bedrooms · 6 beds · 2 baths|[rental, unit, barcelona, 4, 3 bedrooms, 6 beds, 2 baths]  |0.0     |
|22475001|Rental unit in Barcelona · 

In [None]:
import numpy as np

reccomended_ids = np.array(reccomendations.select("id").collect())

#### least

In [None]:
rating_threshold = 3.1
query_3 = results.filter(results["rating_bucket"] < rating_threshold).head()
print(f"Query name: {query_3.asDict()['name']}")
print(f"Query ID: {query_3.asDict()['id']}")

reccomendations = model.approxNearestNeighbors(results, query_3.vector_space, num_neighbours, distCol="distance")
reccomendations.select("id", "name", "clean_tokens", "distance").show(num_neighbours, False)

Query name: Rental unit in Barcelona · ★2.50 · 1 bedroom · 3 beds · 1 bath
Query ID: 562020
+------------------+--------------------------------------------------------------------+---------------------------------------------------------+---------------------+
|id                |name                                                                |clean_tokens                                             |distance             |
+------------------+--------------------------------------------------------------------+---------------------------------------------------------+---------------------+
|562020            |Rental unit in Barcelona · ★2.50 · 1 bedroom · 3 beds · 1 bath      |[rental, unit, barcelona, 2, 1 bedroom, 3 beds, 1 bath]  |0.0                  |
|548085            |Rental unit in Barcelona · 1 bedroom · 3 beds · 1 bath              |[rental, unit, barcelona, 1 bedroom, 3 beds, 1 bath]     |0.0031564109601523436|
|1978145           |Rental unit in Barcelona · 1 bedroom ·

## Nearest neighbours ground truth

In [None]:
from sklearn.neighbors import KDTree

def convert_spark_df_to_pandas(spark_df):
    pandas_df = spark_df.select("id", "name", "vector_space").toPandas()
    vectors = pandas_df["vector_space"].apply(lambda x : np.array(x.toArray()))
    vectors = vectors.to_numpy()
    vectors = np.stack(vectors, axis=0)
    return pandas_df, vectors

def get_gt(query: np.ndarray, query_id: int, num_neighbours: int, real_ids: np.ndarray, vectors_pandas: np.ndarray):
    kdt = KDTree(vectors_pandas, leaf_size=30, metric="euclidean")
    gt_neighbours_idx = kdt.query(query, k=num_neighbours, return_distance=False)
    gt_real_ids = real_ids[gt_neighbours_idx.flatten()]
    gt_ids_without_query_id = np.delete(gt_real_ids, np.where(gt_real_ids == query_id))

    return gt_ids_without_query_id

pandas_df, vectors_pandas = convert_spark_df_to_pandas(results)
ID = 23197
query = results.where(results.id == ID).first()
query_pandas = np.array(query.asDict()["vector_space"].toArray()).reshape(1, -1)
query_id = query.asDict()['id']

num_neighbours= 6
gt_ids = get_gt(query_pandas, query_id, num_neighbours, pandas_df["id"].to_numpy(), vectors_pandas)
print(gt_ids)


[          18918165 967279668909991276           32300579
           31910915 811538092074095964]


In [None]:
from pyspark.sql.functions import udf, array, lit

def calculate_gt(df, num_neighbours):
    def get_gt_one_row(query_id, query_vector):
        """
        WARNING: function uses global variables.
        TODO: Is there a way to pass pandas dataframe to UDF?
        """
        query_pandas = np.array(query_vector.toArray()).reshape(1, -1)
        gt_ids = get_gt(query_pandas, query_id, num_neighbours, pandas_df["id"].to_numpy(), vectors_pandas)
        return [int(n) for n in gt_ids]

    pandas_df, vectors_pandas = convert_spark_df_to_pandas(results)
    get_gt_one_row_udf = F.udf(get_gt_one_row, T.ArrayType(T.LongType()))

    df_with_gt = df.withColumn("ground_truth", get_gt_one_row_udf("id", "vector_space"))
    return df_with_gt

num_neighbours = 6
newgt = calculate_gt(df=results, num_neighbours=num_neighbours)
newgt.select("id", "name", "ground_truth").show(10, False)

+------+--------------------------------------------------------------------------+----------------------------------------------------------------------+
|id    |name                                                                      |ground_truth                                                          |
+------+--------------------------------------------------------------------------+----------------------------------------------------------------------+
|18674 |Rental unit in Barcelona · ★4.30 · 3 bedrooms · 6 beds · 2 baths          |[162091, 385049, 117010, 112538, 138055]                              |
|23197 |Rental unit in Sant Adria de Besos · ★4.77 · 3 bedrooms · 4 beds · 2 baths|[18918165, 967279668909991276, 32300579, 31910915, 811538092074095964]|
|32711 |Rental unit in Barcelona · ★4.46 · 2 bedrooms · 3 beds · 1.5 baths        |[1242470, 1199594, 384155, 135513, 1061343]                           |
|171646|Rental unit in Barcelona · ★4.81 · 2 bedrooms · 2 beds · 1 bat

In [None]:
newgt.where(results.id == ID).select("id", "name", "ground_truth").show(1, False)

+-----+--------------------------------------------------------------------------+----------------------------------------------------------------------+
|id   |name                                                                      |ground_truth                                                          |
+-----+--------------------------------------------------------------------------+----------------------------------------------------------------------+
|23197|Rental unit in Sant Adria de Besos · ★4.77 · 3 bedrooms · 4 beds · 2 baths|[18918165, 967279668909991276, 32300579, 31910915, 811538092074095964]|
+-----+--------------------------------------------------------------------------+----------------------------------------------------------------------+



In [None]:
IDs = list(gt_ids) #[967279668909991276, 18918165]
newgt.where(results.id.isin(IDs)).select("id", "name", "ground_truth").show(len(IDs), False)

+------------------+-----------------------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|id                |name                                                                                     |ground_truth                                                                 |
+------------------+-----------------------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|18918165          |Rental unit in Sant Adria de Besos · ★4.49 · 3 bedrooms · 6 beds · 2 baths               |[32300579, 811538092074095964, 899131976222097946, 30944649, 23197]          |
|31910915          |Bed and breakfast in Santa Coloma de Gramenet · ★4.94 · 1 bedroom · 1 bed · 1 shared bath|[967279668909991276, 631760606413807974, 23197, 50804944, 641020247716930473]|
|32300579          |Hostel in Barcelona · 1 bedroom · 1

In [None]:
real_ids = pandas_df["id"].to_numpy()
print(real_ids[-10:])

[971845785379403969 971878671238422092 971879970019192691
 972199746143686425 972213525609560702 972414398498867957
 972517151783308573 972525401820905576 972527438522389761
 972529324055119972]


## Grid serach for LSH parameters

In [None]:
from sklearn.base import BaseEstimator
from sklearn.utils.estimator_checks import check_estimator


class LshEstimator():
    def __init__(self, num_neighbours=5, num_features=10, bucket_length=1.0, num_hash_tables=1):
        self.num_neighbours = num_neighbours
        self.num_features = num_features
        self.bucket_length = bucket_length
        self.num_hash_tables = num_hash_tables


    def fit(self, X, y=None):
        self.df = convert_title_to_features(df=X, num_features=self.num_features)
        self.model = run_lsh(df=self.df, input_column="vector_space", output_column="hashes", bucket_length=self.bucket_length, num_hash_tables=self.num_hash_tables)
        self.df = calculate_gt(df=self.df, num_neighbours=self.num_neighbours)

    def predict(self, X, y=None):
        # TODO: will the code work if I pass several queries at a time?
        query = X
        reccomendations = model.approxNearestNeighbors(self.df, query.vector_space, num_neighbours, distCol="distance")
        reccomended_ids = np.array(reccomendations.select("id").collect())
        query_id = query.asDict()['id']
        reccomended_ids = np.delete(reccomended_ids, np.where(reccomended_ids == query_id))

        return reccomended_ids

    def score(self, X, y=None):
        # ground_truth_k_neighbours = y[:, :self.num_neighbours] # check dimentions
        reccomensations = self.predict(self, X)
        score = np.sum(np.isin(reccomensations, self.df_with_ground_truth["ground_truth"]), axis=0)
        return score

In [None]:
from sklearn.metrics import make_scorer
from sklearn.model_selection import GridSearchCV
import numpy as np


param_grid = {
    # number of features for TF-IDF
    "num_features": [20, 30],#[5, 10, 20, 30, 50],
    # the length of each hash bucket, a larger bucket lowers the false negative rate.
    "bucket_length": [2.0],#[1.0, 2.0, 3.0],
    # number of hash tables, where increasing number of hash tables lowers the false negative rate, and decreasing it improves the running performance.
    "num_hash_tables": [1],#[1, 3, 7, 10],
    "num_neighbours": [5, 15, 50, 100]
}

lsh_estimator = LshEstimator()
grid_search = GridSearchCV(lsh_estimator, param_grid)
best_parameters = grid_search.fit(listings_df)
# score must be somewhere here????
# score = grid_search.score()

print('Best parameter:', best_parameters.best_params_)
print('Best score:', best_parameters.best_score_)


TypeError: ignored

In [None]:

from sklearn.utils.estimator_checks import check_estimator
# from sklearn.utils
check_estimator(LshEstimator())



## Grid search for number of features

Todo:
- check other options of grid search for pyspark, skikit learn seems incompatible
Options: pyspark.ml.tuning