In [16]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkApp-DF-Tranformations") \
    .getOrCreate()

In [19]:
spark

In [41]:
df = spark.read.csv("resources/in/employee/employee_data_1.csv", header=True, inferSchema=True)

In [42]:
df.show(5)

+---+------------+-----------+------+
| ID|        Name| Department|Salary|
+---+------------+-----------+------+
|  1|    John Doe|Engineering| 50000|
|  2|  Jane Smith|  Marketing| 45000|
|  3|   Jim Brown|      Sales| 40000|
|  4|Jackie White|         HR| 42000|
|  5| Emily Davis|Engineering| 60000|
+---+------------+-----------+------+
only showing top 5 rows



In [43]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)



### select()

In [49]:
df.select("*").show(3)

+---+----------+-----------+------+
| ID|      Name| Department|Salary|
+---+----------+-----------+------+
|  1|  John Doe|Engineering| 50000|
|  2|Jane Smith|  Marketing| 45000|
|  3| Jim Brown|      Sales| 40000|
+---+----------+-----------+------+
only showing top 3 rows



In [45]:
df.select(df.Name).show(3)

+----------+
|      Name|
+----------+
|  John Doe|
|Jane Smith|
| Jim Brown|
+----------+
only showing top 3 rows



In [50]:
#df.select(df.Name, df["Department"]).show(3)
df.select([df.Name, df["Department"]]).show(3)

+----------+-----------+
|      Name| Department|
+----------+-----------+
|  John Doe|Engineering|
|Jane Smith|  Marketing|
| Jim Brown|      Sales|
+----------+-----------+
only showing top 3 rows



### withColumn()

Apply transformations to a column of dataframe. It return new dataframe.

1. Convert the datatypes of columns
2. Create new columns or replace the existing columns
3. Transform entire columns with values
4. Concate the columns etc.

In [60]:
df = df.withColumn(colName="Salary", col=df.Salary.cast("float"))

In [61]:
df.show(5)

+---+------------+-----------+-------+
| ID|        Name| Department| Salary|
+---+------------+-----------+-------+
|  1|    John Doe|Engineering|50000.0|
|  2|  Jane Smith|  Marketing|45000.0|
|  3|   Jim Brown|      Sales|40000.0|
|  4|Jackie White|         HR|42000.0|
|  5| Emily Davis|Engineering|60000.0|
+---+------------+-----------+-------+
only showing top 5 rows



In [62]:
df = df.withColumn(colName="Bonus_Salary", col=df.Salary * 0.25)

In [63]:
df.show(5)

+---+------------+-----------+-------+------------+
| ID|        Name| Department| Salary|Bonus_Salary|
+---+------------+-----------+-------+------------+
|  1|    John Doe|Engineering|50000.0|     12500.0|
|  2|  Jane Smith|  Marketing|45000.0|     11250.0|
|  3|   Jim Brown|      Sales|40000.0|     10000.0|
|  4|Jackie White|         HR|42000.0|     10500.0|
|  5| Emily Davis|Engineering|60000.0|     15000.0|
+---+------------+-----------+-------+------------+
only showing top 5 rows



In [64]:
df = df.withColumn(colName="Total_Salary", col=df.Salary + df.Bonus_Salary)

In [65]:
df.show(5)

+---+------------+-----------+-------+------------+------------+
| ID|        Name| Department| Salary|Bonus_Salary|Total_Salary|
+---+------------+-----------+-------+------------+------------+
|  1|    John Doe|Engineering|50000.0|     12500.0|     62500.0|
|  2|  Jane Smith|  Marketing|45000.0|     11250.0|     56250.0|
|  3|   Jim Brown|      Sales|40000.0|     10000.0|     50000.0|
|  4|Jackie White|         HR|42000.0|     10500.0|     52500.0|
|  5| Emily Davis|Engineering|60000.0|     15000.0|     75000.0|
+---+------------+-----------+-------+------------+------------+
only showing top 5 rows



### withColumnRenamed()

Rename the column names

In [66]:
df = df.withColumnRenamed(existing="Name", new="Emp_Name")

In [67]:
df.show(3)

+---+----------+-----------+-------+------------+------------+
| ID|  Emp_Name| Department| Salary|Bonus_Salary|Total_Salary|
+---+----------+-----------+-------+------------+------------+
|  1|  John Doe|Engineering|50000.0|     12500.0|     62500.0|
|  2|Jane Smith|  Marketing|45000.0|     11250.0|     56250.0|
|  3| Jim Brown|      Sales|40000.0|     10000.0|     50000.0|
+---+----------+-----------+-------+------------+------------+
only showing top 3 rows



### drop()
Delete or remove the column

In [68]:
df = df.drop("Bonus_Salary")

In [69]:
df.show(3)

+---+----------+-----------+-------+------------+
| ID|  Emp_Name| Department| Salary|Total_Salary|
+---+----------+-----------+-------+------------+
|  1|  John Doe|Engineering|50000.0|     62500.0|
|  2|Jane Smith|  Marketing|45000.0|     56250.0|
|  3| Jim Brown|      Sales|40000.0|     50000.0|
+---+----------+-----------+-------+------------+
only showing top 3 rows



### Array Functions
1. explode()
2. split()
3. array()
4. array_contains()

In [70]:
data = [(100, ["PC", "Monitor", "Keyboard"]), (101, ["Laptop", "Speaker"]), (102, ["Mouse", "Adapter"]), (103, ["Headphone"])]

In [71]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

schema = StructType([
    StructField(name="order_id", dataType=IntegerType(), nullable=False),
    StructField(name="items", dataType=ArrayType(elementType=StringType()), nullable=False)
])

df = spark.createDataFrame(data=data, schema=schema)

df.printSchema()

root
 |-- order_id: integer (nullable = false)
 |-- items: array (nullable = false)
 |    |-- element: string (containsNull = true)



In [74]:
df.show(truncate=False)

+--------+-----------------------+
|order_id|items                  |
+--------+-----------------------+
|100     |[PC, Monitor, Keyboard]|
|101     |[Laptop, Speaker]      |
|102     |[Mouse, Adapter]       |
|103     |[Headphone]            |
+--------+-----------------------+



#### explode

In [75]:
from pyspark.sql.functions import explode

df = df.withColumn("item", explode(df.items))
df = df.drop("items")

df.show()

+--------+---------+
|order_id|     item|
+--------+---------+
|     100|       PC|
|     100|  Monitor|
|     100| Keyboard|
|     101|   Laptop|
|     101|  Speaker|
|     102|    Mouse|
|     102|  Adapter|
|     103|Headphone|
+--------+---------+



In [78]:
data = [(100, "PC,Monitor,Keyboard"), (101, "Laptop,Speaker"), (102, "Mouse,Adapter"), (103, "Headphone")]
df = spark.createDataFrame(data=data, schema=["order_id", "items"])

df.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- items: string (nullable = true)



In [None]:
df.show()

+--------+-------------------+
|order_id|              items|
+--------+-------------------+
|     100|PC,Monitor,Keyboard|
|     101|     Laptop,Speaker|
|     102|      Mouse,Adapter|
|     103|          Headphone|
+--------+-------------------+



#### split

In [81]:
from pyspark.sql.functions import split

df = df.withColumn("items", split(df.items, ","))

In [83]:
df.show(truncate=False)

+--------+-----------------------+
|order_id|items                  |
+--------+-----------------------+
|100     |[PC, Monitor, Keyboard]|
|101     |[Laptop, Speaker]      |
|102     |[Mouse, Adapter]       |
|103     |[Headphone]            |
+--------+-----------------------+



In [85]:
data = [(100, ["US", "UK", "AUS"]), (101, ["US", "UK"]), (102, ["IND", "UK"]), (103, ["IND"])]

df = spark.createDataFrame(data=data, schema=["id", "country_codes"])

df.printSchema()

root
 |-- id: long (nullable = true)
 |-- country_codes: array (nullable = true)
 |    |-- element: string (containsNull = true)



#### array_contains

In [None]:
from pyspark.sql.functions import array_contains, col

df = df.withColumn("isLocal", array_contains(col("country_codes"), "US"))
#df = df.withColumn("isLocal", array_contains(df.country_codes, "US"))

df.show()

+---+-------------+-------+
| id|country_codes|isLocal|
+---+-------------+-------+
|100|[US, UK, AUS]|   true|
|101|     [US, UK]|   true|
|102|    [IND, UK]|  false|
|103|        [IND]|  false|
+---+-------------+-------+



In [87]:
data = [(100, "Paul", "Brandon"), (101, "John", "Doe"), (102, "Tina", "Nailor")]

df = spark.createDataFrame(data=data, schema=["id", "first_name", "last_name"])

df.show()

+---+----------+---------+
| id|first_name|last_name|
+---+----------+---------+
|100|      Paul|  Brandon|
|101|      John|      Doe|
|102|      Tina|   Nailor|
+---+----------+---------+



#### array

In [88]:
from pyspark.sql.functions import array

df = df.withColumn("full_name", array(df.first_name, df.last_name))
df.show()

+---+----------+---------+---------------+
| id|first_name|last_name|      full_name|
+---+----------+---------+---------------+
|100|      Paul|  Brandon|[Paul, Brandon]|
|101|      John|      Doe|    [John, Doe]|
|102|      Tina|   Nailor| [Tina, Nailor]|
+---+----------+---------+---------------+



### Map Functions
1. explode()
2. map_keys()
3. map_values()

In [94]:
data = [(1, {"name": "Paul", "gender": "male"}), (2, {"name": "Tina", "gender": "female"}), (3, {"name": "John", "gender": "male"})]

from pyspark.sql.types import StringType, IntegerType, MapType

schema = StructType([
    StructField(name="user_id", dataType=IntegerType()),
    StructField(name="users_info", dataType=MapType(keyType=StringType(), valueType=StringType()))
])

df = spark.createDataFrame(data=data, schema=schema)
df.show(truncate=False)

+-------+--------------------------------+
|user_id|users_info                      |
+-------+--------------------------------+
|1      |{gender -> male, name -> Paul}  |
|2      |{gender -> female, name -> Tina}|
|3      |{gender -> male, name -> John}  |
+-------+--------------------------------+



In [95]:
df = df.withColumn("user_gender", df.users_info.gender)
df.show(truncate=False)

+-------+--------------------------------+-----------+
|user_id|users_info                      |user_gender|
+-------+--------------------------------+-----------+
|1      |{gender -> male, name -> Paul}  |male       |
|2      |{gender -> female, name -> Tina}|female     |
|3      |{gender -> male, name -> John}  |male       |
+-------+--------------------------------+-----------+



In [98]:
df.select("user_id", "user_gender").show()

+-------+-----------+
|user_id|user_gender|
+-------+-----------+
|      1|       male|
|      2|     female|
|      3|       male|
+-------+-----------+



In [99]:
from pyspark.sql.functions import explode

df.select("user_id", explode(df.users_info)).show()

+-------+------+------+
|user_id|   key| value|
+-------+------+------+
|      1|gender|  male|
|      1|  name|  Paul|
|      2|gender|female|
|      2|  name|  Tina|
|      3|gender|  male|
|      3|  name|  John|
+-------+------+------+



In [100]:
df.show()

+-------+--------------------+-----------+
|user_id|          users_info|user_gender|
+-------+--------------------+-----------+
|      1|{gender -> male, ...|       male|
|      2|{gender -> female...|     female|
|      3|{gender -> male, ...|       male|
+-------+--------------------+-----------+



In [101]:
from pyspark.sql.functions import map_keys

df.withColumn("info_keys", map_keys(df.users_info)).show()

+-------+--------------------+-----------+--------------+
|user_id|          users_info|user_gender|     info_keys|
+-------+--------------------+-----------+--------------+
|      1|{gender -> male, ...|       male|[gender, name]|
|      2|{gender -> female...|     female|[gender, name]|
|      3|{gender -> male, ...|       male|[gender, name]|
+-------+--------------------+-----------+--------------+



In [102]:
from pyspark.sql.functions import map_values

df.withColumn("info_values", map_values(df.users_info)).show()

+-------+--------------------+-----------+--------------+
|user_id|          users_info|user_gender|   info_values|
+-------+--------------------+-----------+--------------+
|      1|{gender -> male, ...|       male|  [male, Paul]|
|      2|{gender -> female...|     female|[female, Tina]|
|      3|{gender -> male, ...|       male|  [male, John]|
+-------+--------------------+-----------+--------------+



### when() and otherwise()
Similar to if and else

In [3]:
data = [(1, "Paul", 32), (2, "Tina", 45), (3, "John", 28), (4, "Mike", 36)]

df = spark.createDataFrame(data=data, schema=["id", "name", "age"])
df.show()

+---+----+---+
| id|name|age|
+---+----+---+
|  1|Paul| 32|
|  2|Tina| 45|
|  3|John| 28|
|  4|Mike| 36|
+---+----+---+



In [6]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [11]:
from pyspark.sql.functions import when

df.select(df.id, df.name, when(condition=df.age >= 40, value="Lead").when(condition=(df.age >=30) & (df.age < 40), value="Senior").otherwise(value="Associate").alias("position")).show()

+---+----+---------+
| id|name| position|
+---+----+---------+
|  1|Paul|   Senior|
|  2|Tina|     Lead|
|  3|John|Associate|
|  4|Mike|   Senior|
+---+----+---------+



### alias()

In [15]:
df.select(df.id.alias("emp_id"), df.name.alias("emp_name"), df.age.alias("age_in_yrs")).show()

+------+--------+----------+
|emp_id|emp_name|age_in_yrs|
+------+--------+----------+
|     1|    Paul|        32|
|     2|    Tina|        45|
|     3|    John|        28|
|     4|    Mike|        36|
+------+--------+----------+



### cast()

In [22]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [24]:
from pyspark.sql.types import IntegerType

df_new = df.select(df.id.cast("int"), df.name, df.age.cast(IntegerType()))
df_new.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



### filter() or where()
Both are same

In [40]:
data = [(1, "Paul", 32, "HR"), (2, "Tina", 45, "HR"), (3, "John", 28, "IT"), (4, "Mike", 36, "IT"), (5, "David", 34, "Sales")]

df = spark.createDataFrame(data=data, schema=["id", "name", "age", "department"])
df.show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  1| Paul| 32|        HR|
|  2| Tina| 45|        HR|
|  3| John| 28|        IT|
|  4| Mike| 36|        IT|
|  5|David| 34|     Sales|
+---+-----+---+----------+



In [32]:
df.show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  1| Paul| 32|        HR|
|  2| Tina| 45|        HR|
|  3| John| 28|        IT|
|  4| Mike| 36|        IT|
|  5|David| 34|     Sales|
+---+-----+---+----------+



In [33]:
df.select(df.id, df.name, df.age, df.department).filter(df.age >= 35).show()

+---+----+---+----------+
| id|name|age|department|
+---+----+---+----------+
|  2|Tina| 45|        HR|
|  4|Mike| 36|        IT|
+---+----+---+----------+



In [34]:
df.select(df.id, df.name, df.age).filter(df.department == "IT").show()

+---+----+---+
| id|name|age|
+---+----+---+
|  3|John| 28|
|  4|Mike| 36|
+---+----+---+



In [36]:
df.select(df.id, df.name, df.age).filter((df.age >= 35) & (df.department == "HR")).show()

+---+----+---+
| id|name|age|
+---+----+---+
|  2|Tina| 45|
+---+----+---+



### like()

In [38]:
df.select(df.id, df.name).filter(df.name.like("_i%")).show()  #second letter is i

+---+----+
| id|name|
+---+----+
|  2|Tina|
|  4|Mike|
+---+----+



### distinct()

Returns only distinct or unique rows considering all columns values. Removes the duplicate rows

In [41]:
data = [(1, "Paul", 32, "HR"), (2, "Tina", 45, "HR"), (3, "John", 28, "IT"), (4, "Mike", 36, "IT"), (5, "David", 34, "Sales"), (5, "David", 34, "Sales"), (2, "Tina", 45, "HR")]

df = spark.createDataFrame(data=data, schema=["id", "name", "age", "department"])
df.show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  1| Paul| 32|        HR|
|  2| Tina| 45|        HR|
|  3| John| 28|        IT|
|  4| Mike| 36|        IT|
|  5|David| 34|     Sales|
|  5|David| 34|     Sales|
|  2| Tina| 45|        HR|
+---+-----+---+----------+



In [42]:
df.distinct().show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  1| Paul| 32|        HR|
|  2| Tina| 45|        HR|
|  3| John| 28|        IT|
|  4| Mike| 36|        IT|
|  5|David| 34|     Sales|
+---+-----+---+----------+



### drop_duplicates()

Returns only distinct or unique rows based on subset of columns selected. Removes the duplicate rows.
By default, all columns will be considered if no columns passed.

In [43]:
df.dropDuplicates().show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  1| Paul| 32|        HR|
|  2| Tina| 45|        HR|
|  3| John| 28|        IT|
|  4| Mike| 36|        IT|
|  5|David| 34|     Sales|
+---+-----+---+----------+



In [47]:
df.dropDuplicates(["department"]).show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  1| Paul| 32|        HR|
|  3| John| 28|        IT|
|  5|David| 34|     Sales|
+---+-----+---+----------+



### sort() or orderBy()

In [48]:
data = [(1, "Paul", 32, "HR"), (2, "Tina", 45, "HR"), (3, "John", 28, "IT"), (4, "Mike", 36, "IT"), (5, "David", 34, "Sales")]

df = spark.createDataFrame(data=data, schema=["id", "name", "age", "department"])
df.show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  1| Paul| 32|        HR|
|  2| Tina| 45|        HR|
|  3| John| 28|        IT|
|  4| Mike| 36|        IT|
|  5|David| 34|     Sales|
+---+-----+---+----------+



### asc() and desc()

In [49]:
#df.sort(df.age).show() #default is ascending order
df.sort(df.age.asc()).show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  3| John| 28|        IT|
|  1| Paul| 32|        HR|
|  5|David| 34|     Sales|
|  4| Mike| 36|        IT|
|  2| Tina| 45|        HR|
+---+-----+---+----------+



In [50]:
df.sort(df.age.desc()).show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  2| Tina| 45|        HR|
|  4| Mike| 36|        IT|
|  5|David| 34|     Sales|
|  1| Paul| 32|        HR|
|  3| John| 28|        IT|
+---+-----+---+----------+



In [56]:
df.sort(df.department.desc(), df.age.desc()).show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  5|David| 34|     Sales|
|  4| Mike| 36|        IT|
|  3| John| 28|        IT|
|  2| Tina| 45|        HR|
|  1| Paul| 32|        HR|
+---+-----+---+----------+



### union() and unionAll()
1. Combine rows of two dataframes.
2. But do not remove duplicates.
3. Both union() and unionAll() have same functionality
4. Schema of both dataframes must be same (as union is performed based on column positions)

In [27]:
data1 = [(1, "Paul", 32, "HR"), (2, "Tina", 45, "HR"), (3, "John", 28, "IT")]
data2 = [(3, "John", 28, "IT"), (4, "Mike", 36, "IT"), (5, "David", 34, "Sales")]

schema = ["id", "name", "age", "department"]

df1 = spark.createDataFrame(data=data1, schema=schema)
df2 = spark.createDataFrame(data=data2, schema=schema)

In [28]:
print("df1:")
df1.show()

print("df2:")
df2.show()

df1:
+---+----+---+----------+
| id|name|age|department|
+---+----+---+----------+
|  1|Paul| 32|        HR|
|  2|Tina| 45|        HR|
|  3|John| 28|        IT|
+---+----+---+----------+

df2:
+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  3| John| 28|        IT|
|  4| Mike| 36|        IT|
|  5|David| 34|     Sales|
+---+-----+---+----------+



In [29]:
df1.union(df2).show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  1| Paul| 32|        HR|
|  2| Tina| 45|        HR|
|  3| John| 28|        IT|
|  3| John| 28|        IT|
|  4| Mike| 36|        IT|
|  5|David| 34|     Sales|
+---+-----+---+----------+



In [30]:
df1.unionAll(df2).show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  1| Paul| 32|        HR|
|  2| Tina| 45|        HR|
|  3| John| 28|        IT|
|  3| John| 28|        IT|
|  4| Mike| 36|        IT|
|  5|David| 34|     Sales|
+---+-----+---+----------+



In [31]:
df1.union(df2).distinct().show()

+---+-----+---+----------+
| id| name|age|department|
+---+-----+---+----------+
|  1| Paul| 32|        HR|
|  2| Tina| 45|        HR|
|  3| John| 28|        IT|
|  4| Mike| 36|        IT|
|  5|David| 34|     Sales|
+---+-----+---+----------+



### unionByName()

1. Combine rows of two dataframes even with different schema.
2. Need to pass allowMissingColumns=True

In [34]:
data1 = [(1, "Paul", 32), (2, "Tina", 45), (3, "John", 28)]
data2 = [(3, "John", "IT"), (4, "Mike", "HR"), (5, "David", "Sales")]

schema1 = ["id", "name", "age"]
schema2 = ["id", "name", "department"]

df1 = spark.createDataFrame(data=data1, schema=schema1)
df2 = spark.createDataFrame(data=data2, schema=schema2)

print("df1:")
df1.show()

print("df2:")
df2.show()

df1:
+---+----+---+
| id|name|age|
+---+----+---+
|  1|Paul| 32|
|  2|Tina| 45|
|  3|John| 28|
+---+----+---+

df2:
+---+-----+----------+
| id| name|department|
+---+-----+----------+
|  3| John|        IT|
|  4| Mike|        HR|
|  5|David|     Sales|
+---+-----+----------+



In [None]:
df1.union(df2).show()
# This is wrong actually because union merges based on column position

+---+-----+-----+
| id| name|  age|
+---+-----+-----+
|  1| Paul|   32|
|  2| Tina|   45|
|  3| John|   28|
|  3| John|   IT|
|  4| Mike|   HR|
|  5|David|Sales|
+---+-----+-----+



In [37]:
df1.unionByName(df2, allowMissingColumns=True).show()

+---+-----+----+----------+
| id| name| age|department|
+---+-----+----+----------+
|  1| Paul|  32|      NULL|
|  2| Tina|  45|      NULL|
|  3| John|  28|      NULL|
|  3| John|NULL|        IT|
|  4| Mike|NULL|        HR|
|  5|David|NULL|     Sales|
+---+-----+----+----------+



### groupBy() and agg()

In [4]:
df = spark.read.json(path="resources/in/product_inventory/product_inventory_multiline.json", multiLine=True)
df.show()

+--------------+-----+---------+--------------------+-----+
|      Category|Price|ProductID|         ProductName|Stock|
+--------------+-----+---------+--------------------+-----+
|    Smartphone|  999|     P001|     Apple iPhone 15|   50|
|    Smartphone|  899|     P002|  Samsung Galaxy S23|   30|
|    Headphones|  299|     P003|     Sony WH-1000XM5|  100|
|        Laptop| 1199|     P004|         Dell XPS 13|   20|
|        Laptop| 1399|     P005|     HP Spectre x360|   15|
|    Headphones|  329|     P006|Bose QuietComfort 45|   80|
|      Wearable|  499|     P007|Apple Watch Series 9|   60|
|      Wearable|  399|     P008|Samsung Galaxy Wa...|   70|
|      E-Reader|  129|     P009|   Kindle Paperwhite|  120|
|   Accessories|   99|     P010|Logitech MX Master 3|   90|
|   Accessories|   69|     P011| Razer DeathAdder V2|  100|
|    Smartphone|  799|     P012|      Google Pixel 8|   40|
|        Laptop| 1499|     P013|      Lenovo Yoga 9i|   25|
|        Camera| 1999|     P014|        

In [11]:
df.groupBy("Category").count().show()

+--------------+-----+
|      Category|count|
+--------------+-----+
|      E-Reader|    1|
|      Wearable|    2|
|        Laptop|    3|
|   Accessories|    2|
|        Camera|    2|
|        Tablet|    2|
|         Drone|    1|
|    Smartphone|    3|
|    Headphones|    2|
|Gaming Console|    2|
+--------------+-----+



In [17]:
df.groupBy("Category").agg({"Category": "count"}).show()

+--------------+---------------+
|      Category|count(Category)|
+--------------+---------------+
|      E-Reader|              1|
|      Wearable|              2|
|        Laptop|              3|
|   Accessories|              2|
|        Camera|              2|
|        Tablet|              2|
|         Drone|              1|
|    Smartphone|              3|
|    Headphones|              2|
|Gaming Console|              2|
+--------------+---------------+



In [21]:
df.groupBy("Category").agg({"price": "sum"}).show()

+--------------+----------+
|      Category|sum(price)|
+--------------+----------+
|      E-Reader|       129|
|      Wearable|       898|
|        Laptop|      4097|
|   Accessories|       168|
|        Camera|      4498|
|        Tablet|      1848|
|         Drone|       799|
|    Smartphone|      2697|
|    Headphones|       628|
|Gaming Console|       998|
+--------------+----------+



In [22]:
from pyspark.sql.functions import sum
df.groupBy("Category").agg(sum("price").alias("total_price")).show()

+--------------+-----------+
|      Category|total_price|
+--------------+-----------+
|      E-Reader|        129|
|      Wearable|        898|
|        Laptop|       4097|
|   Accessories|        168|
|        Camera|       4498|
|        Tablet|       1848|
|         Drone|        799|
|    Smartphone|       2697|
|    Headphones|        628|
|Gaming Console|        998|
+--------------+-----------+



In [23]:
df.groupBy("Category").agg({"stock": "max"}).show()

+--------------+----------+
|      Category|max(stock)|
+--------------+----------+
|      E-Reader|       120|
|      Wearable|        70|
|        Laptop|        25|
|   Accessories|       100|
|        Camera|        10|
|        Tablet|        20|
|         Drone|        15|
|    Smartphone|        50|
|    Headphones|       100|
|Gaming Console|        50|
+--------------+----------+



In [25]:
from pyspark.sql.functions import count, min, max, avg

df.groupBy("Category").agg(count("Category").alias("total_count"), min("stock").alias("min_stock"), max("stock").alias("max_stock"), avg("price").alias("avg_price")).show()

+--------------+-----------+---------+---------+------------------+
|      Category|total_count|min_stock|max_stock|         avg_price|
+--------------+-----------+---------+---------+------------------+
|      E-Reader|          1|      120|      120|             129.0|
|      Wearable|          2|       60|       70|             449.0|
|        Laptop|          3|       15|       25|1365.6666666666667|
|   Accessories|          2|       90|      100|              84.0|
|        Camera|          2|        8|       10|            2249.0|
|        Tablet|          2|       18|       20|             924.0|
|         Drone|          1|       15|       15|             799.0|
|    Smartphone|          3|       30|       50|             899.0|
|    Headphones|          2|       80|      100|             314.0|
|Gaming Console|          2|       45|       50|             499.0|
+--------------+-----------+---------+---------+------------------+



#### join()
1. inner join
2. left join
3. rigth join
4. full join
5. self join
6. leftsemi and leftanti joins

In [6]:
emp_data = [(101, "Paul", 32, 2), (102, "Tina", 45, 3), (103, "John", 28, 2), (104, "Mike", 36, 1), (105, "David", 34, 2), (106, "Wright", 25, 5)]
dept_data = [(1, "HR"), (2, "IT"), (3, "Sales"), (4, "Marketing")]

emp_df = spark.createDataFrame(data=emp_data, schema=["id", "emp_name", "age", "dept_id"])
dept_df = spark.createDataFrame(data=dept_data, schema=["id", "dept_name"])

emp_df.show()
dept_df.show()

+---+--------+---+-------+
| id|emp_name|age|dept_id|
+---+--------+---+-------+
|101|    Paul| 32|      2|
|102|    Tina| 45|      3|
|103|    John| 28|      2|
|104|    Mike| 36|      1|
|105|   David| 34|      2|
|106|  Wright| 25|      5|
+---+--------+---+-------+

+---+---------+
| id|dept_name|
+---+---------+
|  1|       HR|
|  2|       IT|
|  3|    Sales|
|  4|Marketing|
+---+---------+



In [7]:
emp_df.join(other=dept_df, on=(emp_df.dept_id == dept_df.id), how="inner").show()

+---+--------+---+-------+---+---------+
| id|emp_name|age|dept_id| id|dept_name|
+---+--------+---+-------+---+---------+
|104|    Mike| 36|      1|  1|       HR|
|101|    Paul| 32|      2|  2|       IT|
|103|    John| 28|      2|  2|       IT|
|105|   David| 34|      2|  2|       IT|
|102|    Tina| 45|      3|  3|    Sales|
+---+--------+---+-------+---+---------+



In [8]:
emp_df.join(other=dept_df, on=(emp_df.dept_id == dept_df.id), how="left").show()

+---+--------+---+-------+----+---------+
| id|emp_name|age|dept_id|  id|dept_name|
+---+--------+---+-------+----+---------+
|101|    Paul| 32|      2|   2|       IT|
|102|    Tina| 45|      3|   3|    Sales|
|103|    John| 28|      2|   2|       IT|
|104|    Mike| 36|      1|   1|       HR|
|105|   David| 34|      2|   2|       IT|
|106|  Wright| 25|      5|NULL|     NULL|
+---+--------+---+-------+----+---------+



In [9]:
emp_df.join(other=dept_df, on=(emp_df.dept_id == dept_df.id), how="right").show()

+----+--------+----+-------+---+---------+
|  id|emp_name| age|dept_id| id|dept_name|
+----+--------+----+-------+---+---------+
| 104|    Mike|  36|      1|  1|       HR|
| 105|   David|  34|      2|  2|       IT|
| 103|    John|  28|      2|  2|       IT|
| 101|    Paul|  32|      2|  2|       IT|
| 102|    Tina|  45|      3|  3|    Sales|
|NULL|    NULL|NULL|   NULL|  4|Marketing|
+----+--------+----+-------+---+---------+



In [10]:
emp_df.join(other=dept_df, on=(emp_df.dept_id == dept_df.id), how="full").show()

+----+--------+----+-------+----+---------+
|  id|emp_name| age|dept_id|  id|dept_name|
+----+--------+----+-------+----+---------+
| 104|    Mike|  36|      1|   1|       HR|
| 101|    Paul|  32|      2|   2|       IT|
| 103|    John|  28|      2|   2|       IT|
| 105|   David|  34|      2|   2|       IT|
| 102|    Tina|  45|      3|   3|    Sales|
|NULL|    NULL|NULL|   NULL|   4|Marketing|
| 106|  Wright|  25|      5|NULL|     NULL|
+----+--------+----+-------+----+---------+



#### leftsemi join
Similar to inner join. Returns matched records from both left and right dataframes but with only left dataframe columns.

In [11]:
emp_df.join(other=dept_df, on=(emp_df.dept_id == dept_df.id), how="leftsemi").show()

+---+--------+---+-------+
| id|emp_name|age|dept_id|
+---+--------+---+-------+
|104|    Mike| 36|      1|
|101|    Paul| 32|      2|
|103|    John| 28|      2|
|105|   David| 34|      2|
|102|    Tina| 45|      3|
+---+--------+---+-------+



#### leftanti join
Opposite of leftsemi join. Returns unmatched records from of left dataframe with only left dataframe columns.

In [12]:
emp_df.join(other=dept_df, on=(emp_df.dept_id == dept_df.id), how="leftanti").show()

+---+--------+---+-------+
| id|emp_name|age|dept_id|
+---+--------+---+-------+
|106|  Wright| 25|      5|
+---+--------+---+-------+



#### self join

In [13]:
emp_data = [(101, "Paul", 32, 102), (102, "Tina", 45, 100), (103, "John", 28, 105), (104, "Mike", 36, 102), (105, "David", 34, 100), (106, "Wright", 25, 105)]

emp_df = spark.createDataFrame(data=emp_data, schema=["id", "emp_name", "age", "manager_id"])

emp_df.show()

+---+--------+---+----------+
| id|emp_name|age|manager_id|
+---+--------+---+----------+
|101|    Paul| 32|       102|
|102|    Tina| 45|       100|
|103|    John| 28|       105|
|104|    Mike| 36|       102|
|105|   David| 34|       100|
|106|  Wright| 25|       105|
+---+--------+---+----------+



In [33]:
from pyspark.sql.functions import col

emp_df.alias("emp").join(other=emp_df.alias("manager"), on=(col("emp.manager_id") == col("manager.id")), how="inner") \
    .select([col("emp.id").alias("emp_id"), col("emp.emp_name").alias("emp_name"), col("manager.emp_name").alias("manager_name")]).show()

+------+--------+------------+
|emp_id|emp_name|manager_name|
+------+--------+------------+
|   101|    Paul|        Tina|
|   104|    Mike|        Tina|
|   103|    John|       David|
|   106|  Wright|       David|
+------+--------+------------+



In [34]:
emp_df.alias("emp").join(other=emp_df.alias("manager"), on=(col("emp.manager_id") == col("manager.id")), how="left") \
    .select([col("emp.id").alias("emp_id"), col("emp.emp_name").alias("emp_name"), col("manager.emp_name").alias("manager_name")]).show()

+------+--------+------------+
|emp_id|emp_name|manager_name|
+------+--------+------------+
|   101|    Paul|        Tina|
|   102|    Tina|        NULL|
|   103|    John|       David|
|   104|    Mike|        Tina|
|   105|   David|        NULL|
|   106|  Wright|       David|
+------+--------+------------+



### pivot()
Rotate the row data (column having discrete/categorical values) in to individual columns.

In [3]:
data = [(1, "Paul", 32000, "HR"), (2, "Tina", 45000, "HR"), (3, "John", 28000, "IT"), (4, "Mike", 36000, "IT"), (5, "David", 34000, "Sales")]

df = spark.createDataFrame(data=data, schema=["id", "name", "salary", "department"])
df.show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1| Paul| 32000|        HR|
|  2| Tina| 45000|        HR|
|  3| John| 28000|        IT|
|  4| Mike| 36000|        IT|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



In [4]:
df.groupBy(df.department).pivot("department").count().show()

+----------+----+----+-----+
|department|  HR|  IT|Sales|
+----------+----+----+-----+
|        HR|   2|NULL| NULL|
|        IT|NULL|   2| NULL|
|     Sales|NULL|NULL|    1|
+----------+----+----+-----+



In [6]:
df.groupBy(df.department).pivot("department", ["HR", "IT", "Sales"]).max("salary").show()

+----------+-----+-----+-----+
|department|   HR|   IT|Sales|
+----------+-----+-----+-----+
|        HR|45000| NULL| NULL|
|        IT| NULL|36000| NULL|
|     Sales| NULL| NULL|34000|
+----------+-----+-----+-----+



### unpivot()
- Available from pyspark 3.4.X onwards
- Rotate the columns to row data

In [16]:
data = [
    (1, 100, 200, 300),
    (2, 400, 500, 600)
]
schema = ["ID", "Month1", "Month2", "Month3"]

df = spark.createDataFrame(data=data, schema=schema)
df.show()

+---+------+------+------+
| ID|Month1|Month2|Month3|
+---+------+------+------+
|  1|   100|   200|   300|
|  2|   400|   500|   600|
+---+------+------+------+



In [17]:
df.unpivot(ids="ID", values=["Month1", "Month2", "Month3"], variableColumnName="Month", valueColumnName="Value").show()

+---+------+-----+
| ID| Month|Value|
+---+------+-----+
|  1|Month1|  100|
|  1|Month2|  200|
|  1|Month3|  300|
|  2|Month1|  400|
|  2|Month2|  500|
|  2|Month3|  600|
+---+------+-----+



### fill() or fillna()

In [10]:
data = [(1, "Paul", 32000, None), (2, "Tina", 45000, "HR"), (3, "John", 28000, None), (4, "Mike", None, "IT"), (5, "David", 34000, "Sales")]

df = spark.createDataFrame(data=data, schema=["id", "name", "salary", "department"])
df.show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1| Paul| 32000|      NULL|
|  2| Tina| 45000|        HR|
|  3| John| 28000|      NULL|
|  4| Mike|  NULL|        IT|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



In [19]:
df.fillna("IT", ["department"]).show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1| Paul| 32000|        IT|
|  2| Tina| 45000|        HR|
|  3| John| 28000|        IT|
|  4| Mike|  NULL|        IT|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



In [20]:
df.fillna(0, ["salary"]).show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1| Paul| 32000|      NULL|
|  2| Tina| 45000|        HR|
|  3| John| 28000|      NULL|
|  4| Mike|     0|        IT|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



In [33]:
df.na.fill(0, ["salary"]).show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1| Paul| 32000|      NULL|
|  2| Tina| 45000|        HR|
|  3| John| 28000|      NULL|
|  4| Mike|     0|        IT|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



In [53]:
from pyspark.sql.functions import avg

df.select(avg(df["salary"]).alias("avg_salary")).show()

+----------+
|avg_salary|
+----------+
|   34750.0|
+----------+



In [54]:
from pyspark.sql.functions import avg

avg_salary = df.select(avg(df["salary"]).alias("avg_salary")).collect()[0].avg_salary
avg_salary

34750.0

In [55]:
df.fillna(avg_salary, ["salary"]).show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1| Paul| 32000|      NULL|
|  2| Tina| 45000|        HR|
|  3| John| 28000|      NULL|
|  4| Mike| 34750|        IT|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



### sample()
- Return random subset of records from dataframe
- fraction indicates percentage of records to return
- seed indicates to return same subset of records each time

In [7]:
spark.range(start=1, end=20, step=2).show()

+---+
| id|
+---+
|  1|
|  3|
|  5|
|  7|
|  9|
| 11|
| 13|
| 15|
| 17|
| 19|
+---+



In [11]:
df.sample(fraction=0.2).show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1| Paul| 32000|      NULL|
|  3| John| 28000|      NULL|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



In [12]:
df.sample(fraction=0.2).show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  5|David| 34000|     Sales|
+---+-----+------+----------+



In [13]:
df.sample(fraction=0.2, seed=123).show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  3| John| 28000|      NULL|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



In [14]:
df.sample(fraction=0.2, seed=123).show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  3| John| 28000|      NULL|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



### collect()

- Returns the rows/records of the dataframe.
- The output type is list of Row() objects
- Basically, collect function collects/gathers the data from all the partitions. So, don't use collect() for large dataframes

In [15]:
df.show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1| Paul| 32000|      NULL|
|  2| Tina| 45000|        HR|
|  3| John| 28000|      NULL|
|  4| Mike|  NULL|        IT|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



In [17]:
records = df.select(df.id, df.name, df.salary).collect()
records

[Row(id=1, name='Paul', salary=32000),
 Row(id=2, name='Tina', salary=45000),
 Row(id=3, name='John', salary=28000),
 Row(id=4, name='Mike', salary=None),
 Row(id=5, name='David', salary=34000)]

In [18]:
print(records[0].id, records[2].name, records[4].salary)

1 John 34000


In [19]:
for row in records:
    print(f"Id: {row.id}, Name: {row.name}, Salary: {row.salary}")

Id: 1, Name: Paul, Salary: 32000
Id: 2, Name: Tina, Salary: 45000
Id: 3, Name: John, Salary: 28000
Id: 4, Name: Mike, Salary: None
Id: 5, Name: David, Salary: 34000


### transform()

Apply custom transformations to the dataframe

In [21]:
data = [(1, "Paul", 32000, "HR"), (2, "Tina", 45000, "HR"), (3, "John", 28000, "IT"), (4, "Mike", 36000, "IT"), (5, "David", 34000, "Sales")]

df = spark.createDataFrame(data=data, schema=["id", "name", "salary", "department"])
df.show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1| Paul| 32000|        HR|
|  2| Tina| 45000|        HR|
|  3| John| 28000|        IT|
|  4| Mike| 36000|        IT|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



In [None]:
#Built-in transformation
from pyspark.sql.functions import upper

df.withColumn("name", (upper(df.name))).show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1| PAUL| 32000|        HR|
|  2| TINA| 45000|        HR|
|  3| JOHN| 28000|        IT|
|  4| MIKE| 36000|        IT|
|  5|DAVID| 34000|     Sales|
+---+-----+------+----------+



In [None]:
#Custom transformation function
def increment_salary(df):
    return df.withColumn("salary", (df.salary + (df.salary * 0.5)))

df.transform(increment_salary).show()

+---+-----+-------+----------+
| id| name| salary|department|
+---+-----+-------+----------+
|  1| Paul|48000.0|        HR|
|  2| Tina|67500.0|        HR|
|  3| John|42000.0|        IT|
|  4| Mike|54000.0|        IT|
|  5|David|51000.0|     Sales|
+---+-----+-------+----------+



### Column transform()
- Import as: from pyspark.sql.functions import transform
- Apply transformation on array type column

In [28]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

In [30]:
data = [(100, ["PC", "Monitor", "Keyboard"]), (101, ["Laptop", "Speaker"]), (102, ["Mouse", "Adapter"]), (103, ["Headphone"])]

schema = StructType([
    StructField(name="order_id", dataType=IntegerType(), nullable=False),
    StructField(name="items", dataType=ArrayType(elementType=StringType()), nullable=False)
])

df = spark.createDataFrame(data=data, schema=schema)
df.show(truncate=False)

+--------+-----------------------+
|order_id|items                  |
+--------+-----------------------+
|100     |[PC, Monitor, Keyboard]|
|101     |[Laptop, Speaker]      |
|102     |[Mouse, Adapter]       |
|103     |[Headphone]            |
+--------+-----------------------+



In [33]:
from pyspark.sql.functions import transform

df.select(df.order_id, transform(col="items", f=(lambda item: upper(item))).alias("items")).show(truncate=False)

+--------+-----------------------+
|order_id|items                  |
+--------+-----------------------+
|100     |[PC, MONITOR, KEYBOARD]|
|101     |[LAPTOP, SPEAKER]      |
|102     |[MOUSE, ADAPTER]       |
|103     |[HEADPHONE]            |
+--------+-----------------------+



### createOrReplaceTempView()
- Create or replace the temporary view from dataframe
- Can be used to execure raw SQL queries
- Available only with-in the current spark session

In [29]:
data = [(1, "Paul", 32000, "HR"), (2, "Tina", 45000, "HR"), (3, "John", 28000, "IT"), (4, "Mike", 36000, "IT"), (5, "David", 34000, "Sales")]

df = spark.createDataFrame(data=data, schema=["id", "name", "salary", "department"])
df.show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1| Paul| 32000|        HR|
|  2| Tina| 45000|        HR|
|  3| John| 28000|        IT|
|  4| Mike| 36000|        IT|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



In [30]:
df.createOrReplaceTempView("employee")

In [31]:
spark.sql("SELECT * FROM employee;").show()

+---+-----+------+----------+
| id| name|salary|department|
+---+-----+------+----------+
|  1| Paul| 32000|        HR|
|  2| Tina| 45000|        HR|
|  3| John| 28000|        IT|
|  4| Mike| 36000|        IT|
|  5|David| 34000|     Sales|
+---+-----+------+----------+



In [32]:
spark.sql("SELECT id, name, department FROM employee;").show()

+---+-----+----------+
| id| name|department|
+---+-----+----------+
|  1| Paul|        HR|
|  2| Tina|        HR|
|  3| John|        IT|
|  4| Mike|        IT|
|  5|David|     Sales|
+---+-----+----------+



### createOrReplaceGlobalTempView
- Available across all spark sessions
- Global temporary view is available in global_temp schema/database.

In [37]:
df.createOrReplaceGlobalTempView("employee_global")

In [38]:
spark.sql("SELECT id, name, department FROM global_temp.employee_global;").show()

+---+-----+----------+
| id| name|department|
+---+-----+----------+
|  1| Paul|        HR|
|  2| Tina|        HR|
|  3| John|        IT|
|  4| Mike|        IT|
|  5|David|     Sales|
+---+-----+----------+



In [39]:
from pyspark.sql import SparkSession

spark_2 = SparkSession.builder \
    .appName("SparkApp-Session-2") \
    .getOrCreate()

In [40]:
spark_2.sql("SELECT id, name, department FROM global_temp.employee_global;").show()

+---+-----+----------+
| id| name|department|
+---+-----+----------+
|  1| Paul|        HR|
|  2| Tina|        HR|
|  3| John|        IT|
|  4| Mike|        IT|
|  5|David|     Sales|
+---+-----+----------+



In [41]:
spark_2.stop()

In [None]:
#spark.stop()