In [25]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lit, explode, posexplode, row_number, expr
from pyspark.sql.functions import sum, max, countDistinct
from pyspark.sql import Row
from pyspark.sql.types import ArrayType, StringType, DoubleType, IntegerType
from pyspark.sql.window import Window
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, StopWordsRemover
from sklearn.decomposition import LatentDirichletAllocation
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_extraction.text import CountVectorizer as CV

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

def get_top_n_terms_tfidf(tfidf_dtm, n=250):
    term_scores = tfidf_dtm.sum(axis=0)
    term_series = pd.Series(term_scores, index=tfidf_dtm.columns)
    top_terms = term_series.sort_values(ascending=False).head(n)
    return top_terms

def extract_terms():
    token_pattern = r'\b\w{3,}\b'
    vectorizer = TfidfVectorizer(token_pattern=token_pattern)
    global_tfidf = pd.DataFrame(vectorizer.fit_transform(contents.select(col("content")).toPandas()["content"]).todense(), columns=vectorizer.get_feature_names_out())
    global_tfidf['content_topic'] = contents.toPandas()['content_topic']
    display(global_tfidf)
    tfidf_per_content_topic = { content_topic: tfidf_per_topic.drop(columns='content_topic') for content_topic, tfidf_per_topic in global_tfidf.groupby('content_topic') }
    top_terms_per_content_topic = {content_topic: get_top_n_terms_tfidf(content_topic_tfidf) for content_topic, content_topic_tfidf in tfidf_per_content_topic.items()}

    flat_top_terms_per_content_topic = pd.concat(top_terms_per_content_topic, axis=1)

    flat_top_terms_per_content_topic.reset_index(inplace=True)
    melted_top_terms_per_content_topic = pd.melt(flat_top_terms_per_content_topic, id_vars=['index'], var_name='content_topic', value_name='score')

    return spark.createDataFrame(melted_top_terms_per_content_topic).select(
        #lit(context["project_id"]).alias("project_id"),
        #lit(context["import_id"]).alias("content_group"),
        col("content_topic").cast('INTEGER'),
        col("index").alias("term"),
        col("score").cast("DOUBLE")
    ).where("score IS NOT NULL")

"""extract terms using spark"""
def extract_terms_spark():
    tokenizer = Tokenizer(inputCol="content", outputCol="words")
    TokenData = tokenizer.transform(contents)
    
    def extract_indices_from_vector(vector):
        return vector.indices.tolist()
    
    def extract_values_from_vector(vector):
        return vector.values.tolist()
    
    def filter_words(words):
        return [word for word in words if len(word) >= 3]
    
    extract_indices_udf = udf(extract_indices_from_vector, ArrayType(IntegerType()))
    extract_values_udf = udf(extract_values_from_vector, ArrayType(DoubleType()))
    filter_words_udf = udf(filter_words, ArrayType(StringType()))
    
    data = TokenData.withColumn("filtered_words", filter_words_udf("words")).drop("words")
    
    cv = CountVectorizer(inputCol="filtered_words", outputCol="tfidf_count_vector")
    cvModel = cv.fit(data)
    data = cvModel.transform(data).drop("filtered_words")
    
    idf = IDF(inputCol="tfidf_count_vector", outputCol="features_tfidf")
    idfModel = idf.fit(data)
    data = idfModel.transform(data).drop("tfidf_count_vector")

    data = data.withColumn("values", extract_values_udf("features_tfidf")) \
               .withColumn("indices", extract_indices_udf("features_tfidf")) \
               .drop("features_tfidf")
    data.show(truncate=False)

    print(cvModel.vocabulary)
    
    explodedIndices = data.select("id","content","content_topic",posexplode(col("indices")).alias("pos","idx"))
    
    explodedValues = data.select("id","content","content_topic",posexplode(col("values")).alias("pos","value"))
    
    data = explodedIndices.join(explodedValues, (explodedValues.id == explodedIndices.id) & \
        (explodedIndices.pos == explodedValues.pos)) \
        .select(explodedIndices.id, explodedIndices.content, explodedIndices.content_topic, \
                explodedIndices.idx.alias("index"), explodedValues.value.alias("score"))
   
    grouped_data = data.groupBy('content_topic', 'index').agg(sum('score').alias('score'))
    window_spec = Window.partitionBy('content_topic').orderBy(col('score').desc())
    ranked_data = grouped_data.withColumn('rank', row_number().over(window_spec)).filter(col('rank') <= 250).drop('rank')

    vocab_df = spark.createDataFrame([(i, term) for i, term in enumerate(cvModel.vocabulary)], ["index", "term"])
    return ranked_data.join(vocab_df, on="index").drop('index').orderBy(['content_topic', 'score'], ascending=[True, False]) \
        .select(
            #lit(context["project_id"]).alias("project_id"),
            #lit(context["import_id"]).alias("content_group"),
            col("content_topic").cast('INTEGER'),
            col("term"),
            col("score").cast("DOUBLE")
        ).where("score IS NOT NULL")

def minmax_tf():
    tokenizer = Tokenizer(inputCol="content", outputCol="words")
    TokenData = tokenizer.transform(contents)
    
    def extract_indices_from_vector(vector):
        return vector.indices.tolist()
    
    def extract_values_from_vector(vector):
        return vector.values.tolist()
    
    def filter_words(words):
        return [word for word in words if len(word) >= 3]
    
    extract_indices_udf = udf(extract_indices_from_vector, ArrayType(IntegerType()))
    extract_values_udf = udf(extract_values_from_vector, ArrayType(DoubleType()))
    filter_words_udf = udf(filter_words, ArrayType(StringType()))
    
    data = TokenData.withColumn("filtered_words", filter_words_udf("words")).drop("words")

    data.show(truncate=False)
    # Remove stop words
    remover = StopWordsRemover(inputCol="filtered_words", outputCol="terms", stopWords=StopWordsRemover.loadDefaultStopWords("english"))
    data = remover.transform(data).drop('filtered_words')
    data.show(truncate=False)

    exploded = data.withColumn("term", explode(col("terms")))
    term_doc_count = exploded.select("term", "id").distinct() \
                         .groupBy("term") \
                         .agg(countDistinct("id").alias("doc_count"))

    cv = CountVectorizer(inputCol="terms", outputCol="tfidf_count_vector")
    cvModel = cv.fit(data)
    data = cvModel.transform(data) \
               .withColumn("values", extract_values_udf("tfidf_count_vector")) \
               .withColumn("indices", extract_indices_udf("tfidf_count_vector")) \
               .drop("features_tfidf") \
               .drop('content') \
               .withColumn("nterms",expr("aggregate(values, 0D, (acc, x) -> acc + x)"))
    data.show(truncate=False)

    explodedIndices = data.select("id","content_topic","nterms",posexplode(col("indices")).alias("pos","idx"))
    
    explodedValues = data.select("id",posexplode(col("values")).alias("pos","value"))
    
    data = explodedIndices.join(explodedValues, (explodedValues.id == explodedIndices.id) & \
        (explodedIndices.pos == explodedValues.pos)) \
        .select(explodedIndices.content_topic, explodedIndices.idx.alias("index"), \
                (explodedValues.value / explodedIndices.nterms).alias("tf")) \
        .groupBy('content_topic', 'index').agg(max('tf').alias('max_tf'))
    vocab_df = spark.createDataFrame([(i, term) for i, term in enumerate(cvModel.vocabulary)], ["index", "term"])
    window_spec = Window.partitionBy('content_topic').orderBy(col('content_topic').asc(),col('max_tf').asc(),col('doc_count').asc())
    data=data.join(vocab_df, on="index").drop('index').join(term_doc_count, on="term") \
            .withColumn('rank', row_number().over(window_spec)).filter(col('rank') <= 30).drop('rank')
    data.show(truncate=False)

def maxtf():
    tokenizer = Tokenizer(inputCol="content", outputCol="words")
    TokenData = tokenizer.transform(contents)
    
    def extract_indices_from_vector(vector):
        return vector.indices.tolist()
    
    def extract_values_from_vector(vector):
        return vector.values.tolist()
    
    def filter_words(words):
        return [word for word in words if len(word) >= 3]
    
    extract_indices_udf = udf(extract_indices_from_vector, ArrayType(IntegerType()))
    extract_values_udf = udf(extract_values_from_vector, ArrayType(DoubleType()))
    filter_words_udf = udf(filter_words, ArrayType(StringType()))
    
    data = TokenData.withColumn("terms", filter_words_udf("words")).drop("words")

    # Remove stop words (need to determine language of docs)
    #remover = StopWordsRemover(inputCol="filtered_words", outputCol="terms", stopWords=StopWordsRemover.loadDefaultStopWords("english"))
    #data = remover.transform(data).drop("filtered_words")

    exploded = data.withColumn("term", explode(col("terms")))
    term_doc_count = exploded.select("term", "id").distinct() \
                         .groupBy("term") \
                         .agg(countDistinct("id").alias("doc_count"))

    cv = CountVectorizer(inputCol="terms", outputCol="tfidf_count_vector")
    cvModel = cv.fit(data)
    data = cvModel.transform(data) \
            .withColumn("values", extract_values_udf("tfidf_count_vector")) \
            .withColumn("indices", extract_indices_udf("tfidf_count_vector")) \
            .drop("features_tfidf") \
            .drop('content') \
            .withColumn("nterms",expr("aggregate(values, 0D, (acc, x) -> acc + x)"))

    explodedIndices = data.select("id", "content_topic","nterms", posexplode(col("indices")).alias("pos","idx"))
    
    explodedValues = data.select("id", posexplode(col("values")).alias("pos","value"))
    
    data = explodedIndices.join(explodedValues, (explodedValues.id == explodedIndices.id) & \
        (explodedIndices.pos == explodedValues.pos)) \
        .select(explodedIndices.content_topic, explodedIndices.idx.alias("index"), \
                (explodedValues.value / explodedIndices.nterms).alias("tf")) \
        .groupBy('content_topic', 'index').agg(max('tf').alias('max_tf'))
    vocab_df = spark.createDataFrame([(i, term) for i, term in enumerate(cvModel.vocabulary)], ["index", "term"])
    window_spec = Window.partitionBy('content_topic').orderBy(col('content_topic').asc(), col('score').desc())
    data = data.join(vocab_df, on="index").drop('index').join(term_doc_count, on="term") \
                .select('content_topic', 'term', (col('max_tf') / col('doc_count')).alias('score')) \
                .withColumn('rank', row_number().over(window_spec)).filter(col('rank') <= 100).drop('rank')
    data.show(truncate=False)

#contents = spark.read.csv("/home/taoufik/Downloads/export.csv", header=True)

# Sample data
contents = spark.createDataFrame([
        Row(id=1, content="The cat is on the mat", content_topic=1),
        Row(id=2, content="My dog and cat are the best", content_topic=1),
        Row(id=3, content="The locals are playing", content_topic=1)
    ])
contents.show(truncate=False)

# Sample code for topic modeling using LDA
vectorizer = CV(max_df=1.0, min_df=1.0, stop_words='english') #CV(max_df=0.9, min_df=2, stop_words='english')
doc_term_matrix = vectorizer.fit_transform(contents.select("content").toPandas())

lda = LatentDirichletAllocation(n_components=5, random_state=42)
lda.fit(doc_term_matrix)
topics = lda.components_
topics
#extract_terms().show()
#extract_terms_spark().show()
#minmax_tf()

+---+---------------------------+-------------+
|id |content                    |content_topic|
+---+---------------------------+-------------+
|1  |The cat is on the mat      |1            |
|2  |My dog and cat are the best|1            |
|3  |The locals are playing     |1            |
+---+---------------------------+-------------+



array([[0.20842222],
       [0.20842266],
       [1.16627871],
       [0.208448  ],
       [0.2084284 ]])