<a href="https://colab.research.google.com/github/parkerburchett/BigDataClass/blob/main/ParkerBurchett_Big_Data_Lab_2_LSH_in_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Colab 2: Implement Locality Sensitive Hashing using Spark

In this lab you will learn to use [Apache Spark](https://spark.apache.org) on a Colab enviroment to implement Locality Sensitive Hashing for document comparisons.

## Setup

Let's setup Spark on your Colab environment.  Run the cell below!

---



In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 22kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 53.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=3de11f0532c482dc15f04ec60baea4dd4999d0700c03ffe1467f2df072fce469
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
The 

Now we authenticate a Google Drive client to download files. Please follow the instruction to enter the authoriztion code.


In [2]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

##Data Sets

We will work with the NIPS dataset of the [Bag of Words Data Set](https://archive.ics.uci.edu/ml/datasets/bag+of+words). It consists of two files:


*   `docword.nips.txt` contain shingles of the document. It contains the count of each words in each document. Importantly, stop words were removed, and only words appearing more than 10 times are kept.
*    `vocab.nips.txt` contains all the used words.





In [3]:
id='1831W_1SpE3A04SqsRtNiGZXhGfzZGFcl'
downloaded = drive.CreateFile({'id': id}) 
downloaded.GetContentFile('vocab.nips.txt') 

id='1qjhteWSwdjIV7nbnDHa_0QgNEiM2YYI0'
downloaded = drive.CreateFile({'id': id}) 
downloaded.GetContentFile('docword.nips.txt') 


Now let's check if the stop words are trule removed from the vocabulary and let's see the first 10 words from the vocabulary.

In [10]:
vocabulary = [ word for word in map(lambda x: x.strip(), open("vocab.nips.txt").readlines()) ]

# check some stop words
print('the' in vocabulary or 'a' in vocabulary or 'to' in vocabulary)

# example of shingles
print(vocabulary[:20])

False
['a2i', 'aaa', 'aaai', 'aapo', 'aat', 'aazhang', 'abandonment', 'abbott', 'abbreviated', 'abcde', 'abe', 'abeles', 'abi', 'abilistic', 'abilities', 'ability', 'abl', 'able', 'ables', 'ablex']


##Locality Sensitive Hashing
Now let's implement locality sensitive hashing. Remeber, it involves three step:


1.   Shingling: Convert the documents into sets (of words). This was already done. Since the data matrix after shingling is sparse, the shingling result is stored in `docword.nips.txt` as triples (`docID`, `wordID`, `count`).
2.   Min-hashing: Use appropriate hash functions to convert the sets (of words) into signatures.
3.   LSH: Generate candidate pairs and calculate the similarity between the candidates.






In [5]:
#Import Spark libraries and initialize the Spark context.
import pyspark
from pyspark.sql import *
import pyspark.sql.functions as f
from pyspark import SparkContext, SparkConf

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

###**Step 1: Shingling**
The document was already shingled, so we only need to pre-process it. 


*   The `map()` function splits the input file by lines
*   The `filter()` function only keeps the lines of 3 tokens
* The next `map()` function retrieves the first two tokens because we don't care about the word count. Also, it changes the beginning word index from 1 (orginal setting in the data file) to 0.
* The `groupByKey()` function groups all the words associted with the same document into a `ResultIterable` object.



In [6]:
docrdd = sc.textFile ("docword.nips.txt") \
        .map(lambda line: line.split()) \
        .filter(lambda line: len(line) == 3) \
        .map(lambda y: (y[0], int(y[1]) - 1)) \
        .groupByKey()

# to help you understand the structure of docrdd:
docrdd.map(lambda x : (x[0], list(x[1]))).take(2)   

[('1',
  [1,
   38,
   41,
   76,
   94,
   95,
   104,
   107,
   132,
   136,
   139,
   148,
   154,
   157,
   168,
   171,
   315,
   364,
   388,
   425,
   427,
   432,
   436,
   477,
   517,
   522,
   531,
   532,
   539,
   541,
   549,
   551,
   573,
   578,
   638,
   652,
   653,
   672,
   674,
   675,
   694,
   696,
   697,
   785,
   821,
   903,
   936,
   940,
   953,
   985,
   986,
   989,
   1055,
   1086,
   1102,
   1134,
   1171,
   1187,
   1212,
   1221,
   1269,
   1281,
   1392,
   1394,
   1397,
   1417,
   1425,
   1481,
   1482,
   1492,
   1496,
   1497,
   1498,
   1499,
   1500,
   1594,
   1613,
   1615,
   1680,
   1758,
   1767,
   1768,
   1771,
   1782,
   1813,
   1823,
   1825,
   1871,
   1872,
   1926,
   1939,
   1942,
   1950,
   1975,
   1977,
   2004,
   2014,
   2041,
   2049,
   2051,
   2053,
   2054,
   2055,
   2058,
   2082,
   2083,
   2086,
   2093,
   2094,
   2110,
   2117,
   2143,
   2148,
   2152,
   2161,
   2174,
   2175,

###**Step 2: Min-hashing**
Now you need to find the signature of each document using min-hash functions. You need to implement `get_signature()` function which takes one element (i.e., (`doc`, `list_of_words`)) in `docrdd` as input, and returns (`doc`, `signature`) as output. Your hash functions should be defined as `((a * x) + b) % len(vocabulary)` where `a` and `b` are random integers and `x` is a word's index. Let's create 100 hash functions using 100 random `a` and random `b`. This means the signature matrix should have 100 rows.


In [7]:
import random

SIGNATURE_SIZE = 100 
# A and B are used to generate random permutations. 
A = random.sample(range(1,1500), SIGNATURE_SIZE)
B = random.sample(range(1,1500), SIGNATURE_SIZE)

def get_signature(p):
    
    doc,words = p
    
    # your code goes here
    
    return((doc, signature)) # return the signatures for the current document doc

###**Step 3: Generate Candidate Pairs**

Now you need to use bands technique to find candidate pairs. You need to implement `hash_bands()` function which takes the signature from the previous step as input, split the signature into chunks, apply python system hash function `hash()` to the band, and return ((`band_index`, `hash_value`), `doc_id` ) key-value pairs. For example ((3, 472648357823), 1111) indicates that in Band #3, Document 1111 has the hash value of 472648357823. Why do we want to use (`band_index`, `hash_value`) as the key and use `doc_id` as the value? Because later we want to use Spark reduce() function to conbine all the documents that have the same hash value within the same band into the same list. 

In [8]:
NUM_BANDS = 10  # your can change it to different value

def hash_bands(p):
    
    doc,sig = p   
    
    # your code goes here
    
    return bands  # here bands are key-value pairs: ((band_index, hash_value), [doc_id])

##Put It All Together

Now you are ready to generate candidate pairs.

* In `map()` function, the `get_signature` returns the document and a set of signatures for that document
*In `flatmap()` function, the `hash_bands` first splits the signatures into chunks. Then a system hash function is applied to the signature to get a hash value. `hash_bands` returns (key, value) pairs. The key is (`band_index`, `hash_value`); the value is the `doc_id`.

*   In `reduceByKey()` function, if two documents share the same key, they are combined together. Remember key is (`band index`, `hash_value`); if they have the same key, this indicates these two documents have the same signature within that band.
*   The `filter()` function only keeps the (key, value\*) pairs when value\* has more than one document.

**Note: the result you obtained might be different from the one listed below.** Why? Becaues we can't control those random numbers. If you believe your code is correct, but you can't get any candidate pairs, try to tune `NUM_BANDS` variable.


In [9]:
candidates = docrdd \
    .map(get_signature) \
    .flatMap(hash_bands) \
    .reduceByKey(lambda a, b: a + b) \
    .filter(lambda v: len(v[1]) > 1) \
    .collect()

candidates

Py4JJavaError: ignored

I got two candidat pairs. Now let's calcuate the Jaccard similary between candidate pairs. You may get more than two pairs. Pick any pair you would like to test.

Get the word list in the first document.

In [None]:
a1 = list(docrdd.filter(lambda x: x[0] == '478').first()[1]) #change doc_id

Get the word list in the second document.

In [None]:
a2 = list(docrdd.filter(lambda x: x[0] == '1493').first()[1]) #change doc_id

Calculate the Jaccard similarity by finding out the set intersection and the set union of the above two word lists.

In [None]:
intersect = set(a1) & set(a2)
union = set(a1 + a2)
sim = len(intersect)/len(union)
sim

This pair of documents has the similarity of 0.15539305301645337. Yours should be close to this value.