In [1]:
!pip install pyspark 
!pip install -U -q PyDrive
!apt update
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 62kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 51.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=8feeb4999b00b8042ec64ae32e10c59d370755f66bfcdeae972466a665e4c647
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1
Ign:

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import os
cur_path = "/content/drive/MyDrive/insider-risk-in-spark/"
os.chdir(cur_path)
!pwd

/content/drive/MyDrive/insider-risk-in-spark


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('proj').getOrCreate()

In [5]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from pyspark.ml.feature import Tokenizer, StopWordsRemover, NGram, CountVectorizer, StandardScaler


In [6]:
email = spark.read.csv( '/content/drive/MyDrive/insider-risk-in-spark/data/email.csv',inferSchema=True,header=True)

In [7]:
email.printSchema()
email.show(5)

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- user: string (nullable = true)
 |-- pc: string (nullable = true)
 |-- to: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- bcc: string (nullable = true)
 |-- from: string (nullable = true)
 |-- size: integer (nullable = true)
 |-- attachments: integer (nullable = true)
 |-- content: string (nullable = true)

+--------------------+-------------------+-------+-------+--------------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|                  id|               date|   user|     pc|                  to|                  cc|                 bcc|                from| size|attachments|             content|
+--------------------+-------------------+-------+-------+--------------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|{R3I7-S4TX96FG-82...|01/02/2010 07:11:45|LAP0338

In [8]:
tokenizer = Tokenizer(inputCol="content", outputCol="words")
wordsData = tokenizer.transform(email)

remover = StopWordsRemover(inputCol="words", outputCol="clean_words")
wordsData = remover.transform(wordsData)

In [15]:
wordsData.show()

+--------------------+-------------------+-------+-------+--------------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------------+--------------------+
|                  id|               date|   user|     pc|                  to|                  cc|                 bcc|                from| size|attachments|             content|               words|         clean_words|
+--------------------+-------------------+-------+-------+--------------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------------+--------------------+
|{R3I7-S4TX96FG-82...|01/02/2010 07:11:45|LAP0338|PC-5758|Dean.Flynn.Hines@...|Nathaniel.Hunter....|                null|Lynn.Adena.Pratt@...|25830|          0|middle f2 systems...|[middle, f2, syst...|[middle, f2, syst...|
|{R0R9-E4GL59IK-29...|01/02/2010 07:12:16|MOH0273|PC-6699|Odonnell-Gage@bel...|                null|    

In [9]:
cv = CountVectorizer(inputCol="clean_words", outputCol="features", vocabSize=1000, minDF=2.0)

model = cv.fit(wordsData)

wordsCV = model.transform(wordsData)


In [12]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors


all0vector = Vectors.dense([0]*1000) 

# Filter the empty Sparse Vector
def no_empty_vector(value):
    if value != all0vector:
        return True
    else:
        return False


no_empty_vector_udf = udf(no_empty_vector, BooleanType())
wordsCV = wordsCV.filter(no_empty_vector_udf('features'))


In [13]:
wordsCV.show()

+--------------------+-------------------+-------+-------+--------------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------------+--------------------+--------------------+
|                  id|               date|   user|     pc|                  to|                  cc|                 bcc|                from| size|attachments|             content|               words|         clean_words|            features|
+--------------------+-------------------+-------+-------+--------------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------------+--------------------+--------------------+
|{R3I7-S4TX96FG-82...|01/02/2010 07:11:45|LAP0338|PC-5758|Dean.Flynn.Hines@...|Nathaniel.Hunter....|                null|Lynn.Adena.Pratt@...|25830|          0|middle f2 systems...|[middle, f2, syst...|[middle, f2, syst...|(1000,[28,59,105,...|
|{R0R9-E4GL59IK-29..

In [14]:
from pyspark.ml.feature import MinHashLSH


mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=20)
model = mh.fit(wordsCV)
wordsHash = model.transform(wordsCV)

In [15]:
wordsHash.show()

+--------------------+-------------------+-------+-------+--------------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                  id|               date|   user|     pc|                  to|                  cc|                 bcc|                from| size|attachments|             content|               words|         clean_words|            features|              hashes|
+--------------------+-------------------+-------+-------+--------------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|{R3I7-S4TX96FG-82...|01/02/2010 07:11:45|LAP0338|PC-5758|Dean.Flynn.Hines@...|Nathaniel.Hunter....|                null|Lynn.Adena.Pratt@...|25830|          0|middle f2 systems...|[middle, f2, syst...|

In [16]:
id_hash = wordsHash.select('id', 'hashes')

In [17]:
sc = spark.sparkContext

numAttrs = 20
attrs = sc.parallelize(["hash_" + str(i) for i in range(numAttrs)]).zipWithIndex().collect()
for name, index in attrs:
    id_hash = id_hash.withColumn(name, id_hash['hashes'].getItem(index))

In [18]:
id_hash.show()

+--------------------+--------------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|                  id|              hashes|        hash_0|        hash_1|        hash_2|        hash_3|        hash_4|        hash_5|        hash_6|        hash_7|        hash_8|        hash_9|       hash_10|       hash_11|       hash_12|       hash_13|       hash_14|       hash_15|       hash_16|       hash_17|       hash_18|       hash_19|
+--------------------+--------------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------

In [19]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

udf_getNumber = udf(lambda x: int(x[0]), LongType())

In [20]:
for col_num in range(20):
    id_hash = id_hash.withColumn('hash_'+str(col_num), udf_getNumber('hash_'+str(col_num)))

In [21]:
id_hash.show()

+--------------------+--------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|                  id|              hashes|   hash_0|   hash_1|   hash_2|   hash_3|   hash_4|   hash_5|   hash_6|   hash_7|   hash_8|   hash_9|  hash_10|  hash_11|  hash_12|  hash_13|  hash_14|  hash_15|  hash_16|  hash_17|  hash_18|  hash_19|
+--------------------+--------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|{R3I7-S4TX96FG-82...|[[5.4521502E7], [...| 54521502| 33808616| 18945747|171614095| 27552661|  9893546|341744119| 28534000| 21465049|100532720|162038291|189345170|161975902|164155346|138306094| 42985656| 24640161| 43713524| 98871159|  4832531|
|{R0R9-E4GL59IK-29...|[[

In [None]:
from pyspark.ml.feature import VectorAssembler

hash_cols = ['hash_'+str(i) for i in range(20)]

assembler = VectorAssembler(inputCols=hash_cols, outputCol="features")
id_hash = assembler.transform(id_hash)

In [24]:
id_hash.show()

+--------------------+--------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------------+
|                  id|              hashes|   hash_0|   hash_1|   hash_2|   hash_3|   hash_4|   hash_5|   hash_6|   hash_7|   hash_8|   hash_9|  hash_10|  hash_11|  hash_12|  hash_13|  hash_14|  hash_15|  hash_16|  hash_17|  hash_18|  hash_19|            features|
+--------------------+--------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------------------+
|{R3I7-S4TX96FG-82...|[[5.4521502E7], [...| 54521502| 33808616| 18945747|171614095| 27552661|  9893546|341744119| 28534000| 21465049|100532720|162038291|189345170|161975902|164155346|138306094| 42985656| 2

In [25]:
id_hash_trans = id_hash.select('id','features')
id_hash_trans.show()

+--------------------+--------------------+
|                  id|            features|
+--------------------+--------------------+
|{R3I7-S4TX96FG-82...|[5.4521502E7,3.38...|
|{R0R9-E4GL59IK-29...|[2.09044447E8,1.9...|
|{G2B2-A8XY58CP-28...|[1.29704246E8,3.3...|
|{A3A9-F4TH89AA-83...|[2.68081987E8,3.1...|
|{E8B7-C8FZ88UF-29...|[1.95670881E8,3.5...|
|{X8T7-A6BT54FP-72...|[2.2231094E7,5.80...|
|{H5J6-G2RS59KI-83...|[1.68681437E8,5.0...|
|{D9T8-M1HJ89XP-63...|[1.39162667E8,3.1...|
|{V3L7-L2RB92RV-91...|[4.9220538E7,1.65...|
|{D5K9-P0IJ71WK-63...|[3.0303696E7,1.45...|
|{R0A5-U4YQ17EA-34...|[3.0303696E7,2.28...|
|{Y8Z6-X5HU72BM-73...|[2.3616913E7,3539...|
|{K3B8-S0RJ27BU-68...|[5.47609107E8,3.7...|
|{J7Y1-G7KD78BQ-41...|[4.9220538E7,1.58...|
|{D7P4-Z0PP26KM-17...|[6.5365742E7,5.41...|
|{P6J4-Y0XJ63II-57...|[5.729314E7,2.184...|
|{K7Y5-V5IP47OA-83...|[3556564.0,4.0148...|
|{R9V2-W5OA43XS-14...|[7471709.0,5.8095...|
|{X4R4-F1BP75UA-02...|[3.0303696E7,5.80...|
|{N4L7-S2MN81EJ-50...|[9.2112874

In [26]:
hash_trans = id_hash_trans.select('features')
hash_trans.show()

+--------------------+
|            features|
+--------------------+
|[5.4521502E7,3.38...|
|[2.09044447E8,1.9...|
|[1.29704246E8,3.3...|
|[2.68081987E8,3.1...|
|[1.95670881E8,3.5...|
|[2.2231094E7,5.80...|
|[1.68681437E8,5.0...|
|[1.39162667E8,3.1...|
|[4.9220538E7,1.65...|
|[3.0303696E7,1.45...|
|[3.0303696E7,2.28...|
|[2.3616913E7,3539...|
|[5.47609107E8,3.7...|
|[4.9220538E7,1.58...|
|[6.5365742E7,5.41...|
|[5.729314E7,2.184...|
|[3556564.0,4.0148...|
|[7471709.0,5.8095...|
|[3.0303696E7,5.80...|
|[9.2112874E7,1.77...|
+--------------------+
only showing top 20 rows



In [27]:
id_hash_sub = id_hash_trans.sample(withReplacement=False, fraction=0.0001, seed=42)

In [28]:
id_hash_sub.count()

2682

In [154]:
from pyspark.ml.clustering import KMeans

kmeansmodel = KMeans().setK(k).setFeaturesCol('features').setPredictionCol('prediction').fit(hash_trans)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1212, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 49768)
Traceback (most recent call last):
  File "/usr/lib/python3.7/socketserver.py", line 316, in _handle_request_noblock
    self.pr

Py4JError: ignored

In [105]:
errors = []
results = []
centers = []

for k in range(2,10):
    kmeansmodel = KMeans().setK(k).setFeaturesCol('features').setPredictionCol('prediction').fit(hash_trans)

    print("With K={}".format(k))
    
    #带有预测簇标签的数据集
    kmeans_results = kmeansmodel.transform(hash_trans).collect()
    results.append(kmeans_results)
    for item in kmeans_results:
        print(str(item[0])+' is predcted as cluster'+ str(item[1]))
    
    #获取到模型的所有聚类中心情况
    kmeans_centers = kmeansmodel.clusterCenters()
    centers.append(kmeans_centers)
    center_seq = 0
    for item in kmeans_centers:
        print("Cluster" +  str(center_seq) + "  Center" + str(item))
        center_seq = center_seq + 1
    
    #计算集合内误差平方和（Within Set Sum of Squared Error, WSSSE)
    WSSSE = kmeansmodel.computeCost(hash_trans)
    errors.append(WSSSE)
    print("Within Set Sum of Squared Error = " + str(WSSSE))
    
    print('--'*30 + '\n')

Py4JJavaError: ignored

In [103]:
from pyspark.ml.clustering import KMeans

# Loads data.

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(hash_trans)

Py4JJavaError: ignored

In [None]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(id_hash_trans)

# Normalize each feature to have unit standard deviation.
id_hash_scaled = scalerModel.transform(id_hash_trans)
id_hash_scaled.show()

In [71]:
from pyspark.mllib.clustering import KMeans

k = 10 # Try different numbers
maxIterations = 20 # Try different numbers
data_rdd = id_hash_trans.select('id','hashes_trans') # An RDD containing all the data
training_rdd = id_hash_trans.select('hashes_trans').rdd # An RDD containing, for each row, a tuple with all the values (except the row key)

In [72]:
model = KMeans.train(training_rdd, k, maxIterations=maxIterations)
preds = model.predict(training_rdd)

Py4JJavaError: ignored

In [None]:
id_hash[['hashes']].take(1)

[Row(hashes=[DenseVector([226851327.0]), DenseVector([9165640.0]), DenseVector([104254652.0]), DenseVector([51684106.0]), DenseVector([23531578.0]), DenseVector([97738272.0]), DenseVector([182732379.0]), DenseVector([87645642.0]), DenseVector([178486040.0]), DenseVector([11915428.0]), DenseVector([14771039.0]), DenseVector([431549604.0]), DenseVector([3911444.0]), DenseVector([20282941.0]), DenseVector([82063745.0]), DenseVector([191832596.0]), DenseVector([26629019.0]), DenseVector([30056425.0]), DenseVector([123643097.0]), DenseVector([32825298.0]), DenseVector([67394723.0]), DenseVector([13859855.0]), DenseVector([106305736.0]), DenseVector([2331316.0]), DenseVector([38654927.0]), DenseVector([57297570.0]), DenseVector([84493334.0]), DenseVector([81757051.0]), DenseVector([11834842.0]), DenseVector([85011795.0]), DenseVector([39371781.0]), DenseVector([52023116.0]), DenseVector([87166582.0]), DenseVector([25994221.0]), DenseVector([32690537.0]), DenseVector([8307600.0]), DenseVector

In [None]:
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.sparse(6, [1, 3], [1.0, 1.0])

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("JaccardDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+---+--------------------+--------------------+
| id|            features|              hashes|
+---+--------------------+--------------------+
|  0|(6,[0,1,2],[1.0,1...|[[2.026114E7], [1...|
|  1|(6,[2,3,4],[1.0,1...|[[2.90294305E8], ...|
|  2|(6,[0,2,4],[1.0,1...|[[2.026114E7], [3...|
+---+--------------------+--------------------+

Approximately joining dfA and dfB on distance smaller than 0.6:
+---+---+---------------+
|idA|idB|JaccardDistance|
+---+---+---------------+
|  0|  5|            0.5|
|  2|  5|            0.5|
|  1|  5|            0.5|
|  1|  4|            0.5|
+---+---+---------------+

Approximately searching dfA for 2 nearest neighbors of the key:
+---+--------------------+--------------------+-------+
| id|            features|              hashes|distCol|
+---+--------------------+--------------------+-------+
|  0|(6,[0,1,2],[1.0,1...|[[2.026114E7], [1...|   0.75|
+---+--------------------+--

In [None]:
dfA.show()

+---+--------------------+
| id|            features|
+---+--------------------+
|  0|(6,[0,1,2],[1.0,1...|
|  1|(6,[2,3,4],[1.0,1...|
|  2|(6,[0,2,4],[1.0,1...|
+---+--------------------+



In [None]:
from pyspark_iforest.ml.iforest import IForest, IForestModel



In [None]:
scaler = StandardScaler(inputCol="hashes", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(resultHash)

# Normalize each feature to have unit standard deviation.
resultHashScaled = scalerModel.transform(resultHash)


IllegalArgumentException: ignored

In [None]:
resultHashScaled.show()

+--------------------+-------------------+-------+-------+--------------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                  id|               date|   user|     pc|                  to|                  cc|                 bcc|                from| size|attachments|             content|               words|         clean_words|              ngrams|            features|              hashes|
+--------------------+-------------------+-------+-------+--------------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|{R3I7-S4TX96FG-82...|2010-01-02 07:11:45|LAP0338|PC-5758|Dean.Flynn.Hines@...|Nathaniel.Hunter....|                null|Lynn.Adena.Pratt@.

In [None]:
word2Vec = Word2Vec(vectorSize=20, minCount=0, inputCol="words", outputCol="word_vector")
model = word2Vec.fit(wordsData)

result = model.transform(wordsData)

In [None]:
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.sparse(6, [1, 3], [1.0, 1.0])

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("JaccardDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+---+--------------------+--------------------+
| id|            features|              hashes|
+---+--------------------+--------------------+
|  0|(6,[0,1,2],[1.0,1...|[[7.76477275E8], ...|
|  1|(6,[2,3,4],[1.0,1...|[[1.194544211E9],...|
|  2|(6,[0,2,4],[1.0,1...|[[7.76477275E8], ...|
+---+--------------------+--------------------+

Approximately joining dfA and dfB on distance smaller than 0.6:
+---+---+---------------+
|idA|idB|JaccardDistance|
+---+---+---------------+
|  2|  5|            0.5|
|  1|  4|            0.5|
|  1|  5|            0.5|
|  0|  5|            0.5|
+---+---+---------------+

Approximately searching dfA for 2 nearest neighbors of the key:
+---+--------------------+--------------------+-------+
| id|            features|              hashes|distCol|
+---+--------------------+--------------------+-------+
|  1|(6,[2,3,4],[1.0,1...|[[1.194544211E9],...|   0.75|
+---+--------------------+--

In [None]:
file = file.withColumn('date', to_timestamp(file.date, 'MM/dd/yyyy HH:mm:ss'))

file.printSchema()
file.show(5)

root
 |-- id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- user: string (nullable = true)
 |-- pc: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- content: string (nullable = true)

+--------------------+-------------------+-------+-------+------------+--------------------+
|                  id|               date|   user|     pc|    filename|             content|
+--------------------+-------------------+-------+-------+------------+--------------------+
|{L9G8-J9QE34VM-28...|2010-01-02 07:23:14|MOH0273|PC-6699|EYPC9Y08.doc|D0-CF-11-E0-A1-B1...|
|{H0W6-L4FG38XG-98...|2010-01-02 07:26:19|MOH0273|PC-6699|N3LTSU3O.pdf|25-50-44-46-2D ca...|
|{M3Z0-O2KK89OX-57...|2010-01-02 08:12:03|HPH0075|PC-2417|D3D3WC9W.doc|D0-CF-11-E0-A1-B1...|
|{E1I4-S4QS61TG-36...|2010-01-02 08:17:00|HPH0075|PC-2417|QCSW62YS.doc|D0-CF-11-E0-A1-B1...|
|{D4R7-E7JL45UX-00...|2010-01-02 08:24:57|HSB0196|PC-8001|AU75JV6U.jpg|               FF-D8|
+--------------------+------

In [None]:
logon = logon.withColumn('date', to_timestamp(logon.date, 'MM/dd/yyyy HH:mm:ss'))

logon.printSchema()
logon.show(5)

root
 |-- id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- user: string (nullable = true)
 |-- pc: string (nullable = true)
 |-- activity: string (nullable = true)

+--------------------+-------------------+-------+-------+--------+
|                  id|               date|   user|     pc|activity|
+--------------------+-------------------+-------+-------+--------+
|{X1D9-S0ES98JV-53...|2010-01-02 06:49:00|NGF0157|PC-6056|   Logon|
|{G2B3-L6EJ61GT-22...|2010-01-02 06:50:00|LRR0148|PC-4275|   Logon|
|{U6Q3-U0WE70UA-37...|2010-01-02 06:53:04|LRR0148|PC-4124|   Logon|
|{I0N5-R7NA26TG-62...|2010-01-02 07:00:00|IRM0931|PC-7188|   Logon|
|{D1S0-N6FH62BT-53...|2010-01-02 07:00:00|MOH0273|PC-6699|   Logon|
+--------------------+-------------------+-------+-------+--------+
only showing top 5 rows



In [None]:
psychometric.printSchema()
psychometric.show(5)

root
 |-- employee_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- O: integer (nullable = true)
 |-- C: integer (nullable = true)
 |-- E: integer (nullable = true)
 |-- A: integer (nullable = true)
 |-- N: integer (nullable = true)

+--------------------+-------+---+---+---+---+---+
|       employee_name|user_id|  O|  C|  E|  A|  N|
+--------------------+-------+---+---+---+---+---+
|    Calvin Edan Love|CEL0561| 40| 39| 36| 19| 40|
|Christine Reagan ...|CRD0624| 26| 22| 17| 39| 32|
|Jade Felicia Cald...|JFC0557| 22| 16| 23| 40| 33|
|Aquila Stewart De...|ASD0577| 40| 48| 36| 14| 37|
|   Micah Abdul Rojas|MAR0955| 36| 44| 23| 44| 25|
+--------------------+-------+---+---+---+---+---+
only showing top 5 rows

