In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF,IDF, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes 
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [3]:
spark = SparkSession.builder.appName("BinaryStringClassification").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/19 20:37:46 WARN Utils: Your hostname, Omars-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.35 instead (on interface en0)
25/08/19 20:37:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/19 20:38:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
df = spark.read.csv('sentiment_analysis.csv', header=True)

# Analysis 

In [9]:
df = df.withColumn("label", col("label").cast("double"))

In [11]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- label: double (nullable = true)
 |-- tweet: string (nullable = true)



In [13]:
df.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 5894|
|  1.0| 2026|
+-----+-----+



In [15]:
from pyspark.sql.functions import col, when, count

df.select(
    count(when(col("tweet").isNull() | (col("tweet") == ""), "tweet")).alias("null_or_empty_tweets")
).show()


+--------------------+
|null_or_empty_tweets|
+--------------------+
|                   0|
+--------------------+



In [17]:
from pyspark.sql.functions import col, lower, regexp_replace, trim

df = df.withColumn(
    "tweet",
    trim(
        regexp_replace(
            regexp_replace(
                lower(col("tweet")),             # lowercase
                r"http\S+|@\w+|#\w+|[^a-zA-Z\s]", ""  # remove URLs, mentions, hashtags, special chars
            ),
            r"\bpictwitter\w+\b", ""             # remove 'pictwitter...'
        )
    )
)

df = df.withColumn("tweet", regexp_replace(col("tweet"), r"\s{2,}", " "))


In [19]:
df.select('tweet').show()

+--------------------+
|               tweet|
+--------------------+
|                test|
|finally a transpa...|
|we love this woul...|
|im wired i know i...|
|what amazing serv...|
|iphone software u...|
|        happy for us|
|new type c charge...|
|bout to go shoppi...|
|               photo|
|hey when you make...|
|ha not heavy mach...|
|contemplating giv...|
|i just made anoth...|
|the battery is so...|
|        from towards|
|like and share if...|
|            go crazy|
|the reason i dont...|
|how is the apple ...|
+--------------------+
only showing top 20 rows


In [21]:
df.select('tweet').show(truncate=False)

+------------------------------------------------------------------------------------------------------------------+
|tweet                                                                                                             |
+------------------------------------------------------------------------------------------------------------------+
|test                                                                                                              |
|finally a transparant silicon case thanks to my uncle                                                             |
|we love this would you go                                                                                         |
|im wired i know im george i was made that way                                                                     |
|what amazing service apple wont even talk to me about a question i have unless i pay them for their stupid support|
|iphone software update fucked up my phone big time stupid iphon

In [23]:
df.show()

+---+-----+--------------------+
| id|label|               tweet|
+---+-----+--------------------+
|  1|  0.0|                test|
|  2|  0.0|finally a transpa...|
|  3|  0.0|we love this woul...|
|  4|  0.0|im wired i know i...|
|  5|  1.0|what amazing serv...|
|  6|  1.0|iphone software u...|
|  7|  0.0|        happy for us|
|  8|  0.0|new type c charge...|
|  9|  0.0|bout to go shoppi...|
| 10|  0.0|               photo|
| 11|  1.0|hey when you make...|
| 12|  1.0|ha not heavy mach...|
| 13|  1.0|contemplating giv...|
| 14|  0.0|i just made anoth...|
| 15|  1.0|the battery is so...|
| 16|  0.0|        from towards|
| 17|  0.0|like and share if...|
| 18|  0.0|            go crazy|
| 19|  1.0|the reason i dont...|
| 20|  1.0|how is the apple ...|
+---+-----+--------------------+
only showing top 20 rows


In [None]:
avg=df.withColumn('leanght',a)

In [25]:
train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)

In [29]:
train_df = train_df.filter(col("tweet").isNotNull() & (col("tweet") != ""))


tokenizer = RegexTokenizer(inputCol="tweet", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashing = HashingTF(inputCol="filtered", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=[tokenizer, remover, hashing,idf, lr])

model = pipeline.fit(train_df)

25/08/19 20:40:47 WARN StopWordsRemover: Default locale set was [en_EG]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
25/08/19 20:40:49 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:40:49 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/08/19 20:40:49 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:40:49 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:40:50 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:40:50 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:40:50 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:40:50 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:40:50 WARN DAGScheduler: Broadcasting large task binary with size 

In [35]:
predictions = model.transform(test_df)  # or train_df
predictions.select("tweet", "label", "prediction").show(10)


+--------------------+-----+----------+
|               tweet|label|prediction|
+--------------------+-----+----------+
|   bado con with and|  0.0|       0.0|
|overrated waste o...|  1.0|       1.0|
|       like a geisha|  0.0|       0.0|
|so i can log in t...|  1.0|       1.0|
|effect created st...|  0.0|       0.0|
|my apple was so h...|  0.0|       0.0|
|johnson will he s...|  0.0|       0.0|
|i might have to v...|  1.0|       0.0|
|get off when you ...|  0.0|       0.0|
|get off apple itu...|  0.0|       0.0|
+--------------------+-----+----------+
only showing top 10 rows


25/08/19 20:43:48 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB


In [37]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluator
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# Parameter Grid (changing regParam + threshold)
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 1.0])
             .addGrid(lr.threshold, [0.3, 0.5, 0.7])   
             .build())

# CrossValidator
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=5)  


In [39]:
# Fit cross-validator
cvModel = cv.fit(train_df)

# Best model
bestModel = cvModel.bestModel
print("Best Threshold:", bestModel.stages[-1]._java_obj.getThreshold())

# Evaluate on test set
predictions = bestModel.transform(test_df)
auc = evaluator.evaluate(predictions)
print("Test AUC:", auc)


25/08/19 20:44:38 WARN StopWordsRemover: Default locale set was [en_EG]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
25/08/19 20:44:38 WARN StopWordsRemover: Default locale set was [en_EG]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
25/08/19 20:44:38 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:44:38 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:44:38 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:44:38 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:44:39 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:44:39 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:44:39 WARN DAGScheduler: Broadcasting large ta

Best Threshold: 0.3
Test AUC: 0.9159451356050862


25/08/19 20:45:49 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB


In [41]:

cvModel = cv.fit(train_df)


bestModel = cvModel.bestModel


lrModel = bestModel.stages[-1]  

print("Best regParam: ", lrModel._java_obj.getRegParam())
print("Best threshold: ", lrModel._java_obj.getThreshold())


25/08/19 20:50:42 WARN StopWordsRemover: Default locale set was [en_EG]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
25/08/19 20:50:42 WARN StopWordsRemover: Default locale set was [en_EG]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
25/08/19 20:50:43 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:50:43 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:50:43 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:50:43 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:50:43 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:50:43 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
25/08/19 20:50:43 WARN DAGScheduler: Broadcasting large ta

Best regParam:  1.0
Best threshold:  0.3


25/08/19 20:51:49 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


In [43]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy:.4f}")

# Confusion matrix
pred_rdd = predictions.select("prediction", "label").rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = MulticlassMetrics(pred_rdd)
print("Confusion Matrix:\n", metrics.confusionMatrix().toArray())


25/08/19 20:55:51 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB


Accuracy: 0.8578


25/08/19 20:55:51 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
25/08/19 20:55:51 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB


Confusion Matrix:
 [[1554.  163.]
 [ 160.  394.]]


In [33]:
predictions.show()

25/08/19 17:35:32 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB


+----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  id|label|               tweet|               words|            filtered|        raw_features|            features|       rawPrediction|         probability|prediction|
+----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
| 100|  0.0|   bado con with and|[bado, con, with,...|         [bado, con]|(262144,[115902,1...|(262144,[115902,1...|[253.993337306530...|           [1.0,0.0]|       0.0|
|1003|  1.0|overrated waste o...|[overrated, waste...|[overrated, waste...|(262144,[41809,83...|(262144,[41809,83...|[-807.50979447664...|           [0.0,1.0]|       1.0|
|1005|  0.0|       like a geisha|   [like, a, geisha]|      [like, geisha]|(262144,[133592,2...|(262144,[133592,2...|[390.615270439185...|       

In [41]:
output_df = predictions.select("id", "tweet", "label", "prediction", "probability").show(truncate=True)

+----+--------------------+-----+----------+--------------------+
|  id|               tweet|label|prediction|         probability|
+----+--------------------+-----+----------+--------------------+
| 100|   bado con with and|  0.0|       0.0|           [1.0,0.0]|
|1003|overrated waste o...|  1.0|       1.0|           [0.0,1.0]|
|1005|       like a geisha|  0.0|       0.0|           [1.0,0.0]|
|1006|so i can log in t...|  1.0|       1.0|[3.65903609817918...|
| 101|effect created st...|  0.0|       0.0|           [1.0,0.0]|
|1010|my apple was so h...|  0.0|       0.0|           [1.0,0.0]|
|1011|johnson will he s...|  0.0|       0.0|           [1.0,0.0]|
|1015|i might have to v...|  1.0|       0.0|           [1.0,0.0]|
|1017|get off when you ...|  0.0|       0.0|           [1.0,0.0]|
|1019|get off apple itu...|  0.0|       0.0|           [1.0,0.0]|
| 102|finalllllly my no...|  0.0|       0.0|           [1.0,0.0]|
|1023|mini gb wifi now ...|  0.0|       0.0|           [1.0,0.0]|
|1024|    

25/08/19 17:45:29 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB


In [71]:
#Naive bayes MODEL#
tokenizern = Tokenizer(inputCol="tweet", outputCol="words")


removern = StopWordsRemover(inputCol="words", outputCol="filtered_words")


hashingTFn = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=1000)

idfn = IDF(inputCol="raw_features", outputCol="features")

nv=NaiveBayes(featuresCol="features", labelCol="label")
pipeline1 = Pipeline(stages=[tokenizern, removern, hashingTFn, idfn,nv])


model1 = pipeline1.fit(train_df)       # fit = learn IDF weights
prediction1 = model1.transform(test_df)    # transform = apply all stages

# Show results
prediction1.select("tweet", "label", "prediction").show(10)


evaluator1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy1 = evaluator.evaluate(prediction1)
print(f"Accuracy: {accuracy:.4f}")

pred_rdd1 = prediction1.select("prediction", "label").rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics1 = MulticlassMetrics(pred_rdd1)
print("Confusion Matrix:\n", metrics1.confusionMatrix().toArray())


25/08/18 15:26:45 WARN StopWordsRemover: Default locale set was [en_EG]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.


+--------------------+-----+----------+
|               tweet|label|prediction|
+--------------------+-----+----------+
|   bado con with and|  0.0|       0.0|
|overrated waste o...|  1.0|       0.0|
|       like a geisha|  0.0|       0.0|
|so i can log in t...|  1.0|       1.0|
|effect created st...|  0.0|       0.0|
|my apple was so h...|  0.0|       0.0|
|johnson will he s...|  0.0|       0.0|
|i might have to v...|  1.0|       0.0|
|get off when you ...|  0.0|       0.0|
|get off apple itu...|  0.0|       0.0|
+--------------------+-----+----------+
only showing top 10 rows
Accuracy: 0.7635




Confusion Matrix:
 [[1316.  401.]
 [ 136.  418.]]


In [112]:
# Convert to Pandas
pdf = predictions.select("tweet", "label", "prediction", "probability").toPandas()


In [114]:
csv_path = "predictions.csv"
pdf.to_csv(csv_path, index=False)


In [130]:

try:
    response = s3.list_buckets()
    print("✅ Connected to S3!")
    print("Your Buckets:")
    for bucket in response["Buckets"]:
        print(f" - {bucket['Name']}")
except Exception as e:
    print("❌ Not connected to S3:", e)


✅ Connected to S3!
Your Buckets:
 - omar93tweetdata


In [132]:
bucket_name = "omar93tweetdata"
s3.upload_file(csv_path, bucket_name, "predictions.csv")

print("✅ File uploaded to S3 successfully!")


✅ File uploaded to S3 successfully!


In [59]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- label: double (nullable = true)
 |-- tweet: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- classWeight: double (nullable = false)



In [None]:
from pyspark.sql.functions import col, when

majority = df.filter(col("label") == 0.0).count()
minority = df.filter(col("label") == 1.0).count()
balancing_ratio = minority / (majority + minority)

df = df.withColumn(
    "classWeight",
    when(col("label") == 0.0, balancing_ratio).otherwise(1 - balancing_ratio))