In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

!ls

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [1 InRelease 0 B/88.7 kB                                                                                Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [1 InRelease 69.2 kB/88.70% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connected to cloud.r-pro0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/

In [2]:
import os
import pandas as pd
import numpy as np
import tqdm
import re
import requests

from sklearn.neighbors import NearestNeighbors
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split

from pyspark.ml.feature import HashingTF, IDF, StopWordsRemover, RegexTokenizer
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml import Pipeline
from pyspark.sql.functions import regexp_extract, col
from pyspark.sql.functions import size, length
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


In [42]:
def read_listings(url):
    """read url and return pandas df"""
    
    return pd.read_csv(url)


def _rgx_remove_punct(txt):
    """remove puctuations from string, convert to lowercase"""
    if not isinstance(txt, str):
      txt = str(txt)
    text = txt.split()
    text = [re.sub('\W', '', tx).lower() for tx in text]
    
    return ' '.join(text)


def find_ground_truth(df, col_name, n_neighbours=6, test_size=0.2):
    """
    split data on train and test,
    apply sklearn TfidfVectorizer,
    find ground truth of 5 nearest neighbours
    """
    knn = NearestNeighbors()
    tf_idf = TfidfVectorizer()
    train, test = train_test_split(df, test_size=test_size, random_state=3)
    idf = tf_idf.fit_transform(test[col_name])
    knn.fit(idf)
    neigbours = [knn.kneighbors(id, n_neighbours) for id in idf]
    neigbours = [neighbour[1][0] for neighbour in neigbours]
    neigbours = [test.id.iloc[five_closest.tolist()].tolist() for five_closest in neigbours]
    test['ground_truth'] = neigbours
    
    return train, test


def create_pipeline(col_name):
    """ 
    create pipeline with RegexTokenizer that removes punctuation,
    stopwordsremover,
    TF-IDF vectorization
    """
    tokenizer = RegexTokenizer(inputCol=col_name, 
                             outputCol=f'{col_name}_tokenized', 
                             pattern='(?:\p{Punct}|\s)+')
    stopwordsremover = StopWordsRemover(inputCol=f'{col_name}_tokenized', outputCol=f'{col_name}_filtered')
    hashingTF = HashingTF(inputCol=f'{col_name}_filtered', outputCol='raw_features', numFeatures=50)
    idf = IDF(inputCol='raw_features', outputCol='features')
    pipeline = Pipeline(stages=[tokenizer, stopwordsremover, hashingTF, idf])
    
    return pipeline 


def find_neighbours(model_, data, value, number, colName):
    result = model_.approxNearestNeighbors(data, value, number, distCol=colName)
  
    return result.select("id").toPandas()['id'][1:].to_list()
    

def compare_lists(a,b):
    results = []
    for i, _ in enumerate(a):
        intersection = set(a[i]).intersection(b[i])
        results.append(len(intersection))      
    
    return np.mean(results), np.sum(np.array(results) > 2)/len(b), results


def grid_search_lsh(train_data, test_data, grid_bucket_length, grid_num_hash_tables, targets, limit=100):
    results = []  
    for bucket_length in grid_bucket_length:
        for n_hash_table in grid_num_hash_tables: 
            brp = BucketedRandomProjectionLSH(inputCol="features", 
                                              outputCol="hashes", 
                                              bucketLength=bucket_length,
                                              numHashTables=n_hash_table)
            model = brp.fit(train_data)
            
            model.transform(train_data)
            searching_targets_wiki = targets.features.to_list()
            print(f'Calculating LSH for bucket_length = {bucket_length} and numHashTables = {n_hash_table}')
            test = test_data.limit(limit)
            targ = searching_targets_wiki[:limit]
            prediction = []
            
            for key in tqdm.tqdm_notebook(targ):
                neigh = find_neighbours(model, test, key, 6, 'hashes')
                prediction.append(neigh)

            score_a, score_b, num_neighb = compare_lists(prediction, targets['ground_truth'])
            print(f'Total score: {score_a}, {score_b}, {num_neighb[:20]}')
            results.append([bucket_length, n_hash_table, score_a, score_b, num_neighb])
    
    return results


# Barcelona Airbnb dataset

In [None]:
barcelona_listings_url = "http://data.insideairbnb.com/spain/catalonia/barcelona/2020-09-12/visualisations/listings.csv"
listings_df = read_listings(barcelona_listings_url)
listings_df.name = listings_df.name.apply(_rgx_remove_punct)
train, test = find_ground_truth(listings_df, 'name')

In [57]:
test.head()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365,ground_truth
2375,5068957,long stay double and broad room,26008219,Adriana,Eixample,el Fort Pienc,41.40113,2.18442,Private room,35,31,49,2020-02-26,1.96,2,125,"[5068957, 45049233, 637232, 21411905, 43057932..."
9593,26114578,double room balcony to gran via,194229020,Adriana De LA CASA DE ANTONIO Bcn,Eixample,Sant Antoni,41.38495,2.16266,Hotel room,74,1,169,2020-09-07,6.44,6,365,"[26114578, 27358010, 27423602, 40796396, 63723..."
16082,40387610,oasis charming room,294462605,Hotel Oasis,Ciutat Vella,"Sant Pere, Santa Caterina i la Ribera",41.38305,2.18449,Private room,72,1,0,,,7,358,"[40387610, 40385005, 40385963, 637232, 2141190..."
10640,29154683,suite new central top location iii,2819397,Leslie & Marta,Eixample,la Dreta de l'Eixample,41.39172,2.16387,Entire home/apt,224,28,2,2019-02-15,0.1,55,0,"[29154683, 44470413, 21411905, 637232, 5350866..."
5397,16309630,city center flat with view terrace,2565810,Alberto,Sants-Montjuïc,el Poble Sec,41.37333,2.17377,Entire home/apt,58,90,11,2019-03-15,0.28,3,0,"[16309630, 4238751, 70099, 44568286, 21707347,..."


In [44]:
pipeline = create_pipeline('name')
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
train_data = spark.createDataFrame(train)
test_data = spark.createDataFrame(test)

pipeline_train_model = pipeline.fit(train_data)
pipeline_test_model = pipeline.fit(test_data)

In [45]:
df_test_vect = pipeline_train_model.transform(test_data)
df_train_vect = pipeline_train_model.transform(train_data)

spark.conf.set("spark.sql.execution.arrow.enabled", "false")
df_test_vect_pd = df_test_vect.toPandas()


In [61]:
df_test_vect.select(['id', 'name', 'ground_truth', 'name_tokenized', 'name_filtered', 'raw_features', 'features']).show(truncate=False)

+--------+------------------------------------------------+------------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|id      |name                                            |ground_truth                                                |name_tokenized                                         |name_filtered                                          |raw_features                                          |features                                                                                                                                                 |
+--------+------------------------------------------------+---------------------------------------------------------

In [46]:
grid_bucket_length = [4, 8]
grid_num_hash_tables = [5, 10]
res = grid_search_lsh(df_train_vect, df_test_vect, grid_bucket_length, grid_num_hash_tables, df_test_vect_pd, df_test_vect.count())

Calculating LSH for bucket_length = 4 and numHashTables = 5


Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`


HBox(children=(FloatProgress(value=0.0, max=4068.0), HTML(value='')))


Total score: 0.8687315634218289, 0.0867748279252704, [1, 0, 0, 2, 1, 2, 0, 0, 2, 0, 2, 0, 0, 2, 0, 0, 1, 2, 0, 0]
Calculating LSH for bucket_length = 4 and numHashTables = 10


HBox(children=(FloatProgress(value=0.0, max=4068.0), HTML(value='')))


Total score: 0.8704523107177975, 0.08652900688298919, [1, 0, 0, 2, 1, 2, 0, 0, 2, 0, 2, 0, 0, 2, 0, 0, 1, 2, 0, 0]
Calculating LSH for bucket_length = 8 and numHashTables = 5


HBox(children=(FloatProgress(value=0.0, max=4068.0), HTML(value='')))


Total score: 0.8687315634218289, 0.0867748279252704, [1, 0, 0, 2, 1, 2, 0, 0, 2, 0, 2, 0, 0, 2, 0, 0, 1, 2, 0, 0]
Calculating LSH for bucket_length = 8 and numHashTables = 10


HBox(children=(FloatProgress(value=0.0, max=4068.0), HTML(value='')))


Total score: 0.8704523107177975, 0.08652900688298919, [1, 0, 0, 2, 1, 2, 0, 0, 2, 0, 2, 0, 0, 2, 0, 0, 1, 2, 0, 0]


# WikiMedia dataset

In [47]:
wiki_url = 'https://wikimedia.org/api/rest_v1/metrics/pageviews/top/uk.wikisource/all-access/2019/04/all-days'
wiki = pd.DataFrame(requests.get(wiki_url).json()['items'][0]['articles'])
wiki['id'] = wiki.index

In [48]:
wiki.head()

Unnamed: 0,article,views,rank,id
0,Головна_сторінка,21278,1,0
1,Вірую,14244,2,1
2,Мойсей_(Іван_Франко)/Пролог,2603,3,2
3,Закон_України_«Про_авторське_право_і_суміжні_п...,2576,4,3
4,Закон_України_«Про_Національну_поліцію»,1776,5,4


In [49]:
train_wiki, test_wiki = find_ground_truth(wiki, col_name='article')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [50]:
pipeline_wiki = create_pipeline('article')
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
train_data_wiki = spark.createDataFrame(train_wiki)
test_data_wiki = spark.createDataFrame(test_wiki)

pipeline_train_model_wiki = pipeline_wiki.fit(train_data_wiki)
pipeline_test_model_wiki = pipeline_wiki.fit(test_data_wiki)

In [51]:
df_train_vect_wiki = pipeline_train_model_wiki.transform(train_data_wiki)
df_test_vect_wiki = pipeline_train_model_wiki.transform(test_data_wiki)

spark.conf.set("spark.sql.execution.arrow.enabled", "false")
df_test_vect_pd_wiki = df_test_vect_wiki.toPandas()


In [62]:
df_test_vect_wiki.select(['id', 'article', 'ground_truth', 'article_tokenized', 'article_filtered', 'raw_features', 'features']).show(truncate=False)

+---+-------------------------------------------------------------------------------------+------------------------------+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |article                                                                              |ground_truth                  |article_tokenized                                                                             |article_filtered                                                                              |raw_features                                                             

In [55]:
grid_bucket_length = [4, 8]
grid_num_hash_tables = [5, 10]
res_wiki = grid_search_lsh(df_train_vect_wiki, df_test_vect_wiki, grid_bucket_length, grid_num_hash_tables, df_test_vect_pd_wiki, df_test_vect.count())

Calculating LSH for bucket_length = 4 and numHashTables = 5


Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`


HBox(children=(FloatProgress(value=0.0, max=195.0), HTML(value='')))


Total score: 0.8615384615384616, 0.1076923076923077, [2, 1, 1, 0, 2, 2, 0, 0, 2, 0, 1, 0, 3, 0, 1, 0, 1, 0, 0, 3]
Calculating LSH for bucket_length = 4 and numHashTables = 10


HBox(children=(FloatProgress(value=0.0, max=195.0), HTML(value='')))


Total score: 0.8666666666666667, 0.1076923076923077, [2, 1, 1, 0, 2, 2, 0, 0, 2, 1, 1, 0, 3, 0, 1, 0, 1, 0, 0, 3]
Calculating LSH for bucket_length = 8 and numHashTables = 5


HBox(children=(FloatProgress(value=0.0, max=195.0), HTML(value='')))


Total score: 0.8615384615384616, 0.1076923076923077, [2, 1, 1, 0, 2, 2, 0, 0, 2, 0, 1, 0, 3, 0, 1, 0, 1, 0, 0, 3]
Calculating LSH for bucket_length = 8 and numHashTables = 10


HBox(children=(FloatProgress(value=0.0, max=195.0), HTML(value='')))


Total score: 0.8666666666666667, 0.1076923076923077, [2, 1, 1, 0, 2, 2, 0, 0, 2, 1, 1, 0, 3, 0, 1, 0, 1, 0, 0, 3]


**Model trained on Airbnb data:**

In [53]:
grid_bucket_length = [4, 8]
grid_num_hash_tables = [5, 10]
res_wiki2 = grid_search_lsh(df_train_vect, df_test_vect_wiki, grid_bucket_length, grid_num_hash_tables, df_test_vect_pd_wiki, df_test_vect.count())

Calculating LSH for bucket_length = 4 and numHashTables = 5


Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`


HBox(children=(FloatProgress(value=0.0, max=195.0), HTML(value='')))


Total score: 0.8615384615384616, 0.1076923076923077, [2, 1, 1, 0, 2, 2, 0, 0, 2, 0, 1, 0, 3, 0, 1, 0, 1, 0, 0, 3]
Calculating LSH for bucket_length = 4 and numHashTables = 10


HBox(children=(FloatProgress(value=0.0, max=195.0), HTML(value='')))


Total score: 0.8666666666666667, 0.1076923076923077, [2, 1, 1, 0, 2, 2, 0, 0, 2, 1, 1, 0, 3, 0, 1, 0, 1, 0, 0, 3]
Calculating LSH for bucket_length = 8 and numHashTables = 5


HBox(children=(FloatProgress(value=0.0, max=195.0), HTML(value='')))


Total score: 0.8615384615384616, 0.1076923076923077, [2, 1, 1, 0, 2, 2, 0, 0, 2, 0, 1, 0, 3, 0, 1, 0, 1, 0, 0, 3]
Calculating LSH for bucket_length = 8 and numHashTables = 10


HBox(children=(FloatProgress(value=0.0, max=195.0), HTML(value='')))


Total score: 0.8666666666666667, 0.1076923076923077, [2, 1, 1, 0, 2, 2, 0, 0, 2, 1, 1, 0, 3, 0, 1, 0, 1, 0, 0, 3]
