In [None]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("StructuredStreaming") \
    .config('spark.executor.memory', '16g') \
    .config('spark.driver.memory', '16g') \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.5.2") \
    .getOrCreate()

In [None]:
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF, HashingTF
from pyspark.ml import feature
import spacy
import  numpy as np

### Нормалізація та попередня обробка даних

In [None]:
from pyspark.sql import DataFrame, functions as F

class UkrainianProcessor:
    def __init__(self, raw_text: DataFrame):
        self.regexp_letters = r'[^\w а-яїєіґ]'
        self.raw_text = raw_text
    
    def cleanup_data(self):
        # Провести очищення текстових даних від стоп-слів/тегів/розмітки;
        self.clean_text = self.raw_text \
            .withColumn("title", F.regexp_replace(F.lower(F.col("Title")), self.regexp_letters, "")) \
            .withColumn("body", F.regexp_replace(F.lower(F.col("Body")), self.regexp_letters, ""))

        self.clean_text.show(5)
        return self.clean_text
    
    def tokenize_text(self, clean_text, input_column):
        # Виконати токенізацію текстових елементів; 
        self.input_column = input_column
        
        tokenizer = feature.Tokenizer(
            inputCol=self.input_column, outputCol=f"{self.input_column}_tokens")
        tokenized = tokenizer.transform(clean_text)

        # Видалення стоп-слів
        stopwords_remover = StopWordsRemover().setLocale('uk_UA').setInputCol(
            f"{self.input_column}_tokens").setOutputCol(f"{self.input_column}_filtered")
        self.filtered = stopwords_remover.transform(tokenized)

        print('Locale:', stopwords_remover.getLocale())
        print('Tokenized & Filtered:')
        self.filtered.printSchema()
        return self
    
    
    def lemmatize_text(self):
        # Провести лематизацію текстових елементів (Spacy). Зберегти результат в окремий файл.
        nlp = spacy.load("uk_core_news_sm")
        self.stop_words = nlp.Defaults.stop_words   
                
        stop_words = [*self.stop_words, ' ']         
        @F.udf("array<string>")
        def lemmatize_udf(tokens):
            return [token.lemma_ for token in nlp(" ".join(tokens)) if token.lemma_ not in stop_words]

        self.lemmatized = self.filtered.withColumn( 
            f"lemmatized_{self.input_column}",
            lemmatize_udf(F.col(f"{self.input_column}_filtered")) 
        )
        
        print('Lemmatized:')
        self.lemmatized.printSchema()
        return self       
    
    def create_bow(self):
        # Створити Bag of Words для всіх нормалізованих слів. Зберегти результат в окремий файл.

        count_vectorizer = CountVectorizer(
            inputCol=f"lemmatized_{self.input_column}", outputCol=f"{self.input_column}_features")
        cv_model = count_vectorizer.fit(self.lemmatized.limit(100))
        self.bag_of_words = cv_model.transform(self.lemmatized.limit(100))
        self.words = cv_model.vocabulary
        
        return self.bag_of_words
    
    def measure_tf_idf(self, bow, input_column):
        idf = IDF(inputCol=f"{input_column}_features",
                outputCol=f"{input_column}_tf_idf", minDocFreq=10)
        self.tf_idf = idf.fit(bow).transform(bow)
        return self.tf_idf
    
    def save_intermediate_results(self):
        self.lemmatized.toPandas().to_csv('../data/lemmatized_tokens.csv')
        self.bag_of_words.toPandas().to_csv('../data/bag_of_words.csv')
                
    
    def preprocessing(self, input_column):
        clean = self.cleanup_data()
        embedded = self.tokenize_text(clean, input_column).lemmatize_text().create_bow()
        self.save_intermediate_results()
        
        return embedded

        
        

In [None]:
basepath = './data/'

ukr_text_data = spark.read.option("encoding", "UTF-8").option("header", "true").csv(f"{basepath}/ukr_text.csv")
ukr_text_data = ukr_text_data.select('Title', 'Body')

ukr_text_data.printSchema()

In [None]:
ukr_pr = UkrainianProcessor(ukr_text_data)    
preprocessed_body = ukr_pr.preprocessing('body')
tf_idf_body = ukr_pr.measure_tf_idf(preprocessed_body, 'body')

tf_idf_body.show()

In [None]:
words_list = ukr_pr.words

@F.udf
def get_popular_score(tf_idf):
    values = tf_idf.toArray()
    highest = float(max(values))
    return highest

@F.udf
def get_popular_word(tf_idf, tf_idf_score):
    global words_list
    values = tf_idf.toArray()

    highest_idx = list(values).index(float(tf_idf_score))
    return words_list[highest_idx]

words_rating = tf_idf_body.select('lemmatized_body', 'body_tf_idf'
    ).withColumn('word_score', get_popular_score('body_tf_idf').cast('double')
    ).withColumn('word', get_popular_word('body_tf_idf', 'word_score')
).groupBy('word','word_score').max('word_score').orderBy(F.desc('word_score'))
    
    
words_rating.printSchema()    
words_rating.select('word', 'word_score').dropDuplicates(['word']).limit(10).show()
    


In [None]:
'''
Term Frequency: is a scoring of the frequency of the word in the current document.
Inverse Document Frequency: is a scoring of how rare the word is across documents.
'''

processed_body = tf_idf_body.select('title', 'body', 'body_tf_idf')
ukr_pr_title = UkrainianProcessor(processed_body)    

preprocessed_title = ukr_pr_title.preprocessing('title')
tf_idf_title = ukr_pr_title.measure_tf_idf(preprocessed_title, 'title')

all_processed = tf_idf_title.select('title', 'title_tf_idf', 'body', 'body_tf_idf')
all_processed.show()

In [None]:
# Порахувати косинусну подібність між полями датасету title та body

@F.udf
def cosine_similarity(v1, v2):
    size = max(len(v1), len(v2))
    vec1_array = np.zeros(size)
    vec2_array = np.zeros(size)
    
    vec1_array[:len(v1)] = np.array(v1)
    vec2_array[:len(v2)] = np.array(v2)
    
    res = np.dot(vec1_array, vec2_array) / (np.linalg.norm(vec1_array) * np.linalg.norm(vec2_array))
    return float(res)

similarity = all_processed.withColumn("cosine_similarity", cosine_similarity( F.col("body_tf_idf"), F.col("title_tf_idf")))

similarity.where('cosine_similarity != "NaN" ').select('cosine_similarity', 'title', 'body').orderBy(F.desc('cosine_similarity')).show(truncate=False)

### Інтелектуальний аналіз текстів з використанням LLM моделей

In [None]:

from sparknlp.base import *
from sparknlp.annotator import Tokenizer, StopWordsCleaner
from sparknlp.annotator import WordEmbeddingsModel
from pyspark.ml import Pipeline

def make_embedding(input_data, column, clean_stop_words=False):
    documentAssembler = DocumentAssembler() \
        .setInputCol(column) \
        .setOutputCol(f"{column}_document")
        
    tokenizer = Tokenizer() \
        .setInputCols(f"{column}_document") \
        .setOutputCol(f"{column}_token")
        
    stopWords = StopWordsCleaner.pretrained("stopwords_iso","uk") \
        .setInputCols([f"{column}_token"]) \
        .setOutputCol(f"{column}_token")


    embeddings = WordEmbeddingsModel.pretrained("w2v_cc_300d", "uk") \
        .setInputCols([f"{column}_document", f"{column}_token"]) \
        .setOutputCol(f"{column}_embeddings")
        
    if clean_stop_words:
        pipeline = Pipeline(stages=[documentAssembler,tokenizer, stopWords, embeddings])
    else:
        pipeline = Pipeline(stages=[documentAssembler,tokenizer, embeddings])

    model = pipeline.fit(input_data)
    embeddings = model.transform(input_data)

    return embeddings



In [None]:
cleaned_text = UkrainianProcessor(ukr_text_data).cleanup_data()

embedded_body = make_embedding(cleaned_text, 'body')
embedded_title = make_embedding(cleaned_text, 'title')

joined = embedded_body.alias('text').join(embedded_title.alias('name'),
    F.col('text.title') == F.col('name.title'))

joined = joined.select('text.title', 'text.body',
        F.col('body_embeddings.embeddings').alias('body_embeddings'), 
        F.col('title_embeddings.embeddings').alias('title_embeddings')
    )
joined.show()

In [None]:
# Порахувати косинусну подібність між полями датасету title та body за посиланням.

@F.udf("float")
def cosine_similarity(v1, v2):
    v1, v2 = v1[0], v2[0]
    size = max(len(v1), len(v2))
    vec1_array = np.zeros(size)
    vec2_array = np.zeros(size)
    
    vec1_array[:len(v1)] = np.array(v1)
    vec2_array[:len(v2)] = np.array(v2)
    
    res = np.dot(vec1_array, vec2_array) / (np.linalg.norm(vec1_array) * np.linalg.norm(vec2_array))
    return float(res)

similarity_nlp = joined.withColumn("cosine_similarity", cosine_similarity( F.col("body_embeddings"), F.col("title_embeddings")))

similarity_nlp = similarity_nlp.filter(~F.isnan("cosine_similarity") & ~F.isnull("cosine_similarity")).orderBy(F.desc('cosine_similarity'))
similarity_nlp.select('cosine_similarity', 'title', 'body').show(truncate=False)

### Find duplicates

In [None]:
# Порахувати кількість дублів в наборі даних з різними косинусними подібностями (наприклад > 0.7, > 0.99); 
def count_duplicates(dataframe: DataFrame, thresholds):  
    for threshold in thresholds:
        dp_count = dataframe.filter(f"cosine_similarity > {threshold}").count()
        print(f"Кількість дублікатів (cosine > {threshold}): {dp_count}")


count_duplicates(similarity_nlp, [0.7, 0.9])

# Вивести топ-10 унікальних дата поїнтів.
similarity_nlp.select('cosine_similarity','title', 'body').filter(
    F.col("cosine_similarity") < 0.01).dropDuplicates().limit(10).show(truncate=False)


### Build LDA

In [None]:
from pyspark.ml.linalg import VectorUDT, SparseVector
from pyspark.ml.clustering import LDA
from pyspark.ml.linalg import VectorUDT
import re


embedded_lda = make_embedding(cleaned_text, 'body', clean_stop_words=True)


def collect_vocabulary(embedded_df):
    embeddings_df = embedded_df.select("body_embeddings").toPandas()
    vocab = []
    for row in embeddings_df["body_embeddings"]:
        vocab.extend([emb.metadata['token'] for emb in row])
    return vocab

vocabulary = collect_vocabulary(embedded_lda)


# Підготовка векторів слів для LDA
@F.udf(VectorUDT())
def array_of_struct_to_vector(features):    
    words_dict = {}

    for struct in features:
        if re.match(r'[^\d]+', struct['metadata']['token']):
            idx = vocabulary.index(struct['metadata']['token'])
            words_dict[idx] = np.sum(struct['embeddings'])
        
    vect = SparseVector(len(vocabulary), words_dict)
    return vect

processed_lda = embedded_lda.withColumn("features", array_of_struct_to_vector(embedded_lda["body_embeddings"]))


In [None]:

def build_lda(num_topics, max_iterations, vocabulary, vectorized_docs, features_col="features"):

    lda = LDA(k=num_topics, maxIter=max_iterations, optimizer="em") \
        .setFeaturesCol(features_col) \
        .setTopicDistributionCol("topic_distribution")

    lda_model = lda.fit(vectorized_docs)

    @F.udf
    def parse_words(topic):
        return [vocabulary[i] for i in topic]

    topics = lda_model.describeTopics()
    topics_with_words = topics.withColumn('words', parse_words('termIndices'))
    topics_with_words.select('topic', 'words').show(truncate=False)
    
# Build LDA for Word2Vec
build_lda(5, 10, vocabulary, processed_lda.limit(100))

# Build LDA for CountVectorizer
build_lda(5, 10, ukr_pr.words, preprocessed_body, 'body_features')
