# Setting Up

## Init

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages databricks:tensorframes:0.2.7-s_2.11 pyspark-shell'

In [2]:
import seaborn as sns
import numpy as np
import pandas as pd
import collections # For frequency counting
import findspark
findspark.init("/home/canwill/spark2/")

import pyspark
from pyspark.sql import DataFrameNaFunctions
from pyspark.sql.functions import lit # Create columns of *literal* value
from pyspark.sql.functions import col # Returns a Column based on the 
                                      # given column name
from pyspark.ml.feature import StringIndexer #label encoding
from pyspark.ml import Pipeline

#sc = pyspark.SparkContext(appName="helloworld")

## SparkSession

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("TensorOnSpark") \
    .config("spark.executor.memory", "10g") \
    .getOrCreate()
    
spark.conf.set("spark.sql.caseSensitive", "false");
spark.sql("set spark.sql.caseSensitive=false");

## Data

In [None]:
trainDF = spark.read.csv("data/train.csv", header="true")
testDF = spark.read.csv("data/test.csv", header="true")

**Combine train and test data.**

In [None]:
## Add Survived column to test, and dataset name as a column
trainDF = trainDF.withColumn('Mark', lit('train'))
testDF = (testDF.withColumn('Survived',lit(0))
                .withColumn('Mark', lit('test')))
testDF = testDF[trainDF.columns]

## Append Test data to Train data
df = trainDF.unionAll(testDF)

In [None]:
df.createOrReplaceTempView("train")

# Steps in a Machine Learning Workflow

* Data Collection
* Data Preprocessing
* Feature Engineering
* Data format translation
* Modeling
* Evaluation and Selection

## Data Collection

* Combiniing Datasets

## Data Preprocessing

### Exploratory Data Analysis

* Statistical Summary
* Histograms
* Correlations

#### What is the schema?

In [None]:
df.columns

In [None]:
df.printSchema()

#### Which ones are numeric?

In [None]:
df.show(5)

Here are the variables which should be numeric (float or integer):

* PassengerId: Integer
* Pclass: Integer
* SibSp: Integer
* Parch: Integer
* Survived: Integer
* Age: Float
* Fare: Float

In [None]:
# Here is an example
df = df.withColumn("AgeTmp", df["Age"].cast("float")) \
    .drop("Age") \
    .withColumnRenamed("AgeTmp", "Age")

In [None]:
# Let's define function
def to_anytype(df, colnames, typename):
    for colname in colnames:
        df = df.withColumn("tmp", df[colname].cast(typename)) \
        .drop(colname) \
        .withColumnRenamed("tmp", colname)
    return(df)

In [None]:
intCols = ['PassengerId', 'Pclass', 'SibSp', 'Parch', 'Survived']
floatCols = ['Age', 'Fare']

df = to_anytype(df, intCols, "integer")
df = to_anytype(df, floatCols, "float")

In [None]:
df.printSchema()

#### Let's inspect data

In [None]:
df.take(5)

In [None]:
df.show(5)

#### Statistical Summary

In [None]:
df.describe('Age').show()

In [None]:
df.describe(['Age', 'Name']).show()

In [None]:
df.describe(trainDF.columns).show()

In [None]:
df.describe(trainDF.columns[1:4]).show()

In [None]:
df.describe(trainDF.columns[5:8]).show()

In [None]:
df.describe(trainDF.columns[9:12]).show()

#### Histograms

* We need the frequency count of various levels

In [None]:
age_hist = spark.sql(
    "SELECT Age AS age, \
            count(*) AS count \
    FROM train \
    GROUP BY Age \
    ORDER BY Age")
age_hist.show(n=age_hist.count())

In [None]:
age_hist = spark.sql(
    "SELECT bucket_floor, \
        CONCAT(bucket_floor, ' to ', bucket_ceiling) as bucket_name, \
        count(*) as count \
     FROM ( \
        SELECT floor(Age/5.00)*5 as bucket_floor, \
            floor(Age/5.00)*5 + 5 as bucket_ceiling \
        FROM train \
     ) a \
     GROUP BY 1, 2 \
     ORDER BY 1")

age_hist.show(n=age_hist.count())


In [None]:
def get_column(df, colname):
    coldata = df.rdd.map(lambda r: r[colname]).collect()
    coldata = ['None' if v is None else v for v in coldata] #replace None values
    return(coldata)

age = get_column(age_hist, "bucket_name")
count = get_column(age_hist, "count")

In [None]:
%matplotlib inline

barplt = sns.barplot(age, count)

In [None]:
%matplotlib inline

barplt = sns.barplot(age, count)
for item in barplt.get_xticklabels():
    item.set_rotation(45)

#### Histogram Function

In [None]:
def get_column(df, colname):
    coldata = df.rdd.map(lambda r: r[colname]).collect()
    coldata = ['None' if v is None else v for v in coldata] #replace None values
    return(coldata)

def histplot(dfname, colname, binsize):
    binsize = str(binsize)
    dfname.createOrReplaceTempView("tmpDF")
    hist_query = "SELECT bucket_floor, \
        CONCAT(bucket_floor, ' to ', bucket_ceiling) as bucket_name, \
        count(*) as count \
     FROM ( \
        SELECT floor(" + colname + "/" + binsize + ")*" + binsize + " as bucket_floor, \
            floor(" + colname + "/" + binsize + ")*" + binsize + " + " + binsize + " as bucket_ceiling \
        FROM tmpDF \
     ) a \
     GROUP BY 1, 2 \
     ORDER BY 1"
    hist_data = spark.sql(hist_query)
    xvar = get_column(hist_data, "bucket_name")
    count = get_column(hist_data, "count")
    barplt = sns.barplot(xvar, count)
    for item in barplt.get_xticklabels():
        item.set_rotation(45)
    return(barplt)

In [None]:
type(df)

In [None]:
histplot(df, "Age", 5)

In [None]:
histplot(df, "Age", 10)

#### All Histograms

* Play with various binsizes

In [None]:
df.printSchema()

In [None]:
histplot(df, "Age", 5)

In [None]:
histplot(df, "Survived", 5)

In [None]:
histplot(df, "Survived", 1)

In [None]:
histplot(df, "Pclass", 1)

In [None]:
histplot(df, "SibSp", 1)

In [None]:
histplot(df, "Parch", 1)

In [None]:
histplot(df, "Age", 5)

In [None]:
histplot(df, "Fare", 5)

In [None]:
histplot(df, "Fare", 10)

Let's test with a categorical variable.

In [None]:
histplot(trainDF, "Embarked", 1)

In [None]:
def histplot_s(df, colname):
    xvar = get_column(df, colname)
    counter = collections.Counter(xvar)
    barplt = sns.barplot(list(counter.keys()), list(counter.values()))
    for item in barplt.get_xticklabels():
        item.set_rotation(45)
    return(barplt)

In [None]:
histplot_s(df, "Sex")

In [None]:
histplot_s(df, "Embarked")

#### Correlations

In [None]:
df.corr("Age", "Fare")

In [None]:
df.corr("Age", "Survived")

In [None]:
df.corr("Fare", "Survived")

Currently, only *pearson* is supported.

### Missing Value Imputation

In [None]:
numVars = ['Survived','Age','SibSp','Parch','Fare']
stringVars = ['Cabin', 'Embarked', 'Pclass', 'Sex']

def countNull(df,var):
    return df.where(df[var].isNull()).count()

def countEmptyString(df,var):
    return df[df[var].isin("")].count()

def countZero(df,var):
    return df[df[var].isin(0)].count()

In [None]:
missing = {var: countNull(df,var) for var in df.columns}
missing

In [None]:
missing = {var: countEmptyString(df, var) for var in df.columns}
missing

In [None]:
missing = {var: countZero(df, var) for var in df.columns}
missing

In [None]:
age_mean = df.groupBy().mean('Age').first()
age_mean

In [None]:
age_mean[0]

In [None]:
age_mean = df.groupBy().mean('Age').first()[0]
fare_mean = df.groupBy().mean('Fare').first()[0]
age_mean, fare_mean

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.functions import count, col 
def mode_spark(df, column):
    # Group by column and count the number of occurrences
    # of each x value
    counts = df.groupBy(column).count()

    # - Find the maximum value in the 'counts' column
    # - Join with the counts dataframe to select the row
    #   with the maximum count
    # - Select the first element of this dataframe and
    #   take the value in column
    mode = counts.join(
        counts.agg(F.max('count').alias('count')),
        on='count'
    ).limit(1).select(column)

    return mode.first()[column]

In [None]:
Embarked_mode = mode_spark(df, 'Embarked')

In [None]:
df = df.na.fill({'Age':age_mean,'Fare':fare_mean, 'Embarked':Embarked_mode})

**What is wrong with what I just did?**

### Outlier Treatment

* Univariate
    - Winsorization
* Multivariate

* Is it a good idea?
* Know your data

## Feature Engineering

### Applying Domain Expertise

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
 
## create user defined function to extract title
getTitle = udf(lambda name: name.split('.')[0].strip(), StringType())
df = df.withColumn('Title', getTitle(df['Name']))
 
df.select('Name','Title').show(5)

In [None]:
getTitle = udf(lambda name: name.split('.')[0].split(',')[1].strip(), StringType())
df = df.withColumn('Title', getTitle(df['Name']))
 
df.select('Name','Title').show(5)

### Categorical Variable treatment

**Some algorithms can handle categorical variables directly, some can't.**

* Label Encoder
    - It is used to transform non-numerical labels to numerical labels (or nominal categorical variables)
    - Numerical labels are always between 0 and n_classes-1
    - May introduce spurious relationship
        * Age and City
* One Hot Encoding
    - Encodes categorical integer features using a one-hot aka one-of-K scheme
    - Preferable

#### Label Encoding (Indexing)

In [None]:
catVars = ['Pclass','Sex','Embarked','Title']

In [None]:
 
## index Sex variable
si = StringIndexer(inputCol = 'Pclass', outputCol = 'Pclass_indexed')
df_indexed = si.fit(df).transform(df)

In [None]:
si = StringIndexer(inputCol = 'Sex', outputCol = 'Sex_indexed')
df_indexed = si.fit(df_indexed).transform(df_indexed)

In [None]:
si = StringIndexer(inputCol = 'Embarked', outputCol = 'Embarked_indexed')
df_indexed = si.fit(df_indexed).transform(df_indexed)

In [None]:
si = StringIndexer(inputCol = 'Title', outputCol = 'Title_indexed')
df_indexed = si.fit(df_indexed).transform(df_indexed)

In [None]:
df_indexed.select('Embarked','Embarked_indexed').show(3)

* The categorical features are indexed in resulting data
* Embarked is mapped S=>0, C=>1, Q=>2

#### StringIndexer

* Maps a string column of labels to a column of label indices
* If the input column is numeric, we cast it to string and index the string values
* The indices are in [0, numLabels), ordered by label frequencies
    - So the most frequent label gets index 0.

#### Transformer

* transform one dataset into another

#### Estimator

* fit models to data

#### Pipelines

 
* A Pipeline consists of a sequence of stages, each of which is either an Estimator or a Transformer
* When Pipeline.fit() is called, the stages are executed in order
    - If a stage is an Estimator, its Estimator.fit() method will be called on the input dataset to fit a model
        * Then the model, which is a transformer, will be used to transform the dataset as the input to the next stage
    - If a stage is a Transformer, its Transformer.transform() method will be called to produce the dataset for the next stage
* The fitted model from a Pipeline is a PipelineModel, which consists of fitted models and transformers, corresponding to the pipeline stages
* If there are no stages, the pipeline acts as an identity transformer.

### Timeseries Variable treatments

* Shattering
* No time/day variables here

## Data format translation

* In this step, we get the data in the format or data type expected by the algorithms
* In the case of Spark MLlib, this includes 
    - local vector
    - dense or sparse vectors
    - labeled points
    - local matrix
    - distributed matrix with row matrix
    - indexed row matrix
    - coordinate matrix
    - block matrix

In our case, we need convert features to Vectors (either SparseVector or DenseVector).

In [None]:
from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector

In [None]:
catVarsIndexed = [i + '_indexed' for i in catVars]
catVarsIndexed

In [None]:
featuresCol = numVars + catVarsIndexed
featuresCol

In [None]:
featuresCol.remove('Survived')
featuresCol

In [None]:
labelCol = ['Mark','Survived']
labelCol

In [None]:
row = Row('mark','label','features') 
row

In [None]:
df_indexed.columns

In [None]:
df_indexed = df_indexed[labelCol + featuresCol]
df_indexed

In [None]:
# 0-mark, 1-label, 2-features
# map features to DenseVector
lf = df_indexed.rdd.map(lambda r: (row(r[0], r[1],DenseVector(r[2:])))).toDF()
lf.show()

In [None]:
# index label
# convert numeric label to categorical, which is required by
# decisionTree and randomForest
lf = StringIndexer(inputCol = 'label', outputCol='index').fit(lf).transform(lf)
 
lf.show(3)

### Split back into train/test data

In [None]:
train = lf.where(lf.mark =='train')
test = lf.where(lf.mark =='test')

In [None]:
# random split further to get train/validate
train, validate = train.randomSplit([0.7,0.3], seed =121)

In [None]:
print('Train Data Number of Row: '+ str(train.count()))
print('Validate Data Number of Row: '+ str(validate.count()))
print('Test Data Number of Row: '+ str(test.count()))

## Modeling

* ML is built based on DataFrame, while mllib is based on RDD
* We'll fit the logistic, decision tree and random forest models from ML packages

#### Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
 
# regPara: lasso regularisation parameter (L1)
lr = LogisticRegression(maxIter = 100, regParam = 0.05, labelCol='index').fit(train)

In [None]:
# Evaluate model based on auc ROC(default for binary classification)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
def testModel(model, validate = validate):
    pred = model.transform(validate)
    evaluator = BinaryClassificationEvaluator(labelCol = 'index')
    return evaluator.evaluate(pred)

In [None]:
print('AUC ROC of Logistic Regression model is: ' + str(testModel(lr)))

In [None]:
print('AUC ROC of Logistic Regression model is: ' + str(testModel(lr, validate=test)))

In [None]:
pred_test = lr.transform(test)
pred_test.show(5)

#### More Models

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
 
dt = DecisionTreeClassifier(maxDepth = 3, labelCol ='index').fit(train)
rf = RandomForestClassifier(numTrees = 100, labelCol = 'index').fit(train)
gbt = GBTClassifier(maxIter = 10, labelCol = 'index').fit(train)


In [None]:
models = {'LogisticRegression':lr,
          'DecistionTree':dt,
          'RandomForest':rf}
 
modelPerf = {k:testModel(v) for k,v in models.items()}
print(modelPerf)

In [None]:
def model_acc(model, validate=validate):
    pred = model.transform(validate)
    eval_vec = np.array(get_column(pred, "label")) == np.array(get_column(pred, "prediction")) 
    return(eval_vec.sum()/len(eval_vec))

In [None]:
model_acc(gbt)

In [None]:
models = {'LogisticRegression':lr,
          'DecistionTree':dt,
          'RandomForest':rf,
          'GradientBoostingMachines':gbt}

modelPerf = {k:model_acc(v) for k,v in models.items()}
print(modelPerf)

#### Tuning

In [None]:
for i in range(10):
    dt = DecisionTreeClassifier(maxDepth = i, labelCol ='index').fit(train)
    print('AUC ROC of Decision Tree model is' + '(for maxDepth= ' + str(i) + '): ' + str(testModel(dt)))

In [None]:
for i in range(5, 200):
    rf = RandomForestClassifier(numTrees = i, labelCol = 'index').fit(train)
    print('AUC ROC of Random Forest model is' + '(for numTrees= ' + str(i) + '): ' + str(testModel(rf)))

## Neural Nets

In [None]:
import tensorflow as tf
import tensorframes as tfs

In [None]:
tfs.print_schema(df)

In [None]:
train_x = tfs.analyze(train.select("features"))
train_x.show()

In [None]:
train_y = tfs.analyze(train.select("label"))
train_y.show()

In [None]:
from tensorflow.contrib import learn

In [None]:
#Define input values
x = tf.placeholder(shape=[None,8],dtype=tf.float32, name='x-input')
y_ = tf.placeholder(shape=[None,1],dtype=tf.float32, name='y-input')

#lets normalize all features along the columns
x_n = tf.nn.l2_normalize(x,1)

print('Input placeholders created')

In [None]:
x.consumers()

In [None]:
#Define Weights and Bias
W = tf.Variable(tf.zeros(shape=[8,1]), name="Weights")
b = tf.Variable(tf.zeros(shape=[1]),name="Bias")
print('Weight and Bias created')

In [None]:
W.value()

In [None]:
#Price prediction
y = tf.add(tf.matmul(x_n,W),b,name='output')

#Loss
loss = tf.reduce_mean(tf.square(y-y_),name='Loss')

print('Output and loss Ops created')

In [None]:
#Lets define Gradient Descent Optimizer
train_op = tf.train.GradientDescentOptimizer(0.03).minimize(loss)

print('Optimizer is created. Graph building is completed.')

In [None]:
#Lets start graph Execution
with tf.Session() as sess:
    # variables need to be initialized before we can use them
    sess.run(tf.global_variables_initializer())
    
    #lets train
    training_epochs = 1000  #how many times data need to be shown to model
    
    for epoch in range(training_epochs):
        
        #Calculate train_op and loss
        train_model, train_loss = sess.run([train_op,loss],feed_dict={x:train_x, y_:train_y})
        
        if epoch % 100 == 0:
            print ('Training loss at step: ', epoch, ' is ', train_loss)
    print (sess.run([W,b]))

In [4]:
"""
Implementation of the K-Means algorithm, while distributing the computations on a cluster.
Given a set of feature vectors, this algorithm runs the K-Means clustering algorithm starting
from a given set of centroids.
"""
from __future__ import print_function

import tensorflow as tf
import tensorframes as tfs
import numpy as np

In [5]:
def tf_compute_distances(points, start_centers):
    """
    Given a set of points and some centroids, computes the distance from each point to each
    centroid.
    :param points: a 2d TF tensor of shape num_points x dim
    :param start_centers: a numpy array of shape num_centroid x dim
    :return: a TF tensor of shape num_points x num_centroids
    """
    with tf.variable_scope("distances"):
        # The dimensions in the problem
        (num_centroids, _) = np.shape(start_centers)
        # The shape of the block is extracted as a TF variable.
        num_points = tf.shape(points)[0]
        # The centers are embedded in the TF program.
        centers = tf.constant(start_centers)
        # Computation of the minimum distance. This is a standard implementation that follows
        # what MLlib does.
        squares = tf.reduce_sum(tf.square(points), reduction_indices=1)
        center_squares = tf.reduce_sum(tf.square(centers), reduction_indices=1)
        prods = tf.matmul(points, centers, transpose_b = True)
        # This code simply expresses two outer products: center_squares * ones(num_points)
        # and ones(num_centroids) * squares
        t1a = tf.expand_dims(center_squares, 0)
        t1b = tf.stack([num_points, 1])
        t1 = tf.tile(t1a, t1b)
        t2a = tf.expand_dims(squares, 1)
        t2b = tf.stack([1, num_centroids])
        t2 = tf.tile(t2a, t2b)
        distances = t1 + t2 - 2 * prods
    return distances

In [6]:
def run_one_step(dataframe, start_centers):
    """
    Performs one iteration of K-Means.
    This function takes a dataframe with dense feature vectors, a set of centroids, and returns
    a new set of centroids along with the total distance of points to centroids.
    This function calculates for each point the closest centroid and then aggregates the newly
    formed clusters to find the new centroids.
    This function uses Spark to distribute the aggregation amongst the node.
    :param dataframe: a dataframe containing a column of features (an array of doubles)
    :param start_centers: a k x m matrix with k the number of centroids and m the number of features
    :return: a k x m matrix, and a positive double
    """
    # The dimensions in the problem
    (num_centroids, num_features) = np.shape(start_centers)
    # For each feature vector, compute the nearest centroid and the distance to that centroid.
    # The index of the nearest centroid is stored in the 'indexes' column.
    # We also add a column of 1's that will be reduced later to count the number of elements in
    # each cluster.
    with tf.Graph().as_default() as g:
        # The placeholder for the input: we use the block format
        points = tf.placeholder(tf.double, shape=[None, num_features], name='features')
        # The shape of the block is extracted as a TF variable.
        num_points = tf.stack([tf.shape(points)[0]], name="num_points")
        distances = tf_compute_distances(points, start_centers)
        # The outputs of the program.
        # The closest centroids are extracted.
        indexes = tf.argmin(distances, 1, name='indexes')
        # This could be done based on the indexes as well.
        min_distances = tf.reduce_min(distances, 1, name='min_distances')
        counts = tf.tile(tf.constant([1]), num_points, name='count')
        df2 = tfs.map_blocks([indexes, counts, min_distances], dataframe)
    # Perform the reduction: we regroup the points by their centroid indexes.
    gb = df2.groupBy("indexes")
    with tf.Graph().as_default() as g:
        # Look at the documentation of tfs.aggregate for the naming conventions of the placeholders.
        x_input = tfs.block(df2, "features", tf_name="features_input")
        count_input = tfs.block(df2, "count", tf_name="count_input")
        md_input = tfs.block(df2, "min_distances", tf_name="min_distances_input")
        # Each operation is just the sum.
        x = tf.reduce_sum(x_input, [0], name='features')
        count = tf.reduce_sum(count_input, [0], name='count')
        min_distances = tf.reduce_sum(md_input, [0], name='min_distances')
        df3 = tfs.aggregate([x, count, min_distances], gb)
    # Get the new centroids
    df3_c = df3.collect()
    # The new centroids.
    new_centers = np.array([np.array(row.features) / row['count'] for row in df3_c])
    total_distances = np.sum([row['min_distances'] for row in df3_c])
    return (new_centers, total_distances)

In [None]:
def run_one_step2(dataframe, start_centers):
    """
    Performs one iteration of K-Means.
    This function takes a dataframe with dense feature vectors, a set of centroids, and returns
    a new set of centroids along with the total distance of points to centroids.
    This function calculates for each point the closest centroid and then aggregates the newly
    formed clusters to find the new centroids.
    This function performs most of the aggregation in TensorFlow.
    :param dataframe: a dataframe containing a column of features (an array of doubles)
    :param start_centers: a k x m matrix with k the number of centroids and m the number of features
    :return: a k x m matrix, and a positive double
    """
    # The dimensions in the problem
    (num_centroids, _) = np.shape(start_centers)
    # For each feature vector, compute the nearest centroid and the distance to that centroid.
    # The index of the nearest centroid is stored in the 'indexes' column.
    # We also add a column of 1's that will be reduced later to count the number of elements in
    # each cluster.
    with tf.Graph().as_default() as g:
        # The placeholder for the input: we use the block format
        points = tf.placeholder(tf.double, shape=[None, num_features], name='features')
        # The distances
        distances = tf_compute_distances(points, start_centers)
        # The rest of this block performs a pre-aggregation step in TF, to limit the
        # communication between TF and Spark.
        # The closest centroids are extracted.
        indexes = tf.argmin(distances, 1, name='indexes')
        min_distances = tf.reduce_min(distances, 1, name='min_distances')
        num_points = tf.stack([tf.shape(points)[0]], name="num_points")
        counts = tf.tile(tf.constant([1]), num_points, name='count')
        # These compute the aggregate based on the indexes.
        block_points = tf.unsorted_segment_sum(points, indexes, num_centroids, name="block_points")
        block_counts = tf.unsorted_segment_sum(counts, indexes, num_centroids, name="block_counts")
        block_distances = tf.reduce_sum(min_distances, name="block_distances")
        # One leading dimension is added to express the fact that the previous elements are just
        # one row in the final dataframe.
        # The final dataframe has one row per block.
        agg_points = tf.expand_dims(block_points, 0, name="agg_points")
        agg_counts = tf.expand_dims(block_counts, 0, name="agg_counts")
        agg_distances = tf.expand_dims(block_distances, 0, name="agg_distances")
        # Using trimming to drop the original data (we are just returning one row of data per
        # block).
        df2 = tfs.map_blocks([agg_points, agg_counts, agg_distances],
                             dataframe, trim=True)
    # Now we simply collect and sum the elements
    with tf.Graph().as_default() as g:
        # Look at the documentation of tfs.aggregate for the naming conventions of the placeholders.
        x_input = tf.placeholder(tf.double,
                                 shape=[None, num_centroids, num_features],
                                 name='agg_points_input')
        count_input = tf.placeholder(tf.int32,
                                     shape=[None, num_centroids],
                                     name='agg_counts_input')
        md_input = tf.placeholder(tf.double,
                                  shape=[None],
                                  name='agg_distances_input')
        # Each operation is just the sum.
        x = tf.reduce_sum(x_input, [0], name='agg_points')
        count = tf.reduce_sum(count_input, [0], name='agg_counts')
        min_distances = tf.reduce_sum(md_input, [0], name='agg_distances')
        (x_, count_, total_distances) = tfs.reduce_blocks([x, count, min_distances], df2)
    # The new centers
    new_centers = (x_.T / (count_ + 1e-7)).T
    return (new_centers, total_distances)

In [7]:
def kmeanstf(dataframe, init_centers, num_iters = 5, tf_aggregate = True):
    """
    Runs the K-Means algorithm on a set of feature points.
    This function takes a dataframe with dense feature vectors, a set of centroids, and returns
    a new set of centroids along with the total distance of points to centroids.
    :param dataframe: a dataframe containing a column of features (an array of doubles)
    :param init_centers: the centers to start from
    :param num_iters:  the maximum number of iterations to run
    :return: a k x m matrix, and a list of positive doubles
    """
    step_fun = run_one_step2 if tf_aggregate else run_one_step
    c = init_centers
    d = np.Inf
    ds = []
    for i in range(num_iters):
        (c1, d1) = step_fun(dataframe, c)
        print("Step =", i, ", overall distance = ", d1)
        c = c1
        if d == d1:
            break
        d = d1
        ds.append(d1)
    return c, ds

In [8]:
# Here is a an example of usage:
try:
    sc.setLogLevel('INFO')
except:
    pass

In [9]:
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.linalg import VectorUDT, _convert_to_vector
from pyspark.sql.types import Row, StructField, StructType
import time

# Small vectors
num_features = 100
# The number of clusters
k = 10
num_points = 10000
num_iters = 10
FEATURES_COL = "features"

np.random.seed(2)
np_data = [x.tolist() for x in np.random.uniform(0.0, 1.0, size=(num_points, num_features))]
schema = StructType([StructField(FEATURES_COL, VectorUDT(), False)])
mllib_rows = [Row(_convert_to_vector(x)) for x in np_data]
mllib_df = spark.createDataFrame(mllib_rows, schema).coalesce(1).cache()

In [10]:
df = spark.createDataFrame([[r] for r in np_data]).toDF(FEATURES_COL).coalesce(1)
# For now, analysis is still required. We cache the output because we are going to perform
# multiple runs on the dataset.
df0 = tfs.analyze(df).cache()

In [11]:
mllib_df.count()
df0.count()

10000

In [12]:
np.random.seed(2)
init_centers = np.random.randn(k, num_features)
start_centers = init_centers
dataframe = df0

In [15]:
ta_0 = time.time()
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol(FEATURES_COL).setInitMode(
        "random").setMaxIter(num_iters)
mod = kmeans.fit(mllib_df)
ta_1 = time.time()

In [13]:
tb_0 = time.time()
(centers, agg_distances) = kmeanstf(df0, init_centers, num_iters=num_iters, tf_aggregate=False)
tb_1 = time.time()

Step = 0 , overall distance =  1145400.75877
Step = 1 , overall distance =  82752.0755883
Step = 2 , overall distance =  82471.0331805
Step = 3 , overall distance =  82334.6093698
Step = 4 , overall distance =  82278.9850022
Step = 5 , overall distance =  82252.0602253
Step = 6 , overall distance =  82235.9084488
Step = 7 , overall distance =  82225.1000004
Step = 8 , overall distance =  82216.9000307
Step = 9 , overall distance =  82209.822798


In [14]:
tc_0 = time.time()
(centers, agg_distances) = kmeanstf(df0, init_centers, num_iters=num_iters, tf_aggregate=True)
tc_1 = time.time()

Step = 0 , overall distance =  1145400.75877
Step = 1 , overall distance =  82752.0755883
Step = 2 , overall distance =  82471.0331805
Step = 3 , overall distance =  82334.6093698
Step = 4 , overall distance =  82278.9850022
Step = 5 , overall distance =  82252.0602253
Step = 6 , overall distance =  82235.9084488
Step = 7 , overall distance =  82225.1000004
Step = 8 , overall distance =  82216.9000307
Step = 9 , overall distance =  82209.822798


In [16]:
mllib_dt = ta_1 - ta_0
tf_dt = tb_1 - tb_0
tf2_dt = tc_1 - tc_0

print("mllib:", mllib_dt, "tf+spark:",tf_dt, "tf:",tf2_dt)

mllib: 1.4775249958 tf+spark: 67.9400539398 tf: 2.10477209091
