
#  **Running Pyspark in Colab**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. 
Follow the steps to install the dependencies:

## gdgd

In [None]:
# !wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
# !tar xf spark-3.0.1-bin-hadoop2.7.tgz
# !pip install -q findspark

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [21]:
import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "../spark-3.0.1-bin-hadoop2.7"

Run a local spark session to test your installation:

In [22]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
import pandas as pd
from sklearn.neighbors import NearestNeighbors
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
import requests

knn = NearestNeighbors()
td_idf_vectorizer = TfidfVectorizer()

## Downloaded data and prepare data

In [1]:
# ! wget http://data.insideairbnb.com/spain/catalonia/barcelona/2020-09-12/visualisations/listings.csv

--2020-11-03 20:42:45--  http://data.insideairbnb.com/spain/catalonia/barcelona/2020-09-12/visualisations/listings.csv
Resolving data.insideairbnb.com (data.insideairbnb.com)... 52.216.77.19
Connecting to data.insideairbnb.com (data.insideairbnb.com)|52.216.77.19|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3137507 (3,0M) [application/csv]
Saving to: ‘listings.csv’


2020-11-03 20:42:50 (836 KB/s) - ‘listings.csv’ saved [3137507/3137507]



Barcelon data

In [None]:
! mkdir train_test_data

In [None]:
listings_df_pd = pd.read_csv("listings.csv")
listings_df_pd_filtered = listings_df_pd[~listings_df_pd.name.isnull()].reset_index(drop=True)

train, test = train_test_split(listings_df_pd_filtered, test_size=0.2, random_state=42)
it_idf_names = td_idf_vectorizer.fit_transform(test.name)

knn.fit(X=it_idf_names)

five_closest = [list(knn.kneighbors(d,6)[1][0][1:] )for d in it_idf_names]
test['ground_truth'] = [test.iloc[d].id.to_list() for d in five_closest ]

test.to_parquet("train_test_data/test.pkt")
train.to_parquet("train_test_data/train.pkt")

WikiData

In [None]:
api_wiki = "https://wikimedia.org/api/rest_v1/metrics/pageviews/top/uk.wikisource/all-access/2019/04/all-days"
wiki = pd.DataFrame(requests.get(api_wiki).json()['items'][0]['articles'])

wiki['id'] = wiki.index
wiki_filtered = wiki[~wiki.Page.isnull()].reset_index(drop=True)

train, test = train_test_split(wiki_filtered, test_size=0.2, random_state=42)
it_idf_names = td_idf_vectorizer.fit_transform(test.Page)

knn.fit(X=it_idf_names)

five_closest = [list(knn.kneighbors(d,6)[1][0][1:] )for d in it_idf_names]
test['ground_truth'] = [test.iloc[d].id.to_list() for d in five_closest ]

test.to_parquet("train_test_data/wiki_test.pkt")
train.to_parquet("train_test_data/wiki_train.pkt")

In [None]:
listings_df_pd = pd.read_csv("listings.csv")
listings_df_pd_filtered = listings_df_pd[~listings_df_pd.name.isnull()].reset_index(drop=True)

train, test = train_test_split(listings_df_pd_filtered, test_size=0.2, random_state=42)
it_idf_names = td_idf_vectorizer.fit_transform(test.name)

knn.fit(X=it_idf_names)

five_closest = [list(knn.kneighbors(d,6)[1][0][1:] )for d in it_idf_names]
test['ground_truth'] = [test.iloc[d].id.to_list() for d in five_closest ]

test.to_parquet("train_test_data/test.pkt")
train.to_parquet("train_test_data/train.pkt")

##  Barcelona experiments

In [23]:
import pandas as pd
import tqdm
import numpy as np

In [69]:
# listings = spark.read.csv('listings.csv',inferSchema=True, header =True)

train = spark.read.parquet('train_test_data/train.pkt')
test = spark.read.parquet('train_test_data/test.pkt')

In [70]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.ml.feature import BucketedRandomProjectionLSH


tokenizer = Tokenizer(inputCol="name", outputCol="CleanTokens")
stopwordsremover = StopWordsRemover(inputCol="CleanTokens", outputCol="CleanTokensStopRemoved")
hashingTF = HashingTF(inputCol="CleanTokensStopRemoved", outputCol="VectorSpace", numFeatures=50)
idf = IDF(inputCol="VectorSpace", outputCol="VectorSpaceIDF")



In [71]:
pipeline = Pipeline(stages=[tokenizer, stopwordsremover, hashingTF, idf])

pipelineModel = pipeline.fit(train)
pipelineModelTest = pipeline.fit(test)

test_prepared = pipelineModel.transform(test)
df_pipe = pipelineModel.transform(train)

Learning LSH

In [74]:
test_pd = test_prepared.toPandas()
looking_for = test_pd.VectorSpaceIDF.to_list()

In [85]:


def find_neighbours(model_, data, value, number, colName):
    result = model_.approxNearestNeighbors(data, value, number, distCol=colName)
    
    return result.select("id").toPandas()['id'][1:].to_list()

def compare_lists(a,b):
    results = []
    for i, _ in enumerate(a):
        intersection = set(a[i]).intersection(b[i])
        results.append(len(intersection))
        
#     return np.mean(results), results
    return np.mean(results), np.sum(np.array(results) > 1)/len(b), np.sum(np.array(results) == 5)/len(b), results


def grid_search_lsh(train_data, test_data, grid_bucket_length, grid_num_hash_tables, targets, limit=300):
    results = []  
    for bucket_length in grid_bucket_length:
        for n_hash_table in grid_num_hash_tables: 
            brp = BucketedRandomProjectionLSH(inputCol="VectorSpaceIDF", 
                                              outputCol="hashes", 
                                              bucketLength=bucket_length,
                                              numHashTables=n_hash_table)
            # fit train
            model = brp.fit(train_data)
            print(f'Models params: \nbucket: {model.getBucketLength()}, n_ht: {model.getNumHashTables()}')
            
#             df_pipe = model.transform(train_data)
            looking_for = targets.VectorSpaceIDF.to_list()
            print(f'Calculating LSH for bucket_length={bucket_length} and numHashTables = {n_hash_table}')
            test = test_data.limit(limit)
            targ = looking_for[:limit]
            prediction = []
            
            
            for key in tqdm.tqdm_notebook(targ):
                neigh = find_neighbours(model, test, key, 6, 'hashes')
                prediction.append(neigh)
                
            score_a, score_b, score_c, num_neighb = compare_lists(prediction, targets['ground_truth'])
            print(f'Total score: {score_a}, {score_b}, {score_c}\n {num_neighb[:20]} \n{prediction[:20]}' )
            results.append([bucket_length, n_hash_table, score_a, score_b, score_c, model])
    return results


In [None]:
grid_bucket_length = [2, 10]
grid_num_hash_tables = [1, 10]

In [None]:
res = grid_search_lsh(df_pipe, test_prepared, grid_bucket_length,
                      grid_num_hash_tables, test_pd, limit=test_prepared.count())

Models params: 
bucket: 2.0, n_ht: 1
Calculating LSH for bucket_length=2 and numHashTables = 1


Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for key in tqdm.tqdm_notebook(targ):


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=4065.0), HTML(value='')))


Total score: 0.5685116851168511, 0.13554735547355473, 0.0002460024600246002
 [0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1] 
[[39031875, 25927581, 24170185, 5197752, 42855696], [40060052, 41974762, 37520718, 8806793, 7173195], [39660659, 42355295, 4389803, 4691491, 23649134], [18540745, 37713410, 37554688, 34676896, 23477641], [26345440, 1391124, 39660659, 1070322, 41275665], [35598015, 6666031, 39623073, 6436766, 7301264], [34358699, 33727149, 45282768, 11685001, 21653709], [32213212, 8020706, 28803790, 2091401, 38035345], [15713824, 793232, 19320230, 44462767, 42020758], [45074881, 31818441, 29986591, 2288221, 27191542], [37713410, 37554688, 5407955, 17578576, 532775], [5695033, 29644147, 44160082, 6834341, 41545530], [22463569, 34117861, 23804717, 814202, 23511046], [12414795, 27797794, 12694296, 38694539, 15863971], [10208538, 25335202, 16891365, 39470732, 36896927], [29819343, 20258456, 38081807, 41394469, 43649088], [20437264, 18918174, 4957651, 34108603, 3203100],

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=4065.0), HTML(value='')))


Total score: 0.6346863468634686, 0.15670356703567034, 0.0004920049200492004
 [0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1] 
[[4228483, 39031875, 25927581, 24170185, 5197752], [40060052, 41974762, 37520718, 8806793, 7173195], [1810923, 39753694, 39744593, 42600422, 39660659], [18540745, 37554688, 37713410, 34676896, 23477641], [26345440, 1391124, 39660659, 1070322, 41275665], [35598015, 6666031, 39623073, 43420312, 6436766], [34358699, 33727149, 45282768, 11685001, 21653709], [32213212, 6058351, 20597809, 8020706, 28803790], [15713824, 793232, 45282768, 24977031, 16385602], [43764033, 9626098, 45074881, 31818441, 36253888], [37713410, 37554688, 5407955, 17578576, 532775], [24662715, 5695033, 29644147, 44160082, 619728], [22463569, 40415475, 34117861, 23804717, 814202], [12414795, 39523332, 2117095, 20326192, 2476289], [10208538, 25335202, 16891365, 39470732, 36896927], [29819343, 20258456, 32546591, 2436237, 38081807], [20437264, 18918174, 4957651, 21722132, 34108603], [

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=4065.0), HTML(value='')))


Total score: 0.5724477244772448, 0.13628536285362855, 0.0002460024600246002
 [0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1] 
[[39031875, 25927581, 24170185, 5197752, 42855696], [40060052, 41974762, 37520718, 8806793, 7173195], [39660659, 42355295, 4389803, 4691491, 23649134], [18540745, 37713410, 37554688, 34676896, 23477641], [26345440, 1391124, 39660659, 1070322, 41275665], [35598015, 6666031, 39623073, 6436766, 7301264], [34358699, 33727149, 45282768, 11685001, 21653709], [32213212, 8020706, 28803790, 2091401, 38035345], [15713824, 793232, 19320230, 44462767, 42020758], [45074881, 31818441, 29986591, 2288221, 27191542], [37713410, 37554688, 5407955, 17578576, 532775], [5695033, 29644147, 44160082, 6834341, 41545530], [22463569, 34117861, 23804717, 814202, 23511046], [12414795, 27797794, 12694296, 38694539, 15863971], [10208538, 25335202, 16891365, 39470732, 36896927], [29819343, 20258456, 38081807, 41394469, 43649088], [20437264, 18918174, 4957651, 34108603, 3203100],

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=4065.0), HTML(value='')))

## Wikidata experiments

In [None]:
wiki_train = spark.read.parquet('train_test_data/wiki_train.pkt')
wiki_test = spark.read.parquet('train_test_data/wiki_test.pkt')

In [None]:
tokenizerWiki = Tokenizer(inputCol="Page", outputCol="CleanTokens")


In [None]:
pipeline = Pipeline(stages=[tokenizerWiki, stopwordsremover, hashingTF, idf])

pipelineWikiTrain = pipeline.fit(train)
pipelineWikiTest = pipeline.fit(test)

wiki_train_pipe = pipelineWikiTrain.transform(train)
test_prepared_wiki = pipelineWikiTest.transform(test)


In [None]:
test_pd_wiki = test_prepared_wiki.toPandas()
looking_for_wiki = test_pd_wiki.VectorSpaceIDF.to_list()

In [None]:
grid_bucket_length = [2, 10]
grid_num_hash_tables = [1, 10]

Trained on Barcelona data

In [None]:
res_barcelona_wiki = grid_search_lsh(df_pipe, test_prepared_wiki, grid_bucket_length,
                      grid_num_hash_tables, test_pd_wiki, limit=test_prepared_wiki.count())

Trained on Wiki

In [None]:
res_wiki = grid_search_lsh(wiki_train_pipe, test_prepared_wiki, grid_bucket_length,
                      grid_num_hash_tables, test_pd_wiki, limit=test_prepared_wiki.count())