<a href="https://colab.research.google.com/github/jiaxinli980115/ETL/blob/main/MinHashSparkDemo_(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Example use of MinHash for deduplication

### Load apple dataset from json

In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 66kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 20.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=10b8a8f12d6276cd9e25dbe91def4ecbf5db9b6b740cb389faf52f71bba0ee22
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [2]:
import json

boeing_data = []
with open('webhose_boeing.json', 'r') as f:
    for line in f.readlines():
        boeing_data.append(json.loads(line))

boeing_titles = [a['title'] for a in boeing_data]

In [3]:
%env PYSPARK_PYTHON=python3

env: PYSPARK_PYTHON=python3


### Set comes with duplicates:

In [4]:
len(boeing_titles), len(set(boeing_titles))

(7484, 4866)

### This is a toy example, and exact duplicates can be found with python's set object. But what if it was bigger? 

### We can use spark's MinHash over character n-grams
#### We create a dataframe containing the title as a string in one column and a list of characters in another

In [5]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()
spark = SparkSession(sc)
df = spark.createDataFrame([
    (k, t, list(t)) for k, t in enumerate(boeing_titles) if len(list(t)) >=3],
    ['id', 'title', 'title_characters'])
df.select('title').show()

+--------------------+
|               title|
+--------------------+
|FAA chief called ...|
|FAA chief called ...|
|Cathay Pacific fi...|
|B-2 to test GBU-5...|
|Boeing rejected 7...|
|Boeing files arbi...|
|Meet The Oldest B...|
|Five Reasons The ...|
|FAA chief called ...|
|Why Is Flying In ...|
|FAA chief called ...|
|FAA chief called ...|
|FAA chief called ...|
|FAA chief called ...|
|FAA chief called ...|
|FAA chief called ...|
|FAA chief called ...|
|FAA chief called ...|
|FAA chief called ...|
|Today in History,...|
+--------------------+
only showing top 20 rows



#### Now we use spark to select character n-grams (shingles)

In [6]:
from pyspark.ml.feature import NGram

ngram = NGram(n=3, inputCol='title_characters', outputCol='ngrams')
ngram_df = ngram.transform(df)
ngram_df.select('ngrams').show()

+--------------------+
|              ngrams|
+--------------------+
|[F A A, A A  , A ...|
|[F A A, A A  , A ...|
|[C a t, a t h, t ...|
|[B - 2, - 2  , 2 ...|
|[B o e, o e i, e ...|
|[B o e, o e i, e ...|
|[M e e, e e t, e ...|
|[F i v, i v e, v ...|
|[F A A, A A  , A ...|
|[W h y, h y  , y ...|
|[F A A, A A  , A ...|
|[F A A, A A  , A ...|
|[F A A, A A  , A ...|
|[F A A, A A  , A ...|
|[F A A, A A  , A ...|
|[F A A, A A  , A ...|
|[F A A, A A  , A ...|
|[F A A, A A  , A ...|
|[F A A, A A  , A ...|
|[T o d, o d a, d ...|
+--------------------+
only showing top 20 rows



#### And transform those in-grams into binary vectors we can use in MinHash

In [7]:
from pyspark.ml.feature import CountVectorizer 

count_vectorizer = CountVectorizer(inputCol='ngrams', outputCol='vector', binary=True)
model = count_vectorizer.fit(ngram_df)
cv_df = model.transform(ngram_df)

# the vectors are displayed in 'sparse' format, 
# i.e. the numbers shown are the indices i of vector x where x[i]=1, 
# and x[k]=0 for all other k
cv_df.select('vector').show()

+--------------------+
|              vector|
+--------------------+
|(14930,[0,1,3,4,5...|
|(14930,[0,1,3,4,5...|
|(14930,[1,2,24,29...|
|(14930,[8,9,15,21...|
|(14930,[0,1,2,3,4...|
|(14930,[0,1,2,3,4...|
|(14930,[0,1,3,4,5...|
|(14930,[12,14,15,...|
|(14930,[0,1,3,4,5...|
|(14930,[0,1,38,54...|
|(14930,[0,1,3,4,5...|
|(14930,[0,1,3,4,5...|
|(14930,[0,1,3,4,5...|
|(14930,[0,1,3,4,5...|
|(14930,[0,1,3,4,5...|
|(14930,[0,1,3,4,5...|
|(14930,[0,1,3,4,5...|
|(14930,[0,1,3,4,5...|
|(14930,[0,1,3,4,5...|
|(14930,[6,10,73,2...|
+--------------------+
only showing top 20 rows



#### Now we can apply MinHash

In [8]:
from pyspark.ml.feature import MinHashLSH

min_hash = MinHashLSH(inputCol='vector', outputCol='minHash', seed=0, numHashTables=10)
model = min_hash.fit(cv_df)
hash_df = model.transform(cv_df)

# We now have the min hash values for the dataset of article titles
hash_df.select('minHash').show(1)

+--------------------+
|             minHash|
+--------------------+
|[[6333835.0], [23...|
+--------------------+
only showing top 1 row



#### We can now use these values to search for duplicates for a given title: everything with Jaccard similarity above a given threshold is probably similar

In [9]:
joined_rows = model.approxSimilarityJoin(cv_df, cv_df, threshold=0.05, distCol='jaccard_distance')
# the returned dataframe will be pairs of rows where each pair has an estimated distance of at most the threshold
joined_rows.show()

+--------------------+--------------------+----------------+
|            datasetA|            datasetB|jaccard_distance|
+--------------------+--------------------+----------------+
|{5974, REPORT: Ma...|{5974, REPORT: Ma...|             0.0|
|{6859, Virgin Atl...|{6866, Virgin Atl...|             0.0|
|{6866, Virgin Atl...|{6886, Virgin Atl...|             0.0|
|{6868, Virgin Atl...|{6875, Virgin Atl...|             0.0|
|{6872, Virgin Atl...|{6895, Virgin Atl...|             0.0|
|{6876, Virgin Atl...|{6864, Virgin Atl...|             0.0|
|{6883, Virgin Atl...|{6887, Virgin Atl...|             0.0|
|{6889, Virgin Atl...|{6882, Virgin Atl...|             0.0|
|{6895, Virgin Atl...|{6880, Virgin Atl...|             0.0|
|{1043, Boeing Cut...|{1295, Boeing Cut...|             0.0|
|{1266, Boeing Cut...|{1284, Boeing Cut...|             0.0|
|{1278, Boeing Cut...|{1294, Boeing Cut...|             0.0|
|{1279, Boeing Cut...|{1273, Boeing Cut...|             0.0|
|{1280, Boeing Cut...|{1

#### We can now see all duplicates of a title

In [10]:
from pyspark.sql.functions import col
joined_rows.filter(joined_rows.datasetA.id == 2467).select(col('datasetA.title'), col('datasetB.title')).show()

+--------------------+--------------------+
|               title|               title|
+--------------------+--------------------+
|Boeing prepares t...|Boeing prepares t...|
+--------------------+--------------------+



#### And we can deduplicate the whole dataset if we want to

In [11]:
# this will return the first id for each title in the dataset
deduplicated_df = joined_rows.groupby(col('datasetA.title')).min('datasetA.id')
deduplicated_df.show(5)

+--------------------+------------------------+
|      datasetA.title|min(datasetA.id AS `id`)|
+--------------------+------------------------+
|Warren Buffett is...|                    5137|
|FAA outlines refo...|                    3415|
|Lawmakers Push fo...|                    3235|
|NASA Chief “All I...|                    1711|
|Boeing expects to...|                    6374|
+--------------------+------------------------+
only showing top 5 rows



In [12]:
deduplicated_titles = deduplicated_df.toPandas()['datasetA.title']

# we can check that the resulting list is the right length
# or almost the same length, it's not exact
len(deduplicated_titles), len(boeing_titles), len(set(boeing_titles))

(4866, 7484, 4866)

In [13]:
deduplicated_df = deduplicated_df.toPandas()
deduplicated_df.rename(columns={'datasetA.title':'title','min(datasetA.id AS `id`)':'id'}, inplace = True)
unique_id = deduplicated_df['id'].values
boeing = list(boeing_data[i] for i in unique_id)
with open('deduplicated_titles','w') as f:
            json.dump(boeing,f)