In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('KMA').getOrCreate()
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
import os
import time
import pandas as pd

import boto3
from io import BytesIO
import sagemaker.amazon.common as smac
from sagemaker import get_execution_role

In [2]:
'''
-----------------------------------------------------------------------------
--- Inptuts
-----------------------------------------------------------------------------
'''
Interp = 'Interp'
s3DataSet = 'interpDataScaledL6.csv'
Dimensions = 180

clusterRange1 = range(2,101,1)
clusterRange2 = range(105,1001,5)

In [3]:
'''
-----------------------------------------------------------------------------
--- Definitions
-----------------------------------------------------------------------------
'''
Results = 'Results'
Analysis = 'KMA'
resultsPath = '{}/{}/{}/{}/'.format(Results, Interp, Analysis, Dimensions)

# Configuring S3
s3_bucket_name = 'jasper-ml-sagemaker'
role = get_execution_role()

client = boto3.client('s3')
resource = boto3.resource('s3')
my_bucket = resource.Bucket(s3_bucket_name)

In [None]:
'''
-----------------------------------------------------------------------------
--- Importing and Vectorising Data
-----------------------------------------------------------------------------
'''
dataKey = s3DataSet
obj = client.get_object(Bucket=s3_bucket_name, Key=dataKey)
pd.read_csv(obj['Body'], header=None, index_col=None).to_csv(dataKey, index=False)
dataset = spark.read.csv(dataKey,inferSchema = True)

assembler = VectorAssembler(inputCols= dataset.columns,
                           outputCol = 'features')
final_data = assembler.transform(dataset)

'''
-----------------------------------------------------------------------------
--- Performing Clustering
-----------------------------------------------------------------------------
'''
for i in clusterRange1:
    #  Perform Kmeans on the data
    timestart = time.time()
    kmeans = KMeans(k=i, maxIter= 2000,featuresCol='features')
    kMeansModel = kmeans.fit(final_data)

    # save at each stage to reduce memory consumption
    dataKey = '{}results.csv'.format(i)
    kMeansModel.transform(final_data).select('prediction').toPandas().to_csv(dataKey, index=False)
    my_bucket.upload_file(dataKey,Key=resultsPath + dataKey)
    os.remove(dataKey)

    print('KMA with {} centres took {} seconds'.format(i,time.time()-timestart))
    
for i in clusterRange2:
    timestart = time.time()
    kmeans = KMeans(k=i, maxIter= 2000,featuresCol='features')
    kMeansModel = kmeans.fit(final_data)

    # save at each stage to reduce memory consumption
    dataKey = '{}results.csv'.format(i)
    kMeansModel.transform(final_data).select('prediction').toPandas().to_csv(dataKey, index=False)
    my_bucket.upload_file(dataKey,Key=resultsPath + dataKey)
    os.remove(dataKey)

    print('KMA with {} centres took {} seconds'.format(i,time.time()-timestart))
    
'''
-----------------------------------------------------------------------------
--- Inptuts
-----------------------------------------------------------------------------
'''
Coord = 'Polar'
Interp = 'NoInterp'
s3DataSet = 'polarDataScaledL6.csv'
Dimensions = 180

clusterRange1 = range(2,101,1)
clusterRange2 = range(105,1001,5)

'''
-----------------------------------------------------------------------------
--- Definitions
-----------------------------------------------------------------------------
'''
Results = 'Results'
Analysis = 'KMA'
resultsPath = '{}/{}/{}/{}/{}/'.format(Results, Coord, Interp, Analysis, Dimensions)

# Configuring S3
s3_bucket_name = 'jasper-ml-sagemaker'
role = get_execution_role()

client = boto3.client('s3')
resource = boto3.resource('s3')
my_bucket = resource.Bucket(s3_bucket_name)

'''
-----------------------------------------------------------------------------
--- Importing and Vectorising Data
-----------------------------------------------------------------------------
'''
dataKey = s3DataSet
obj = client.get_object(Bucket=s3_bucket_name, Key=dataKey)
pd.read_csv(obj['Body'], header=None, index_col=None).to_csv(dataKey, index=False)
dataset = spark.read.csv(dataKey,inferSchema = True)

assembler = VectorAssembler(inputCols= dataset.columns,
                           outputCol = 'features')
final_data = assembler.transform(dataset)

'''
-----------------------------------------------------------------------------
--- Performing Clustering
-----------------------------------------------------------------------------
'''
for i in clusterRange1:
    #  Perform Kmeans on the data
    timestart = time.time()
    kmeans = KMeans(k=i, maxIter= 2000,featuresCol='features')
    kMeansModel = kmeans.fit(final_data)

    # save at each stage to reduce memory consumption
    dataKey = '{}results.csv'.format(i)
    kMeansModel.transform(final_data).select('prediction').toPandas().to_csv(dataKey, index=False)
    my_bucket.upload_file(dataKey,Key=resultsPath + dataKey)
    os.remove(dataKey)

    print('KMA with {} centres took {} seconds'.format(i,time.time()-timestart))

for i in clusterRange2:
    timestart = time.time()
    kmeans = KMeans(k=i, maxIter= 2000,featuresCol='features')
    kMeansModel = kmeans.fit(final_data)

    # save at each stage to reduce memory consumption
    dataKey = '{}results.csv'.format(i)
    kMeansModel.transform(final_data).select('prediction').toPandas().to_csv(dataKey, index=False)
    my_bucket.upload_file(dataKey,Key=resultsPath + dataKey)
    os.remove(dataKey)

    print('KMA with {} centres took {} seconds'.format(i,time.time()-timestart))

'''
-----------------------------------------------------------------------------
--- Inptuts
-----------------------------------------------------------------------------
'''
Coord = 'Polar'
Interp = 'Interp'
s3DataSet = 'polarInterpDataScaledL6.csv'
Dimensions = 180

clusterRange1 = range(2,101,1)
clusterRange2 = range(105,1001,5)

'''
-----------------------------------------------------------------------------
--- Definitions
-----------------------------------------------------------------------------
'''
Results = 'Results'
Analysis = 'KMA'
resultsPath = '{}/{}/{}/{}/{}/'.format(Results, Coord, Interp, Analysis, Dimensions)

# Configuring S3
s3_bucket_name = 'jasper-ml-sagemaker'
role = get_execution_role()

client = boto3.client('s3')
resource = boto3.resource('s3')
my_bucket = resource.Bucket(s3_bucket_name)

'''
-----------------------------------------------------------------------------
--- Importing and Vectorising Data
-----------------------------------------------------------------------------
'''
dataKey = s3DataSet
obj = client.get_object(Bucket=s3_bucket_name, Key=dataKey)
pd.read_csv(obj['Body'], header=None, index_col=None).to_csv(dataKey, index=False)
dataset = spark.read.csv(dataKey,inferSchema = True)

assembler = VectorAssembler(inputCols= dataset.columns,
                           outputCol = 'features')
final_data = assembler.transform(dataset)

'''
-----------------------------------------------------------------------------
--- Performing Clustering
-----------------------------------------------------------------------------
'''
for i in clusterRange1:
    #  Perform Kmeans on the data
    timestart = time.time()
    kmeans = KMeans(k=i, maxIter= 2000,featuresCol='features')
    kMeansModel = kmeans.fit(final_data)

    # save at each stage to reduce memory consumption
    dataKey = '{}results.csv'.format(i)
    kMeansModel.transform(final_data).select('prediction').toPandas().to_csv(dataKey, index=False)
    my_bucket.upload_file(dataKey,Key=resultsPath + dataKey)
    os.remove(dataKey)

    print('KMA with {} centres took {} seconds'.format(i,time.time()-timestart))

for i in clusterRange2:
    timestart = time.time()
    kmeans = KMeans(k=i, maxIter= 2000,featuresCol='features')
    kMeansModel = kmeans.fit(final_data)

    # save at each stage to reduce memory consumption
    dataKey = '{}results.csv'.format(i)
    kMeansModel.transform(final_data).select('prediction').toPandas().to_csv(dataKey, index=False)
    my_bucket.upload_file(dataKey,Key=resultsPath + dataKey)
    os.remove(dataKey)

    print('KMA with {} centres took {} seconds'.format(i,time.time()-timestart))

KMA with 2 centres took 8.36699628829956 seconds
KMA with 3 centres took 9.013299226760864 seconds
KMA with 4 centres took 10.240257978439331 seconds
KMA with 5 centres took 9.501990795135498 seconds
KMA with 6 centres took 13.895456552505493 seconds
KMA with 7 centres took 13.68198037147522 seconds
KMA with 8 centres took 12.460870504379272 seconds
KMA with 9 centres took 14.792114973068237 seconds
KMA with 10 centres took 10.590485572814941 seconds
KMA with 11 centres took 11.44452166557312 seconds
KMA with 12 centres took 13.168226480484009 seconds
KMA with 13 centres took 11.657974481582642 seconds
KMA with 14 centres took 10.831020832061768 seconds
KMA with 15 centres took 11.954235553741455 seconds
KMA with 16 centres took 12.135224342346191 seconds
KMA with 17 centres took 13.722691535949707 seconds
KMA with 18 centres took 16.591112852096558 seconds
KMA with 19 centres took 15.139440536499023 seconds
KMA with 20 centres took 14.511483430862427 seconds
KMA with 21 centres took 1