1. Finding Nearest Image
2. TC for LSH
3. TC for Linear Search
4. Error Graphs

# Finding Nearest Image using PYSPARK

In [1]:
#!/usr/bin/env python
# coding: utf-8

# In[1]:


# Authors: Jessica Su, Wanzi Zhou, Pratyaksh Sharma, Dylan Liu, Ansh Shukla
# Modified by Evan Stene for CSCI 5702/7702
import findspark
import re
import sys
from pyspark import SparkContext, SparkConf
from IPython.display import display
import numpy as np
import random
import time
import pdb
import unittest
from PIL import Image
from pyspark import SparkContext

from pyspark.sql import functions as F
from pyspark.sql import types as T
from functools import reduce
from pyspark.sql import SQLContext


findspark.init()

 # Call this only after findspark.init() 
from pyspark.context import SparkContext 
from pyspark.sql.session import SparkSession 
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

from pyspark.sql import functions as F
from pyspark.sql import types as T
from functools import reduce

def l1(u,v):
    return int(np.sum(np.absolute(np.array(u)-np.array(v))))

class LSH:
    def __init__(self, filename, k, L):
        """
        Initializes the LSH object
        A - dataframe to be searched
        k - number of thresholds in each function
        L - number of functions
        """
        # do not edit this function!
        #self.sc = SparkContext()
        self.sc = sc
        self.k = k
        self.L = L
        self.A = self.load_file(filename)
        self.functions = self.create_functions()
        self.hashed_A = self.hash_data()
        
    # TODO: Implement this
    def l1(self, u, v):
        """
        Finds the L1 distance between two vectors
        u and v are 1-dimensional Row objects
        l1_distance = |x1-x2|+|y1-y2|
        pandas udf in pyspark
        """
        return int(np.sum(np.absolute(np.array(u)-np.array(v))))
        # raise NotImplementedError

    # TODO: Implement this
    def load_data(self, filename):
        """
        Loads the data into a spark DataFrame, where each row corresponds to
        an image patch -- this step is sort of slow.
        Each row in the data is an image, and there are 400 columns.
        """
        try:
            self.A = spark.read.csv(filename)
            print("Succesfully loaded.")
        except:
            print("Error in opening file.")
            
        return
        raise NotImplementedError

    # TODO: Implement this
    def create_function(self, dimensions, thresholds):
        """
        Creates a hash function from a list of dimensions and thresholds.
        """
        def f(v):
            # inside_vector = [v[idx] for idx in dimensions]
            returnable = []
            for jdx in range(len(dimensions)):
                returnable.append(1 if v[dimensions[jdx]] >= thresholds[jdx] else 0)
            return returnable
            # [c1,c2,c3....,c400]
            # [c1,c2,c3,c250]
            
        # this function is returning a k-bit vector of 0s and 1s
        udf_f = F.udf(f,T.ArrayType(T.IntegerType()))
        return udf_f

    def create_functions(self, num_dimensions=400, min_threshold=0, max_threshold=255):
        """
        Creates the LSH functions (functions that compute L K-bit hash keys).
        Each function selects k dimensions (i.e. column indices of the image matrix)
        at random, and then chooses a random threshold for each dimension, between 0 and
        255.  For any image, if its value on a given dimension is greater than or equal to
        the randomly chosen threshold, we set that bit to 1.  Each hash function returns
        a length-k bit string of the form "0101010001101001...", and the L hash functions 
        will produce L such bit strings for each image.
        """
        functions = []
        for i in range(self.L):
            dimensions = np.random.randint(low = 0, 
                                    high = num_dimensions,
                                    size = self.k)
            dimensions = [int(x) for x in list(dimensions)]
            thresholds = np.random.randint(low = min_threshold, 
                                    high = max_threshold + 1, 
                                    size = self.k)
            thresholds = [int(x) for x in list(dimensions)]
            functions.append(self.create_function(dimensions, thresholds))
        return functions

    # TODO: Implement this
    def hash_vector(self,v):
        """
        Hashes an individual vector (i.e. image).  This produces an array with L
        entries, where each entry is a string of k bits.
        """
        # you will need to use self.functions for this method
        # the expected input is an array of floats, this is the vector v
        vector_v = spark.createDataFrame([v]).withColumn("vector",F.array(*[F.col(name).cast("float") for name in df_columns]))
        
        for idx in range(len(self.functions)):
            vector_v = vector_v.withColumn("_hashed_vector_"+str(idx),self.functions[idx](F.col("vector")))
        return vector_v.collect()[0]
        
        # raise NotImplementedError

    # TODO: Implement this
    def hash_data(self):
        """
        Hashes the data in A, where each row is a datapoint, using the L
        functions in 'self.functions'
        """
        """ apply a function on each row of self.A"""
        """ for each row in my dataframe self.A, return hash_vector(r)"""
        
        # first we transform the dataframe into a processable shape
        df_columns = list(self.A.columns)
        tmp_df = self.A.withColumn("vector",F.array(*[F.col(name).cast("float") for name in df_columns]))
        
        # defining and applying the udf hash_vector function to our data
        hashed_tmp_df = tmp_df
        for idx in range(len(self.functions)):
             hashed_tmp_df = hashed_tmp_df.withColumn("_hashed_vector_"+str(idx),self.functions[idx](F.col("vector")))
        self.hashed_A = hashed_tmp_df
        return hashed_tmp_df
        # you will need to use self.A for this method
        raise NotImplementedError

    # TODO: Implement this
    def get_candidates(self,hashed_point, query_index):
        """
        Retrieve all of the points that hash to one of the same buckets 
        as the query point.  Do not do any random sampling (unlike what the first
        part of this problem prescribes).
        Don't retrieve a point if it is the same point as the query point.
        """
        
        """
        I want to retrieve all the points that were hashed into the same bucket as the point located in the same hashed_point bucket
        and then select the 3L nearest neighbours. We will execute a linear search over this bucket, comparing the real values
        using l1 distance. The query_index point 
        """
        idx = query_index
        query_point = hashed_point
        bucket = query_point["_hashed_vector_"+str(idx)]
        query_points_bucket = self.hashed_A.filter("CAST(_hashed_vector_"+str(idx)+" as string) = '"+str(bucket)+"'")
        #query_points_bucket.collect()
        # we want to compare our query_point[vector] to each of the query_points_bucket
        # we will give them n l1 score
        query_points_bucket = query_points_bucket.withColumn("query_vector_point",F.array(*[F.lit(x) for x in query_point["vector"]]))    # based on that score, we will retrieving the top 3L
        # query_points_bucket.collect()
        udf_l1 = F.udf(l1,T.IntegerType())
        scored_query_points = query_points_bucket.filter(F.col("vector")!=F.col("query_vector_point")).withColumn("score",udf_l1("query_vector_point","vector"))
        candidates = scored_query_points.sort("score",ascending=False)[["vector","score"]].collect()[:3*self.L]
        return candidates
        
        # you will need to use self.hashed_A for this method
        # raise NotImplementedError

    # TODO: Implement this
    def lsh_search(self,query_index, num_neighbors = 10):
        """
        Run the entire LSH algorithm. Query_Index is just a way to select a point that will be query itself.
        """
        query_point = self.hashed_A.limit(query_index).collect()[-1]
        """
        query_point will be composed of the vector and its resulting hashes with each function in self.functions
        """
        result_vector = []
        for idx in range(self.L):
            result_vector.append(self.get_candidates(query_point,idx))
            sqlContext.clearCache()

        flattened_results = reduce(lambda x,y:x+y,result_vector)
        sorted_results = sorted(flattened_results,key=lambda x:x["score"],reverse=False)

        # we will return the top 10 values.     
        # query_index is a vector of values
        # num_neighbors = self.L
        return sorted_results[:num_neighbors]
        # raise NotImplementedError

# Plots images at the specified rows and saves them each to files.
def plot(A, row_nums, base_filename):
    for row_num in row_nums:
        patch = np.reshape(A[row_num, :], [20, 20])
        im = Image.fromarray(patch)
        if im.mode != 'RGB':
            im = im.convert('RGB')
        im.save(base_filename + "-" + str(row_num) + ".png")

# Finds the nearest neighbors to a given vector, using linear search.
# TODO: Implement this
def linear_search(A, query_index, num_neighbors):
    query_point = A.limit(query_index).withColumn("vector",F.array(*[F.col(name).cast("float") for name in df_columns])).collect()[-1]
    # sqlContext.clearCache()
    tmp_df = A.withColumn("query_point",F.array(*[F.lit(x) for x in query_point["vector"]]))
    udf_l1 = F.udf(l1,T.IntegerType())
    top_n = tmp_df.withColumn("score",udf_l1("vector","query_point")).sort("score",ascending=False).limit(num_neighbors).collect()
    return top_n
    # raise NotImplementedError

# Write a function that computes the error measure
# TODO: Implement this
def lsh_error():
    raise NotImplementedError

#### TESTS #####

class TestLSH(unittest.TestCase):
    def test_l1(self):
        u = np.array([1, 2, 3, 4])
        v = np.array([2, 3, 2, 3])
        self.assertEqual(l1(u, v), 4)

    def test_hash_data(self):
        f1 = lambda v: sum(v)
        f2 = lambda v: sum([x * x for x in v])
        A = np.array([[1, 2, 3], [4, 5, 6]])
        self.assertEqual(f1(A[0,:]), 6)
        self.assertEqual(f2(A[0,:]), 14)

        functions = [f1, f2]
        self.assertTrue(np.array_equal(hash_vector(functions, A[0, :]), np.array([6, 14])))
        self.assertTrue(np.array_equal(hash_data(functions, A), np.array([[6, 14], [15, 77]])))

    ### TODO: Write your tests here (they won't be graded, 
    ### but you may find them helpful)


if __name__ == '__main__':
#    unittest.main() ### TODO: Uncomment this to run tests
    # create an LSH object using lsh = LSH(k=16, L=10)
    lsh_model = LSH(A=spark.read.csv("patches.csv"),k=16,L=10)
    t_results=lsh_model.lsh_search(1)
   
    """
    Your code here
    """







TypeError: __init__() got an unexpected keyword argument 'A'

# Result Diagnosis

In [7]:
lsh_model = LSH(A=spark.read.csv("patches.csv"), k=24, L=10)

In [9]:
t_results=lsh_model.lsh_search(1)

In [12]:
t_results

[Row(vector=[134.0, 144.0, 129.0, 144.0, 138.0, 164.0, 159.0, 59.0, 7.0, 9.0, 21.0, 19.0, 11.0, 9.0, 2.0, 2.0, 15.0, 19.0, 7.0, 2.0, 152.0, 149.0, 134.0, 142.0, 139.0, 166.0, 149.0, 47.0, 7.0, 10.0, 21.0, 20.0, 11.0, 8.0, 2.0, 4.0, 19.0, 18.0, 6.0, 2.0, 153.0, 154.0, 140.0, 140.0, 144.0, 170.0, 133.0, 31.0, 6.0, 11.0, 21.0, 20.0, 11.0, 5.0, 2.0, 9.0, 24.0, 16.0, 5.0, 2.0, 144.0, 151.0, 142.0, 140.0, 152.0, 171.0, 112.0, 15.0, 7.0, 14.0, 20.0, 19.0, 11.0, 3.0, 2.0, 13.0, 27.0, 13.0, 4.0, 2.0, 146.0, 143.0, 139.0, 142.0, 161.0, 163.0, 87.0, 5.0, 9.0, 17.0, 18.0, 17.0, 10.0, 2.0, 4.0, 17.0, 27.0, 10.0, 3.0, 2.0, 147.0, 138.0, 137.0, 147.0, 167.0, 144.0, 58.0, 4.0, 13.0, 21.0, 16.0, 15.0, 10.0, 2.0, 6.0, 19.0, 23.0, 8.0, 1.0, 2.0, 145.0, 139.0, 138.0, 153.0, 168.0, 118.0, 32.0, 3.0, 17.0, 25.0, 14.0, 12.0, 9.0, 2.0, 9.0, 19.0, 18.0, 5.0, 0.0, 2.0, 148.0, 142.0, 139.0, 157.0, 167.0, 100.0, 15.0, 4.0, 19.0, 27.0, 13.0, 10.0, 9.0, 3.0, 10.0, 19.0, 14.0, 4.0, 0.0, 2.0, 140.0, 140.0, 140.0, 157

In [14]:
spark_df = spark.read.csv("patches.csv")

In [16]:
rows_to_test = [100,200,300,400,500,600,700,800,900,1000]
for row in rows_to_test:
    print("Starting linear search on row",row)
    start_time = time.time()
    ls_results = linear_search(spark_df,row,3)
    end_time = time.time()
    total_time = end_time - start_time
    sqlContext.clearCache()
    ls_distance_output = sum([x["score"] for x in ls_results])
    result_dict = {
        "row":row,
        "time":total_time,
        "NN":ls_results,
        "distance":ls_distance_output
    }
    with open("./assignment01_row_"+str(row)+"_linearsearch.pickle","wb") as f:
        pickle.dump(result_dict,f)
    print("Finished running linear search on row",row)
    print("===================================================")

Starting linear search on row 100


NameError: name 'df_columns' is not defined

In [17]:
lsh_model = LSH(A=spark.read.csv("patches.csv"),k=24,L=10)

# Time Complexity for LSH

# TC for Linear search