In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer, RegexTokenizer, StopWordsRemover, StringIndexer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a Spark session
spark = SparkSession.builder \
    .appName("Sentiment Analysis with Naive Bayes") \
    .getOrCreate()

# Load the CSV file into a DataFrame
file_path = 'twitter_training.csv'
df = spark.read.csv(file_path, header=False, inferSchema=True)

# Define column names
df = df.withColumnRenamed('_c0', 'ID') \
       .withColumnRenamed('_c1', 'Topic') \
       .withColumnRenamed('_c2', 'Sentiment') \
       .withColumnRenamed('_c3', 'Text')

# Drop rows with null values
df = df.dropna()

# Prepare features and label
tokenizer = RegexTokenizer(inputCol='Text', outputCol='words', pattern='\\W')
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
vectorizer = CountVectorizer(inputCol='filtered_words', outputCol='features')
indexer = StringIndexer(inputCol='Sentiment', outputCol='label')

# Split data into train and test sets
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

# Create a Naive Bayes classifier
nb = NaiveBayes(featuresCol='features', labelCol='label', smoothing=1.0)

# Create a pipeline
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, indexer, nb])

# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Stop the Spark session
spark.stop()


24/05/04 21:59:38 WARN Utils: Your hostname, Abdelmajid-Macs-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.11.108 instead (on interface en0)
24/05/04 21:59:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/05/04 21:59:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/04 21:59:48 WARN StopWordsRemover: Default locale set was [en_MA]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
24/05/04 22:00:01 WARN DAGScheduler: Broadcasting large task binary with size 1296.4 KiB
                                                                                

Accuracy: 0.7506495282373855


In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer, RegexTokenizer, StopWordsRemover, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a Spark session
spark = SparkSession.builder \
    .appName("Sentiment Analysis with Decision Tree") \
    .getOrCreate()

# Load the CSV file into a DataFrame
file_path = 'twitter_training.csv'
df = spark.read.csv(file_path, header=False, inferSchema=True)

# Define column names
df = df.withColumnRenamed('_c0', 'ID') \
       .withColumnRenamed('_c1', 'Topic') \
       .withColumnRenamed('_c2', 'Sentiment') \
       .withColumnRenamed('_c3', 'Text')

# Drop rows with null values
df = df.dropna()

# Prepare features and label
tokenizer = RegexTokenizer(inputCol='Text', outputCol='words', pattern='\\W')
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
vectorizer = CountVectorizer(inputCol='filtered_words', outputCol='features')
indexer = StringIndexer(inputCol='Sentiment', outputCol='label')

# Split data into train and test sets
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

# Create a Decision Tree classifier
dt = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxDepth=5)

# Create a pipeline
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, indexer, dt])

# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Stop the Spark session
spark.stop()


24/05/04 22:00:12 WARN StopWordsRemover: Default locale set was [en_MA]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
24/05/04 22:00:23 WARN DAGScheduler: Broadcasting large task binary with size 1023.3 KiB
24/05/04 22:00:24 WARN MemoryStore: Not enough space to cache rdd_51_2 in memory! (computed 106.4 MiB so far)
24/05/04 22:00:24 WARN MemoryStore: Not enough space to cache rdd_51_1 in memory! (computed 106.4 MiB so far)
24/05/04 22:00:24 WARN MemoryStore: Not enough space to cache rdd_51_0 in memory! (computed 106.4 MiB so far)
24/05/04 22:00:24 WARN BlockManager: Persisting block rdd_51_0 to disk instead.
24/05/04 22:00:24 WARN BlockManager: Persisting block rdd_51_1 to disk instead.
24/05/04 22:00:24 WARN BlockManager: Persisting block rdd_51_2 to disk instead.
24/05/04 22:00:28 WARN MemoryStore: Not enough space to cache rdd_51_2 in memory! (computed 246.1 MiB so far)
24/05/04 22:00:33 

Accuracy: 0.3674962395733625


In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer, RegexTokenizer, StopWordsRemover, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import joblib

# Create a Spark session
spark = SparkSession.builder \
    .appName("Sentiment Analysis") \
    .getOrCreate()

# Load the CSV file into a DataFrame
file_path = 'twitter_training.csv'
df = spark.read.csv(file_path, header=False, inferSchema=True)

# Define column names
df = df.withColumnRenamed('_c0', 'ID') \
       .withColumnRenamed('_c1', 'Topic') \
       .withColumnRenamed('_c2', 'Sentiment') \
       .withColumnRenamed('_c3', 'Text')

# Drop rows with null values
df = df.dropna()

# Prepare features and label
tokenizer = RegexTokenizer(inputCol='Text', outputCol='words', pattern='\\W')
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
vectorizer = CountVectorizer(inputCol='filtered_words', outputCol='features')
indexer = StringIndexer(inputCol='Sentiment', outputCol='label')

# Split data into train and test sets
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

# Create a Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Create a pipeline
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, indexer, lr])

# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Save the model


# Stop the Spark session
spark.stop()


24/05/04 22:01:03 WARN StopWordsRemover: Default locale set was [en_MA]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
24/05/04 22:01:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/05/04 22:01:07 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/05/04 22:01:21 WARN DAGScheduler: Broadcasting large task binary with size 1301.5 KiB


Accuracy: 0.8372760836865856


In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer, RegexTokenizer, StopWordsRemover, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a Spark session
spark = SparkSession.builder \
    .appName("Sentiment Analysis") \
    .getOrCreate()

# Load the CSV file into a DataFrame
file_path = 'twitter_training.csv'
df = spark.read.csv(file_path, header=False, inferSchema=True)

# Define column names
df = df.withColumnRenamed('_c0', 'ID') \
       .withColumnRenamed('_c1', 'Topic') \
       .withColumnRenamed('_c2', 'Sentiment') \
       .withColumnRenamed('_c3', 'Text')

# Drop rows with null values
df = df.dropna()

# Prepare features and label
tokenizer = RegexTokenizer(inputCol='Text', outputCol='words', pattern='\\W')
# Keep the ID column by using a custom function to remove stop words
stop_words = StopWordsRemover.loadDefaultStopWords('english')
stop_words_remover = StopWordsRemover(inputCol='words', outputCol='filtered_words', stopWords=stop_words)
vectorizer = CountVectorizer(inputCol='filtered_words', outputCol='features')
indexer = StringIndexer(inputCol='Sentiment', outputCol='label')

# Split data into train and test sets
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

# Create a Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Create a pipeline
pipeline = Pipeline(stages=[tokenizer, stop_words_remover, vectorizer, indexer, lr])

# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Stop the Spark session
spark.stop()


24/05/04 22:03:31 WARN StopWordsRemover: Default locale set was [en_MA]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
24/05/04 22:03:48 WARN DAGScheduler: Broadcasting large task binary with size 1301.5 KiB


Accuracy: 0.8372760836865856


In [5]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer, RegexTokenizer, StopWordsRemover, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a Spark session
spark = SparkSession.builder \
    .appName("Sentiment Analysis") \
    .getOrCreate()

# Load the CSV file into a DataFrame
file_path = 'twitter_training.csv'
df = spark.read.csv(file_path, header=False, inferSchema=True)

# Define column names
df = df.withColumnRenamed('_c0', 'ID') \
       .withColumnRenamed('_c1', 'Topic') \
       .withColumnRenamed('_c2', 'Sentiment') \
       .withColumnRenamed('_c3', 'Text')

# Drop rows with null values
df = df.dropna()

# Prepare features and label
tokenizer = RegexTokenizer(inputCol='Text', outputCol='words', pattern='\\W')
stop_words = StopWordsRemover.loadDefaultStopWords('english')
stop_words_remover = StopWordsRemover(inputCol='words', outputCol='filtered_words', stopWords=stop_words)
vectorizer = CountVectorizer(inputCol='filtered_words', outputCol='features')
indexer = StringIndexer(inputCol='Sentiment', outputCol='label')

# Select columns for processing (excluding 'ID')
selected_cols = ['Text', 'Sentiment']
df_selected = df.select(selected_cols)

# Split data into train and test sets
(train_data, test_data) = df_selected.randomSplit([0.8, 0.2], seed=42)

# Create a Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Create a pipeline
pipeline = Pipeline(stages=[tokenizer, stop_words_remover, vectorizer, indexer, lr])

# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Stop the Spark session
spark.stop()


24/05/04 22:05:07 WARN StopWordsRemover: Default locale set was [en_MA]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
24/05/04 22:05:24 WARN DAGScheduler: Broadcasting large task binary with size 1301.5 KiB
                                                                                

Accuracy: 0.8441132230274853


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, StringIndexer, NGram, HashingTF, IDF
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create a Spark session
spark = SparkSession.builder \
    .appName("Sentiment Analysis with RandomForest") \
    .getOrCreate()

# Load the CSV file into a DataFrame
file_path = 'twitter_training.csv'
df = spark.read.csv(file_path, header=False, inferSchema=True)

# Define column names
df = df.withColumnRenamed('_c0', 'ID') \
       .withColumnRenamed('_c1', 'Topic') \
       .withColumnRenamed('_c2', 'Sentiment') \
       .withColumnRenamed('_c3', 'Text')

# Drop rows with null values
df = df.dropna()

# Prepare features and label
tokenizer = RegexTokenizer(inputCol='Text', outputCol='words', pattern='\\W')
stop_words_remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
indexer = StringIndexer(inputCol='Sentiment', outputCol='label')

# Add NGram feature extraction
ngram = NGram(n=2, inputCol='filtered_words', outputCol='ngrams')
hashingTF = HashingTF(inputCol='ngrams', outputCol='rawFeatures')
idf = IDF(inputCol='rawFeatures', outputCol='features')

# Change to Random Forest Classifier
rf = RandomForestClassifier(featuresCol='features', labelCol='label')

# Create a pipeline
pipeline = Pipeline(stages=[tokenizer, stop_words_remover, indexer, ngram, hashingTF, idf, rf])

# Split data into train and test sets
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

# Set up parameter grid and cross-validation
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100, 200])
             .addGrid(rf.maxDepth, [5, 10])
             .build())

evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Train the model
cvModel = crossval.fit(train_data)

# Make predictions
predictions = cvModel.transform(test_data)

# Evaluate the model
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Stop the Spark session
spark.stop()


24/05/04 22:05:52 WARN StopWordsRemover: Default locale set was [en_MA]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
24/05/04 22:05:53 WARN StopWordsRemover: Default locale set was [en_MA]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
24/05/04 22:05:53 WARN StopWordsRemover: Default locale set was [en_MA]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
24/05/04 22:05:57 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/05/04 22:05:57 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/05/04 22:05:58 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
24/05/04 22:06:43 WARN DAGScheduler: Broadcasting large task binary with size 1034.3 KiB
24/05/04 22:06

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer, RegexTokenizer, StopWordsRemover, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a Spark session
spark = SparkSession.builder \
    .appName("Sentiment Analysis with Logistic Regression") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Load the CSV file into a DataFrame
file_path = 'twitter_training.csv'
df = spark.read.csv(file_path, header=False, inferSchema=True)

# Define column names and drop the ID column
df = df.withColumnRenamed('_c0', 'ID') \
       .withColumnRenamed('_c1', 'Topic') \
       .withColumnRenamed('_c2', 'Sentiment') \
       .withColumnRenamed('_c3', 'Text') \
       .drop('ID')  # Drop the ID column

# Drop rows with null values
df = df.dropna()

# Prepare features and label
tokenizer = RegexTokenizer(inputCol='Text', outputCol='words', pattern='\\W')
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
vectorizer = CountVectorizer(inputCol='filtered_words', outputCol='features')
indexer = StringIndexer(inputCol='Sentiment', outputCol='label')

# Split data into train and test sets
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

# Create a Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Create a pipeline
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, indexer, lr])

# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Stop the Spark session
spark.stop()


24/05/04 22:23:03 WARN Utils: Your hostname, Abdelmajid-Macs-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.11.108 instead (on interface en0)
24/05/04 22:23:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/05/04 22:23:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/04 22:23:12 WARN StopWordsRemover: Default locale set was [en_MA]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.
24/05/04 22:23:24 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/05/04 22:23:24 WARN BLAS: Failed to load implementation from: com.github.f

Accuracy: 0.8495145631067961
