In [1]:
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
from pyspark.ml.feature import StopWordsRemover, Tokenizer, CountVectorizer, IDF
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.clustering import LDA
from nltk.stem import WordNetLemmatizer
from pyspark.sql.functions import udf, col, expr, monotonically_increasing_id
import nltk
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/j.rahnama/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [2]:
spark = SparkSession.builder \
    .appName("LDA on Instagram Posts") \
    .getOrCreate()

24/11/27 17:38:22 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
# Load the data into a DataFrame
# Display the schema and count of the DataFrame
df = spark.read.csv("/Users/j.rahnama/Desktop/BU8/usecase/topicExtraction/twitter_xsmall.csv", header=True, inferSchema=True)

df.printSchema()
print(f"Total records: {df.count()}")

                                                                                

root
 |-- TweetID: string (nullable = true)
 |-- Weekday: string (nullable = true)
 |-- Hour: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Lang: string (nullable = true)
 |-- IsReshare: integer (nullable = true)
 |-- Reach: string (nullable = true)
 |-- RetweetCount: string (nullable = true)
 |-- Likes: string (nullable = true)
 |-- Klout: string (nullable = true)
 |-- Sentiment: double (nullable = true)
 |-- text: string (nullable = true)
 |-- LocationID12: integer (nullable = true)
 |-- UserID13: string (nullable = true)
 |-- UserID14: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- LocationID16: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- State: string (nullable = true)
 |-- StateCode: string (nullable = true)
 |-- City: string (nullable = true)

Total records: 1016


In [4]:
# Select only the text column from the DataFrame
single_column_df = df.select("text")

In [5]:
# Sample x% of the data (or adjust as needed)
sampled_df = single_column_df.sample(fraction=1.0, seed=42)

In [6]:
# Create a lemmatizer
lemmatizer = WordNetLemmatizer()

# Function to remove emojis from a text column
def remove_emojis(column):
    """
    Removes emojis from the specified text column using a regex pattern.
    """
    emoji_pattern = (
        "[\U0001F600-\U0001F64F"   # Emoticons
        "\U0001F300-\U0001F5FF"    # Symbols & Pictographs
        "\U0001F680-\U0001F6FF"    # Transport & Map Symbols
        "\U0001F700-\U0001F77F"    # Alchemical Symbols
        "\U0001F800-\U0001F8FF"    # Supplemental Arrows-C
        "\U0001F900-\U0001F9FF"    # Supplemental Symbols and Pictographs
        "\U0001FA00-\U0001FA6F"    # Chess Symbols
        "\U00002700-\U000027BF"    # Dingbats
        "\U000024C2-\U0001F251"    # Enclosed Characters
        "\U0001F1E6-\U0001F1FF]+"  # Flags
    )
    return F.regexp_replace(column, emoji_pattern, "")

# Function to preprocess text
def preprocess_text(column):
    """
    Preprocesses a text column by removing URLs, converting to lowercase,
    removing punctuation (except hashtags), and removing emojis.
    """
    column = F.regexp_replace(column, r'http\S+', '')  # Remove URLs
    column = F.lower(column)  # Convert to lowercase
    # column = F.regexp_replace(column, r'[^a-zA-Z\s#]', '')  # Remove punctuation except hashtags
    column = F.regexp_replace(column, r'[^a-zA-Z\s]', '')
    column = remove_emojis(column)  # Remove emojis
    return column

# Function to apply preprocessing
def apply_preprocessing(dataframe, text_column_name, cleaned_column_name):
    """
    Applies text preprocessing to a DataFrame.
    """
    df_temp =  dataframe.withColumn(cleaned_column_name, preprocess_text(F.col(text_column_name)))
    # Filter out null or empty rows
    return filter_null_text_string(df_temp, cleaned_column_name)

# Function to tokenize text
def tokenize_text(dataframe, input_column, output_column):
    """
    Tokenizes the text in the specified column.
    """
    tokenizer = Tokenizer(inputCol=input_column, outputCol=output_column)
    temp_df = tokenizer.transform(dataframe)
    
    # Filter out null or empty rows
    return filter_null_text_array(temp_df, output_column)

# Function to remove stopwords
def remove_stopwords(dataframe, input_column, output_column, custom_stopwords=None):
    """
    Removes stopwords from the tokenized words column.
    """
    english_stopwords = StopWordsRemover.loadDefaultStopWords('english')
    german_stopwords = StopWordsRemover.loadDefaultStopWords('german') # could be extended for other languages
    all_stopwords = english_stopwords + german_stopwords + (custom_stopwords or [])
    
    # Broadcast the stopwords for efficiency
    broadcast_stopwords = spark.sparkContext.broadcast(all_stopwords)
    
    remover = StopWordsRemover(
        inputCol=input_column,
        outputCol=output_column,
        stopWords=broadcast_stopwords.value
    )
    temp_df = remover.transform(dataframe)
    
    #  Filter out null or empty rows
    return filter_null_text_array(temp_df, output_column)

# Function to filter out null or empty text rows
def filter_null_text_array(dataframe, column_name):
    """
    Filters out rows with null or empty values in the specified column.
    """
    return dataframe.filter(F.col(column_name).isNotNull() | (F.size(F.col(column_name)) > 0))

# Function to filter out null or empty text rows
def filter_null_text_string(dataframe, column_name):
    """
    Filters out rows with null or empty values in the specified column.
    """
    return dataframe.filter(F.col(column_name).isNotNull() | (F.length(F.col(column_name)) > 0))

# Define a UDF for lemmatization
@udf(ArrayType(StringType()))
def lemmatize_words(tokens):
    return [lemmatizer.lemmatize(word) for word in tokens]

# Filter out words with length < 3
def filter_short_words(dataframe, input_column, output_column, min_length=3):
    return dataframe.withColumn(output_column, expr(f"filter({input_column}, word -> length(word) >= {min_length})"))


In [7]:
# Main preprocessing pipeline
def main_pipeline(input_dataframe, text_column_name):
    """
    Executes the text preprocessing pipeline.
    """
    
    # Step 1: Preprocess text
    preprocessed_df = apply_preprocessing(input_dataframe, text_column_name, "basic_cleaned")
    

    
    # Step 2: Tokenize text
    tokenized_df = tokenize_text(preprocessed_df, "basic_cleaned", "tokens")



    # Step 3: Remove stopwords
    no_stopwords_df = remove_stopwords(tokenized_df, "tokens", "without_stopwords_tokens", custom_stopwords=[
        'lol', 'idk', 'tbh', 'insta', 'gram', 'post'  # Extend as needed
    ])


    # Step 4: Apply lemmatizer
    lemmatized_df = no_stopwords_df.withColumn("lemmatized_tokens", lemmatize_words(F.col("without_stopwords_tokens")))

    # Step 5: Filter out words with length < 3
    filtered_df = filter_short_words(lemmatized_df, "lemmatized_tokens", "cleaned_words")
    
    return filtered_df



In [8]:
# # a sample DataFrame only for preprocessing step
# sample_data = [
#     {"text": "Hello! This is a test post. 🚀 Visit http://example.com"},
#     {"text": "LOL! IDK what to post. 🤔"},
#     {"text": "#DataScience is amazing. #AI rocks!"},
#     {"text": "Random text with emoji 🎨"},
#     {"text": "Weekend vibes! 🌞🏖️ #Relax #BeachLife"},
#     {"text": "Can't believe how delicious this brunch was! 🥞☕ #Foodie #Yum"},
#     {"text": "Throwback to the best vacation ever! 🌍✈️ #TravelGoals"},
#     {"text": "When life gives you lemons... make lemonade 🍋😂 #Positivity"},
#     {"text": "Just finished an intense workout 💪🔥 #FitnessJourney"},
#     {"text": "Feeling grateful for all the love and support 💕🙏 #Blessed"},
#     {"text": "Exploring the city with my besties 🏙️👯‍♀️ #UrbanAdventures"},
#     {"text": "Late-night thoughts... 🌌✨ #DeepTalks"},
#     {"text": "OMG this dress is everything! 👗😍 #Fashionista"},
#     {"text": "Chasing sunsets 🌅🌸 #NatureLover"},
#     {"text": "Current mood: coffee and coding ☕💻 #ProgrammerLife"},
#     {"text": "Best concert ever! 🎶🎤 #LiveMusic #Memories"},
#     {"text": "Baking cookies for the first time! 🍪👩‍🍳 #HomeChef"},
#     {"text": "Happy birthday to me! 🎉🎂🎁 #AnotherYearOlder"},
#     {"text": "Rainy day reads 📚☔ #BookLover"},
#     {"text": "Just adopted the cutest puppy 🐶❤️ #FurBaby"},
# ]

# sampled_df = spark.createDataFrame(sample_data, columns)

# Apply the pipeline
processed_df = main_pipeline(sampled_df, "text")

# processed_df.printSchema()

# Apply these to remove unnecessary columns
#final_processed_df = processed_df.select("cleaned_words")
final_processed_df = processed_df.withColumn("id", monotonically_increasing_id())
# Show results
final_processed_df.select("id","text","basic_cleaned", "tokens", "without_stopwords_tokens", "lemmatized_tokens", 
                    "cleaned_words").show(5, truncate=True)

24/11/27 17:38:31 WARN StopWordsRemover: Default locale set was [en_DE]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
[Stage 5:>                                                          (0 + 1) / 1]

+---+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+
| id|                text|       basic_cleaned|              tokens|without_stopwords_tokens|   lemmatized_tokens|       cleaned_words|
+---+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+
|  0|"""New AWS servic...|new aws service h...|[new, aws, servic...|    [new, aws, servic...|[new, aws, servic...|[new, aws, servic...|
|  1|Use a highly avai...|use a highly avai...|[use, a, highly, ...|    [use, highly, ava...|[use, highly, ava...|[use, highly, ava...|
|  2|New usage example...|new usage example...|[new, usage, exam...|    [new, usage, exam...|[new, usage, exam...|[new, usage, exam...|
|  3|Copy AMIs from an...|copy amis from an...|[copy, amis, from...|    [copy, amis, anot...|[copy, amis, anot...|[copy, amis, anot...|
|  4|New usage example...|new usage example...|[

                                                                                

In [9]:
# Create a CountVectorizer and TF
num_of_topics = 20
TF_input_col = "cleaned_words"
TF_output_col = "tf_features"
IDF_input_col = "tf_features"
IDF_ouput_col = "tfidf_features"
LDA_feature_col = "tfidf_features" ## Change this to tf_features/tfidf_features if you want to apply LDA on TF features/ITF features. 

# TF
count_vectorizer = CountVectorizer(inputCol=TF_input_col, outputCol=TF_output_col, vocabSize=20000, minDF=10.0) # needs to be tuned!
cv_model = count_vectorizer.fit(final_processed_df)
vectorized_df = cv_model.transform(final_processed_df)

# Apply IDF to get the TF-IDF representation
idf = IDF(inputCol=IDF_input_col, outputCol=IDF_ouput_col)
idf_model = idf.fit(vectorized_df)
tfidf_df = idf_model.transform(vectorized_df)

# Apply LDA
corpus = tfidf_df.select(LDA_feature_col)
lda = LDA(k=num_of_topics, maxIter=100, featuresCol= LDA_feature_col)  # Adjust k for the number of desired topics
lda_model = lda.fit(corpus)

# Display topics
topics = lda_model.describeTopics(maxTermsPerTopic=5)  # Display top 5 terms for each topic
vocab = cv_model.vocabulary

# for topic in topics.collect():
#     print(f"Topic {topic[0]}: ", end="")
#     print(", ".join([vocab[i] for i in topic[1]]))

# Print topics and top-weighted terms
ListOfIndexToWords = udf(lambda wl: list([vocab[w] for w in wl]))
FormatNumbers = udf(lambda nl: ["{:1.4f}".format(x) for x in nl])

# topics.select(ListOfIndexToWords(topics.termIndices).alias('words')).show(truncate=False, n=num_of_topics)
# topics.select(FormatNumbers(topics.termWeights).alias('weights')).show(truncate=False, n=num_of_topics)
toptopics = topics.select((topics.topic + 1).alias('topic'),
                          ListOfIndexToWords(topics.termIndices).alias('words'),
                          FormatNumbers(topics.termWeights).alias('weights'))
toptopics.show(truncate=False, n=num_of_topics)
print('Topics:', num_of_topics, 'Vocabulary:', len(vocab))

24/11/27 17:38:45 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


+-----+---------------------------------------------------------+----------------------------------------+
|topic|words                                                    |weights                                 |
+-----+---------------------------------------------------------+----------------------------------------+
|1    |[faster, resource, cloudwatch, week, next]               |[0.1164, 0.1090, 0.1057, 0.1037, 0.0955]|
|2    |[migration, service, cloud, data, tool]                  |[0.7651, 0.0142, 0.0115, 0.0055, 0.0039]|
|3    |[instance, cluster, aurora, mysql, web]                  |[0.2391, 0.1222, 0.1141, 0.1137, 0.0977]|
|4    |[cloudformation, api, use, read, gateway]                |[0.1320, 0.0987, 0.0974, 0.0931, 0.0858]|
|5    |[security, partner, check, solution, config]             |[0.1776, 0.1600, 0.1520, 0.1019, 0.0721]|
|6    |[reinvent, cost, dont, registration, miss]               |[0.1172, 0.1155, 0.1103, 0.0829, 0.0827]|
|7    |[elastic, certificate, beansta

In [11]:
# Identify topics for each documentation
topic_distribution = lda_model.transform(tfidf_df)

# an UDF to extract the top topics based on their distribution
def get_top_topics(topic_distribution_vector, n=3):
    # Get the indices of the top n topics
    top_topic_indices = topic_distribution_vector.argsort()[-n:][::-1]
    return top_topic_indices.tolist()
get_top_topics_udf = udf(get_top_topics, ArrayType(StringType()))

# Add a new column ('topicDistribution') with the top topics to the DataFrame
result_df = topic_distribution.withColumn("top_topics", get_top_topics_udf(col("topicDistribution")))
result_df.select("text", "top_topics").show(5, truncate=False)
                                                    
# result_df.printSchema()
# result_df.select("topicDistribution").show(5, truncate=False)

[Stage 215:>                                                        (0 + 1) / 1]

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|text                                                                                                                                                                                                                                                                                                       |top_topics |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|"""New AWS service helps companies to move their #apps to

                                                                                

In [15]:
# Specify the path where you want to save the CSV file
output_path = "./labeledTwitterSmall.csv"  # Replace with your desired output path

# Convert the array columns to a string
result_df = result_df.withColumn("lemmatized_tokens", F.concat_ws(", ", F.col("lemmatized_tokens")))
result_df = result_df.withColumn("cleaned_words", F.concat_ws(", ", F.col("cleaned_words")))
result_df = result_df.withColumn("top_topics", F.concat_ws(", ", F.col("top_topics")))

result_df.show(2, truncate=False)
# Save the DataFrame to a CSV file
result_df.select("text", "lemmatized_tokens", "cleaned_words", "top_topics").write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(output_path)
print(" results have been saved to", output_path)

                                                                                

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+---+------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------

[Stage 219:>                                                        (0 + 1) / 1]

 results have been saved to ./labeledTwitterSmall.csv


24/11/28 09:48:52 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 926839 ms exceeds timeout 120000 ms
24/11/28 09:48:52 WARN SparkContext: Killing executors is not supported by current scheduler.
24/11/28 09:49:00 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o

In [13]:
# Create a list to hold the rows for the DataFrame
rows = []

# Iterate through the topics and their terms
for topic in topics.collect():
    topic_number = topic[0]  # Topic index
    terms = [vocab[i] for i in topic[1]]  # Get the corresponding terms
    # Create a row with topic number and terms joined by comma
    rows.append(Row(topic=f"Topic {topic_number}", terms=", ".join(terms)))

print(rows)
# Create a DataFrame from the list of rows
topics_df = spark.createDataFrame(rows)
topics_df = topics_df.coalesce(1)

# Specify the output path for the CSV file
output_path = "./topics_terms.csv"  # Replace with your desired output path

# Save the DataFrame to a CSV file
topics_df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(output_path)
print("Topics and their terms have been saved to", output_path)

[Row(topic='Topic 0', terms='faster, resource, cloudwatch, week, next'), Row(topic='Topic 1', terms='migration, service, cloud, data, tool'), Row(topic='Topic 2', terms='instance, cluster, aurora, mysql, web'), Row(topic='Topic 3', terms='cloudformation, api, use, read, gateway'), Row(topic='Topic 4', terms='security, partner, check, solution, config'), Row(topic='Topic 5', terms='reinvent, cost, dont, registration, miss'), Row(topic='Topic 6', terms='elastic, certificate, beanstalk, update, cloudcomputing'), Row(topic='Topic 7', terms='blog, new, public, cloud, sector'), Row(topic='Topic 8', terms='awssummit, free, startup, lab, chicago'), Row(topic='Topic 9', terms='device, apps, part, app, farm'), Row(topic='Topic 10', terms='analytics, deploy, detail, platform, docker'), Row(topic='Topic 11', terms='support, cloudcomputing, iot, apn, awslaunch'), Row(topic='Topic 12', terms='awsidentity, access, ready, securely, follow'), Row(topic='Topic 13', terms='using, meetup, today, start, we

[Stage 217:>                                                        (0 + 1) / 1]

Topics and their terms have been saved to ./topics_terms.csv


                                                                                