In [1]:
# import required libraries
from pyspark.mllib.linalg import Matrices, SparseMatrix
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import *
from operator import add
import numpy as np
import math
import cv2

In [2]:
# initiate the spark session
appName = "PySpark Example - Python Array/List to Spark Data Frame"
master = "local"
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()
sc = spark.sparkContext

In [3]:
# i- length j- height
def findKurnelsForPixel(imgDim, kurnelDim, pixelNum):    
    K_i_min,K_j_min = 0,0
    K_i_max = imgDim["length"]-(kurnelDim["length"])
    K_j_max = imgDim["height"]-(kurnelDim["height"])

    i = (pixelNum%imgDim["length"])
    j = (math.floor(pixelNum/imgDim["length"]))
    
    K_ij_max = [j,i]
    K_ij_min = [j-(kurnelDim["height"]-1),i-(kurnelDim["length"]-1)]

    output = []
    for i in range(K_ij_min[0], K_ij_max[0]+1):
        for j in range(K_ij_min[1], K_ij_max[1]+1):
            n = (imgDim["length"]*i+j)
            if(n<0 or i<0 or j<0 or j>K_j_max or i>K_i_max):continue
            output.append(n)
    return output

In [4]:
#defining key functions

def bundleWithKurnelData(kurnelArray,data,index):
    output = []
    for kurnel in kurnelArray:
        output.append([kurnel,index,data])
    return output
def multiplyByKurnel(x,kurnelMap):
    return [x[0][0],x[0][2]*kurnelMap[x[1]%len(kurnelMap)]]

def convolve(imgRdd, kurnel):
    imageDim = {"height":img.shape[0],  #height,length
                "length":img.shape[1]}
    kurnelDim = {"height":len(kurnel), #height,length
                 "length":len(kurnel[0])}
    
    kurnelMap = np.array(kurnel).flatten()
    
    image = (imgRdd
             .zipWithIndex()
             .map(lambda x: bundleWithKurnelData(findKurnelsForPixel(imageDim,kurnelDim,x[1]),x[0],x[1]))
             .flatMap(lambda x:x)
             .sortBy(lambda x :(x[0],x[1]))
             .zipWithIndex()
             .map(lambda x: multiplyByKurnel(x,kurnelMap))
             .reduceByKey(lambda a,b: int((a+b)))
             .map(lambda x: x[1]))
    newImageDims = {"height":imageDim["height"]-kurnelDim["height"]+1,
                    "length":imageDim["length"]-kurnelDim["length"]+1}
    
    return image,newImageDims

def saveImageRddToStorage(imageRdd,imageDims,imageName):
    image = imageRdd.collect()
    output,temp,index = [],[],0
    for px in image:
        index += 1
        temp.append(px)
        if(index%imageDims["length"] == 0):
            output.append(temp)
            temp = []
    newimg = np.array(output)
    return cv2.imwrite(imageName, newimg)

In [5]:
img = cv2.cvtColor(cv2.imread("check.png"), cv2.COLOR_BGR2GRAY)
sobalX = [[1,2,1],
          [0,0,0],
          [-1,-2,-1]]
sobalY = [[-1,0,1],
          [-2,0,2],
          [-1,0,1]]

imgRdd01=sc.parallelize(img).flatMap(lambda x:x)

image,imageDims = convolve(imgRdd01,sobalX)    
saveImageRddToStorage(image,imageDims,"checkSobalX.png")

image,imageDims = convolve(imgRdd01,sobalY)    
saveImageRddToStorage(image,imageDims,"checkSobalY.png")

True

In [42]:
loadGrayScale = lambda x:cv2.cvtColor(cv2.imread(x), cv2.COLOR_BGR2GRAY)
img01 = loadGrayScale("checkSobalX.png")
img02 = loadGrayScale("checkSobalY.png")

img01rdd=sc.parallelize(img01).flatMap(lambda x:x).zipWithIndex().map(lambda x: [x[1],x[0]])
img02rdd=sc.parallelize(img02).flatMap(lambda x:x).zipWithIndex().map(lambda x: [x[1],x[0]])
img01rdd = img01rdd.union(img02rdd).sortByKey()

aTuple = (0,0)
img01rdd = img01rdd.aggregateByKey(aTuple, lambda a,b: (a[0] + b,    a[1] + 1),
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))
img01rdd = img01rdd.mapValues(lambda v: int(v[0]/v[1])).sortByKey().map(lambda x: x[1])

saveImageRddToStorage(img01rdd,imageDims,"checkFinal.png")

True

In [None]:
def AddAndAverageImageRdds():
    