<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"></ul></div>

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

In [2]:
sparkSession = SparkSession.builder\
                            .appName("LogAnalyst")\
                            .master("local")\
                            .getOrCreate()
sparkSession

In [3]:
df = sparkSession.read.option("header", "false").csv("hdfs://localhost:9000/data/task-event")

In [4]:
df.show(10)

+---+---+----+-------+---+------------+---+--------------------+---+---+-----+-------+--------------------+----+
|_c0|_c1| _c2|    _c3|_c4|         _c5|_c6|                 _c7|_c8|_c9| _c10|   _c11|                _c12|_c13|
+---+---+----+-------+---+------------+---+--------------------+---+---+-----+-------+--------------------+----+
|  0|  0| 2.0|3418309|  0|4155527081.0|  0|70s3v5qRyCO/1PCdI...|  3|  9| null|   null|                null|null|
|  1|  0| 2.0|3418309|  1| 329150663.0|  0|70s3v5qRyCO/1PCdI...|  3|  9| null|   null|                null|null|
|  2|  0|null|3418314|  0|3938719206.0|  0|70s3v5qRyCO/1PCdI...|  3|  9|0.125|0.07446|0.000424399999999...| 0.0|
|  3|  0|null|3418314|  1| 351618647.0|  0|70s3v5qRyCO/1PCdI...|  3|  9|0.125|0.07446|0.000424399999999...| 0.0|
|  4|  0| 2.0|3418319|  0| 431052910.0|  0|70s3v5qRyCO/1PCdI...|  3|  9| null|   null|                null|null|
|  5|  0| 2.0|3418319|  1| 257348783.0|  0|70s3v5qRyCO/1PCdI...|  3|  9| null|   null|          

In [None]:
from pyspark.sql.functions import col

In [7]:
df_selected = df.select('_c10', '_c11', '_c12')
df_dropna = df_selected.dropna()
df_dropna.show(5)

+-------+-------+--------------------+
|   _c10|   _c11|                _c12|
+-------+-------+--------------------+
|  0.125|0.07446|0.000424399999999...|
|  0.125|0.07446|0.000424399999999...|
|0.03125|0.08691|0.000454899999999...|
|0.03125|0.08691|0.000454899999999...|
|0.03125|0.08691|0.000454899999999...|
+-------+-------+--------------------+
only showing top 5 rows



In [28]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

for c in df_dropna.columns: 
    df_dropna = df_dropna.withColumn(c, col(c).cast("double"))
    
# df_dropna.show(3)

+-------+-------+--------------------+
|   _c10|   _c11|                _c12|
+-------+-------+--------------------+
|  0.125|0.07446|4.243999999999999...|
|  0.125|0.07446|4.243999999999999...|
|0.03125|0.08691|4.548999999999999...|
+-------+-------+--------------------+
only showing top 3 rows



In [8]:
from pyspark.ml.feature import StandardScaler, VectorAssembler

In [38]:
stages = [] 
num_cols = df_dropna.columns
# for col in num_cols: 
#     scaler = StandardScaler(inputCol=col, outputCol=col+'_transformed', withStd=True, withMean=False)
#     stages.append(scaler)
    
feature_cols = [col for col in num_cols]
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
stages.append(vector_assembler)

In [40]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages) 
transformer = pipeline.fit(df_dropna)
new_df = transformer.transform(df_dropna)
new_df.show(5)

+------+-------+--------+--------------------+
|  _c10|   _c11|    _c12|            features|
+------+-------+--------+--------------------+
|0.1125|0.03802|3.033E-4|[0.1125,0.03802,3...|
|0.1125|0.03802|3.033E-4|[0.1125,0.03802,3...|
|0.1125|0.03802|3.033E-4|[0.1125,0.03802,3...|
|0.1125|0.03802|3.033E-4|[0.1125,0.03802,3...|
|0.1125|0.03802|3.033E-4|[0.1125,0.03802,3...|
+------+-------+--------+--------------------+
only showing top 5 rows



In [55]:
from pyspark.ml.clustering import GaussianMixture

gmm = GaussianMixture().setK(2).setSeed(538009335)
model = gmm.fit(new_df)

print("Gaussians shown as a DataFrame: ")
model.gaussiansDF.show(truncate=False, vertical=True)

Gaussians shown as a DataFrame: 
-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 mean | [0.076264,0.037794054054054074,1.507426486486487E-4]                                                                                                                                                           
 cov  | 0.001016320106810814  2.924396334054038E-4   3.844277061837835E-6   
2.924396334054038E-4  7.925422894024818E-4   1.5342560171541173E-6  
3.844277061837835E-6  1.5342560171541173E-6  1.6731754307633302E-8   
-RECORD 1--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 mean | [0.03125,0.049620000000000226,3.1660000000000016E-4]                                           

In [47]:
labels = model.transform(new_df).select("prediction")

In [54]:
labels.describe().show()

+-------+------------------+
|summary|        prediction|
+-------+------------------+
|  count|               259|
|   mean|0.8571428571428571|
| stddev|0.3506046035634262|
|    min|                 0|
|    max|                 1|
+-------+------------------+

