Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

# Clustering in Spark using spaCy and KMeans



## This cell configures the spark session - Do not change

In [None]:
%%configure -f
{
"conf": {
     "spark.rpc.message.maxSize": 1024,
     "spark.kryoserializer.buffer.max": "256m",
     "spark.driver.maxResultSize": "8g"
   }
}

StatementMeta(, 36, -1, Finished, Available)

## These are the parameters that need to be changed to your values

In [None]:
# The blob account url - https://[accountname].blob.core.windows.net
account_url = "https://[accountname].blob.core.windows.net"
# The blob account name = [accountname]
account_name = ''
# The blob account key [iufquq34r423r2==] - used to generate a SAS key
account_key = ''

# The input file name 
input_filename = 'abfss://share@[accountname].dfs.core.windows.net/sport_articles.csv'
# The number of clusters - this can be automated or start with a guesstimate
number_of_clusters = 5
# The output directory where the output file will be written to
output_directory = 'abfss://share@[accountname].dfs.core.windows.net/sport_articles/output/'
# The name of the output file
output_filename = 'cosine_spacy_max_++_cosine.csv'

# The name of the primary ADLS share
file_system_name="share"
# The directory folders where your files reside  
directory_name='bbc'  # bbc/videos/

# If set to true generate a 3D scatterplot otherwise 2D
SCATTER_PLOT_3D = False
# If this is set to True then the Coalesce notebook will need to be run to merge the partition files into a single file
LOW_MEMORY_MODE = True

# Concept Graph - # Get top N most connected nodes   
number_of_connected_nodes = 5

# Azure SubscriptionId
subscription_id=""
# AzureML Workspace Resource Group
resource_group=""
# AzureML Workspace Name
workspace_name=""

StatementMeta(small, 36, 10, Finished, Available)

## Track the Experiment in Azure ML

In [None]:
from azureml.core import Workspace, Experiment, Run
import mlflow

ws = Workspace(subscription_id=subscription_id, resource_group=resource_group, workspace_name=workspace_name)    
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())
experiment_name = f"({mssparkutils.runtime.context['notebookname']}_{str(mssparkutils.env.getJobId())})"
mlflow.set_experiment(experiment_name)
mlflow.log_param("input_filename", input_filename)
mlflow.log_param("number_of_clusters", number_of_clusters)
mlflow.log_param("output_directory", output_directory)
mlflow.log_param("output_filename", output_filename)
mlflow.log_param("account_url", account_url)
mlflow.log_param("account_name", account_name)
mlflow.log_param("file_system_name", file_system_name)
mlflow.log_param("directory_name", directory_name)
mlflow.log_param("SCATTER_PLOT_3D", SCATTER_PLOT_3D)
mlflow.log_param("LOW_MEMORY_MODE", LOW_MEMORY_MODE)
params = {
    "sparkpool": mssparkutils.runtime.context['sparkpool'],
    "workspace": mssparkutils.runtime.context['workspace'],
    "notebookname": mssparkutils.runtime.context['notebookname'],
    "isForPipeline": mssparkutils.runtime.context['isForPipeline'],
    "pipelinejobid": mssparkutils.runtime.context['pipelinejobid']
}

mlflow.log_params(params)
mlflow.pyspark.ml.autolog()

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, StopWordsRemover, PCA, RegexTokenizer
from pyspark.ml.clustering import LDA, KMeans, BisectingKMeans
from pyspark.ml import Pipeline
from pyspark import SparkContext, SparkConf
import sys
from pyspark.sql.functions import udf, col, size
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession
import ntpath
import os

from pyspark.ml.feature import StopWordsRemover, RFormula
from spacy.lang.en.stop_words import STOP_WORDS
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import spacy
from string import punctuation
from pyspark.sql.functions import udf,col,lit
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType,StringType, FloatType

from itertools import combinations
from operator import itemgetter

from graphframes import *
from pyspark.sql.functions import monotonically_increasing_id, lit

global custom_stopwords
global stopwords
global nlp


stopwords = STOP_WORDS
custom_stopwords = []
nlp = spacy.load("en_core_web_lg") # Using the large model

import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer

# replace this with your folder data
df = spark.read.load(input_filename, header=True, format='csv')


StatementMeta(small, 36, 11, Finished, Available)

# Using spaCy for Tokenisation, Vectorisation via custom functions and udf

In [None]:
# Cleaning and Tokenising with spaCy
def spacy_tokenizer(sentence):
    mytokens = nlp(sentence)
    mytokens = [ word.lemma_.lower().strip() if word.lemma_ != "-PRON-" else word.lower_ for word in mytokens ]
    mytokens = [ word for word in mytokens if word not in STOP_WORDS and word not in punctuation] #remove all punctuations
    mytokens = " ".join([i for i in mytokens])
    return mytokens


# Converting spaCy processed text into the tokens first and a feature matrix
def spacy_vectorizer_token(doc):
    vec_list = [nlp(doc).vector.reshape(1,-1) for word in doc]
    feature_matrix = np.concatenate(vec_list)
    return feature_matrix


# Converting spaCy processed text into a feature array - spaCy generates the average vector 
def spacy_vectorizer_sent(doc):
    if (nlp(doc).has_vector):
        vec_list = nlp(doc).vector
    else:
        vec_list = np.empty(300) # Spacy may not return a vector, Change this parameters depening the spaCy language model size (small:96, medium:180, large: 300)
    return Vectors.dense(vec_list)

# Clean, remove stop words, punctiations and tokenise text with spaCy
udf_spacy = udf(spacy_tokenizer, StringType())
df = df.withColumn("spacy_preprocessed_text_no_stopwords_no_punct", udf_spacy(col("text"))) 

# Vectorise processed text to spaCy features
udf_spacy_vectoriser = udf(spacy_vectorizer_sent, VectorUDT()) #StringType())
df = df.withColumn("features", udf_spacy_vectoriser(col("spacy_preprocessed_text_no_stopwords_no_punct"))) 


StatementMeta(small, 36, 12, Finished, Available)

# Apply PCA and Kmeans clustering

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

# Apply PCA and Kmeans in a pipeline
pca = PCA(k=20, inputCol="features")
pca.setOutputCol("pca_features")

if SCATTER_PLOT_3D:
  pca_2 = PCA(k=3, inputCol="pca_features")
else:  
  pca_2 = PCA(k=2, inputCol="pca_features")
  
pca_2.setOutputCol("pca_scatterplot_features")

kmeans = KMeans(k=number_of_clusters, seed=42, initMode="k-means||", distanceMeasure="euclidean")

pipeline = Pipeline(stages=[pca, kmeans, pca_2])
model = pipeline.fit(df)
df_coords = model.transform(df)

StatementMeta(small, 36, 14, Finished, Cancelled)

## Retrieve Data from Blob Storage

In [None]:
from azure.identity import DefaultAzureCredential
from datetime import datetime, timedelta
from azure.storage.blob import BlobServiceClient, generate_container_sas, BlobSasPermissions
token_credential = DefaultAzureCredential()

blob_service_client = BlobServiceClient(
    account_url=account_url,
    credential=token_credential
)

from azure.storage.filedatalake import DataLakeServiceClient, generate_directory_sas
SAS = generate_directory_sas(
        account_name=account_name,
        file_system_name=file_system_name,
        directory_name=directory_name,
        credential=account_key,
        permission=BlobSasPermissions(read=True),
        expiry=datetime.utcnow() + timedelta(days=100))

SAS_key  = "?" + SAS

storage_path = os.path.join(account_url, file_system_name, directory_name)
SAS_path = []

def build_sas_path(row):
    file_name = ntpath.basename(row)
    return account_url + "/" + file_system_name + "/" + directory_name + "/" + file_name + SAS_key

udf_build_sas_path = udf(build_sas_path, StringType())

def get_X(row):
    return str(row.values[0])

def get_Y(row):
    return str(row.values[1])

def get_Z(row):
    return str(row.values[2])

def join_text(row):
    return "".join(row)

def remove_separator(row):
    return row.replace(",", "").replace("\\", "")

udf_get_X = udf(get_X, StringType())
udf_get_Y = udf(get_Y, StringType())
udf_get_Z = udf(get_Z, StringType())
udf_join_text = udf(join_text, StringType())


    
udf_remove_separator = udf(remove_separator, StringType())


if SCATTER_PLOT_3D:
    df_coords = df_coords.withColumn("blob_path", udf_build_sas_path(df_coords.filename)).withColumn("X", udf_get_X(df_coords.pca_scatterplot_features).cast('string')).withColumn("Y", udf_get_Y(df_coords.pca_scatterplot_features).cast('string')).withColumn("Z", udf_get_Z(df_coords.pca_scatterplot_features).cast('string')).withColumn("processed_text", udf_join_text(df_coords.spacy_preprocessed_text_no_stopwords_no_punct).cast('string')).withColumn("text", udf_remove_separator(df_coords.text))
else:
    df_coords = df_coords.withColumn("blob_path", udf_build_sas_path(df_coords.filename)).withColumn("X", udf_get_X(df_coords.pca_scatterplot_features).cast('string')).withColumn("Y", udf_get_Y(df_coords.pca_scatterplot_features).cast('string')).withColumn("processed_text", udf_join_text(df_coords.spacy_preprocessed_text_no_stopwords_no_punct).cast('string')).withColumn("text", udf_remove_separator(df_coords.text))

df_graph = df_coords
df_coords = df_coords.drop('pca_features', 'pca_scatterplot_features', 'features', 'spacy_preprocessed_text_no_stopwords_no_punct')

StatementMeta(, , , Cancelled, )

## Save the outcome with two operations (usual or coalesce)

In [None]:
if LOW_MEMORY_MODE:
    df_coords.write.mode('overwrite').options(header='true').csv(os.path.join(output_directory, output_filename))
else:
    df1 = df_coords.filter((df_coords.blob_path != 'blob_path'))
    df1.coalesce(1).write.mode('overwrite').options(header='true').csv(os.path.join(output_directory, output_filename))

mlflow.pyspark.ml.mlflow.end_run()

StatementMeta(, , , Cancelled, )

# Optional: Add Azure Cognitive Search

## Add Search Parameters

In [None]:
# Azure Search Admin Key
search_admin_key = ""
# The name of the search service
search_service_name = ""
# The Azure Search Query Key
search_query_key = ""

In [None]:
from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit

df = df.drop("_c0")

(
    df.withColumn("key", monotonically_increasing_id().cast("string"))
    .withColumn("SearchAction", lit("upload"))
    .writeToAzureSearch(
        subscriptionKey=search_admin_key,
        actionCol="SearchAction",
        serviceName=search_service_name,
        indexName=experiment_name,  # Defaults to the notebook name
        keyCol="key",
    )
)

## Search the generated Azure Search Index

In [None]:
import requests
from IPython.display import display, Markdown

term_to_search_for = "covid"

url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06".format(
    search_service_name, experiment_name

)
jdata = requests.post(url, json={"search": term_to_search_for}, headers={"api-key": search_query_key}).json()

for doc in jdata['value']:
    display(Markdown(f'**Search Score {doc["@search.score"]}** Document {doc["filename"]}'))
    display(Markdown(f'{doc["text"]}'))


## Implement Semantic Search

1) [Enable Semantic Search](https://docs.microsoft.com/en-us/azure/search/semantic-search-overview#enable-semantic-search) on your search instance

2) [Configure Semantic Search](https://docs.microsoft.com/en-us/azure/search/semantic-how-to-query-request?tabs=semanticConfiguration%2Cportal#create-a-semantic-configuration)

In [None]:
term_to_search_for = "Whose thumb was fractured?"

url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2021-04-30-Preview".format(
    search_service_name, experiment_name
)
jdata = requests.post(url, json={"search": term_to_search_for, "queryType": "semantic", "semanticConfiguration": "config", "queryLanguage": "en-us", "answers": "extractive|count-3",
"captions": "extractive|highlight-true",  "highlightPreTag": "<mark>","highlightPostTag": "</mark>"}, headers={"api-key": search_query_key}).json()

for doc in jdata['value']:
    display(Markdown(f'**Search Score {doc["@search.score"]}** **Search rerankerScore Score {doc["@search.rerankerScore"]}** Document {doc["filename"]}'))
    display(Markdown(f'@search.captions {doc["@search.captions"]}'))


# Optional:  Build the Concept Graph using GraphFrames

## Optional: Amend this section to build your concept graph

In [None]:
lst_text = df_graph.select('spacy_preprocessed_text_no_stopwords_no_punct').rdd.flatMap(lambda x: x).collect()

lst_source_node = []
lst_source_node_weight = []
lst_source_node_label = []
lst_target_node = []
lst_target_node_weight = []
lst_target_node_label = []
lst_source_url = []
lst_target_url = []
lst_edge_weight = []
lst_edge_colour_weight = []

lst_g_nodes = []
lst_g_edges = []

dict_nodes = {}

from itertools import combinations

for i, row in enumerate(lst_text):
    
    combos = list(combinations(row, 2))
   
    for c in combos:
        # First update edge weights
        if (c[0] + "_" + c[1] not in dict_nodes) and (c[1] + "_" + c[0] not in dict_nodes):
            dict_nodes[c[0] + "_" + c[1]] = 1 # initialise and create first combo
        elif c[0] + "_" + c[1] in dict_nodes:
            dict_nodes[c[0] + "_" + c[1]] += 1
        elif c[1] + "_" + c[0] in dict_nodes:
            dict_nodes[c[1] + "_" + c[0]] += 1

    for c in combos:
        lst_source_node.append(c[0])
        lst_g_nodes.append((c[0], c[0]))
        lst_target_node.append((c[1]))
        lst_g_nodes.append((c[1], c[1]))
        if c[0] + "_" + c[1] in dict_nodes:
            lst_edge_weight.append(dict_nodes[c[0] + "_" + c[1]])
            lst_source_node_weight.append(dict_nodes[c[0] + "_" + c[1]])
            lst_target_node_weight.append(dict_nodes[c[0] + "_" + c[1]])
            lst_g_edges.append((c[0],c[1], "related"))
        else:
            lst_edge_weight.append(dict_nodes[c[1] + "_" + c[0]])
            lst_source_node_weight.append(dict_nodes[c[1] + "_" + c[0]])
            lst_target_node_weight.append(dict_nodes[c[1] + "_" + c[0]])

topn = dict(sorted(dict_nodes.items(), key = itemgetter(1), reverse = True)[:number_of_connected_nodes])

# Assign edge weight colour
for key in zip(lst_source_node, lst_target_node):
    
    if key[0] + "_" + key[1] in topn or key[1] + "_" + key[0] in topn:
        lst_edge_colour_weight.append("red")
    else:
        lst_edge_colour_weight.append("black")


# Create the Graph RDD
columns = ['source', 'target', 'source_node_weight', 'target_node_weight', 'edge_weight', 'edge_colour']
df_concept_graph = spark.createDataFrame(zip(lst_source_node, lst_target_node, lst_source_node_weight, lst_target_node_weight, lst_edge_weight, lst_edge_colour_weight), columns)
  
# Create a Vertex DataFrame with unique ID column "id"
v = sqlContext.createDataFrame(lst_g_nodes, ["id", "name"])
# Create an Edge DataFrame with "src" and "dst" columns
e = sqlContext.createDataFrame(lst_g_edges, ["src", "dst", "relationship"])

## Show degree connectivity

In [None]:
from graphframes import GraphFrame
g = GraphFrame(v, e)

# Query: Get in-degree of each vertex.
df_degree = g.inDegrees
df_degree.sort(['inDegree'], ascending=False).show()

## Run PageRank 

In [None]:
# Query: Count the number of "follow" connections in the graph.
g.edges.filter("relationship = 'relationship'").count()

# Run PageRank algorithm, and show results.
results = g.pageRank(resetProbability=0.01, maxIter=20)
results.vertices.select("id", "pagerank").show()

# Save the outcome with two operations (usual or coalesce)

In [None]:
df_concept_graph.write.mode('overwrite').options(header='true').csv(os.path.join(output_directory, output_filename[:-4] + "concept_graph.csv"))