In [3]:
sc

In [4]:
spark = SparkSession.builder.appName('deep_learning').getOrCreate()

In [5]:
# Load the dl_data.csv into hadoop in the named folder 'user1'

df = spark.read.csv('/user1/fer2013.csv', header=True, inferSchema=True)

                                                                                

In [6]:
# Display the structure of schema
df.printSchema()

root
 |-- emotion: integer (nullable = true)
 |-- pixels: string (nullable = true)
 |-- Usage: string (nullable = true)



In [7]:
df.show(5)

+-------+--------------------+--------+
|emotion|              pixels|   Usage|
+-------+--------------------+--------+
|      0|70 80 82 72 58 58...|Training|
|      0|151 150 147 155 1...|Training|
|      2|231 212 156 164 1...|Training|
|      4|24 32 36 30 32 23...|Training|
|      6|4 0 0 0 0 0 0 0 0...|Training|
+-------+--------------------+--------+
only showing top 5 rows



In [8]:
# print of data shape
print('Shape of dataset:',(df.count(),len(df.columns)))

Shape of dataset: (35887, 3)


In [9]:
df.describe().show()



+-------+------------------+--------------------+-----------+
|summary|           emotion|              pixels|      Usage|
+-------+------------------+--------------------+-----------+
|  count|             35887|               35887|      35887|
|   mean|3.3232646919497313|                null|       null|
| stddev|1.8738187592999593|                null|       null|
|    min|                 0|0 0 0 0 0 0 0 0 0...|PrivateTest|
|    max|                 6|99 99 99 99 101 1...|   Training|
+-------+------------------+--------------------+-----------+



                                                                                

In [10]:
df.columns

['emotion', 'pixels', 'Usage']

In [11]:
uniqe_usages = df.select("Usage").distinct()
uniqe_usages.show()

+-----------+
|      Usage|
+-----------+
|   Training|
| PublicTest|
|PrivateTest|
+-----------+



[Stage 9:>                                                          (0 + 2) / 3]                                                                                

In [12]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define mapping of emotions to labels
emotion_mapping = {
    0: "Angry",
    1: "Disgust",
    2: "Fear",
    3: "Happy",
    4: "Sad",
    5: "Surprise",
    6: "Neutral"
}

# Create 'label' column based on 'emotion' column
emotion_to_label_udf = udf(lambda emotion: emotion_mapping.get(emotion, "Unknown"), StringType())

df = df.withColumn("label", emotion_to_label_udf(df["emotion"]))

In [13]:
from pyspark.sql import functions as F
df = df.withColumn("pixels", F.split(df["pixels"], " ").cast("array<int>").alias("pixels"))

In [14]:
df.printSchema()

root
 |-- emotion: integer (nullable = true)
 |-- pixels: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- Usage: string (nullable = true)
 |-- label: string (nullable = true)



In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

@udf(ArrayType(FloatType()))
def normalize_pixels(pixels):
    return [float(pixel) / 255.0 for pixel in pixels]

df = df.withColumn("pixels", normalize_pixels(df["pixels"]))

In [16]:
df.show()

+-------+--------------------+--------+--------+
|emotion|              pixels|   Usage|   label|
+-------+--------------------+--------+--------+
|      0|[0.27450982, 0.31...|Training|   Angry|
|      0|[0.5921569, 0.588...|Training|   Angry|
|      2|[0.90588236, 0.83...|Training|    Fear|
|      4|[0.09411765, 0.12...|Training|     Sad|
|      6|[0.015686275, 0.0...|Training| Neutral|
|      2|[0.21568628, 0.21...|Training|    Fear|
|      4|[0.078431375, 0.0...|Training|     Sad|
|      3|[0.3019608, 0.305...|Training|   Happy|
|      3|[0.33333334, 0.32...|Training|   Happy|
|      2|[1.0, 0.99607843,...|Training|    Fear|
|      0|[0.11764706, 0.09...|Training|   Angry|
|      6|[0.15294118, 0.29...|Training| Neutral|
|      6|[0.85882354, 0.83...|Training| Neutral|
|      6|[0.5803922, 0.564...|Training| Neutral|
|      3|[0.015686275, 0.0...|Training|   Happy|
|      5|[0.41960785, 0.41...|Training|Surprise|
|      3|[0.05490196, 0.05...|Training|   Happy|
|      2|[1.0, 1.0, 

In [17]:
sample_pixels = df.select("pixels").first()["pixels"]
print(len(sample_pixels))

2304


Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [18]:
df.show(5)

+-------+--------------------+--------+-------+
|emotion|              pixels|   Usage|  label|
+-------+--------------------+--------+-------+
|      0|[0.27450982, 0.31...|Training|  Angry|
|      0|[0.5921569, 0.588...|Training|  Angry|
|      2|[0.90588236, 0.83...|Training|   Fear|
|      4|[0.09411765, 0.12...|Training|    Sad|
|      6|[0.015686275, 0.0...|Training|Neutral|
+-------+--------------------+--------+-------+
only showing top 5 rows



In [19]:
train_df = df.filter(df["Usage"] == "Training")
test_df = df.filter(df["Usage"] == "PrivateTest")
val_df = df.filter(df["Usage"] == "PublicTest")

In [20]:
print('Shape of train_df:',(train_df.count(),len(train_df.columns)))
print('Shape of test_df:',(test_df.count(),len(test_df.columns)))
print('Shape of val_df:',(val_df.count(),len(val_df.columns)))

Shape of train_df: (28709, 4)
Shape of test_df: (3589, 4)
Shape of val_df: (3589, 4)


In [21]:
import numpy as np
X_train = np.array([row["pixels"] for row in train_df.collect()])
y_train = np.array([row["emotion"] for row in train_df.collect()])


                                                                                

In [22]:

X_test = np.array([row["pixels"] for row in test_df.collect()])
y_test = np.array([row["emotion"] for row in test_df.collect()])


                                                                                

In [23]:

X_val = np.array([row["pixels"] for row in val_df.collect()])
y_val = np.array([row["emotion"] for row in val_df.collect()])

                                                                                

In [29]:
X_train = X_train.reshape(X_train.shape[0], 48, 48, 1)
X_test = X_test.reshape(X_test.shape[0], 48, 48, 1)
X_val = X_val.reshape(X_val.shape[0], 48, 48, 1)

In [31]:
import tensorflow as tf

model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, (3,3), activation='relu', input_shape=(48,48,1)),
    tf.keras.layers.MaxPooling2D((2,2)),
    tf.keras.layers.Conv2D(64, (3,3), activation='relu'),
    tf.keras.layers.MaxPooling2D((2,2)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(7, activation='softmax')
])


model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=10)


test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test accuracy: {test_accuracy}")

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Test accuracy: 0.5380328893661499
