In [1]:
import pandas as pd
import numpy as np

# Load the data

In [33]:
# Load the parquet dataframe in pandas
pd_df = pd.read_parquet("../data/clean_data/df_trans_sent_comments.parquet")

# Display info of dataframe
pd_df.info()

# Drop Nan val# ues
pd_df.dropna(inplace=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 184673 entries, 0 to 184672
Data columns (total 12 columns):
 #   Column             Non-Null Count   Dtype              
---  ------             --------------   -----              
 0   comment_id         184673 non-null  object             
 1   author             184673 non-null  object             
 2   author_channel_id  184673 non-null  object             
 3   text               184673 non-null  object             
 4   like_count         184673 non-null  float64            
 5   published_at       184673 non-null  datetime64[ns, UTC]
 6   updated_at         184673 non-null  datetime64[ns, UTC]
 7   totalReplyCount    184673 non-null  float64            
 8   video_id           184673 non-null  object             
 9   translation        184670 non-null  object             
 10  clean_text         184670 non-null  object             
 11  sentiment          184670 non-null  float64            
dtypes: datetime64[ns, UTC](2), flo

# Sentiment Analysis using RoBERTa
* NLP Transformer-based Models used for Sentiment Analysis

* RoBERTa: Robustly Optimized BERT Approach
* BERT: Bidirectional Encoder Representations from Transformers

* Optimised using datasets from Huggingface: structure optimised for NLP

In [27]:

from datasets import Dataset
from transformers import pipeline

# Load the sentiment analysis model with GPU support if available
sentiment_model = pipeline(task="text-classification", model="SamLowe/roberta-base-go_emotions", top_k=None, device=0)

# Define a function to process a batch of texts and return the raw output without truncating the tokens
def apply_sentiment(batch):
    # Ensure 'clean_text' is a list of texts in the batch
    texts = batch['clean_text_truncated']
    
    # Process each text without truncating tokens
    sentiments = [sentiment_model(text) for text in texts]  # No token truncation, raw model output
    
    # Add the raw sentiment analysis result back to the batch
    batch['emotions'] = sentiments
    
    return batch

# Safe copy of the dataframe
df_test = pd_df.copy()

# Truncate the text up to 512 tokens
def truncate_text(text):
    return text[:512]
df_test["clean_text_truncated"] = df_test["clean_text"].apply(truncate_text)

# Convert the PySpark DataFrame to Pandas and then to a Hugging Face Dataset
dataset = Dataset.from_pandas(df_test)

# Apply the sentiment analysis model in batches
dataset = dataset.map(apply_sentiment, batched=True)

# Convert the dataset back to a pandas DataFrame if needed
df_result = dataset.to_pandas()



Map:   0%|          | 0/184670 [00:00<?, ? examples/s]

KeyboardInterrupt: 

In [None]:
# Save dataframe locally
df_result.to_parquet("../data/clean_data/df_comments_emotions_nlp.parquet")

# PySpark for Sentiment Analysis

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel

In [3]:
# Create a Spark sesion
spark = SparkSession.builder.getOrCreate()

your 131072x1 screen size is bogus. expect trouble
24/10/23 00:33:17 WARN Utils: Your hostname, DESKTOP-AEUBGUH resolves to a loopback address: 127.0.1.1; using 172.17.252.233 instead (on interface eth0)
24/10/23 00:33:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/23 00:33:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Amply the memory up to 8GB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 8000 * 1024 * 1024)

In [5]:
# Get the threshold value and remove the non-numeric part (the 'b' character for bytes)
threshold_value = spark.conf.get("spark.sql.autoBroadcastJoinThreshold").rstrip('b')

# Convert the cleaned value to an integer and calculate the size in MB
size = int(threshold_value) / (1024 * 1024)

# Print the result
print(f"Default size of broadcast table is {size} MB.")

Default size of broadcast table is 8000.0 MB.


In [None]:
# Verify the dataframe has been stored in cache
print("Memory Used : {0}".format(df.storageLevel.useMemory))
print("Disk Used : {0}".format(df.storageLevel.useDisk))

Memory Used : True
Disk Used : True


# Load PySpark Dataframe

In [None]:
# Load the dataframe in PySpark
df_spark = spark.createDataFrame(df_result)

# Save it in cache for later uses
df_spark.cache()

# Process Emotion Analysis data from RoBERTa

In [12]:
from pyspark.sql.functions import udf, col, slice
from pyspark.sql.types import FloatType

# Diccionario que mapea emociones a un valor entre -1 y 1
emotion_sentiment_map = {
    'admiration': 1,
    'approval': 0.8,
    'gratitude': 1,
    'neutral': 0,
    'optimism': 0.7,
    'disapproval': -0.7,
    'realization': 0.5,
    'curiosity': 0.4,
    'excitement': 0.9,
    'annoyance': -0.6,
    'joy': 1,
    'disappointment': -0.6,
    'confusion': -0.3,
    'surprise': 0.2,
    'love': 1,
    'pride': 0.9,
    'sadness': -0.8,
    'caring': 0.8,
    'amusement': 0.7,
    'anger': -0.9,
    'desire': 0.5,
    'disgust': -1,
    'relief': 0.6,
    'fear': -0.9,
    'remorse': -0.7,
    'grief': -0.9,
    'embarrassment': -0.4,
    'nervousness': -0.5
}

# Check whether the input is empty
def calculate_sentiment(emotions):
    if len(emotions) != 28:
        return 0.0

    # Create an initial list with all the emotion's scores
    initial_score_list = []
    for n in range(len(emotions)):
        score = emotions[n]['score']
        initial_score_list.append(score)

    # Create an initial list with all the emotion's scores
    # Sum the total initial score
    initial_score_total = sum(initial_score_list)

    # Multiply each percentage of the emotion per the value from the emotion_dictionary_values
    emotion_score_list = []
    for n in range(len(emotions)):
        label = emotions[n]['label']
        score = initial_score_list[n] / initial_score_total
        final_score = emotion_sentiment_map[label] * score
        emotion_score_list.append(final_score)

    # Sum the total value pondered
    return sum(emotion_score_list)

sentiment_score_udf = udf(calculate_sentiment, FloatType())
df_final = df_spark.withColumn("sentiment_score", sentiment_score_udf(df_spark["emotions"]))


In [28]:
df_final = df_final.select("video_id", "like_count", "sentiment_score")
# df_final.write.parquet("../data/final_sentiment_score.parquet")

# SQL Query

In [14]:
# Register the DataFrame as a temporary SQL view
df_final.createOrReplaceTempView("sentiment_data")

In [15]:
# Execute SQL query to apply the same logic as the PySpark code
df_weighted_sentiment_sum = spark.sql("""
    SELECT
        video_id,
        (SUM(like_count * sentiment_score) / SUM(like_count)) AS weighted_sentiment_score
    FROM
        sentiment_data
    GROUP BY
        video_id
""")

In [16]:
df_weighted_sentiment_sum.show(5)

ERROR:root:Exception while sending command.                      (0 + 12) / 100]
Traceback (most recent call last):
  File "/home/ivanseldasp/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ivanseldasp/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/ivanseldasp/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/ivanseldasp/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in sen

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 47186)
Traceback (most recent call last):
  File "/usr/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/ivanseldasp/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/home/ivanseldasp/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
  File "/home/ivanseldasp/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 271, in

Py4JError: An error occurred while calling o99.showString

# Plotting test

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Sample data for a bubble heatmap
data = np.random.choice([0, 1, 2], size=(10, 10))  # Random data with values 0, 1, 2
df = pd.DataFrame(data, columns=[f"Column {i+1}" for i in range(10)],
                  index=[f"Row {i+1}" for i in range(10)])

# Create the bubble heatmap
plt.figure(figsize=(6, 5))

for i in range(df.shape[0]):
    for j in range(df.shape[1]):
        value = df.iloc[i, j]
        if value != 0:
            plt.scatter(j, i, s=value * 300, color='blue' if value == 1 else 'purple', alpha=0.6)

# Customizing the heatmap
plt.xticks(np.arange(df.shape[1]), df.columns, rotation=90)
plt.yticks(np.arange(df.shape[0]), df.index)
plt.gca().invert_yaxis()  # To match the orientation

plt.grid(True, which='both', color='gray', linestyle='--', linewidth=0.5, alpha=0.7)

# Creating a custom legend
legend_elements = [plt.Line2D([0], [0], marker='o', color='w', label='Probably', markersize=10,
                               markerfacecolor='blue', alpha=0.6),
                   plt.Line2D([0], [0], marker='o', color='w', label='Could be', markersize=10,
                               markerfacecolor='purple', alpha=0.6)]

plt.legend(handles=legend_elements, title='Key', loc='upper right')

plt.title('Bubble Heatmap Example', fontsize=14)
plt.tight_layout()
plt.show()

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np

# Sample text data
documents = [
    "Data science is an interdisciplinary field.",
    "Machine learning is a subset of artificial intelligence.",
    "Python is widely used in data science and machine learning.",
    "TF-IDF is a statistical measure used in text mining.",
    "Visualization is key in presenting data analysis."
]

# Generate TF-IDF matrix
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(documents)

# Create a DataFrame from the TF-IDF matrix
tfidf_df = pd.DataFrame(tfidf_matrix.toarray(), columns=vectorizer.get_feature_names_out())

# Define a threshold for low values
low_value_threshold = 0.1

# Creating a bubble map
plt.figure(figsize=(15, 10))

# Iterate over the DataFrame to create the bubble map with crosses for low values
for row_idx in range(tfidf_df.shape[0]):
    for col_idx in range(tfidf_df.shape[1]):
        value = tfidf_df.iloc[row_idx, col_idx]
        if value > 0:
            # Plot bubbles for non-zero values
            plt.scatter(col_idx, row_idx, s=value * 1000, alpha=0.6, color='blue')
            # Plot a cross for low TF-IDF values
            if value < low_value_threshold:
                plt.scatter(col_idx, row_idx, s=100, marker='x', color='red', alpha=0.8, linewidths=2)

# Customizing the bubble map
plt.xticks(np.arange(tfidf_df.shape[1]), tfidf_df.columns, rotation=90)
plt.yticks(np.arange(tfidf_df.shape[0]), [f"Doc {i+1}" for i in range(tfidf_df.shape[0])])
plt.gca().invert_yaxis()  # To match the heatmap orientation
plt.gca().set_aspect('equal', adjustable='box')


plt.grid(True, which='both', color='gray', linestyle='--', linewidth=0.5, alpha=0.7)

# Title and layout adjustments
plt.title('Bubble Map of TF-IDF Matrix with Crosses for Low Values', fontsize=14)
plt.tight_layout()
plt.show();

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns

# Crear los datos de ejemplo similares al gráfico que compartiste
emotions = [
    "Abysmal", "Awful", "Terrible", "Very bad", "Really bad", "Rubbish", "Bad", "Poor",
    "Pretty bad", "Below average", "Mediocre", "Average", "Not bad", "Above average",
    "Pretty good", "Good", "Great", "Really good", "Very good", "Fantastic", "Superb",
    "Brilliant", "Incredible"
]
average_scores = np.linspace(1.2, 8.8, len(emotions))

# Creamos una lista de distribuciones ficticias para cada emoción
np.random.seed(42)
data = {
    'score': np.concatenate([np.random.normal(loc=avg, scale=0.4, size=500) for avg in average_scores]),
    'emotion': np.concatenate([[emotion] * 500 for emotion in emotions])
}

df = pd.DataFrame(data)
df['score'] = df['score'].clip(0, 10)  # Limitar valores de score entre 0 y 10

# Crear el gráfico estilo joyplot manualmente asegurando el desplazamiento vertical y la alineación con la cuadrícula
plt.figure(figsize=(10, 14))
colors = plt.cm.RdYlGn(np.linspace(0, 1, len(emotions)))

# Configuración del desplazamiento para permitir superposición y alineación
offset = 0
overlap_factor = 0.5  # Ajusta este factor para la superposición

# Generar los datos de densidad para cada emoción
for i, emotion in enumerate(reversed(emotions)):
    subset = df[df['emotion'] == emotion]
    density, bins = np.histogram(subset['score'], bins=150, range=(0, 10), density=True)
    bins_center = 0.5 * (bins[1:] + bins[:-1])

    # Dibujar el área sombreada con color y permitir superposición
    plt.fill_between(bins_center, offset, density + offset, color=colors[i], alpha=0.2, step='mid')

    # Dibujar la línea de la curva de densidad sobre el área coloreada
    plt.plot(bins_center, density + offset, color='black', linewidth=0.8)

    # Incrementar el offset para la siguiente línea permitiendo superposición y alineación con la cuadrícula
    offset += max(density) * overlap_factor

# Configuraciones del gráfico
plt.yticks(np.arange(0, offset, offset / len(emotions)), reversed(emotions))  # Asegurar alineación
plt.xticks(np.arange(0, 11, 1))  # Alinear los ticks del eje x con la cuadrícula
plt.xlabel("Emotional Intensity (0 = Very Negative, 10 = Very Positive)")
plt.ylabel("Emotion")
plt.title("Emotional Analysis of YouTube Video Comments", fontsize=14)
plt.xlim(0, 10)

# Mejorar la visualización de la cuadrícula
plt.grid(axis='x', linestyle='--', alpha=0.5)
plt.grid(axis='y', linestyle=':', alpha=0.2)

# Mostrar el gráfico
plt.tight_layout()
plt.show()