In [132]:
from google.cloud import storage
import tarfile
import numpy as np
from scipy.spatial.distance import euclidean
from scipy import exp
from scipy.linalg import eigh
from PIL import Image
import io
import time
import operator
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
from matplotlib import pylab as plt

In [2]:
# test spark
data = [1,2,3,4]
rdd = sc.parallelize(data)

In [3]:
bucket_name = "dataproc-staging-us-central1-759291875656-wohgf1sk"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)

In [4]:
file_prefix = "data/"
blobs = bucket.list_blobs(prefix=file_prefix, delimiter = '/')

In [5]:
# list files in folder
for blob in blobs:
    print(blob.name)

data/
data/filelist_places365-standard.tar
data/test_data.tar
data/train_data.tar
data/val_data.tar


#### Download file from blob

In [6]:
file_name = "val_data.tar"
blob = bucket.get_blob(file_prefix + file_name)
blob.download_to_filename("small_data.tar")

In [7]:
data = []
tar_file = tarfile.open("small_data.tar")


for member in tar_file.getmembers():

    f = tar_file.extractfile(member)
    if f is not None:
        content = f.read()
        data.append(content)

tar_file.close()

In [7]:
# test small amount of images
TEST_NUM_IMAGES = 100

data = []
tar_file = tarfile.open("small_data.tar")

num = 0
for member in tar_file.getmembers():
    if num <= TEST_NUM_IMAGES:
        f = tar_file.extractfile(member)
        if f is not None:
            content = f.read()
            data.append(content)
        num += 1
    else:
        break
tar_file.close()

In [8]:
rdd = sc.parallelize(data)

### Resize, convert and flatten image data into array

In [104]:
def convert_img(img_data, dim_h):
    """ 
  Resize image so that it has height dim_h and flatten the image
  Args:
    img_data: (bytes) image data
    dim_h: (int) desired height
  Returns:
    img: (np array) the resized and flattened image
    """
    img = Image.open(io.BytesIO(img_data))
    img = img.convert(mode = 'L') # convert to grey scale
    img = img.resize((dim_h, dim_h), Image.ANTIALIAS)
    im_flatten = np.reshape(np.array(img),(-1,))
#     im_flatten = Vectors.dense(im_flatten)
    
    return im_flatten.tolist()

In [179]:
rdd_flatten = rdd.map(lambda x: convert_img(x, 64)).cache()

### Implement PCA

In [106]:
# use pca function provided by spark
# maximum number of columns: 65,535
mat = RowMatrix(rdd_flatten)

# Principal components are stored in a local dense matrix.
pc = mat.computePrincipalComponents(10)

In [107]:
pc

DenseMatrix(4096, 10, [-0.0265, -0.0262, -0.0261, -0.0263, -0.0253, -0.0252, -0.0249, -0.0249, ..., 0.0051, 0.0016, 0.0013, 0.0004, 0.001, 0.0004, 0.0053, 0.009], 0)

In [108]:
# Project the rows to the linear space spanned by the top 10 principal components.
projected = mat.multiply(pc)

In [52]:
def compute_covariance(data):
    """
    Compute covariance matrix for given RDD.
    Args:
    data: (RDD of np arrays) RDD representing the data
    Returns:
    covmat: (np array) covariance matrix of the RDD
    """
    n = data.count()
    mean = data.mean()
    data_0_mean = data.map(lambda m: m - mean)
    covmat = (data_0_mean
                    .map(lambda mat: np.outer(mat.T, mat))
                    .reduce(lambda x,y: x+y)) / float(n)
    return covmat

In [15]:
def pca(data, k=2):
    """
    Computes the top `k` principal components, their corresponding PCA scores, and eigenvalues.
    Args:
    data: (RDD of np arrays) RDD representing the data.
    k: (int) number of principal components to find
    Returns:
    (eigenvectors, scores, eigenvalues): (np.ndarray, RDD of np.ndarray, np.ndarray)
    """

    # Compute covariance matrix
    covmat = compute_covariance(data)

    # Compute eigenvalues & eigenvectors
    eig_vals, eig_vecs = eigh(covmat)

    # Sort the eigenvectors based on their eigenvalues
    inds = np.argsort(eig_vals)
    inds = inds[::-1]

    # Find the `k` principal components, `k` scores, and all eigenvalues
    components = eig_vecs[:,inds[:k]]
    eigenvalues = eig_vals[inds]
    scores = data.map(lambda m: m.dot(components))

    return (components, scores, eigenvalues)

In [181]:
# Run PCA on the actual data
top_comps, top_scores, top_eigenvals = pca(rdd_flatten.map(lambda x: Vectors.dense(x)), 10)

In [182]:
top_eigenvals.shape

(4096,)

In [183]:
top_scores.take(5)

[array([-5874.04451701,  3275.41228657,  1606.86812073,  -536.11883759,
         -687.56409978,   -64.52112675,  -217.51302467,   432.69275909,
          111.26081469,   226.90419953]),
 array([-7739.93590997,  1711.57899632,    56.73387892, -2114.27076937,
        -1391.27538992,   248.32092075,   465.58113685,    99.99948625,
         1027.39581905,   581.3017812 ]),
 array([-4955.50470509,  4228.33465538,  1806.05317013, -1495.06222458,
        -1686.02874393,  -885.38693382,  -135.29897658,  1212.23213987,
          -64.93972595,    34.14565319]),
 array([-7744.32929818,  1157.33397593,   -11.24212822,  -840.27731286,
          293.00488427, -1324.63214265,  -862.38506288,   330.76414178,
         1038.6267776 ,    28.46015111]),
 array([-7.17540601e+03,  3.10681140e+03,  3.42356090e+03,  2.40681560e+00,
        -3.12504738e+02, -1.06664435e+03,  4.13124193e+02, -4.92624694e+02,
         3.30710718e+02, -5.21468157e+01])]

In [184]:
top_comps.shape

(4096, 10)

### Implement KPCA

In [81]:
def sqeuclidean(s1,s2):
    return float(euclidean(s1,s2)**2)

In [146]:
# https://github.com/marty90/spark-distance-matrix/blob/master/spark_distance_matrix.py

PARTITIONS_1=100
PARTITIONS_2=1000

def compute_distance(data):
    """
    Compute distance matrix for given RDD.
    Args:
    data: (RDD of np arrays) RDD representing the data
    Returns:
    dist: (np array) distance matrix of the RDD
    """
    distance = sqeuclidean
    points = data.collect()[0]

    # Create Distributed Elements
    # Get a vector like:
    #
    # ( 1 , [e1, e2, ... ])
    # ( 2 , [e1, e2, ... ])
    #      ....
    points_rdd = sc.parallelize( range(len(points)), PARTITIONS_1)
    
    # Compute cartesian product, and filter only the upper triangle matrix
    couples_rdd = points_rdd.cartesian(points_rdd).filter(lambda t: t[0] >= t[1]).coalesce(PARTITIONS_2)

    # Compute distances for a batch of pairs
    def calc_distance (tups, points):

        for tup in tups:
            i1, i2 = tup
            p1 = points [i1]
            p2 = points [i2]
            d = distance(p1,p2)
            
            yield (i1, (i2, d))   
            # Emit distances for the lower triangle matrix
            if i1 != i2:
                yield (i2, (i1, d)) 
   
    couples_distance = couples_rdd.mapPartitions(lambda tups: calc_distance(tups, points) )
    
    # Create Distance Matrix
    # Get a vector like:
    #
    # ( 1 , [d1, d2, ... ])
    # ( 2 , [d1, d2, ... ])
    #      ....
    
    def get_distance_vector(t):
        i, dist_items = t
        dist_items_sorted = sorted (dist_items, key=operator.itemgetter(0))
        distances = [d for index, d in dist_items_sorted]
        return (i, distances)
    
    distributed_distance = couples_distance.groupByKey().map(get_distance_vector)
    
    return distributed_distance.map(lambda x: x[1])

In [173]:
def kpca(data, gamma, n_components):
    """
    Implementation of a RBF kernel PCA.

    Arguments:
        data: (RDD of np arrays) RDD representing the data
        gamma: A free parameter (coefficient) for the RBF kernel.
        n_components: The number of components to be returned.

    Returns the k eigenvectors (alphas) that correspond to the k largest
        eigenvalues (lambdas).

    """
    # Calculating the squared Euclidean distances for every pair of points
    # in the MxN dimensional dataset.
    sq_dists = np.array(compute_distance(data).collect())

    # Computing the MxM kernel matrix.
    K = exp(-gamma * sq_dists)

    # Centering the symmetric NxN kernel matrix.
    N = K.shape[0]
    one_n = np.ones((N,N)) / N
    K_norm = K - one_n.dot(K) - K.dot(one_n) + one_n.dot(K).dot(one_n)

    # Obtaining eigenvalues in descending order with corresponding
    # eigenvectors from the symmetric matrix.
    eigvals, eigvecs = eigh(K_norm)

    # Obtaining the i eigenvectors (alphas) that corresponds to the i highest eigenvalues (lambdas).
    alphas = np.column_stack((eigvecs[:,-i] for i in range(1,n_components+1)))
    lambdas = [eigvals[-i] for i in range(1,n_components+1)]

    return alphas, lambdas

In [174]:
alphas, lambdas = kpca(rdd_flatten, 10, 5)



In [176]:
alphas.shape

(100, 5)

In [178]:
lambdas

[5.768555561804704,
 3.000136201335044,
 3.0000817307766314,
 3.0000000211358326,
 3.0000000043937973]

In [None]:
# filename = 'val_flatten.pkl'
# rdd_flatten.saveAsPickleFile('gs://'+ bucket_name + '/' + file_prefix + filename)

In [16]:
# pickle_rdd = sc.pickleFile('gs://'+ bucket_name + '/' + file_prefix + filename ).collect()
# pickle_rdd