In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType, FloatType
from pyspark.ml.feature import Word2Vec, Tokenizer
from pyspark.ml.linalg import Vectors


spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [13]:
json_file_path = "./capital.json"

df = spark.read.json(json_file_path)
df_with_id = df.withColumn("unique_id", F.monotonically_increasing_id())
df_filtered = df_with_id.filter(~df_with_id["unique_id"].isin([0, 11]))
df_final = df_filtered.drop("unique_id")


In [14]:
clean_df = df_final.drop('_corrupt_record')
clean_df.toPandas()

Unnamed: 0,answer,question
0,Paris,What is the capital of France?
1,Berlin,What is the capital of Germany?
2,Brasília,What is the capital of Brazil?
3,Tokyo,What is the capital of Japan?
4,Canberra,What is the capital of Australia?
5,Ottawa,What is the capital of Canada?
6,Moscow,What is the capital of Russia?
7,New Delhi,What is the capital of India?
8,Mexico City,What is the capital of Mexico?
9,Pretoria,What is the capital of South Africa?


In [15]:
# 2. Tokenize the questions
tokenizer = Tokenizer(inputCol="question", outputCol="words")
tokenized_data = tokenizer.transform(clean_df)

In [16]:
# 3. Train a Word2Vec model
word2vec = Word2Vec(vectorSize=100, minCount=0, inputCol="words", outputCol="word2vec_result")
model = word2vec.fit(tokenized_data)
word2vec_result = model.transform(tokenized_data)


23/08/16 12:24:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [17]:
word2vec_result.select("question", "word2vec_result").show(truncate=False)


+------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [18]:
def cosine_similarity(v1, v2):
    return float(v1.dot(v2) / (v1.norm(2) * v2.norm(2)))

In [24]:
def search_with_word2vec(model, dataframe, question, top_k=1):
    # Transform the question
    new_question = spark.createDataFrame([(question,)], ["question"])
    new_question = tokenizer.transform(new_question)
    new_vector = model.transform(new_question).head()["word2vec_result"]
    
    # UDF to compute similarity
    similarity_udf = F.udf(lambda x: cosine_similarity(x, new_vector), FloatType())
    
    # Compute similarity for each row
    similar = dataframe.withColumn("similarity", similarity_udf(F.col("word2vec_result")))
    
    # Return top k results
    return similar.orderBy("similarity", ascending=False).limit(top_k)


In [28]:
# Test the function
query = "What is the capital of Japan?"  # Just a sample question
top_similar = search_with_word2vec(model, word2vec_result, query)
top_similar.select("question", "answer").show(truncate=False)

+-----------------------------+------+
|question                     |answer|
+-----------------------------+------+
|What is the capital of Japan?|Tokyo |
+-----------------------------+------+

