In [1]:
#Install Java and PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget  https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz

--2025-09-28 08:37:58--  https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 388407094 (370M) [application/x-gzip]
Saving to: ‘spark-3.4.0-bin-hadoop3.tgz’


2025-09-28 08:42:59 (1.23 MB/s) - ‘spark-3.4.0-bin-hadoop3.tgz’ saved [388407094/388407094]



In [2]:
!tar xf spark-3.4.0-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
#Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

In [4]:
!ls

Amazon_Responded_Oct05.csv  spark-3.4.0-bin-hadoop3
sample_data		    spark-3.4.0-bin-hadoop3.tgz


In [5]:
#Initialize PySpark
import findspark
findspark.init()

from pyspark.sql import SparkSession

# Start a Spark session
spark = SparkSession.builder \
    .appName("Colab PySpark") \
    .master("local[*]") \
    .getOrCreate()

In [6]:
#ctrl+space to make suggestions
type(spark)

In [7]:
#all properties and methods
dir(spark)

['Builder',
 '__annotations__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_activeSession',
 '_convert_from_pandas',
 '_createFromLocal',
 '_createFromRDD',
 '_create_dataframe',
 '_create_from_pandas_with_arrow',
 '_create_shell_session',
 '_getActiveSessionOrCreate',
 '_get_numpy_record_dtype',
 '_inferSchema',
 '_inferSchemaFromList',
 '_instantiatedSession',
 '_jconf',
 '_jsc',
 '_jsparkSession',
 '_jvm',
 '_repr_html_',
 '_sc',
 'builder',
 'catalog',
 'conf',
 'createDataFrame',
 'getActiveSession',
 'newSession',
 'range',
 'read',
 'readStream',
 'sparkContext',
 'sql',
 'stop',
 'streams',
 'table',
 'udf',
 'v

In [8]:
from pyspark.sql.types import *

# Load and Clean The Data

In [10]:
import pandas as pd
# dataset path
csv_path = "/content/Amazon_Responded_Oct05.csv"

# Loading the data
use_cols = ["user_id_str", "user_followers_count", "text_"]
df = pd.read_csv(csv_path, usecols=use_cols, dtype={"user_id_str": str}, low_memory=False)
df

Unnamed: 0,user_id_str,user_followers_count,text_
0,,,
1,143515471,1503.0,@AmazonHelp Can you please DM me? A product I ...
2,85741735,149569.0,"@SeanEPanjab I'm sorry, we're unable to DM you..."
3,143515471,1503.0,@AmazonHelp It was purchased on https://t.co/g...
4,143515471,1503.0,"@AmazonHelp I am following you now, if it help..."
...,...,...,...
462024,,,
462025,60186579,234.0,@AmazonHelp do you guys ship to APO addresses?
462026,85741735,148850.0,@nat_says_ We do! You can find more informatio...
462027,60186579,234.0,@AmazonHelp Thank you


In [11]:
df.isnull().sum()

Unnamed: 0,0
user_id_str,83895
user_followers_count,83895
text_,83895


In [12]:
# Drop rows with missing values
df = df.dropna(subset=use_cols)

In [13]:
df.isnull().sum()

Unnamed: 0,0
user_id_str,0
user_followers_count,0
text_,0


In [15]:
# Convert followers to integer safely
df["user_followers_count"] = pd.to_numeric(df["user_followers_count"], errors="coerce").fillna(0).astype(int)
df

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["user_followers_count"] = pd.to_numeric(df["user_followers_count"], errors="coerce").fillna(0).astype(int)


Unnamed: 0,user_id_str,user_followers_count,text_
1,143515471,1503,@AmazonHelp Can you please DM me? A product I ...
2,85741735,149569,"@SeanEPanjab I'm sorry, we're unable to DM you..."
3,143515471,1503,@AmazonHelp It was purchased on https://t.co/g...
4,143515471,1503,"@AmazonHelp I am following you now, if it help..."
5,85741735,149569,@SeanEPanjab Please give us a call/chat so we ...
...,...,...,...
462023,85741735,148849,@Voltaireon Please send us the details here: h...
462025,60186579,234,@AmazonHelp do you guys ship to APO addresses?
462026,85741735,148850,@nat_says_ We do! You can find more informatio...
462027,60186579,234,@AmazonHelp Thank you


In [18]:
# Cleaning Text
import re

def clean_text(t):
    t = str(t).lower()                          # lowercase
    t = re.sub(r"http\S+|www\S+", " ", t)       # remove URLs
    t = re.sub(r"@\w+", " ", t)                 # remove mentions (@user)
    t = re.sub(r"[^a-z\s]", " ", t)             # remove everything not a-z or space
    t = re.sub(r"\s+", " ", t).strip()          # collapse multiple spaces
    return t

df["text_"] = df["text_"].apply(clean_text)
df["text_"] = df["text_"].astype(str).str.replace(r"[\r\n\t]+", " ", regex=True).str.strip()
df

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["text_"] = df["text_"].apply(clean_text)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["text_"] = df["text_"].astype(str).str.replace(r"[\r\n\t]+", " ", regex=True).str.strip()


Unnamed: 0,user_id_str,user_followers_count,text_
1,143515471,1503,can you please dm me a product i ordered last ...
2,85741735,149569,i m sorry we re unable to dm you was this orde...
3,143515471,1503,it was purchased on
4,143515471,1503,i am following you now if it helps to dm
5,85741735,149569,please give us a call chat so we can look into...
...,...,...,...
462023,85741735,148849,please send us the details here so we can look...
462025,60186579,234,do you guys ship to apo addresses
462026,85741735,148850,we do you can find more information here sb
462027,60186579,234,thank you


In [19]:
# Converting df into PySpark dataframe for distributed processing and selecting relevant columns
spark_df = spark.createDataFrame(df)
spark_df = spark_df.select("user_id_str", "user_followers_count", "text_")
spark_df.printSchema()
spark_df.show(5, truncate=120)

root
 |-- user_id_str: string (nullable = true)
 |-- user_followers_count: long (nullable = true)
 |-- text_: string (nullable = true)

+-----------+--------------------+-----------------------------------------------------------------------------------------+
|user_id_str|user_followers_count|                                                                                    text_|
+-----------+--------------------+-----------------------------------------------------------------------------------------+
|  143515471|                1503|                         can you please dm me a product i ordered last year never arrived|
|   85741735|              149569|i m sorry we re unable to dm you was this order purchased on or one of our other sites cl|
|  143515471|                1503|                                                                      it was purchased on|
|  143515471|                1503|                                                 i am following you now if it he

In [22]:
# Before Removing duplication
total_rows = spark_df.count()
distinct_users_before = spark_df.select("user_id_str").distinct().count()
print(f"Before deduplication: {total_rows:,} rows, {distinct_users_before:,} distinct users")

Before deduplication: 378,134 rows, 71,417 distinct users


In [23]:
# Handling duplicate columns
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Defining a window partitioned by user_id_str, ordered by followers descending
w = Window.partitionBy("user_id_str").orderBy(F.desc("user_followers_count"))

# Adding row number inside each user group
dedup_df = spark_df.withColumn("rn", F.row_number().over(w))

# Keep only rn == 1 (the row with max followers for that user)
dedup_df = dedup_df.filter(dedup_df.rn == 1).drop("rn")

#  After removing deduplication
total_rows_after = dedup_df.count()
distinct_users_after = dedup_df.select("user_id_str").distinct().count()
print(f"After deduplication: {total_rows_after:,} rows, {distinct_users_after:,} distinct users")
dedup_df.show(5, truncate=120)

After deduplication: 71,417 rows, 71,417 distinct users
+-----------+--------------------+------------------------------------------------------------------------------------------------------------------------+
|user_id_str|user_followers_count|                                                                                                                   text_|
+-----------+--------------------+------------------------------------------------------------------------------------------------------------------------+
| 1000033789|                  64|                                                                                really whatawaste environmentalawareness|
|  100008452|                  42|                   hi my item arrived all smashed up as shown in photos do i need to return damaged goods in order to ge|
|  100028695|                  11|worst experience you delivered me a box of cereals open stuck with gluegun on top of that the cerels were spoilt for ...|
| 100029

In [24]:
# Filtering users with more than 5000 followers
popular_df = dedup_df.filter(dedup_df.user_followers_count > 5000)

# Count how many popular users remain
popular_users_count = popular_df.count()
print(f"Number of popular users (followers > 5000): {popular_users_count:,}")

popular_df.show(5, truncate=120)


Number of popular users (followers > 5000): 2,130
+-----------+--------------------+------------------------------------------------------------------------------------------------------------------------+
|user_id_str|user_followers_count|                                                                                                                   text_|
+-----------+--------------------+------------------------------------------------------------------------------------------------------------------------+
| 1000295179|               70991|   so i have an safe place set up but this is the second time they opted not to use it and give it to a house doors away|
|  100272410|               78504|                      amazon in order gati courier has failed to deliver my package even though it has reached bokaro on|
| 1005304003|                5257|                                   i have one simple question and you guys had to transfer me three times is this a joke|
|  100614299| 

# Word Frequency Analysis



1.   Extract tweets (only text_).
2.   Clean each tweet: remove URLs, punctuation, numbers, make lowercase.
3.   Map: split into words.
4.   Reduce: count word frequencies.
5.   Get Top 10 most frequent words.



In [25]:
import re
# Cleaning Text
def clean_text(t):
    if not t:
        return ""
    t = str(t).lower()
    t = re.sub(r"http\S+|www\S+", " ", t)   # remove URLs
    t = re.sub(r"@\w+", " ", t)             # remove mentions
    t = re.sub(r"[^a-z\s]", " ", t)         # keep only letters and spaces
    t = re.sub(r"\s+", " ", t).strip()      # collapse multiple spaces
    return t

# Extracting tweets as an RDD
tweets_rdd = popular_df.select("text_").rdd.map(lambda row: row[0])

# Cleaning text and split into words (Map)
words_rdd = tweets_rdd.map(clean_text).flatMap(lambda txt: txt.split())

# Removing stopwords
stopwords = set([
    "the","and","to","a","of","in","is","for","that","on","it","with","this","be","are","as","at","an","by","from","or","was","i","you"
])
words_rdd = words_rdd.filter(lambda w: len(w) > 1 and w not in stopwords)

# Counting word frequencies (Reduce)
word_counts = words_rdd.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)

# Extracting Top 10 words
top10 = word_counts.takeOrdered(10, key=lambda x: -x[1])

print("Top 10 words used by popular users:")
for word, count in top10:
    print(f"{word}\t{count}")


Top 10 words used by popular users:
my	868
amazon	397
can	322
me	321
but	291
have	285
prime	280
not	272
order	267
your	239


In [26]:
# Saving outputs
print("Top 10 words used by popular users:")
for word, count in top10:
    print(f"{word}\t{count}")

# Save to text file
out_path = "/content/output.txt"
with open(out_path, "w", encoding="utf-8") as f:
    f.write("word\tcount\n")
    for word, count in top10:
        f.write(f"{word}\t{count}\n")

print(f"\nTop 10 words saved to {out_path}")


Top 10 words used by popular users:
my	868
amazon	397
can	322
me	321
but	291
have	285
prime	280
not	272
order	267
your	239

Top 10 words saved to /content/output.txt
