# **Unlock the Potential of PySpark DataFrames: Hands-on Tips and Personalizations**

`University of East London, Docklands Campus, 2023-24`

`module name`: **`Machine Learning on Big Data (CN7030) - MSc AI&DS`**

`Author`: **`Dr Amin Karami (PG Academic Lead in CDT School)`**

`E`: **`a.karami@uel.ac.uk`**

`W`: **`http://www.aminkarami.com/`**

---

**DataFrame (DF)**: Schema (named columns) + declarative language. A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database. It is very efficient for strucutred data.

data: https://drive.google.com/file/d/1HiP_TkWYClAmzhhOFzhXdOXTfZb9DoB_/view?usp=drive_link (641MB)

source: https://spark.apache.org/docs/latest/sql-programming-guide.html

source: https://spark.apache.org/docs/latest/api/python/reference/

# **Section 1: Initialize PySpark**

In [3]:
# !pip3 install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=032d433e72e1091fff73b69241a37d2a37cde8cc8f43794763fbe43766c0ea3e
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# **System Check**

In [4]:
# !free -h

               total        used        free      shared  buff/cache   available
Mem:            12Gi       613Mi       4.9Gi       1.0Mi       7.1Gi        11Gi
Swap:             0B          0B          0B


# **Linking with Spark**

In [6]:
# Linking with Spark
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .appName("Tutorial2_CN7030") \
                    .master("local[*]") \
                    .config("spark.executor.memory", "3g") \
                    .config("spark.driver.memory", "2g") \
                    .config("spark.executor.cores", "2") \
                    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
                    .getOrCreate()

spark

# **Connect to the Google Drive**

In [7]:
# from google.colab import drive
# drive.mount('/content/drive')

Mounted at /content/drive


# **Section 2: Create PySpark DataFrame from CSV**

In [8]:
from os import truncate
df = spark.read.format('csv').load('data.csv', header = True, inferSchema = True)

df.show(truncate = True)

df.printSchema()

print(df.count())
print(len(df.columns))

+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|                  id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|
+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|  0.8017532427858894| 79.71301351894658|     4| 37.10369234532471|   47|    62|     5|Female|       B|     8|      85|
|  0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|    70|      85|
|  0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|    66|      53|
|  0.2073428376111074| 45.09378549789149|    32|              NULL|   59|    12|    21|  Male|       B|    56|       2|
|  0.6392921379278927|30.357970527906065|    13|              NULL|   22|    69|    58|Female|       A|     0|      74|
|  0.8505582285081454|              NULL

# How many partitions?

In [9]:
df.rdd.getNumPartitions()

6

# Let's increase the number of partitions

In [10]:
df2 = df.repartition(8)
df2.rdd.getNumPartitions()

8

In [11]:
df3 = df.repartition('category')
df3.rdd.getNumPartitions()

2

In [12]:
df4 = df.repartition(6,'category')
df4.rdd.getNumPartitions()

6

In [13]:
df5 = df.repartitionByRange(10,'income')
df5.rdd.getNumPartitions()

10

# Write the DF to disk in a partitioned manner

In [14]:
df5.write.option('header', True) \
    .partitionBy('gender') \
    .mode('overwrite')\
    .csv('gender_df5')

In [15]:
df4.write.option('header', True) \
    .partitionBy('gender') \
    .mode('overwrite')\
    .csv('gender_df4')

In [16]:
df5.repartition(3).write.option('header', True) \
    .partitionBy('gender') \
    .mode('overwrite')\
    .csv('gender_df5_repartitioned')

In [None]:
df4.write.option('header', True) \
    .partitionBy('gender') \
    .mode('overwrite')\
    .csv('gender_df4')

# Set record limitations for files
# spark.conf.set("spark.sql.files.maxRecordsPerFile", "500000")

In [17]:
#  Partition based on coalesce

df5.coalesce(6).write.option('header', True) \
    .partitionBy('gender','category') \
    .mode('overwrite')\
    .csv('gender_df5_two_partitions')

# **Section 3: DataFrame Operations and Transformations**

In [20]:
#  SELECT

df4.select("age","salary","gender").show(5)

+------------------+------+------+
|               age|salary|gender|
+------------------+------+------+
| 24.22600602366628|     6|  Male|
| 70.97364852287149|    17|  Male|
|30.357970527906065|    13|Female|
| 57.94047153077633|    37|Female|
|              NULL|     0|Female|
+------------------+------+------+
only showing top 5 rows



In [21]:
#  FILTER

df4.filter((df4.age > 30) & (df4.salary > 50)).show(5)

+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|                  id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|
+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|0.054599328832547256|58.445521939730064|    84|29.619007900937078|   94|    72|    74|  Male|       A|    78|      87|
|  0.6455441593808561| 68.47061987702949|    75|              NULL|   62|    31|    44|Female|       A|     2|      42|
|  0.8316027808351703|   64.751100030111|    96|              NULL|   13|    24|    30|Female|       A|     5|      19|
| 0.28862497348283567| 34.91854369238886|    76|              NULL|   73|    43|    48|  Male|       A|    83|      39|
|  0.7408143015309203| 52.83048441274324|    95|46.769693535768866|   59|     4|    10|Female|       A|    19|      38|
+--------------------+------------------

In [28]:
from pyspark.sql.functions import mean, sum, round
# AVERAGE
df4.groupBy("category").agg(
    round(mean('salary'),2).alias("mean_salary"),
    sum('salary').alias("sum_salary")
    ).show(10)

+--------+-----------+----------+
|category|mean_salary|sum_salary|
+--------+-----------+----------+
|       A|      49.49| 247412615|
|       B|      49.51| 247582007|
+--------+-----------+----------+



In [29]:
#  SORT
df4.sort("height", "score", ascending=False).show(10)

+--------------------+------------------+------+-----------------+-----+------+------+------+--------+------+--------+
|                  id|               age|salary|            score|sales|height|weight|gender|category|income|expenses|
+--------------------+------------------+------+-----------------+-----+------+------+------+--------+------+--------+
| 0.16933902279650814|              NULL|    37|99.99842061516966|   63|    99|    78|Female|       B|    60|      36|
|  0.4414179206080756| 47.31491599580799|    17|99.99629934376922|   96|    99|    53|Female|       A|    13|      63|
|  0.6365465779061935| 75.97020871674081|    59|99.99605629873773|   56|    99|    82|  Male|       A|    36|      64|
|   0.137572774067943| 46.72427635576165|    16|99.99478454051689|   14|    99|    68|Female|       B|    20|      47|
|  0.7188334449042442|              NULL|    24|99.99452559017527|    5|    99|    66|Female|       B|    73|      92|
|  0.6186372186898206| 68.40377455302632|     9|

In [31]:
# RENAME COLUMNS

renamed_df = df4.withColumnRenamed("income", "annual_income")\
                .withColumnRenamed("score","annual_score")

renamed_df.show(10)

+-------------------+------------------+------+------------------+-----+------+------+------+--------+-------------+--------+
|                 id|               age|salary|      annual_score|sales|height|weight|gender|category|annual_income|expenses|
+-------------------+------------------+------+------------------+-----+------+------+------+--------+-------------+--------+
| 0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|           70|      85|
| 0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|           66|      53|
| 0.6392921379278927|30.357970527906065|    13|              NULL|   22|    69|    58|Female|       A|            0|      74|
| 0.7555506990689408| 57.94047153077633|    37| 99.05156945373632|   18|    67|    94|Female|       A|           91|      89|
|0.34380469538701885|              NULL|     0| 68.12994102604317|   50|    66|    29|Female|       A|           39|  

In [33]:
# DROP COLUMN
reduce_df = df4.drop("id","height")
reduce_df.show(10)

+------------------+------+------------------+-----+------+------+--------+------+--------+
|               age|salary|             score|sales|weight|gender|category|income|expenses|
+------------------+------+------------------+-----+------+------+--------+------+--------+
| 24.22600602366628|     6| 79.75769828959885|    8|    42|  Male|       A|    70|      85|
| 70.97364852287149|    17| 71.71375356646551|   82|    72|  Male|       A|    66|      53|
|30.357970527906065|    13|              NULL|   22|    58|Female|       A|     0|      74|
| 57.94047153077633|    37| 99.05156945373632|   18|    94|Female|       A|    91|      89|
|              NULL|     0| 68.12994102604317|   50|    29|Female|       A|    39|      10|
| 78.00738662272417|    44|57.377461450339794|   32|    68|  Male|       A|     3|      35|
| 5.739474360847097|    60|              NULL|   69|    59|Female|       A|    89|      94|
|29.232534249025754|    33| 72.17303870051973|   19|     5|Female|       A|     

# Section 4 : Working with Missing Data

In [34]:
from pyspark.sql.functions import col

# loop over each columns and see for null values (one liner for loop)
missing_valuses = df4.select([sum(col(c).isNull().cast("int")).alias(c) for c in df4.columns])

missing_valuses.show()

+---+-------+------+-------+-----+------+------+------+--------+------+--------+
| id|    age|salary|  score|sales|height|weight|gender|category|income|expenses|
+---+-------+------+-------+-----+------+------+------+--------+------+--------+
|  0|4000499|     0|4001316|    0|     0|     0|     0|       0|     0|       0|
+---+-------+------+-------+-----+------+------+------+--------+------+--------+



In [35]:
# A logical copy that does not affect df4
df_copy = df4.select("*")

# Drop null values
df_cleaned = df_copy.dropna()
df_cleaned.show(30)
df_cleaned.count()

+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|                  id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|
+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|  0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|    70|      85|
|  0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|    66|      53|
|  0.7555506990689408| 57.94047153077633|    37| 99.05156945373632|   18|    67|    94|Female|       A|    91|      89|
| 0.07531261247891552| 78.00738662272417|    44|57.377461450339794|   32|    83|    68|  Male|       A|     3|      35|
|   0.635481528615935|29.232534249025754|    33| 72.17303870051973|   19|    42|     5|Female|       A|     1|      11|
|  0.6088087252200041| 46.04202629270567

In [39]:
#  Again check if there's any null values in the record using df_cleaned

# loop over each columns and see for null values (one liner for loop)
missing_valuses = df_cleaned.select([sum(col(c).isNull().cast("int")).alias(c) for c in df4.columns])

missing_valuses.show()

+---+---+------+-----+-----+------+------+------+--------+------+--------+
| id|age|salary|score|sales|height|weight|gender|category|income|expenses|
+---+---+------+-----+-----+------+------+------+--------+------+--------+
|  0|  0|     0|    0|    0|     0|     0|     0|       0|     0|       0|
+---+---+------+-----+-----+------+------+------+--------+------+--------+



In [40]:
#  Drop selected columns

columns_to_check = ["age","salary"]
# A logical copy that does not affect df4
df_copy = df4.select("*")

# Drop null values of columns_to_check
df_cleaned = df_copy.dropna(subset=columns_to_check) # subset
df_cleaned.show(30)
df_cleaned.count()

+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|                  id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|
+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|  0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|    70|      85|
|  0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|    66|      53|
|  0.6392921379278927|30.357970527906065|    13|              NULL|   22|    69|    58|Female|       A|     0|      74|
|  0.7555506990689408| 57.94047153077633|    37| 99.05156945373632|   18|    67|    94|Female|       A|    91|      89|
| 0.07531261247891552| 78.00738662272417|    44|57.377461450339794|   32|    83|    68|  Male|       A|     3|      35|
|  0.6520025939987977| 5.739474360847097

5999501

In [41]:
# Fill missing values with a constant value

df_copy = df4.select("*")
df_filled = df_copy.fillna({"age":30,"score":50})
df_filled.show(40)

+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|                  id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|
+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+
|  0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|    70|      85|
|  0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|    66|      53|
|  0.6392921379278927|30.357970527906065|    13|              50.0|   22|    69|    58|Female|       A|     0|      74|
|  0.7555506990689408| 57.94047153077633|    37| 99.05156945373632|   18|    67|    94|Female|       A|    91|      89|
| 0.34380469538701885|              30.0|     0| 68.12994102604317|   50|    66|    29|Female|       A|    39|      10|
| 0.07531261247891552| 78.00738662272417

In [43]:
#  fill missing value with stat (mean, median, mode, constant)

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=["age","score"],
                  outputCols=["age_imputed","score_imputed"],
                  strategy='mean')

df_imputed = imputer.fit(df4).transform(df4)
df_imputed.show(40)

+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+------------------+------------------+
|                  id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|       age_imputed|     score_imputed|
+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+------------------+------------------+
|  0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|    70|      85| 24.22600602366628| 79.75769828959885|
|  0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|    66|      53| 70.97364852287149| 71.71375356646551|
|  0.6392921379278927|30.357970527906065|    13|              NULL|   22|    69|    58|Female|       A|     0|      74|30.357970527906065| 49.99490561227385|
|  0.7555506990689408| 57.94047153077633|    37| 99.

In [44]:
from pyspark.sql.functions import format_number, col

df_imputed = df_imputed.withColumn("age_imputed", format_number(col("age_imputed"),0))
df_imputed = df_imputed.withColumn("score_imputed", format_number(col("score_imputed"),3))

df_imputed.show(30)

+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+-----------+-------------+
|                  id|               age|salary|             score|sales|height|weight|gender|category|income|expenses|age_imputed|score_imputed|
+--------------------+------------------+------+------------------+-----+------+------+------+--------+------+--------+-----------+-------------+
|  0.6565552949992319| 24.22600602366628|     6| 79.75769828959885|    8|    30|    42|  Male|       A|    70|      85|         24|       79.758|
|  0.2515595782593636| 70.97364852287149|    17| 71.71375356646551|   82|     1|    72|  Male|       A|    66|      53|         71|       71.714|
|  0.6392921379278927|30.357970527906065|    13|              NULL|   22|    69|    58|Female|       A|     0|      74|         30|       49.995|
|  0.7555506990689408| 57.94047153077633|    37| 99.05156945373632|   18|    67|    94|Female|       A|    91|      89|     

In [52]:
#  Advanced: replace missing values with custom logic using UDF (User Defined Functions)

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def custom_fill(value):
  if value is None:
    return "unknown"
  else:
    return value

custom_fill_udf = udf(custom_fill, StringType()) # StringType is the return

df_filled = df4.withColumn("score_filled", custom_fill_udf(df4.score))

df_filled.show(5)

+-------------------+------------------+------+-----------------+-----+------+------+------+--------+------+--------+-----------------+
|                 id|               age|salary|            score|sales|height|weight|gender|category|income|expenses|     score_filled|
+-------------------+------------------+------+-----------------+-----+------+------+------+--------+------+--------+-----------------+
| 0.6565552949992319| 24.22600602366628|     6|79.75769828959885|    8|    30|    42|  Male|       A|    70|      85|79.75769828959885|
| 0.2515595782593636| 70.97364852287149|    17|71.71375356646551|   82|     1|    72|  Male|       A|    66|      53|71.71375356646551|
| 0.6392921379278927|30.357970527906065|    13|             NULL|   22|    69|    58|Female|       A|     0|      74|          unknown|
| 0.7555506990689408| 57.94047153077633|    37|99.05156945373632|   18|    67|    94|Female|       A|    91|      89|99.05156945373632|
|0.34380469538701885|              NULL|     0|6