In [131]:
##Loading the data 

path="/home/affine/Downloads/train_1.csv"

#creating a dataframe by loading the path of inputfile by using databricks library for csv files
train = sqlCtx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true',delimiter=',').load(path)


In [2]:
## removing duplicates
train = train.dropDuplicates()

In [3]:
##saving the data 
df = train.cache()

In [4]:
##addition of extra columns (derived from the column present in the data)
from pyspark.sql.functions import month,datediff

df1 = df.withColumn("month", month(df.date_time))
df1 = df1.withColumn("los",datediff(df1.srch_co,df1.srch_ci))
df1 = df1.withColumn("bw",datediff(df1.srch_ci,df1.date_time))

In [5]:
##removing unnecessary cols
df1 = df1.select([col for col in df1.columns if col not in {"hotel_market","user_location_region","user_location_city","hotel_cluster","srch_destination_id","user_id","cnt","srch_ci","srch_co","date_time"}])

In [6]:
df1.count()

4999

In [7]:
##removing records with null values
df1 = df1.dropna()

In [8]:
df1.count()

2867

In [10]:
##changing the data type to double of all the columns
from pyspark.sql.types import *
df2 = df1.withColumn("site_name_1",df1.site_name.cast(DoubleType())).\
        withColumn("posa_continent_1",df1.posa_continent.cast(DoubleType())).\
        withColumn("user_location_country_1",df1.user_location_country.cast(DoubleType())).\
        withColumn("is_mobile_1",df1.is_mobile.cast(DoubleType())).\
        withColumn("is_package_1",df1.is_package.cast(DoubleType())).\
        withColumn("channel_1",df1.channel.cast(DoubleType())).\
        withColumn("srch_adults_cnt_1",df1.srch_adults_cnt.cast(DoubleType())).\
        withColumn("srch_children_cnt_1",df1.srch_children_cnt.cast(DoubleType())).\
        withColumn("srch_rm_cnt_1",df1.srch_rm_cnt.cast(DoubleType())).\
        withColumn("srch_destination_type_id_1",df1.srch_destination_type_id.cast(DoubleType())).\
        withColumn("is_booking_1",df1.is_booking.cast(DoubleType())).\
        withColumn("hotel_continent_1",df1.hotel_continent.cast(DoubleType())).\
        withColumn("hotel_country_1",df1.hotel_country.cast(DoubleType())).\
        withColumn("month_1",df1.month.cast(DoubleType())).\
        withColumn("los_1",df1.los.cast(DoubleType())).\
        withColumn("bw_1",df1.bw.cast(DoubleType())).\
        withColumn("orig_destination_distance_1",df1.orig_destination_distance.cast(DoubleType()))


In [11]:
##removing repetative columns
df2 = df2.select([cols for cols in df2.columns if cols not in {"site_name","posa_continent","user_location_country","is_mobile","is_package","channel","srch_adults_cnt","srch_children_cnt","srch_rm_cnt","srch_destination_type_id","is_booking","hotel_continent","hotel_country","month","los","bw","orig_destination_distance"}])

In [34]:
##Changing spark dataframe to pandas to convert it to matrix
df_pandas = df2.toPandas()
df_matrix = df_pandas.as_matrix()

In [44]:
df_matrix

array([[  2.00000000e+00,   3.00000000e+00,   6.60000000e+01, ...,
          4.00000000e+00,   1.60000000e+01,   2.23426410e+03],
       [  2.00000000e+00,   3.00000000e+00,   6.60000000e+01, ...,
          5.00000000e+00,   1.06000000e+02,   9.13625900e+02],
       [  2.00000000e+00,   3.00000000e+00,   6.60000000e+01, ...,
          5.00000000e+00,   1.06000000e+02,   9.11514200e+02],
       ..., 
       [  2.00000000e+00,   3.00000000e+00,   6.60000000e+01, ...,
          3.00000000e+00,   5.00000000e+01,   8.51538400e+02],
       [  2.00000000e+00,   3.00000000e+00,   6.60000000e+01, ...,
          2.00000000e+00,   6.00000000e+01,   1.06299200e+02],
       [  2.00000000e+00,   3.00000000e+00,   6.60000000e+01, ...,
          2.00000000e+00,   6.00000000e+01,   1.04421200e+02]])

In [51]:
##Making clustering model

clusters = KMeans.train(sc.parallelize(df_matrix), 6, maxIterations=10,
        runs=10, initializationMode="random")

  "Support for runs is deprecated in 1.6.0. This param will have no effect in 1.7.0.")


In [52]:
##number of clusters
clusters.k

6

In [117]:
##cluster indexes for the data to be predicted
cluster_ind = clusters.predict(sc.parallelize(df_matrix))
x = cluster_ind.collect()

In [109]:
##size of each cluster
cluster_sizes = cluster_ind.countByValue().items()
cluster_sizes

dict_items([(0, 67), (1, 635), (2, 175), (3, 811), (4, 318), (5, 861)])

In [112]:
##calculating the within set sum of square error (helps in tuning the model)
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = sc.parallelize(df_matrix).map(lambda point: error(point)).reduce(lambda x, y: x + y)

WSSSE

819049.6719431345

In [120]:
##centroid coordinates of clusters
Cluster_Centers=clusters.clusterCenters
Cluster_Centers[0]

array([  8.80597015e+00,   3.25373134e+00,   1.16507463e+02,
         1.94029851e-01,   5.97014925e-02,   4.17910448e+00,
         2.04477612e+00,   4.32835821e-01,   1.11940299e+00,
         2.53731343e+00,   1.79104478e-01,   2.64179104e+00,
         6.19850746e+01,   8.07462687e+00,   2.22388060e+00,
         7.82388060e+01,   8.10413414e+03])

In [130]:
##saving the model
path_save = "/home/affine/Downloads"
clusters.save(sc, path_save)

###command for retreiving the model
Model = KMeansModel.load(sc, path_save)

In [128]:
###creating final matrix with cluster assigned to them

import numpy as np
import pandas as pd
clust_ind = np.asarray(x).reshape(df_matrix.shape[0],1)

final = np.append(df_matrix,clust_ind , axis=1)

In [104]:
##converting matrix to dataframe
index = list(range(df_matrix.shape[0]))
columns = df2.columns + ['cluster']   
values = final

df_final = pd.DataFrame(values, index=index, columns=columns)
df_spark = sqlContext.createDataFrame(df_final)