# Lab 2
In this lab, we will use PySpark to implement p-stable LSH on Euclidean Space.

### Import modules

In [1]:
import random
import math
from pyspark import SparkConf, SparkContext

### Create a Spark Context

In [2]:
conf = SparkConf().setMaster("local[*]").setAppName("lab2")
sc = SparkContext(conf = conf)

### Data Preparation
We use random.randint to generate raw data and query

In [3]:
n, d = 500, 32 # n is the number of data points, d is the number of dimension of data
data = [(i, [random.randint(0, 5) for _ in range(d)]) for i in range(n)]
query = [random.randint(0, 5) for _ in range(d)]

### Hash Function Generation
We randomly generate normal vectors `a` and values `b` with uniform distribution.

In [4]:
L, K = 5, 20
a = [[[random.normalvariate(0, 1) for _ in range(d)] for _ in range(K)] for _ in range(L)]
b = [[random.uniform(0, 1) for _ in range(K)] for _ in range(L)]
w = 100 # w is user specified paramter.

### Generate data rdd

In [5]:
data_rdd = sc.parallelize(data)
# We need to use data_rdd frequently, it's better to use cache() to store rdd in memory.
data_rdd.cache()

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

### Generate L Hash Tables

In [6]:
# This function is used to generate hash value according to hash function.
project = lambda x, y, z:str(math.floor((sum([(x[i] * y[i]) for i in range(d)]) + z)/w))

In [7]:
# data_hash (turn into string type) as key and data_id as value
hash_table = lambda x, l:(" ".join([project(x[1], a[l][k], b[l][k]) for k in range(K)]), x[0])

In [8]:
# This funciton is used to calculate L2 distance, we ignore the sqrt part for efficiency
cal_distance = lambda x:(sum((i - j) ** 2 for i, j in zip(data[x][1], query)), x)

#### transform query into query hash

In [9]:
query_hash = [" ".join([project(query, a[l][k], b[l][k]) for k in range(K)]) for l in range(L)]

### Main Function

In [10]:
# Create an empty rdd to store candidate id
union_rdd = sc.emptyRDD()

# We compute hash table for all the data and get candidate set
for l in range(L):
    # Get data hash
    hash_rdd = data_rdd.map(lambda x:hash_table(x, l))
    # Combine by key and form (key, {id_1, id_2, ...}) to generate hash_table
    hash_table_rdd = hash_rdd.combineByKey(lambda x:{x}, lambda x, y: x | {y}, lambda x, y:x | y)
    # Generate candidate set if H_l(q) == H_l(o)
    cand_rdd = hash_table_rdd.filter(lambda x:x[0] == query_hash[l]).values().flatMap(lambda x:x)
    # Union candidates
    union_rdd = union_rdd.union(cand_rdd).distinct()

# Count the number of candidates
count = union_rdd.count()

# Handle no candidate situation
if count == 0:
    print("None of data_hash collide with query_hash.")
else:
    print("nb_of_candidates:", count)
    # For each candidate, we calculate the real distance between query_hash and sort it
    sort_rdd = union_rdd.map(cal_distance).sortByKey()
    # We use first() to get most nearest one to the query
    closest_point = sort_rdd.first()[1]
    print("Most nearest point id:", closest_point)

nb_of_candidates: 29
Most nearest point id: 124


In [11]:
# close spark context
sc.stop()

### Question

Currently, we use `For Loop` to retrieve L hash_table. However, in the above implementation, retrieval in hash_table is not O(1). Instead, a transformation is needed.

Could you find a more efficient way to reduce the number of transformations and avoid using `For Loop`.

**Hint**: try to generate L hash table in one transformation function.