In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import col, lower, regexp_replace

# Initialize Spark session
spark = SparkSession.builder.appName("RobustWordCount").getOrCreate()

In [None]:
def read_data(file_path):

    return spark.read.text(file_path)

# Preprocessing: Remove punctuation, lowercase text, and remove stopwords
def preprocess_text(df):

    df = df.withColumn("cleaned_text", regexp_replace(col("value"), "[^a-zA-Z\\s]", ""))
    

    df = df.withColumn("cleaned_text", lower(col("cleaned_text")))


    tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
    df = tokenizer.transform(df)
    

    remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
    df = remover.transform(df)
    
    return df

In [None]:
def count_word_frequencies(df):

    word_counts = df.select("filtered_words").rdd.flatMap(lambda row: row['filtered_words']) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b)
    
    return word_counts

# Function to save word count results to HDFS or any destination
def save_word_counts(word_counts, output_path):

    word_counts_df = word_counts.toDF(["word", "count"])
    

    word_counts_df.write.mode("overwrite").csv(output_path)

In [None]:
# Main pipeline function
def main(file_path, output_path):
    
    df = read_data(file_path)
    
    preprocessed_df = preprocess_text(df)
    
    word_counts = count_word_frequencies(preprocessed_df)
    
    save_word_counts(word_counts, output_path)

    print(f"Word count results saved to: {output_path}")


input_file_path = "hdfs://path/*.txt"  # Input file path
output_file_path = "hdfs://path/word_counts"  # Output directory for word counts

# Call the main function to process the text
main(input_file_path, output_file_path)


spark.stop()