In [None]:
%%configure -f
{"driverMemory": "4g", "executorMemory":"8960m"}

In [None]:
# Packages to download the corpus and pandas
import urllib.request
#import pandas as pd
# This block is for sparknlp stuff
import sparknlp
from sparknlp.annotator import *
from sparknlp.base import *
# This block is for pyspark stuff
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import BucketedRandomProjectionLSH, BucketedRandomProjectionLSHModel

In [None]:
spark = sparknlp.start()

In [None]:
urls = spark.read.json("s3://your-bucket/datasets/aws-blogs/json/list.json")

In [None]:
urls.printSchema()

In [None]:
medlineplusDF = urls.withColumn(
    "text",
    F.concat(
        F.col("title"), 
        F.lit(" "), 
        F.col("postExcerpt")
    )
).select("title", F.col("link").alias("url"), "text")

medlineplusDF.show(1,1000)


In [None]:
docass = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

sentence_detector_dl = SentenceDetector()\
    .setInputCols(["document"])\
    .setOutputCol("sentence")

emb_use = UniversalSentenceEncoder.pretrained()\
    .setInputCols("sentence")\
    .setOutputCol("use_embeddings")


In [None]:
pipeline_use = Pipeline(
    stages=[
        docass, 
        sentence_detector_dl, 
        emb_use
        ]
    )

# Fit (or prepare) the pipeline into a model
model_use = pipeline_use.fit(medlineplusDF)

model_use.save("hdfs:///user/diego/model_use.model")
# Apply the model to our medlineplusDF content and save in a new dataframe
medlineplusSentencesDF_init = model_use.transform(medlineplusDF)

In [None]:
medlineplusSentencesDF = medlineplusSentencesDF_init.select(
  F.col("title"),
  F.col("url"),
  F.arrays_zip(
    F.col("sentence.result").alias("sentence"),
    F.col("sentence.begin").alias("begin"),
    F.col("sentence.end").alias("end"),
    F.col("use_embeddings.embeddings")
  ).alias("zip")
).select(
  F.col("title"),
  F.col("url"),
  F.explode(F.col("zip")).alias("zip")
).select(
  F.col("title"),
  F.col("url"),
  F.col("zip")['0'].alias("sentence"),
  F.col("zip")['1'].alias("begin"),
  F.col("zip")['2'].alias("end"),
  F.col("zip")['3'].alias("embeddings")
).repartition(100)

In [None]:
def avg_vectors(bert_vectors):
  
  length = len(bert_vectors[0]["embeddings"])
  avg_vec = [0] * length
  
  for vec in bert_vectors:
    for i, x in enumerate(vec["embeddings"]):
      avg_vec[i] += x
    avg_vec[i] = avg_vec[i] / length
  return avg_vec

import pyspark.sql.types as T

avg_vectors_udf = F.udf(avg_vectors, T.ArrayType(T.DoubleType()))

myudf = F.udf(lambda vs: Vectors.dense(vs), VectorUDT())

df_doc_vec = medlineplusSentencesDF.withColumn("doc_vector", avg_vectors_udf(F.col("embeddings")))

In [None]:
medlineplusSentencesDF_dv = df_doc_vec.select(
  "title", 
  "url", 
  "sentence", 
  "begin", 
  "end", 
  myudf("embeddings").alias("embeddings")
)

In [None]:
def get_key(query, model):
  queryDF = spark.createDataFrame([[query]]).toDF("text")
  queryDF = model.transform(queryDF)
  queryDF = queryDF.select(
    F.explode(
      F.arrays_zip(
        F.col("sentence.result"),
        F.col("use_embeddings.embeddings")
      )
    ).alias("zip")
  ).select(
    F.col("zip")['0'].alias("sentence"),
    myudf(F.col("zip")['1']).alias("embeddings")
  )  

  key = queryDF.select("embeddings").take(1)[0].embeddings
  return key

In [None]:
get_key("How to stream from Hbase?", model_use)

In [None]:
brp = BucketedRandomProjectionLSH(
 inputCol="embeddings",
 outputCol="hashes",
 bucketLength=5,
 numHashTables=10
)

# Fit the LSH model for our content
brp_model = brp.fit(medlineplusSentencesDF_dv)
brp_model.save("hdfs:///user/diego/brp_model.model")
# Apply the model to our content and get the new reduced
# representation
hashesDF = brp_model.transform(medlineplusSentencesDF_dv).persist()
hashesDF.write.parquet("hdfs:///user/diego/output")

In [None]:
def find_close_sentences(query, emb_model, brp_model, hashesDF, k):
  key = get_key(query, emb_model)
  resultsDF = brp_model.approxNearestNeighbors(hashesDF, key, k)
  return resultsDF

In [None]:
find_close_sentences("how to monitoring spark streaming?", model_use, brp_model, hashesDF, 5).select(F.col("title"),F.col("distCol"),F.col("sentence")).show(truncate=False)