In [1]:
import findspark
findspark.init()

In [46]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import *
from pyspark.mllib.feature import StandardScaler, StandardScalerModel
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel
from pyspark import SparkContext
import numpy as np
import sys
from scipy.spatial.distance import mahalanobis
import get_outliers


In [62]:
spark = SparkSession \
        .builder \
        .appName("Nulls and Outliers Detection 1") \
        .getOrCreate()


In [63]:
spark.sparkContext

In [64]:
df_temp = df = spark.read.csv(path = 'GROUP7/bss9-579f_clean/part-00000-26b4c5d2-1b04-44e3-aff2-769b8351c897-c000.csv', header = True,inferSchema = True, sep='\t')

In [65]:
df_count = df_temp.count()

In [66]:
df_count

483

In [67]:
numeric_cols = []
for col,dtype in df_temp.dtypes:
    if 'string' not in dtype and col!='rid':
        numeric_cols.append(col)

In [68]:
numeric_cols_temp = numeric_cols[:2]

In [69]:
numeric_cols_temp

['comparable_rental_2_gross_sqft',
 'comparable_rental_3_estimated_gross_income']

In [70]:
#df_temp.select([numeric_cols[2],*numeric_cols_temp])
df_temp = df_temp.withColumn('rid', monotonically_increasing_id())

In [71]:
df_col_rdd = df_temp.select(['rid',*numeric_cols_temp]).rdd

In [None]:
def kmeans_multivariate(df, numeric_cols, k=3, maxIterations=100):
    def addclustercols(x):
        points = np.array(x[1].toArray()).astype(float)
        center = clusters.centers[0]
        mindist = euclidean(points, center)
        c1 = 0

        for i in range(1, len(clusters.centers)):
            center = clusters.centers[i]
            dist = euclidean(points, center)
            if dist < mindist:
                c1 = i
                mindist = dist
        return (int(x[0]), int(c1), float(mindist))

    cols = ['rid']
    cols.extend(numeric_cols)
    df_col_rdd = df[cols].rdd
    label = df_col_rdd.map(lambda x: x[0])
    vso = df_col_rdd.map(lambda x: np.array(x[1:]).astype(float))
    scaler = StandardScaler(withMean=True, withStd=True).fit(vso)
    vso = scaler.transform(vso)

    clusters = KMeans.train(vso, k, initializationMode='random', maxIterations=maxIterations)
    df_col_rdd = label.zip(vso).toDF().rdd
    print(df_col_rdd.collect())
    rdd_w_clusts = df_col_rdd.map(lambda x: addclustercols(x))
    cols = ['rid', 'c_no', 'dist_c']
    kmeans_df = rdd_w_clusts.toDF(cols)
    outlier_all, _ = iqr_outliers(kmeans_df.where(kmeans_df['c_no'] == 0), 'dist_c')
    for i in range(1, k):
        outlier_c, _ = iqr_outliers(kmeans_df.where(kmeans_df['c_no'] == i), 'dist_c')
        outlier_all = outlier_all.unionAll(outlier_c)
    #print_outlier_summary(outlier_all.count(), df.count(), "kMeans (multivariate)")
    return outlier_all

In [72]:
def getDistances(x):
    clust_center = x[0]
    rid = x[1][0]
    point = np.array(x[1][1].toArray()).astype(float)
    dist = mahalanobis(clust_center,point,sigmas_inv[clust_center])
    return (int(rid),int(clust_center),float(dist))
    

In [73]:
rid = df_col_rdd.map(lambda x:x[0])
features = df_col_rdd.map(lambda x: np.array(x[1:]).astype(float))
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)
features = scaler2.transform(features)
zipped_col = rid.zip(features)
gmm = GaussianMixture.train(features,3)
labels = gmm.predict(features)
mus = []
sigmas = []
sigmas_inv = []
for i in range(3):
    mus.append(np.array(gmm.gaussians[i].mu.toArray()).astype(float))
    sigmas.append(np.array(gmm.gaussians[i].sigma.toArray()).astype(float))
    sigmas_inv.append(np.linalg.inv(sigmas[i]))
final_rdd = labels.zip(zipped_col)
rdd_w_clusts = final_rdd.map(lambda x: getDistances(x))

In [105]:
gmm_df.where(gmm_df['c_no'] == 0).show()

+---+----+------------------+
|rid|c_no|            dist_c|
+---+----+------------------+
|  1|   0|1.3417595880225692|
|  2|   0|0.6259355430408566|
|  3|   0| 1.969200743054647|
|  4|   0| 1.969200743054647|
|  6|   0|3.4587405022484377|
|  8|   0|1.2100683440665077|
|  9|   0| 0.570260286659337|
| 10|   0|1.7980582243233845|
| 11|   0|2.5274539774592917|
| 12|   0| 2.632781788454022|
| 13|   0|0.5937630548774238|
| 14|   0|0.5937630548774238|
| 21|   0|2.8198594109249964|
| 23|   0|0.6259355430408566|
| 24|   0|0.6259355430408566|
| 25|   0|1.2312494197128114|
| 28|   0|0.6124584041875907|
| 29|   0|0.6259355430408566|
| 31|   0|1.0798740296508085|
| 33|   0|1.0765750360500557|
+---+----+------------------+
only showing top 20 rows



In [103]:
cols = ['rid', 'c_no', 'dist_c']
gmm_df = rdd_w_clusts.toDF(cols)
outlier_all = get_outliers.iqr_outliers(gmm_df.where(gmm_df['c_no'] == 0), 'dist_c')
print(outlier_all.show())
for i in range(1, 3):
    outlier_c = get_outliers.iqr_outliers(gmm_df.where(gmm_df['c_no'] == i), 'dist_c')
    outlier_all = outlier_all.unionAll(outlier_c)

+---+-----------------+-----------------+
|rid|           dist_c|           dist_c|
+---+-----------------+-----------------+
|425|5.353605956998742|5.353605956998742|
|451|4.747783848259721|4.747783848259721|
|457|5.993677023233796|5.993677023233796|
+---+-----------------+-----------------+

None


In [75]:
outlier_all.count()

24

In [97]:
outlier_all.count()

23

In [101]:
outlier_all.show()

+---+------------------+------------------+
|rid|            dist_c|            dist_c|
+---+------------------+------------------+
|  5|0.8181668075756773|0.8181668075756773|
| 58|0.4197031162934129|0.4197031162934129|
| 93| 2.955095923872292| 2.955095923872292|
|116|1.6243447061841065|1.6243447061841065|
|159| 3.732245738548085| 3.732245738548085|
|214|0.8811070218418926|0.8811070218418926|
|215|0.8811070218418926|0.8811070218418926|
|220| 2.522640762227058| 2.522640762227058|
|228|0.9198827528155038|0.9198827528155038|
|279| 2.125132118809308| 2.125132118809308|
|285|1.1357407816111917|1.1357407816111917|
|340| 2.361567562202725| 2.361567562202725|
|348| 2.155426764956365| 2.155426764956365|
|389|0.8942177110847539|0.8942177110847539|
|420|1.2516598756087944|1.2516598756087944|
|425|1.1725064364430433|1.1725064364430433|
|428|1.2087188464618732|1.2087188464618732|
|451|0.9531737413850345|0.9531737413850345|
|452|3.5158242168139813|3.5158242168139813|
|457|  1.48224191173903|  1.4822

In [89]:
final_rdd = labels.zip(zipped_col)

In [36]:
labels.take(5)

[2, 0, 0, 0, 0]

In [30]:
mus = []
sigmas = []

In [31]:
for i in range(3):
    mus.append(np.array(gmm.gaussians[i].mu.toArray()).astype(float))
    sigmas.append(np.array(gmm.gaussians[i].sigma.toArray()).astype(float))

In [32]:
mus

[array([0.44592339, 0.43449696]),
 array([-0.46776086, -0.35164155]),
 array([3.15045825, 1.74866176])]

In [33]:
sigmas

[array([[0.47591458, 0.07829796],
        [0.07829796, 0.74783396]]), array([[0.06576417, 0.03679376],
        [0.03679376, 0.07849692]]), array([[ 2.13991533, -1.64727693],
        [-1.64727693,  8.41836254]])]

In [119]:
zipped_col.take(5)

[(0, DenseVector([3.2302, 0.1908])),
 (1, DenseVector([-0.8191, 0.402])),
 (2, DenseVector([0.3915, 1.683])),
 (3, DenseVector([0.3915, 1.683])),
 (4, DenseVector([0.3915, 1.683]))]

In [120]:
zipped_col = zipped_col.map(lambda x:x[0]+1)

In [121]:
zipped_col.take(5)

[1, 2, 3, 4, 5]

In [82]:
features = features.map(lambda x:x.tolist())

In [88]:
rid = rid.collect()

In [89]:
features.toDF().withColumn('rid',rid)

AssertionError: col should be Column