In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from sklearn.cluster import KMeans
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import GaussianMixture
from pyspark.sql import functions
from pyspark.ml.clustering import KMeansModel

session = SparkSession.builder.getOrCreate()

df = session.read.csv('./data/sales_segments.csv.gz', sep='^', header=True)

df = session.read\
    .option('header', 'true')\
    .option('sep', '^')\
    .csv('sales_segments.csv.gz')

kpis = df.select((df['revenue_amount_seg'] / df['bookings_seg']).alias('revenue'),
                 (df['fuel_surcharge_amount_seg'] / df['bookings_seg']).alias('tax'))\
         .cache()


help(VectorAssembler)

assembler = VectorAssembler(inputCols=['revenue', 'tax'], outputCol='features')

newdf = assembler.transform(kpis)

error = assembler.transform(df)

km_estimator = KMeans(k=8)

km_model = km_estimator.fit(newdf)

predicted = km_model.transform(newdf)

predicted.groupby('prediction').count().show()


gm_estimator = GaussianMixture(k=8)
gm_model = gm_estimator.fit(newdf)

predicted_by_gm = gm_model.transform(newdf)


stats = predicted.groupby('prediction').agg(
    functions.mean('revenue').alias('rev_mean'),
    functions.stddev_pop('revenue').alias('rev_std'), 
    functions.mean('tax').alias('tax_mean'),
    functions.stddev_pop('tax').alias('tax_std'))

annotated = predicted.join(stats, on=predicted['prediction'] == stats['prediction'])
annotated = predicted.join(stats, on='prediction')

zs = annotated.withColumn('z-rev', (annotated['revenue'] - annotated['rev_mean']) / annotated['rev_std'])

zs = annotated.select(annotated['prediction'].alias('cluster'),
                     'revenue',
                     'tax',
                     ((annotated['revenue'] - annotated['rev_mean']) / annotated['rev_std']).alias('z-rev'),
                     ((annotated['tax'] - annotated['tax_mean']) / annotated['tax_std']).alias('z-tax'))

scored = zs.withColumn('z-score', 
                      functions.sqrt(zs['z-rev'] ** 2 + zs['z-tax'] ** 2))

outliered = scored.withColumn('outlier', scored['z-score'] > 3)

pandas_df = outliered.toPandas()

km_model.save('my_kmeans_model')

loaded_model = KMeansModel.load('my_kmeans_model')

loaded_model.transform(newdf)

outliered.write.mode('overwrite').csv('outliered', header=True, sep='^')