In [1]:
from pyspark import SparkContext
from pyspark import SparkConf
import io
from tifffile import TiffFile 
import numpy as np
import zipfile
from PIL import Image 
import hashlib
from scipy import linalg 

In [2]:
conf = SparkConf().setAppName("SatelliteProject").setMaster("local").set("spark.executor.memory", "3g")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

### Part - 1

In [3]:
rdd = sc.binaryFiles('/Users/jatingarg/Desktop/Satellite_BigData/a2_small_sample/')
fileNameList = rdd.map(lambda key: key[0].split("/")).flatMap(lambda key: key).\
                filter(lambda key: str(key).__contains__(".zip")).collect()
zipName_broadcast = sc.broadcast(set(fileNameList))
for fileName in zipName_broadcast.value:
    print(fileName)

3677453_2025190.zip
3677454_2025195.zip
3677500_2035190.zip
3677501_2035195.zip
3677502_2035200.zip


In [4]:
if "3677502_2035200.zip" in zipName_broadcast.value:
    print(True)

True


In [5]:
ques1_lis = ['3677454_2025195.zip-0', '3677454_2025195.zip-1', '3677454_2025195.zip-18', '3677454_2025195.zip-19']

In [300]:
rdd_1 = sc.binaryFiles('/Users/jatingarg/Desktop/Satellite_BigData/a2_small_sample/')
imageRDD = rdd_1.map(lambda key: (key[0],getOrthoTif(key[1]))).map(lambda key: divideImages(key)).\
            flatMap(lambda key: key)
filesList = imageRDD.filter(lambda key: key[0] in ques1_lis).collect()

In [10]:
len(filesList)

4

In [11]:
print(type(filesList[0][1]))

<class 'numpy.ndarray'>


In [7]:
def divideImages(key):
    fileName = key[0].split("/")
    name = str()
    for file in fileName:
        if file in zipName_broadcast.value:
            name = file
            break
    imageArray = key[1]
    tupList = list()
    tupList.clear()
    for i in range(0,len(imageArray),len(imageArray)//5):
        for j in range(0,len(imageArray[0]),len(imageArray[0])//5):
            tempArr = [[0 for i in range(500)] for j in range(500)]
            for p in range(i,i+500,1):
                for q in range(j,j+500,1):
                    tempArr[p%500][q%500] = imageArray[p][q]
            tupList.append(np.array(tempArr))
    nameImageList = list()
    for i in range(len(tupList)):
        nameImageList.append((name+"-"+str(i),tupList[i]))
    return nameImageList

In [8]:
def getOrthoTif(zfBytes):
#given a zipfile as bytes (i.e. from reading from a binary file),
# return a np array of rgbx values for each pixel
    bytesio = io.BytesIO(zfBytes)
    zfiles = zipfile.ZipFile(bytesio, "r")
    #find tif:
    for fn in zfiles.namelist():
        if fn[-4:] == '.tif':#found it, turn into array:
            tif = TiffFile(io.BytesIO(zfiles.open(fn).read()))
    return tif.asarray()

### Result - Part -1

In [12]:
lis = ['3677454_2025195.zip-0', '3677454_2025195.zip-1', '3677454_2025195.zip-18', '3677454_2025195.zip-19']
for tup in filesList:
    if tup[0] in lis:
        print(tup[0],tup[1][0][0])

3677454_2025195.zip-0 [114 111 109 114]
3677454_2025195.zip-1 [ 54  53  57 117]
3677454_2025195.zip-18 [ 79  70  66 123]
3677454_2025195.zip-19 [61 57 63 84]


In [13]:
filesList[0][1][0][0]

array([114, 111, 109, 114], dtype=uint8)

### Part - 2

In [303]:
imageRdd_1 = imageRDD.map(lambda key: (key[0],calculateIntensity(key[1])))

In [304]:
print(lis[0][1][0][0])

6


In [305]:
def calculateIntensity(matrix):
    temp = [[0 for i in range(len(matrix[0]))] for j in range(len(matrix))]
    for i in range(len(matrix)):
        for j in range(len(matrix[0])):
            pixel = matrix[i][j]
            intensity = getIntensityFromRGBI(pixel)
            temp[i][j] = intensity
    return temp

In [306]:
def getIntensityFromRGBI(pixel):
    r = pixel[0]
    g = pixel[1]
    b = pixel[2]
    I = pixel[3]
    rgb_mean = (r+g+b)/3
    intensity = int(rgb_mean * (I/100))
    return intensity

In [307]:
imageRdd_2 = imageRdd_1.map(lambda key: (key[0],reductionResolution(key[1],10)))

In [308]:
def reductionResolution(matrix,factor):
    matrix = np.array(matrix)
    temp = [[0 for i in range(len(matrix[0])//factor)] for j in range(len(matrix)//factor)]
    for i in range(0,len(matrix),factor):
        for j in range(0,len(matrix[0]),factor):
            temp[i//factor][j//factor] = getMeanOverFactor(matrix[i:i+factor , j:j+factor])
    return temp

In [309]:
def getMeanOverFactor(matrix):
    return np.mean(matrix)

In [310]:
imageRdd_3 = imageRdd_2.map(lambda key: (key[0],row_diff(key[1]),col_diff(key[1]))).\
                map(lambda key: (key[0],getFeature(key[1],key[2])))

In [311]:
arr = [[1,2],[4,5]]
arr = np.array(arr)
np.diff(arr,axis=1)

array([[1],
       [1]])

In [312]:
def row_diff(matrix):
    matrix = np.array(matrix)
    matrix = np.diff(matrix,axis=1)
    matrix = np.where(np.logical_or(matrix < -1, matrix > 1), matrix, 0)
    matrix = np.clip(matrix, -1, 1)
#     temp = [[0 for i in range(len(matrix[0])-1)] for j in range(len(matrix))]
#     for i in range(len(matrix)):
#         for j in range(len(matrix[0])-1):
#             temp[i][j] = matrix[i][j+1] - matrix[i][j]
#             temp[i][j] = float(temp[i][j])
#             if temp[i][j] < float(-1):
#                 temp[i][j] = -1
#             elif temp[i][j] > float(1):
#                 temp[i][j] = 1
#             else:
#                 temp[i][j] = 0
    return matrix

In [313]:
def col_diff(matrix):
    matrix = np.array(matrix)
    matrix = np.diff(matrix,axis=0)
    matrix = np.where(np.logical_or(matrix < -1, matrix > 1), matrix, 0)
    matrix = np.clip(matrix, -1, 1)
#     matrix[matrix < -1] = -1
#     matrix[matrix > 1] = 1
#     matrix[matrix >= -1 & matrix <= 1] = 0
#     temp = [[0 for i in range(len(matrix[0]))] for j in range(len(matrix)-1)]
#     for i in range(len(matrix)-1):
#         for j in range(len(matrix[0])):
#             temp[i][j] = matrix[i+1][j] - matrix[i][j]
#             temp[i][j] = float(temp[i][j])
#             if temp[i][j] < float(-1):
#                 temp[i][j] = -1
#             elif temp[i][j] > float(1):
#                 temp[i][j] = 1
#             else:
#                 temp[i][j] = 0
    return matrix

In [314]:
a = 6
type(np.int64(a))

numpy.int64

In [315]:
def getFeature(row_diff,col_diff):
#     feature = list()
    row_diff = np.array(row_diff)
    col_diff = np.array(col_diff)
    return np.append(row_diff.flatten(),col_diff.flatten())
#     for i in range(len(row_diff)):
#         for j in range(len(row_diff[0])):
#             feature.append(row_diff[i][j])
#     for i in range(len(col_diff)):
#         for j in range(len(col_diff[0])):
#             feature.append(col_diff[i][j])
#     return feature

In [316]:
ques_2lis = ["3677454_2025195.zip-1", "3677454_2025195.zip-18"]

In [317]:
ques_2lis_broadcast = sc.broadcast(set(ques_2lis))

In [318]:
featuresList = imageRdd_3.filter(lambda key: key[0] in ques_2lis_broadcast.value).collect()

### Result - Part - 2

In [319]:
lis = ["3677454_2025195.zip-1", "3677454_2025195.zip-18"]
for tup in featuresList:
    if tup[0] in lis:
        print(tup[0],np.array(tup[1]))

3677454_2025195.zip-1 [ 0.  1. -1. ...,  1. -1.  1.]
3677454_2025195.zip-18 [-1. -1.  0. ...,  1. -1. -1.]


In [None]:
3677454_2025195.zip-1 [ 0  1 -1 ...,  1 -1  1]
3677454_2025195.zip-18 [-1 -1  0 ...,  1 -1 -1]

In [37]:
lengthBroadCast = sc.broadcast(len(featuresList[0][1]))

### Part - 3

In [75]:
lis = imageRdd_3.map(lambda key: (key[0],key[1],getMD5HashForFeatures(key[1],128))).collect()

In [38]:
def getMD5HashForFeatures(feature,factor):
    feature = np.array(feature)
    hashcode = str()
    increment = len(feature)//factor
    for arr in np.array_split(np.array(feature), 128):
        hexdigest = hashlib.md5(arr).hexdigest()
        binstr = bin(int(hexdigest,16))
        bit = hexdigest[len(hexdigest)//2]
        hashcode += bit
    return hashcode

In [77]:
lis[0][2]

'f44b4267ccf0108e345819ca7856720212136826d220966c63b023a17b3f9fd303cae68e9b5a42d534b74a718bf132a61f737b077a7c7810218b63f10aab6de4'

In [78]:
lis1 = ["3677454_2025195.zip-1", "3677454_2025195.zip-18"]
for tup in lis:
    if tup[0] in lis1:
        print(tup[0],len(tup[2]))

3677454_2025195.zip-1 128
3677454_2025195.zip-18 128


In [178]:
lshRdd = imageRdd_3.map(lambda key: (key[0],key[1],getMD5HashForFeatures(key[1],128)))

In [179]:
lshRdd_1 = lshRdd.map(lambda key: (key[0],key[1],getBucketsMapped(key[2],8,491)))

In [180]:
def getBucketsMapped(signatureVector,bands,prime):
#     bands = 16
    rowsPBand = len(signatureVector)//bands
#     prime = 157
    bucketList = list()
    count = 0
    for i in range(0,len(signatureVector),rowsPBand):
        bucketNo = hash(signatureVector[i : i + rowsPBand]) % prime
        bucketList.append((count,bucketNo))
        count += 1
        if count == bands:
            break
    return bucketList

In [181]:
questionList = ['3677454_2025195.zip-0', '3677454_2025195.zip-1', '3677454_2025195.zip-18', '3677454_2025195.zip-19']

In [190]:
templshRdd_1 = lshRdd_1.map(lambda key: (key[0],key[2])).flatMapValues(lambda key: key).\
                map(lambda key: (key[1],key[0]))

In [None]:
((0, 330), '3677454_2025195.zip-0'),
 ((1, 120), '3677454_2025195.zip-0'),
 ((2, 217), '3677454_2025195.zip-0'),
 ((3, 14), '3677454_2025195.zip-0'),
 ((4, 227), '3677454_2025195.zip-0'),
 ((5, 458), '3677454_2025195.zip-0'),
 ((6, 204), '3677454_2025195.zip-0'),
 ((7, 95), '3677454_2025195.zip-0'),

In [183]:
def appendMatcheddNames(commonlist):
    temp = list()
    for name in questionList:
        if name in commonlist:
            for each in commonlist:
                if name != each:
                    temp.append((name,each))

    return temp  

In [195]:
templshRdd_2 = templshRdd_1.groupByKey().map(lambda key: (key[1])).map(lambda x:list(x)).collect()

In [196]:
templshRdd_2

[['3677453_2025190.zip-0'],
 ['3677453_2025190.zip-0'],
 ['3677453_2025190.zip-0'],
 ['3677453_2025190.zip-0', '3677502_2035200.zip-8'],
 ['3677453_2025190.zip-0'],
 ['3677453_2025190.zip-0'],
 ['3677453_2025190.zip-0', '3677500_2035190.zip-17'],
 ['3677453_2025190.zip-0'],
 ['3677453_2025190.zip-1'],
 ['3677453_2025190.zip-1'],
 ['3677453_2025190.zip-1'],
 ['3677453_2025190.zip-1'],
 ['3677453_2025190.zip-1'],
 ['3677453_2025190.zip-1'],
 ['3677453_2025190.zip-1'],
 ['3677453_2025190.zip-1'],
 ['3677453_2025190.zip-2'],
 ['3677453_2025190.zip-2'],
 ['3677453_2025190.zip-2', '3677502_2035200.zip-12'],
 ['3677453_2025190.zip-2'],
 ['3677453_2025190.zip-2'],
 ['3677453_2025190.zip-2'],
 ['3677453_2025190.zip-2'],
 ['3677453_2025190.zip-2'],
 ['3677453_2025190.zip-3'],
 ['3677453_2025190.zip-3'],
 ['3677453_2025190.zip-3'],
 ['3677453_2025190.zip-3', '3677453_2025190.zip-20'],
 ['3677453_2025190.zip-3'],
 ['3677453_2025190.zip-3', '3677453_2025190.zip-21'],
 ['3677453_2025190.zip-3'],
 ['

In [185]:
templshRdd_3 = templshRdd_2.map(lambda key: appendMatcheddNames(key)).flatMap(lambda key: key)

In [186]:
candidatesListCheck = templshRdd_3.groupByKey().map(lambda x: (x[0],set(x[1]))).\
        map(lambda x:(x[0],list(x[1]))).collect()

In [187]:
candidatesListCheck

[('3677454_2025195.zip-18', ['3677453_2025190.zip-13']),
 ('3677454_2025195.zip-0',
  ['3677501_2035195.zip-2',
   '3677502_2035200.zip-21',
   '3677501_2035195.zip-9']),
 ('3677454_2025195.zip-19',
  ['3677501_2035195.zip-2', '3677500_2035190.zip-19'])]

In [177]:
rdd_3b

[('3677454_2025195.zip-1',
  {'3677453_2025190.zip-0',
   '3677453_2025190.zip-12',
   '3677453_2025190.zip-16',
   '3677454_2025195.zip-10',
   '3677454_2025195.zip-20',
   '3677500_2035190.zip-14',
   '3677500_2035190.zip-19',
   '3677500_2035190.zip-23',
   '3677500_2035190.zip-24',
   '3677500_2035190.zip-4',
   '3677500_2035190.zip-5',
   '3677501_2035195.zip-4',
   '3677502_2035200.zip-12',
   '3677502_2035200.zip-20'}),
 ('3677454_2025195.zip-18',
  {'3677453_2025190.zip-4',
   '3677453_2025190.zip-8',
   '3677500_2035190.zip-12',
   '3677500_2035190.zip-20',
   '3677500_2035190.zip-3',
   '3677500_2035190.zip-6',
   '3677501_2035195.zip-17',
   '3677501_2035195.zip-24',
   '3677501_2035195.zip-3',
   '3677502_2035200.zip-10',
   '3677502_2035200.zip-13',
   '3677502_2035200.zip-14',
   '3677502_2035200.zip-17',
   '3677502_2035200.zip-21',
   '3677502_2035200.zip-22',
   '3677502_2035200.zip-4',
   '3677502_2035200.zip-5'}),
 ('3677454_2025195.zip-0',
  {'3677453_2025190.zip-11

In [197]:
rdd_3b = lshRdd.map(lambda x : ( x[0], getBucketsMapped(x[2],8,491) )).flatMapValues(lambda x : x).\
            map(lambda x : (x[1],x[0])).groupByKey().filter(lambda x : isIn3b(x[1])).\
                filter(lambda x : len(x[1])>1).map(lambda x : x[1]).map(lambda x : makeTuple(x)).\
                    flatMap(lambda x : x).groupByKey().map(lambda x : (x[0], set(x[1]))).collect()

In [156]:
def isIn3b(lis):
    list_3b = ["3677454_2025195.zip-0","3677454_2025195.zip-1", '3677454_2025195.zip-18', "3677454_2025195.zip-19"]
    for ele in lis:
        if ele in list_3b:
            return True
    return False

In [158]:
def makeTuple(lis):
    list_3b = [
        "3677454_2025195.zip-0", "3677454_2025195.zip-1",
        '3677454_2025195.zip-18', "3677454_2025195.zip-19"
    ]
    res = list()
    for candidate in list_3b:
        if candidate in lis:
            for item in lis:
                if item != candidate:
                    res.append((candidate, item))
    return res

In [172]:
candidatesListCheck[0][0]

'3677454_2025195.zip-18'

In [198]:
for individualSet in rdd_3b:
    print(individualSet[0],len(individualSet[1]))

3677454_2025195.zip-18 1
3677454_2025195.zip-0 3
3677454_2025195.zip-19 2


In [110]:
questionList3B = ['3677454_2025195.zip-1', '3677454_2025195.zip-18']
for i in range(len(candidatesListCheck)):
    if candidatesListCheck[i][0] in questionList3B:
        print(candidatesListCheck[i][0],"==== Candidate List =====",candidatesListCheck[i][1])

3677454_2025195.zip-1 ==== Candidate List ===== ['3677502_2035200.zip-12', '3677453_2025190.zip-0', '3677500_2035190.zip-19', '3677453_2025190.zip-12', '3677502_2035200.zip-20', '3677500_2035190.zip-4', '3677454_2025195.zip-20', '3677454_2025195.zip-10', '3677500_2035190.zip-5', '3677501_2035195.zip-4', '3677500_2035190.zip-24', '3677500_2035190.zip-23', '3677500_2035190.zip-14', '3677453_2025190.zip-16']
3677454_2025195.zip-18 ==== Candidate List ===== ['3677500_2035190.zip-20', '3677502_2035200.zip-10', '3677502_2035200.zip-21', '3677502_2035200.zip-17', '3677500_2035190.zip-12', '3677502_2035200.zip-5', '3677453_2025190.zip-8', '3677500_2035190.zip-3', '3677501_2035195.zip-3', '3677502_2035200.zip-22', '3677502_2035200.zip-14', '3677502_2035200.zip-13', '3677453_2025190.zip-4', '3677500_2035190.zip-6', '3677501_2035195.zip-24', '3677501_2035195.zip-17', '3677502_2035200.zip-4']


In [90]:
vectorsToBeTakenList = list()
for i in range(len(candidatesListCheck)):
    if candidatesListCheck[i][0] in questionList3B:
        vectorsToBeTakenList.extend(candidatesListCheck[i][1])
vectorsToBeTakenList.extend(questionList3B)
vectorsToBeTakenList = set(vectorsToBeTakenList)
vectorsToBeTakenList = list(vectorsToBeTakenList)

### SVD

In [55]:
def makePair(nameSVD):
    name, vector = nameSVD
    temp = []
    i = 0
    for ele in name:
        temp.append((ele,vector[i]))
        i += 1
    return temp

In [56]:
def getNameVector(namevector1,namevector2,featureLength):
    name1 , vector1 = namevector1
    name2 , vector2 = namevector2
    a = np.array(vector1)
    b = np.array(vector2)
    return np.append(name1,name2),np.reshape(np.append(a,b),(-1,featureLength))

In [57]:
def getTag(st):
    a = st.split("-")[0]
    return a

In [58]:
def SVD(image,dimensions):
    mean = np.mean(image, axis=0)
    stDev = np.std(image, axis=0)
#     np.nan_to_num(stDev)
    stDev[stDev == 0] = 1
    
    img_zscore = (image - mean) / stDev

    U, s, Vh = linalg.svd(img_zscore, full_matrices=1)

#     img_zscore_lowdim = U[:,0:dimensions]
    
    return Vh[:,0:dimensions]

In [59]:
def SVDFromCommonV(image,vBroadCast):
    mean = np.mean(image, axis=0)
    stDev = np.std(image, axis=0)
#     np.nan_to_num(stDev)
    stDev[stDev == 0] = 1
    
    img_zscore = (image - mean) / stDev
    img_zscore = np.array(img_zscore)
    img_zscore_lowdim = np.matmul(img_zscore , vBroadCast.value)
    
    return img_zscore_lowdim

In [96]:
# rdd = sc.parallelize([("abc-1",[1,2,3,4]),("def-2",[5,6,7,8]),("def-2",[15,16,17,18]),("abc-1",[0,9,8,7]),("def-2",[25,26,27,28]),("abc-1",[1,2,3,4])])
# rdd2 = rdd.map(lambda x:(getTag(x[0]),(x[0],x[1]))).reduceByKey(lambda a,b: getNameVector(a,b,4))
# rdd3 = rdd2.map(lambda x:(x[1][0],SVD(x[1][1],2)))
# abb = rdd3.\
# map(lambda x : makePair(x)).\
# flatMap(lambda x:x).collect()

In [98]:
abb[0]

('abc-1', array([-0.5,  0.5]))

In [63]:
lshRdd_3 = lshRdd_1.map(lambda key:(getTag(key[0]),(key[0],key[1]))).reduceByKey(lambda a,b: getNameVector(a,b,4900))
lshRdd_4 = lshRdd_3.map(lambda key:(key[1][0],SVD(key[1][1], 10)))
vCollect = lshRdd_4.take(1)

In [64]:
vCollect[0][1].shape

(4900, 10)

In [65]:
vBroadCast = sc.broadcast(vCollect[0][1])

In [66]:
# arr1 = [[1,2],[3,4]]
# arr1 = np.array(arr1)
# arr2 = [[1,0],[1,1]]
# arr2 = np.array(arr2)
# np.matmul(arr1 , arr2)

In [100]:
lshRdd_3 = lshRdd_1.map(lambda key:(getTag(key[0]),(key[0],key[1]))).reduceByKey(lambda a,b: getNameVector(a,b,4900))
lshRdd_4 = lshRdd_3.map(lambda key:(key[1][0],SVDFromCommonV(key[1][1],vBroadCast))).map(lambda key : makePair(key)).flatMap(lambda x:x)
svdList = lshRdd_4.filter(lambda x: x[0] in vectorsToBeTakenList).collect()

In [107]:
svdList[1]

('3677453_2025190.zip-4',
 array([ -3.56789561e+00,  -3.19502577e+00,   5.32611362e+00,
          2.61382641e+00,  -6.88707359e+01,  -9.54627949e-01,
         -2.44733200e-01,  -1.83744038e+00,   1.68935723e-01,
         -1.21716331e-02]))

In [108]:
def getVectors(name):
    for i in range(len(svdList)):
        if svdList[i][0] == name:
            return svdList[i][1]

In [112]:
ques_3C_List = ['3677454_2025195.zip-1', '3677454_2025195.zip-18']
for i in range(len(candidatesListCheck)):
    if candidatesListCheck[i][0] in ques_3C_List:
        parentName = candidatesListCheck[i][0]
        vectorParent = getVectors(parentName)
        print("Distance Between " + str(parentName) + " and its candidate pairs sorted from least to highest:")
        print()
        temp = list()
        for candidateName in candidatesListCheck[i][1]:
            candidateVector = getVectors(candidateName)
            temp.append(((candidateName) , \
                         float(np.linalg.norm(np.array(vectorParent)-np.array(candidateVector)))))
        lis = sorted(temp,key=lambda x:x[1])
        for x in lis:
            print(x[0],"       ",x[1])
        print("\n")

Distance Between 3677454_2025195.zip-1 and its candidate pairs sorted from least to highest:

3677500_2035190.zip-14         3.3227992230697865
3677502_2035200.zip-20         4.6586010114006715
3677501_2035195.zip-4         4.693895246574818
3677454_2025195.zip-20         4.697569940712177
3677502_2035200.zip-12         4.839827413819539
3677500_2035190.zip-23         4.952445345204258
3677454_2025195.zip-10         4.994445546102303
3677500_2035190.zip-5         5.086496507687073
3677500_2035190.zip-24         5.332599497712025
3677500_2035190.zip-4         5.880446082000745
3677500_2035190.zip-19         7.039475711803563
3677453_2025190.zip-12         12.200370968457863
3677453_2025190.zip-16         14.797913807537558
3677453_2025190.zip-0         63.711480587737924


Distance Between 3677454_2025195.zip-18 and its candidate pairs sorted from least to highest:

3677500_2035190.zip-6         3.852610590858557
3677502_2035200.zip-10         3.8532543756566633
3677502_2035200.zip-14  

In [113]:
imageRdd_1.persist()

PythonRDD[49] at RDD at PythonRDD.scala:48

In [311]:
def extraCreditDifferentFactor(factor):
    imageRdd_2 = imageRdd_1.map(lambda key: (key[0],reductionResolution(key[1],factor)))
    imageRdd_3 = imageRdd_2.map(lambda key: (key[0],row_diff(key[1]),col_diff(key[1]))).\
                    map(lambda key: (key[0],getFeature(key[1],key[2])))
        
    ques_2lis = ["3677454_2025195.zip-1", "3677454_2025195.zip-18"]
    ques_2lis_broadcast = sc.broadcast(set(ques_2lis))
    featuresList = imageRdd_3.filter(lambda key: key[0] in ques_2lis_broadcast.value).collect()
    lis = ["3677454_2025195.zip-1", "3677454_2025195.zip-18"]
    for tup in featuresList:
        if tup[0] in lis:
            print(tup[0],np.array(tup[1]))
    lengthBroadCast = sc.broadcast(len(featuresList[0][1]))
    print(lengthBroadCast.value)
    lshRdd = imageRdd_3.map(lambda key: (key[0],key[1],getMD5HashForFeatures(key[1],128)))
    lshRdd_1 = lshRdd.map(lambda key: (key[0],key[1],getBucketsMapped(key[2],64,101)))
    bucketsMappedList = lshRdd_1.collect()
    
    allCandidateSet = set()
    for i in range(len(bucketsMappedList)):
        name,feature,bucket = bucketsMappedList[i]
        candidateSet = set()
        if name in questionList:
            countdic = dict()
            for j in range(len(bucket)):
                for k in range(len(bucketsMappedList)):
                    if i != k:
                        candidatename,candidatefeature,candidatebuckets = bucketsMappedList[k]
                        if candidatebuckets[j] == bucket[j] and candidatename != name:
                            ctuple = (name,candidatename)
                            if countdic.__contains__(ctuple) == False:
                                countdic[ctuple] = 1
                            else:
                                candidateSet.add(candidatename)
            allCandidateSet.add((name,tuple(candidateSet)))
    allCandidateSet_broadcast = sc.broadcast(allCandidateSet)
#     print(allCandidateSet_broadcast.value)
    final = list()
    for individualSet in allCandidateSet_broadcast.value:
        temp = list()
        for val in individualSet[1]:
            temp.append(val)
        final.append(temp)
        print(individualSet[0],len(temp))
        
    lshRdd_2 = lshRdd_1.map(lambda key: (key[0],key[1],getCandidateList(key[0],allCandidateSet_broadcast)))
    candidatesListCheck = lshRdd_2.filter(lambda key: key[0] in questionList).collect()

    questionList3B = ['3677454_2025195.zip-1', '3677454_2025195.zip-18']
    for i in range(len(candidatesListCheck)):
        if candidatesListCheck[i][0] in questionList3B:
            print(candidatesListCheck[i][0],"==== Candidate List =====",candidatesListCheck[i][2])
            
    lshRdd_3 = lshRdd_2.map(lambda key:(getTag(key[0]),(key[0],key[1]))).reduceByKey(lambda a,b: getNameVector(a,b,lengthBroadCast.value))
    lshRdd_4 = lshRdd_3.map(lambda key:(key[1][0],SVD(key[1][1], 10)))
    vCollect = lshRdd_4.take(1)

    vBroadCast = sc.broadcast(vCollect[0][1])
    lshRdd_3 = lshRdd_2.map(lambda key:(getTag(key[0]),(key[0],key[1]))).reduceByKey(lambda a,b: getNameVector(a,b,lengthBroadCast.value))
    lshRdd_4 = lshRdd_3.map(lambda key:(key[1][0],SVDFromCommonV(key[1][1],vBroadCast))).map(lambda key : makePair(key)).flatMap(lambda x:x)
    svdList = lshRdd_4.collect()
    
    ques_3C_List = ['3677454_2025195.zip-1', '3677454_2025195.zip-18']
    for i in range(len(candidatesListCheck)):
        if candidatesListCheck[i][0] in ques_3C_List:
            parentName = candidatesListCheck[i][0]
            vectorParent = getVectors(parentName)
            print("Distance Between " + str(parentName) + " and its candidate pairs sorted from least to highest:")
            print()
            temp = list()
            for candidateName in candidatesListCheck[i][2]:
                candidateVector = getVectors(candidateName)
                temp.append(((candidateName) , \
                             float(np.linalg.norm(np.array(vectorParent)-np.array(candidateVector)))))
            lis = sorted(temp,key=lambda x:x[1])
            for x in lis:
                print(x[0],"       ",x[1])
            print("\n")

In [312]:
extraCreditDifferentFactor(10)

3677454_2025195.zip-1 [ 0  1 -1 ...,  1 -1  1]
3677454_2025195.zip-18 [-1 -1  0 ...,  1 -1 -1]
4900
3677454_2025195.zip-0 25
3677454_2025195.zip-19 21
3677454_2025195.zip-1 16
3677454_2025195.zip-18 16
3677454_2025195.zip-1 ==== Candidate List ===== ['3677500_2035190.zip-4', '3677500_2035190.zip-14', '3677454_2025195.zip-3', '3677501_2035195.zip-12', '3677501_2035195.zip-6', '3677501_2035195.zip-4', '3677501_2035195.zip-17', '3677453_2025190.zip-11', '3677502_2035200.zip-11', '3677501_2035195.zip-14', '3677501_2035195.zip-23', '3677501_2035195.zip-11', '3677500_2035190.zip-24', '3677500_2035190.zip-11', '3677500_2035190.zip-23', '3677453_2025190.zip-5']
3677454_2025195.zip-18 ==== Candidate List ===== ['3677453_2025190.zip-8', '3677502_2035200.zip-16', '3677500_2035190.zip-20', '3677453_2025190.zip-15', '3677502_2035200.zip-8', '3677501_2035195.zip-16', '3677453_2025190.zip-1', '3677502_2035200.zip-22', '3677454_2025195.zip-4', '3677502_2035200.zip-9', '3677502_2035200.zip-23', '367750

In [207]:
hash("eewknk")

-832409904158055332

In [358]:
temp = [[j for j in range(500)] for i in range(500)]

In [359]:
temp = np.array(temp)
temp = np.array(np.split(temp,indices_or_sections=50,axis=1))
temp = np.array(np.split(temp,indices_or_sections=50,axis=1))
temp = np.mean(temp,axis=(2,3))

In [360]:
temp.shape

(50, 50)

In [361]:
temp

array([[   4.5,   14.5,   24.5, ...,  474.5,  484.5,  494.5],
       [   4.5,   14.5,   24.5, ...,  474.5,  484.5,  494.5],
       [   4.5,   14.5,   24.5, ...,  474.5,  484.5,  494.5],
       ..., 
       [   4.5,   14.5,   24.5, ...,  474.5,  484.5,  494.5],
       [   4.5,   14.5,   24.5, ...,  474.5,  484.5,  494.5],
       [   4.5,   14.5,   24.5, ...,  474.5,  484.5,  494.5]])

In [390]:
temp = [[[1,2,4,500],[3,4,5,700]],
        [[5,7,8,900],[1,3,5,700]]]

In [391]:
temp = np.array(temp)

In [392]:
temp = ((np.mean(temp[:,:,:3],axis=2))*(temp[:,:,3]/100))
temp = temp.astype(int)

In [393]:
print(temp)

[[11 28]
 [60 21]]
