In [21]:
# General Imports
import glob
import shutil

# Setup Spark Session
from pyspark.sql import SparkSession

# Get Spark Functions Needed
from pyspark.sql.functions import col, udf, split, explode, lit

# Get Datatypes needed for DataFrame manipulation
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, StringType

# Setup Spark Session
sc = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("data_clean_up") \
        .getOrCreate()

# Print Spark Version being run
print("Spark V: ", sc.version)

Spark V:  3.3.2


In [22]:
# *********************************
# *** English Data Preparations ***
# *********************************

# Load English CSV File into a Dataframe
df_english = sc.read.csv("data/english.csv", header=True, inferSchema=True)
print(f'Count (raw): {df_english.count()}')

# Print Columns
print(f'English Source Columns: {df_english.columns}\n')

# Filter only lavbels that we mathc with the German counter parts
filter_values = ["love", "hate", "neutral", "anger", "happiness", "surprise", "sadness", "worry", "enthusiasm"]
df_english_filtered = df_english.filter(col("sentiment").isin(filter_values))

# Remove unnecessary column
df_english_filtered = df_english_filtered.drop("tweet_id")

# Rename Columns
df_english_filtered = df_english_filtered.withColumnRenamed("sentiment", "emotion_en")
df_english_filtered = df_english_filtered.withColumnRenamed("content", "sentence_en")

# Make sure the column order to the same for both german and english csv files
df_english_filtered = df_english_filtered.select('sentence_en', 'emotion_en')

print(f'Count (filtered): {df_english_filtered.count()}')

# Group By for Details & Count
df_english_grouped = df_english_filtered.groupBy('emotion_en').count()

# Show Groupings and Respetive Counts
print('\nGrouped & Count by "emotion"')
df_english_grouped.show(truncate=0)


# Save Dataframe to CSV
directory_path = 'data/spark_data_parts'
df_english_filtered.coalesce(1).write.csv(directory_path, header=True, mode="overwrite")

file_pattern = 'part-00000*.csv'
file_path = glob.glob(directory_path + '/' + file_pattern)[0]

shutil.move(file_path, './data/data_en.csv')

Count (raw): 40000
English Source Columns: ['tweet_id', 'sentiment', 'content']

Count (filtered): 35692

Grouped & Count by "emotion"
+----------+-----+
|emotion_en|count|
+----------+-----+
|love      |3842 |
|hate      |1323 |
|neutral   |8638 |
|anger     |110  |
|happiness |5209 |
|surprise  |2187 |
|sadness   |5165 |
|worry     |8459 |
|enthusiasm|759  |
+----------+-----+



'./data/data_en.csv'

In [23]:
# ********************************
# *** German Data Preparations ***
# ********************************

# Load German JSON File into a Dataframe
df_german = sc.read.json("data/german.json")
print(f'Count (raw): {df_german.count()}')

# Print Columns
print(f'German Source Columns: {df_german.columns}\n')


# We Need to "split" the "artice_emotion" column since the ETH team listed
# multiple emotions in one column
# In order to "explode" the column to it's distinct rows
df_german_exploded = df_german.select('*', explode('article_emotion').alias('emotion_de'))

# Rename Column
df_german_exploded = df_german_exploded.withColumnRenamed("title", "sentence_de")

# Remove unnecessary column
df_german_exploded = df_german_exploded.drop("article_id")
df_german_exploded = df_german_exploded.drop("article_stance")
df_german_exploded = df_german_exploded.drop("paragraphs")
df_german_exploded = df_german_exploded.drop("source")
df_german_exploded = df_german_exploded.drop("article_emotion")
df_german_exploded = df_german_exploded.drop("snippet")

# Make sure the column order to the same for both german and english csv files
df_german_exploded = df_german_exploded.select('sentence_de', 'emotion_de')

print(f'Count (filtered): {df_german_exploded.count()}')

# Group By for Details & Count
df_german_grouped = df_german_exploded.groupBy('emotion_de').count()

# Show Groupings and Respetive Counts
print('\nGrouped & Count by "emotion"')
df_german_grouped.show(truncate=0)


# Save Dataframe to CSV
directory_path = 'data/spark_data_parts'
df_german_exploded.coalesce(1).write.csv(directory_path, header=True, mode="overwrite")

file_pattern = 'part-00000*.csv'
file_path = glob.glob(directory_path + '/' + file_pattern)[0]

shutil.move(file_path, './data/data_de.csv')


Count (raw): 1970
German Source Columns: ['article_emotion', 'article_id', 'article_stance', 'paragraphs', 'snippet', 'source', 'title']

Count (filtered): 2568

Grouped & Count by "emotion"
+------------+-----+
|emotion_de  |count|
+------------+-----+
|Vertrauen   |316  |
|Freude      |140  |
|Ärger       |226  |
|Überraschung|369  |
|Traurigkeit |184  |
|Antizipation|774  |
|Unklar      |314  |
|Angst       |154  |
|Ekel        |29   |
|Keine       |62   |
+------------+-----+



'./data/data_de.csv'

In [26]:
# **************************************
# *** English <-> German Emotion Key ***
# **************************************


from pyspark.sql.functions import col, udf, split, explode, lit
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, StringType

# Emotion Dictionary English <-> German
# This key/value setup was previsoulsy created for linking purposes
emotion_key = {
    "boredom"    : "---",
    "love"       : "Vertrauen",
    "relief"     : "---",
    "fun"        : "---",
    "hate"       : "Ekel",
    "neutral"    : "Unklar",
    "anger"      : "Ärger",
    "happiness"  : "Freude",
    "surprise"   : "Überraschung", 
    "sadness"    : "Traurigkeit", 
    "worry"      : "Angst",
    "enthusiasm" : "Antizipation", 
    "empty "     : "---",
    "---"        : "Keine",
}

# Create the schema for the DataFrame
schema = StructType([
    StructField("emotion_en", StringType(), nullable=False),
    StructField("emotion_de", StringType(), nullable=False)
])

# Convert the dictionary to a list of tuples
data = [(key, value) for key, value in emotion_key.items()]

# Create the PySaprk DataFrame
df_emotion_key = sc.createDataFrame(data, schema)

df_emotion_key.show()

# Extend the English and German onto the datasets
df_german_exploded = df_german_exploded.join(df_emotion_key, on="emotion_de", how="left")
df_english_filtered = df_english_filtered.join(df_emotion_key, on="emotion_en", how="left")

# New sentence column
df_german_exploded = df_german_exploded.withColumn("sentence_en", lit(None))
df_english_filtered = df_english_filtered.withColumn("sentence_de", lit(None))

df_german_exploded.show()
df_english_filtered.show()


# Using Deep Translator to proxy to Google Translate
from deep_translator import GoogleTranslator


# Define Tranlation Functions
def translate_en_to_de(row):
    translated = GoogleTranslator(source='en', target='de').translate(row["sentence_en"])
    row["sentence_de"] = translated

def translate_de_to_en(row):
    translated = GoogleTranslator(source='de', target='en').translate(row["sentence_de"])
    row["sentence_en"] = translated
    

df_german_exploded.foreach(translate_de_to_en)  


+----------+------------+
|emotion_en|  emotion_de|
+----------+------------+
|   boredom|         ---|
|      love|   Vertrauen|
|    relief|         ---|
|       fun|         ---|
|      hate|        Ekel|
|   neutral|      Unklar|
|     anger|       Ärger|
| happiness|      Freude|
|  surprise|Überraschung|
|   sadness| Traurigkeit|
|     worry|       Angst|
|enthusiasm|Antizipation|
|    empty |         ---|
|       ---|       Keine|
+----------+------------+

+------------+--------------------+----------+----------+-----------+----------+
|  emotion_de|         sentence_de|emotion_en|emotion_en|sentence_en|emotion_en|
+------------+--------------------+----------+----------+-----------+----------+
|   Vertrauen|Adoptiert zu sein...|      love|      love|       null|      love|
|      Freude|Österreichs Verfa...| happiness| happiness|       null| happiness|
|      Freude|Visana gewinnt er...| happiness| happiness|       null| happiness|
|       Ärger|Masern: Erste Kin...|     anger

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 260.0 failed 1 times, most recent failure: Lost task 0.0 in stage 260.0 (TID 659) (richards-mbp.localdomain executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/pyspark/rdd.py", line 540, in func
    return f(iterator)
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/pyspark/rdd.py", line 1160, in processPartition
    f(x)
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/var/folders/8c/5_2vy2pn72x609w4xvq0yjzc0000gn/T/ipykernel_11323/2316904776.py", line 64, in translate_de_to_en
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/deep_translator/google.py", line 57, in translate
    if is_input_valid(text, max_chars=5000):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/deep_translator/validate.py", line 39, in is_input_valid
    raise NotValidPayload(text)
deep_translator.exceptions.NotValidPayload: None --> text must be a valid text with maximum 5000 character,otherwise it cannot be translated

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2278)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1589)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/pyspark/rdd.py", line 540, in func
    return f(iterator)
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/pyspark/rdd.py", line 1160, in processPartition
    f(x)
  File "/opt/homebrew/Cellar/apache-spark/3.3.2/libexec/python/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/var/folders/8c/5_2vy2pn72x609w4xvq0yjzc0000gn/T/ipykernel_11323/2316904776.py", line 64, in translate_de_to_en
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/deep_translator/google.py", line 57, in translate
    if is_input_valid(text, max_chars=5000):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/deep_translator/validate.py", line 39, in is_input_valid
    raise NotValidPayload(text)
deep_translator.exceptions.NotValidPayload: None --> text must be a valid text with maximum 5000 character,otherwise it cannot be translated

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2278)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
