<a href="https://colab.research.google.com/github/vagzikopis/SparkWordCount/blob/main/TweetAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, explode, split, desc, row_number, length
from pyspark.sql.window import Window
from pyspark.ml.feature import StopWordsRemover

# Initialize the Spark Session and use DataFrame
spark = SparkSession.builder.appName("AirlineAnalysis").getOrCreate()

# Load the dataset
df = spark.read.csv("tweets.csv", header=True, inferSchema=True)
# Extract all airline names
airline_names = [row.airline.lower() for row in df.select("airline").distinct().filter(col("airline").isNotNull()).collect()]
expanded_airlines = []
for name in airline_names:
    air_name = name.replace(" ", "")
    expanded_airlines.append(air_name)
    expanded_airlines.append(air_name)
    expanded_airlines.append(air_name + "air")
    expanded_airlines.append(air_name + "airline")
    expanded_airlines.append(air_name + "airlines")
    expanded_airlines.append(air_name + "airway")
    expanded_airlines.append(air_name + "airways")
# 3. Data Preprocessing & Cleaning
# Remove punctuation
# Convert all to lowercase
clean_df = df.withColumn("clean_text", lower(regexp_replace(col("text"), r'[^\w\s]', "")))

# Question 1: Top 5 words per Sentiment
# Filter out rows where 'clean_text' is null or empty before splitting,
# as StopWordsRemover can't process null or empty arrays.
clean_df = clean_df.filter(col("clean_text").isNotNull() & (length(col("clean_text")) > 0))

# Split the cleaned text into an array of words
tokens_df = clean_df.withColumn("words_array", split(col("clean_text"), " "))

# Stop Word Removal
# Use StopWordsRemover that filters out common stop words that don't carry sentiment meaning
remover = StopWordsRemover(inputCol="words_array", outputCol="filtered_words")
# Merge airline names with stop_words
custom_stop_words = remover.getStopWords() + expanded_airlines + ["flight", "flights"]
remover.setStopWords(custom_stop_words)

# Apply the transformation
df_filtered = remover.transform(tokens_df)

# Explode and Filter
# explode() turns the array of words into individual rows (one row per word)
# Filter for length > 1 to remove single characters that do not carry sentiment meaning
words_df = df_filtered.select("airline_sentiment", explode(col("filtered_words")).alias("word")) \
    .filter(length(col("word")) > 1)

# Word Count
# Group by sentiment type and the word itself to get the frequency
word_counts = words_df.groupBy("airline_sentiment", "word").count()

# Window Function for Ranking
# Partition by sentiment and order by count descending to find the top words for each group
sentiment_window = Window.partitionBy("airline_sentiment").orderBy(desc("count"))

top_5_words = word_counts.withColumn("rank", row_number().over(sentiment_window)) \
    .filter(col("rank") <= 5) \
    .select("airline_sentiment", "word", "count")

print("Question 1: Top 5 Words per Sentiment")
top_5_words.show()

# Question 2: Main Negative Reason per Airline
# Ignore rows with confidence <= 0.5 and skip null values
negative_df = df.filter((col("negativereason_confidence") > 0.5) & (col("negativereason").isNotNull()))

# Group by Airline and Reason and perform word count
reason_counts = negative_df.groupBy("airline", "negativereason").count()

# Window Function for Top Reason
# Partition by airline and pick the the highest count
airline_window = Window.partitionBy("airline").orderBy(desc("count"))

top_reasons = reason_counts.withColumn("rank", row_number().over(airline_window)) \
    .filter(col("rank") == 1) \
    .select("airline", "negativereason", "count")

print("Question 2: Main Negative Reason per Airline")
top_reasons.show(truncate=False)

Question 1: Top 5 Words per Sentiment
+-----------------+---------+-----+
|airline_sentiment|     word|count|
+-----------------+---------+-----+
|         negative|  jetblue|  990|
|         negative|      get|  934|
|         negative|cancelled|  864|
|         negative|  service|  696|
|         negative|    hours|  609|
|          neutral|  jetblue|  685|
|          neutral|      get|  227|
|          neutral|   please|  172|
|          neutral|     help|  150|
|          neutral|   thanks|  147|
|         positive|   thanks|  587|
|         positive|  jetblue|  569|
|         positive|    thank|  433|
|         positive|    great|  222|
|         positive|  service|  146|
+-----------------+---------+-----+

Question 2: Main Negative Reason per Airline
+--------------+----------------------+-----+
|airline       |negativereason        |count|
+--------------+----------------------+-----+
|American      |Customer Service Issue|654  |
|Delta         |Late Flight           |228  |
|S