In [1]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
pd.set_option('display.width', 500)
pd.set_option('display.max_columns', 100)
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()

import numpy as np
from sklearn.decomposition import PCA
import os
from pylab import imread, imshow
%matplotlib inline
from PIL import Image
import matplotlib.pyplot as plt
from scipy.misc import imresize
import matplotlib.image as mpimg

In [9]:
# Read in the data
train = pd.read_csv('../data/train_data_with_sampling.csv')
test = pd.read_csv('../data/test_data.csv')

# Split into train and test
X_train_ids = train[['tmdb_id']].values
y_train = train[['group1', 'group2', 'group3', 'group4', 'group5', 'group6', 'group7']].as_matrix()
X_test_ids = test[['tmdb_id']].values
y_test = test[['group1', 'group2', 'group3', 'group4', 'group5', 'group6', 'group7']].as_matrix()

In [21]:
############# Function to convert color to grayscale ################

def rgb2gray(rgb):
    return np.dot(rgb[...,:3], [0.299, 0.587, 0.114])

############# Read in posters for training set ################

def load_posters_from_ids(id_array, n):
    posters = []
    ids = []
    errors = 0
    for poster in id_array[:n]:
        img = mpimg.imread('../posters/' + str(int(poster[0])) + '.jpg') 
        # grayscale image 
        gray = rgb2gray(img)
        try:
            scaled = imresize(gray, (741,500))
            posters.append(np.asarray(scaled))
            ids.append(poster)
        except ValueError:
            errors +=1
            continue
    posters_array = np.asarray(posters)
    X = np.array(posters_array).reshape((posters_array.shape[0], -1))
    print 'posters shape: ', X.shape
    print 'errors: ', errors
    return X

In [22]:
X_train = load_posters_from_ids(X_train_ids, 100)
X_test = load_posters_from_ids(X_test_ids, 50)

posters shape:  (100, 370500)
errors:  0
posters shape:  (50, 370500)
errors:  0


In [27]:
from pyspark.sql import SQLContext
from numpy.linalg import eigh
from pyspark.mllib.linalg import *
 
sqlsc=SQLContext(sc)

data_train = map(lambda x: (Vectors.dense(x),), X_train)
df_train = sqlsc.createDataFrame(data_train,["features"])

data_test = map(lambda x: (Vectors.dense(x),), X_test)
df_test = sqlsc.createDataFrame(data_test,["features"])

In [24]:
def estimateCovariance(df):
    """Compute the covariance matrix for a given dataframe.
 
    Note:
        The multi-dimensional covariance array should be calculated using outer products.  Don't
        forget to normalize the data by first subtracting the mean.
 
    Args:
        df:  A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.
 
    Returns:
        np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
            length of the arrays in the input dataframe.
    """
    m = df.select(df['features']).map(lambda x: x[0]).mean()
    dfZeroMean = df.select(df['features']).map(lambda x: x[0]).map(lambda x: x-m)  # subtract the mean
     
    return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()

In [25]:
def pca(df, k=2):
    """Computes the top `k` principal components, corresponding scores, and all eigenvalues.
 
    Note:
        All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
        each eigenvectors as a column.  This function should also return eigenvectors as columns.
 
    Args:
        df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
        k (int): The number of principal components to return.
 
    Returns:
        tuple of (np.ndarray, DF of DenseVector, np.ndarray): A tuple of (eigenvectors, DF of
            scores, eigenvalues).  Eigenvectors is a multi-dimensional array where the number of
            rows equals the length of the arrays in the input `DF` and the number of columns equals
            `k`.  The `DF` of scores has the same number of rows as `df` and consists of DenseVectors
            of length `k`.  Eigenvalues is an array of length d (the number of features).
    """
    cov = estimateCovariance(df)
    col = cov.shape[1]
    eigVals, eigVecs = eigh(cov)
    inds = np.argsort(eigVals)
    eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]  
    components = eigVecs[0:k]
    eigVals = eigVals[inds[-1:-(col+1):-1]]  # sort eigenvalues
    score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
    scoreDF = sqlContext.createDataFrame(score.map(lambda x: (DenseVector(x),)), ['pca_features'])
    # Return the `k` principal components, `k` scores, and all eigenvalues
     
    return components.T, scoreDF, eigVals


In [26]:
def varianceExplained(df, k=1):
    """Calculate the fraction of variance explained by the top `k` eigenvectors.
 
    Args:
        df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
        k: The number of principal components to consider.
 
    Returns:
        float: A number between 0 and 1 representing the percentage of variance explained
            by the top `k` eigenvectors.
    """
    eigenvalues = pca(df,k)[2] 
    return sum(eigenvalues[0:k])/sum(eigenvalues)

In [28]:
df_train.show(1)

+--------------------+
|            features|
+--------------------+
|[41.0,40.0,39.0,4...|
+--------------------+
only showing top 1 row



In [29]:
comp, score, eigVals = pca(df_train)

KeyboardInterrupt: 

In [None]:
score.collect()