In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TextVectorization") \
    .getOrCreate()

24/12/19 19:42:26 WARN Utils: Your hostname, Anls-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 161.9.118.206 instead (on interface en0)
24/12/19 19:42:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/19 19:42:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df = spark.read.csv("text.csv", header=True, inferSchema=True)

                                                                                

In [3]:
from pyspark.sql.functions import regexp_replace, lower, split

df_clean = df.withColumn("clean_text", lower(regexp_replace("t1", "[^a-zA-Z\\s]", "")))
df_tokens = df_clean.withColumn("tokens", split("clean_text", "\\s+"))

In [4]:
from pyspark.ml.feature import HashingTF, IDF

hashing_tf = HashingTF(inputCol="tokens", outputCol="raw_features", numFeatures=10000)
featurized_data = hashing_tf.transform(df_tokens)

print(featurized_data.show())

idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

print(rescaled_data.select("features").show(truncate=False))

24/12/19 19:42:35 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: t1, 
 Schema: t1, _c1
Expected: _c1 but found: 
CSV file: file:///Users/anildervis/Documents/ITU/YZV411/Project/text.csv


+---------+----+----------+-----------+--------------------+
|       t1| _c1|clean_text|     tokens|        raw_features|
+---------+----+----------+-----------+--------------------+
|jaskldbsa|NULL| jaskldbsa|[jaskldbsa]|(10000,[6118],[1.0])|
| jaskdnsa|NULL|  jaskdnsa| [jaskdnsa]|(10000,[3561],[1.0])|
| ashjdlsa|NULL|  ashjdlsa| [ashjdlsa]|(10000,[5049],[1.0])|
+---------+----+----------+-----------+--------------------+

None


                                                                                

+-----------------------------------+
|features                           |
+-----------------------------------+
|(10000,[6118],[0.6931471805599453])|
|(10000,[3561],[0.6931471805599453])|
|(10000,[5049],[0.6931471805599453])|
+-----------------------------------+

None


In [5]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["features"], outputCol="vector")
dataset = assembler.transform(rescaled_data)

In [6]:
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

def compute_similarity(vector1, vector2):
    return cosine_similarity([vector1], [vector2])[0][0]

In [7]:
from pyspark.ml.linalg import Vectors
user_vector = Vectors.dense(np.random.rand(10000))
user_vector_broadcast = spark.sparkContext.broadcast(user_vector)

In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

similarity_udf = udf(lambda x: float(compute_similarity(user_vector_broadcast.value, x)), DoubleType())
df_similarities = dataset.withColumn("similarity", similarity_udf("vector"))

In [9]:
print(df_similarities.show())

24/12/19 19:42:38 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: t1, 
 Schema: t1, _c1
Expected: _c1 but found: 
CSV file: file:///Users/anildervis/Documents/ITU/YZV411/Project/text.csv


+---------+----+----------+-----------+--------------------+--------------------+--------------------+--------------------+
|       t1| _c1|clean_text|     tokens|        raw_features|            features|              vector|          similarity|
+---------+----+----------+-----------+--------------------+--------------------+--------------------+--------------------+
|jaskldbsa|NULL| jaskldbsa|[jaskldbsa]|(10000,[6118],[1.0])|(10000,[6118],[0....|(10000,[6118],[0....|0.010676062180195093|
| jaskdnsa|NULL|  jaskdnsa| [jaskdnsa]|(10000,[3561],[1.0])|(10000,[3561],[0....|(10000,[3561],[0....|0.014932668346377753|
| ashjdlsa|NULL|  ashjdlsa| [ashjdlsa]|(10000,[5049],[1.0])|(10000,[5049],[0....|(10000,[5049],[0....|0.012092973083839812|
+---------+----+----------+-----------+--------------------+--------------------+--------------------+--------------------+

None


                                                                                

In [10]:
top_matches = df_similarities.orderBy("similarity", ascending=False).limit(10)
print(top_matches)
top_matches.select("vector", "similarity").show()

DataFrame[t1: string, _c1: string, clean_text: string, tokens: array<string>, raw_features: vector, features: vector, vector: vector, similarity: double]
+--------------------+--------------------+
|              vector|          similarity|
+--------------------+--------------------+
|(10000,[3561],[0....|0.014932668346377753|
|(10000,[5049],[0....|0.012092973083839812|
|(10000,[6118],[0....|0.010676062180195093|
+--------------------+--------------------+

