In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import RegexTokenizer
import re

In [26]:
# Create a SparkSession
from pyspark.sql import SparkSession
import pymysql
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession.builder.getOrCreate()


In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import RegexTokenizer
import re
# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Define preprocessing functions
def lower_case(text):
    """Convert text to lowercase."""
    return text.lower()

def remove_stopwords(text):
    """Remove stopwords from text."""
    stopwords = ['stopword1', 'stopword2', 'stopword3']  # Define your stopwords
    words = text.split()
    filtered_words = [word for word in words if word not in stopwords]
    return ' '.join(filtered_words)

def stem_text(text):
    """Apply stemming to text."""
    return text

def remove_special_characters(text):
    """Remove emojis and special characters from text."""
    # Define regex patterns for emojis and special characters
    emoji_pattern = r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F1E0-\U0001F1FF\u2600-\u26FF\u2700-\u27BF]+'
    special_chars_pattern = r'[^a-zA-Z0-9\s]'

    # Remove emojis
    text = re.sub(emoji_pattern, '', text)

    # Remove special characters
    text = re.sub(special_chars_pattern, '', text)
    return text

def remove_url_hashtags(text):
    """Remove URLs and hashtags from text."""
    # Define regex patterns for URLs and hashtags
    url_pattern = r'http\S+'
    hashtag_pattern = r'#[\w_]+'

    # Remove URLs
    text = re.sub(url_pattern, '', text)

    # Remove hashtags
    text = re.sub(hashtag_pattern, '', text)
    return text

# Register UDFs
lower_case_udf = udf(lower_case, StringType())
remove_stopwords_udf = udf(remove_stopwords, StringType())
stem_text_udf = udf(stem_text, StringType())
remove_special_chars_udf = udf(remove_special_characters, StringType())
remove_url_hashtags_udf = udf(remove_url_hashtags, StringType())

In [28]:
from pyspark.sql import SparkSession
import pandas as pd
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Read CSV file into DataFrame
pandas_df = pd.read_csv('SocialDeilemma.csv')
print(pandas_df.shape)
#pandas_df = pandas_df.iloc[:1000,:]
print(pandas_df.shape)
spark_df = spark.createDataFrame(pandas_df)

# Show the DataFrame
spark_df.show()

(10000, 3)
(10000, 3)
+--------------------+---------+--------------------+
|                text|Sentiment|                Date|
+--------------------+---------+--------------------+
|Is this a white m...|  Neutral|2022-07-10 08:44:...|
|Make sure to see ...| Positive|2023-01-31 08:44:...|
|@Gunntwitt @netfl...| Positive|2022-09-29 08:44:...|
|âFake news on T...| Negative|2022-06-13 08:44:...|
|I don't want to w...|  Neutral|2022-07-21 08:44:...|
|#TheSocialDilemma...| Positive|2022-12-16 08:44:...|
|#TheSocialDilemma...|  Neutral|2022-10-30 08:44:...|
|Watched this last...|  Neutral|2023-03-02 08:44:...|
|Fake news on twit...| Negative|2023-01-19 08:44:...|
|I hate to #tweet ...| Negative|2022-08-19 08:44:...|
|If you havenât ...|  Neutral|2022-12-05 08:44:...|
|Just finished wat...| Positive|2022-07-16 08:44:...|
|This is the kind ...| Positive|2022-08-16 08:44:...|
|Expert tip: never...|  Neutral|2022-09-18 08:44:...|
|The number of pre...|  Neutral|2022-06-11 08:44:...|
|Recom

In [29]:
from pyspark.ml.feature import StringIndexer

# Create StringIndexer transformer
indexer = StringIndexer(inputCol="Sentiment", outputCol="Label")

# Fit and transform the data
spark_df = indexer.fit(spark_df).transform(spark_df)


In [30]:
spark_df.show()

+--------------------+---------+--------------------+-----+
|                text|Sentiment|                Date|Label|
+--------------------+---------+--------------------+-----+
|Is this a white m...|  Neutral|2022-07-10 08:44:...|  1.0|
|Make sure to see ...| Positive|2023-01-31 08:44:...|  0.0|
|@Gunntwitt @netfl...| Positive|2022-09-29 08:44:...|  0.0|
|âFake news on T...| Negative|2022-06-13 08:44:...|  2.0|
|I don't want to w...|  Neutral|2022-07-21 08:44:...|  1.0|
|#TheSocialDilemma...| Positive|2022-12-16 08:44:...|  0.0|
|#TheSocialDilemma...|  Neutral|2022-10-30 08:44:...|  1.0|
|Watched this last...|  Neutral|2023-03-02 08:44:...|  1.0|
|Fake news on twit...| Negative|2023-01-19 08:44:...|  2.0|
|I hate to #tweet ...| Negative|2022-08-19 08:44:...|  2.0|
|If you havenât ...|  Neutral|2022-12-05 08:44:...|  1.0|
|Just finished wat...| Positive|2022-07-16 08:44:...|  0.0|
|This is the kind ...| Positive|2022-08-16 08:44:...|  0.0|
|Expert tip: never...|  Neutral|2022-09-

In [31]:
# Apply preprocessing steps on text column
spark_df = spark_df.withColumn("lowercased_text", lower_case_udf("text"))
spark_df = spark_df.withColumn("stopwords_removed_text", remove_stopwords_udf("lowercased_text"))
spark_df = spark_df.withColumn("stemmed_text", stem_text_udf("stopwords_removed_text"))

In [32]:
spark_df.show()

+--------------------+---------+--------------------+-----+--------------------+----------------------+--------------------+
|                text|Sentiment|                Date|Label|     lowercased_text|stopwords_removed_text|        stemmed_text|
+--------------------+---------+--------------------+-----+--------------------+----------------------+--------------------+
|Is this a white m...|  Neutral|2022-07-10 08:44:...|  1.0|is this a white m...|  is this a white m...|is this a white m...|
|Make sure to see ...| Positive|2023-01-31 08:44:...|  0.0|make sure to see ...|  make sure to see ...|make sure to see ...|
|@Gunntwitt @netfl...| Positive|2022-09-29 08:44:...|  0.0|@gunntwitt @netfl...|  @gunntwitt @netfl...|@gunntwitt @netfl...|
|âFake news on T...| Negative|2022-06-13 08:44:...|  2.0|âfake news on t...|  âfake news on t...|âfake news on t...|
|I don't want to w...|  Neutral|2022-07-21 08:44:...|  1.0|i don't want to w...|  i don't want to w...|i don't want to w...|


In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, avg, when

# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Preprocess the data
spark_df = spark_df.withColumn("Sentiment", when((col("Sentiment") == "Neutral") | (col("Sentiment") == "Positive"), "neutral_positive").otherwise(col("Sentiment")))
spark_df = spark_df.withColumn("Date", date_format(col("Date"), "yyyy-MM-dd"))

spark_df.show()

+--------------------+----------------+----------+-----+--------------------+----------------------+--------------------+
|                text|       Sentiment|      Date|Label|     lowercased_text|stopwords_removed_text|        stemmed_text|
+--------------------+----------------+----------+-----+--------------------+----------------------+--------------------+
|Is this a white m...|neutral_positive|2022-07-10|  1.0|is this a white m...|  is this a white m...|is this a white m...|
|Make sure to see ...|neutral_positive|2023-01-31|  0.0|make sure to see ...|  make sure to see ...|make sure to see ...|
|@Gunntwitt @netfl...|neutral_positive|2022-09-29|  0.0|@gunntwitt @netfl...|  @gunntwitt @netfl...|@gunntwitt @netfl...|
|âFake news on T...|        Negative|2022-06-13|  2.0|âfake news on t...|  âfake news on t...|âfake news on t...|
|I don't want to w...|neutral_positive|2022-07-21|  1.0|i don't want to w...|  i don't want to w...|i don't want to w...|
|#TheSocialDilemma...|ne

In [34]:
# Create StringIndexer transformer
indexer = StringIndexer(inputCol="Sentiment", outputCol="Sop")

# Fit and transform the data
spark_df = indexer.fit(spark_df).transform(spark_df)

spark_df.show()

+--------------------+----------------+----------+-----+--------------------+----------------------+--------------------+---+
|                text|       Sentiment|      Date|Label|     lowercased_text|stopwords_removed_text|        stemmed_text|Sop|
+--------------------+----------------+----------+-----+--------------------+----------------------+--------------------+---+
|Is this a white m...|neutral_positive|2022-07-10|  1.0|is this a white m...|  is this a white m...|is this a white m...|0.0|
|Make sure to see ...|neutral_positive|2023-01-31|  0.0|make sure to see ...|  make sure to see ...|make sure to see ...|0.0|
|@Gunntwitt @netfl...|neutral_positive|2022-09-29|  0.0|@gunntwitt @netfl...|  @gunntwitt @netfl...|@gunntwitt @netfl...|0.0|
|âFake news on T...|        Negative|2022-06-13|  2.0|âfake news on t...|  âfake news on t...|âfake news on t...|1.0|
|I don't want to w...|neutral_positive|2022-07-21|  1.0|i don't want to w...|  i don't want to w...|i don't want to w.

In [43]:
# Group by date and calculate average sentiment
result = spark_df.groupBy("Date").agg(avg("Sop").alias("Average_Sentiment"))

# Show the result
result.show()

+----------+-------------------+
|      Date|  Average_Sentiment|
+----------+-------------------+
|2023-05-18|               0.25|
|2023-05-01|0.17647058823529413|
|2023-01-21|0.18181818181818182|
|2022-10-05|0.30434782608695654|
|2023-04-28|0.13333333333333333|
|2023-04-17| 0.1724137931034483|
|2023-04-21|0.15384615384615385|
|2022-10-07|0.13333333333333333|
|2023-02-10|0.12903225806451613|
|2023-03-11|0.27586206896551724|
|2023-03-10| 0.1935483870967742|
|2023-05-04|0.14285714285714285|
|2023-04-26|0.19047619047619047|
|2023-02-23|            0.28125|
|2022-07-04|              0.125|
|2022-07-08| 0.2857142857142857|
|2023-03-17|0.11538461538461539|
|2022-07-30| 0.2222222222222222|
|2022-09-03|0.13793103448275862|
|2023-05-14|0.18181818181818182|
+----------+-------------------+
only showing top 20 rows



In [37]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Perform groupBy and count on Sentiment column
result = spark_df.groupBy("Date").count()

# Show the result
result.show()

+----------+-----+
|      Date|count|
+----------+-----+
|2023-05-18|   40|
|2023-05-01|   17|
|2023-01-21|   22|
|2022-10-05|   23|
|2023-04-28|   30|
|2023-04-17|   29|
|2023-04-21|   26|
|2022-10-07|   30|
|2023-02-10|   31|
|2023-03-11|   29|
|2023-03-10|   31|
|2023-05-04|   28|
|2023-04-26|   42|
|2023-02-23|   32|
|2022-07-04|   24|
|2022-07-08|   28|
|2023-03-17|   26|
|2022-07-30|   27|
|2022-09-03|   29|
|2023-05-14|   33|
+----------+-----+
only showing top 20 rows



In [36]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Perform groupBy and count on Sentiment column
result = spark_df.groupBy("Sentiment").count()

# Show the result
result.show()

+----------------+-----+
|       Sentiment|count|
+----------------+-----+
|neutral_positive| 8229|
|        Negative| 1771|
+----------------+-----+



In [47]:
result.show()

+----------+-------------------+
|      Date|  Average_Sentiment|
+----------+-------------------+
|2023-05-18|               0.25|
|2023-05-01|0.17647058823529413|
|2023-01-21|0.18181818181818182|
|2022-10-05|0.30434782608695654|
|2023-04-28|0.13333333333333333|
|2023-04-17| 0.1724137931034483|
|2023-04-21|0.15384615384615385|
|2022-10-07|0.13333333333333333|
|2023-02-10|0.12903225806451613|
|2023-03-11|0.27586206896551724|
|2023-03-10| 0.1935483870967742|
|2023-05-04|0.14285714285714285|
|2023-04-26|0.19047619047619047|
|2023-02-23|            0.28125|
|2022-07-04|              0.125|
|2022-07-08| 0.2857142857142857|
|2023-03-17|0.11538461538461539|
|2022-07-30| 0.2222222222222222|
|2022-09-03|0.13793103448275862|
|2023-05-14|0.18181818181818182|
+----------+-------------------+
only showing top 20 rows



In [44]:
df = result.toPandas()


In [48]:
df.to_csv('preprocessedSD.csv', index=False)

In [45]:
# result.write.format("csv").option("header", "true").save("processedSocialDilemma.csv")


# Save Cassandra

In [49]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *


In [65]:
from cassandra.cluster import Cluster
cluster = Cluster(["127.0.0.1"], port = 9046)
session = cluster.connect()


In [70]:
result  = session.execute("CREATE KEYSPACE IF NOT EXISTS SD WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}")

In [71]:
session.set_keyspace("sd")

In [72]:

result = session.execute("CREATE TABLE IF NOT EXISTS SocialDilemma ( Date TEXT PRIMARY KEY, Average_Sentiment TEXT);")

In [76]:

def Insert(date, avg_sent):
    session.execute("INSERT INTO sd.SocialDilemma (Date, Average_Sentiment) VALUES (%s, %s)", (date, avg_sent))
for i in df.index:
    Insert(str(df.iloc[i,0]),str(df.iloc[i,1]))

In [None]:
rows = session.execute('SELECT * from sd.SocialDilemma')
for i in rows:
    print(i)

Row(date=u'2023-02-13', average_sentiment=u'0.20689655172413793')
Row(date=u'2022-12-18', average_sentiment=u'0.19230769230769232')
Row(date=u'2022-05-28', average_sentiment=u'0.047619047619047616')
Row(date=u'2022-08-08', average_sentiment=u'0.21739130434782608')
Row(date=u'2022-12-31', average_sentiment=u'0.0')
Row(date=u'2022-09-19', average_sentiment=u'0.23809523809523808')
Row(date=u'2022-08-15', average_sentiment=u'0.1724137931034483')
Row(date=u'2022-11-29', average_sentiment=u'0.25')
Row(date=u'2022-10-18', average_sentiment=u'0.2222222222222222')
Row(date=u'2023-01-18', average_sentiment=u'0.16666666666666666')
Row(date=u'2022-09-29', average_sentiment=u'0.08571428571428572')
Row(date=u'2023-02-22', average_sentiment=u'0.18518518518518517')
Row(date=u'2022-06-19', average_sentiment=u'0.2')
Row(date=u'2022-07-16', average_sentiment=u'0.08695652173913043')
Row(date=u'2023-03-14', average_sentiment=u'0.11538461538461539')
Row(date=u'2023-02-11', average_sentiment=u'0.133333333333

Traceback (most recent call last):
  File "c:\Python27\lib\site-packages\cassandra\cluster.py", line 3522, in _reconnect_internal
    return self._try_connect(host)
  File "c:\Python27\lib\site-packages\cassandra\cluster.py", line 3544, in _try_connect
    connection = self._cluster.connection_factory(host.endpoint, is_control_connection=True)
  File "c:\Python27\lib\site-packages\cassandra\cluster.py", line 1620, in connection_factory
    return self.connection_class.factory(endpoint, self.connect_timeout, *args, **kwargs)
  File "c:\Python27\lib\site-packages\cassandra\connection.py", line 831, in factory
    conn = cls(endpoint, *args, **kwargs)
  File "c:\Python27\lib\site-packages\cassandra\io\asyncorereactor.py", line 344, in __init__
    self._connect_socket()
  File "c:\Python27\lib\site-packages\cassandra\connection.py", line 899, in _connect_socket
    ([a[4] for a in addresses], sockerr.strerror or sockerr))
error: [Errno 10061] Tried connecting to [('127.0.0.1', 9046)]. Las