# Initialization

## Checking the Environment

In [1]:
!java --version
!python --version

openjdk 11.0.18 2023-01-17
OpenJDK Runtime Environment (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1)
OpenJDK 64-Bit Server VM (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1, mixed mode, sharing)
Python 3.9.16


## Installing Apache Spark (PySpark)

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m19.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=4878912ee1680ed27ad94cec334ad7035aaa06345d0226acb871f80348ad60ec
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

## Initialize Apache Spark context

In [3]:
# Import Apache Spark SQL
from pyspark.sql import SparkSession

# Create Spark Session/Context
# We are using local machine with all the CPU cores [*]
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Hello Pyspark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
# Check spark session
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f8108054400>


# Data Mining Task

The LSH task always consists of three steps:


1. Converting original data into vectors
2. Calculate the hash using MinHash algorithm
3. Searching the similar pairs using k-Nearest Neighbor, or join algorithm.



## Downloading the dataset

In [5]:
!pip install kaggle

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [7]:
!mkdir ~/.kaggle/
!touch ~/.kaggle/kaggle.json

# PLEASE USE YOUR OWN KEY
# Download your own key according to this instruction https://github.com/Kaggle/kaggle-api#api-credentials

api_token = {"username":"oz471420","key":"c62ace22e73f19fdda98c399245acf25"}

import json

with open('/root/.kaggle/kaggle.json', 'w') as file:
    json.dump(api_token, file)

!chmod 600 ~/.kaggle/kaggle.json

mkdir: cannot create directory ‘/root/.kaggle/’: File exists


In [8]:
# Download from https://www.kaggle.com/datasets/urbanbricks/wikipedia-promotional-articles

!kaggle datasets download -d urbanbricks/wikipedia-promotional-articles

Downloading wikipedia-promotional-articles.zip to /content
 94% 189M/201M [00:02<00:00, 94.8MB/s]
100% 201M/201M [00:02<00:00, 89.4MB/s]


In [9]:
!unzip wikipedia-promotional-articles.zip

Archive:  wikipedia-promotional-articles.zip
  inflating: good.csv                
  inflating: promotional.csv         


In [10]:
!ls -la

total 783144
drwxr-xr-x 1 root root      4096 Apr  7 11:54 .
drwxr-xr-x 1 root root      4096 Apr  7 11:47 ..
drwxr-xr-x 4 root root      4096 Apr  5 13:29 .config
-rw-r--r-- 1 root root 475685227 Oct 27  2019 good.csv
-rw-r--r-- 1 root root 115360355 Oct 27  2019 promotional.csv
drwxr-xr-x 1 root root      4096 Apr  5 13:30 sample_data
-rw-r--r-- 1 root root 210863294 Apr  7 11:53 wikipedia-promotional-articles.zip


## Read the dataset

In [11]:
# Read CSV
df = spark.read.option("header", True).csv("/content/good.csv")
df.printSchema()

root
 |-- text: string (nullable = true)
 |-- url: string (nullable = true)



In [12]:
# Add an ID for the dataset
from pyspark.sql.functions import monotonically_increasing_id

newsDF = df.withColumn("id", monotonically_increasing_id())
newsDF.show()

+--------------------+--------------------+---+
|                text|                 url| id|
+--------------------+--------------------+---+
|Nycticebus linglo...|https://en.wikipe...|  0|
|Oryzomys pliocaen...|https://en.wikipe...|  1|
|.hack dt hk is a ...|https://en.wikipe...|  2|
|The You Drive Me ...|https://en.wikipe...|  3|
|0 8 4 is the seco...|https://en.wikipe...|  4|
|I Corps is a corp...|https://en.wikipe...|  5|
|The 1982 Florida ...|https://en.wikipe...|  6|
|Tropical Depressi...|https://en.wikipe...|  7|
|Tropical Depressi...|https://en.wikipe...|  8|
|Tropical Depressi...|https://en.wikipe...|  9|
|On 1 November 194...|https://en.wikipe...| 10|
|1 1 is a song rec...|https://en.wikipe...| 11|
|1 Thing is a song...|https://en.wikipe...| 12|
|One Times Square,...|https://en.wikipe...| 13|
|1 vs. 100 is an A...|https://en.wikipe...| 14|
|One World Trade C...|https://en.wikipe...| 15|
|The A1 in London ...|https://en.wikipe...| 16|
|Tropical Depressi...|https://en.wikipe.

In [13]:
# Get the total rows
newsDF.count()

30279

## 1. Prepare the tokenizer

We transform the input into tokenized words.

In [14]:
# Prepare the tokenizer
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsDF = tokenizer.transform(newsDF)

wordsDF.show()

+--------------------+--------------------+---+--------------------+
|                text|                 url| id|               words|
+--------------------+--------------------+---+--------------------+
|Nycticebus linglo...|https://en.wikipe...|  0|[nycticebus, ling...|
|Oryzomys pliocaen...|https://en.wikipe...|  1|[oryzomys, plioca...|
|.hack dt hk is a ...|https://en.wikipe...|  2|[.hack, dt, hk, i...|
|The You Drive Me ...|https://en.wikipe...|  3|[the, you, drive,...|
|0 8 4 is the seco...|https://en.wikipe...|  4|[0, 8, 4, is, the...|
|I Corps is a corp...|https://en.wikipe...|  5|[i, corps, is, a,...|
|The 1982 Florida ...|https://en.wikipe...|  6|[the, 1982, flori...|
|Tropical Depressi...|https://en.wikipe...|  7|[tropical, depres...|
|Tropical Depressi...|https://en.wikipe...|  8|[tropical, depres...|
|Tropical Depressi...|https://en.wikipe...|  9|[tropical, depres...|
|On 1 November 194...|https://en.wikipe...| 10|[on, 1, november,...|
|1 1 is a song rec...|https://en.w

In [15]:
# Vectorize the dataset
from pyspark.ml.feature import CountVectorizer

vocabSize=1000

# Train the CountVectorizer Model using our data
cvModel = CountVectorizer(inputCol="words", outputCol="features", vocabSize=vocabSize, minDF=10).fit(wordsDF)

# Transform our data into vector
vectorizedDF = cvModel.transform(wordsDF)
vectorizedDF.show()

+--------------------+--------------------+---+--------------------+--------------------+
|                text|                 url| id|               words|            features|
+--------------------+--------------------+---+--------------------+--------------------+
|Nycticebus linglo...|https://en.wikipe...|  0|[nycticebus, ling...|(1000,[0,1,2,3,4,...|
|Oryzomys pliocaen...|https://en.wikipe...|  1|[oryzomys, plioca...|(1000,[0,1,2,3,4,...|
|.hack dt hk is a ...|https://en.wikipe...|  2|[.hack, dt, hk, i...|(1000,[0,1,2,3,4,...|
|The You Drive Me ...|https://en.wikipe...|  3|[the, you, drive,...|(1000,[0,1,2,3,4,...|
|0 8 4 is the seco...|https://en.wikipe...|  4|[0, 8, 4, is, the...|(1000,[0,1,2,3,4,...|
|I Corps is a corp...|https://en.wikipe...|  5|[i, corps, is, a,...|(1000,[0,1,2,3,4,...|
|The 1982 Florida ...|https://en.wikipe...|  6|[the, 1982, flori...|(1000,[0,1,2,3,4,...|
|Tropical Depressi...|https://en.wikipe...|  7|[tropical, depres...|(1000,[0,1,2,3,4,...|
|Tropical 

## 2. Fit/train an LSH Model

In [16]:
from  pyspark.ml.feature import MinHashLSH

mh = MinHashLSH(inputCol="features", outputCol="hashValues", numHashTables=3)
LSHmodel = mh.fit(vectorizedDF)

LSHmodel.transform(vectorizedDF).show()

+--------------------+--------------------+---+--------------------+--------------------+--------------------+
|                text|                 url| id|               words|            features|          hashValues|
+--------------------+--------------------+---+--------------------+--------------------+--------------------+
|Nycticebus linglo...|https://en.wikipe...|  0|[nycticebus, ling...|(1000,[0,1,2,3,4,...|[[1081553.0], [36...|
|Oryzomys pliocaen...|https://en.wikipe...|  1|[oryzomys, plioca...|(1000,[0,1,2,3,4,...|[[5.4265679E7], [...|
|.hack dt hk is a ...|https://en.wikipe...|  2|[.hack, dt, hk, i...|(1000,[0,1,2,3,4,...|[[1081553.0], [36...|
|The You Drive Me ...|https://en.wikipe...|  3|[the, you, drive,...|(1000,[0,1,2,3,4,...|[[1081553.0], [36...|
|0 8 4 is the seco...|https://en.wikipe...|  4|[0, 8, 4, is, the...|(1000,[0,1,2,3,4,...|[[1081553.0], [1....|
|I Corps is a corp...|https://en.wikipe...|  5|[i, corps, is, a,...|(1000,[0,1,2,3,4,...|[[1081553.0], [36...|
|

## 3. Searching the similar pairs/items for a key "united" "states"

In [17]:
print(cvModel.vocabulary.index("united"))
print(cvModel.vocabulary.index("states"))

92
165


In [18]:
# Testing searching for "united" "states"

from pyspark.ml.linalg import Vectors


# Convert the input with 3 words into 1000 size vectors
# If the words exist in the index we will give value = 1.0, otherwise 0.0
# Final result: key = [0, 0, ... , 1.0, ..., 1.0, 1.0, ....]

key = Vectors.sparse(vocabSize, {cvModel.vocabulary.index("civil"): 1.0, cvModel.vocabulary.index("war"): 1.0})

In [23]:
# Define the number of neighbours
k = 40

# Search inside LSH model that we already trained
resultDF = LSHmodel.approxNearestNeighbors(vectorizedDF, key, k)
resultDF.show()

+----+---+---+-----+--------+----------+-------+
|text|url| id|words|features|hashValues|distCol|
+----+---+---+-----+--------+----------+-------+
+----+---+---+-----+--------+----------+-------+



In [25]:
# Save the result into CSV
import pandas as pd

data = resultDF.toPandas()
data.to_csv("result.csv")