In [None]:
#https://hpc.ucalgary.ca/arc/software/spark
import os
import atexit
import sys

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=3
tasks_per_node=8
memory_per_task=1024 #1 gig per process, adjust accordingly
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="1:00" #1 hour
os.environ['SBATCH_PARTITION']='lattice' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

sj.wait_to_start()
sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)


In [None]:
#imports and change to img directories
import numpy as np
from PIL import Image
os.chdir('imdb/00/')
os.getcwd()

In [None]:
#load all image names into list
files = os.listdir('.')
images = []
for file in files:
    if file.endswith('.jpg'):
        images.append(file)
    else:
        continue

In [None]:
#convert all images to list of np array
npimg=[]
for img in images[:50]:
    im = Image.open(img)
    npimg.append(np.asarray(im))

In [None]:
#load images into rdd
rdd = sc.parallelize(npimg)

In [None]:
def resizehalf(img):
    from PIL import Image
    import numpy as np
    im = Image.fromarray(img)
    basewidth = im.size[0]
    basewidth = int(basewidth*(50/100)) #can resize desired % change the number over 100 ex 40/100 = 40% size reduction
    #140/100 = 40% increase
    wpercent = (basewidth/float(im.size[0]))
    hsize = int((float(im.size[1])*float(wpercent)))
    im = im.resize((basewidth, hsize), Image.ANTIALIAS)# change basewidth,hsize for hard code resize
    return np.asarray(im)

In [None]:
#resize all img in rdd
resizedarr = rdd.map(resizehalf)

In [None]:
#change wd to prep saving images
os.chdir('..')
os.getcwd()

In [None]:
#saves resized images in folder resized
counter = 0
for i in resizedarr.collect():
    img = Image.fromarray(i)
    img.save(os.getcwd()+'/resized/'+images[counter])
    counter +=1