# Filtering

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()
data = [(1, "Alice", 29), (2, "Bob", 35)]
df = spark.createDataFrame(data, ["id", "name", "age"])
df.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 29|
|  2|  Bob| 35|
+---+-----+---+



In [53]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()
data = [
    {"id": 1, "name": "Alice", "age": 29, "department": "sales", "salary": 50000},
    {"id": 2, "name": "Bob", "age": 35, "department": "engineering", "salary": 60000},
    {"id": 3, "name": "Charlie", "age": 22, "department": "sales", "salary": 45000},
    {"id": 4, "name": "Diana", "age": 40, "department": "engineering", "salary": 70000}
]
df = spark.createDataFrame(data)
df.show()

+---+-----------+---+-------+------+
|age| department| id|   name|salary|
+---+-----------+---+-------+------+
| 29|      sales|  1|  Alice| 50000|
| 35|engineering|  2|    Bob| 60000|
| 22|      sales|  3|Charlie| 45000|
| 40|engineering|  4|  Diana| 70000|
+---+-----------+---+-------+------+



In [8]:
# basic filtering
df_filtered = df.filter(df.age > 30)
df_filtered = df.filter(df['age'] > 30)

df_filtered.show()

+---+-----------+---+-----+
|age| department| id| name|
+---+-----------+---+-----+
| 35|engineering|  2|  Bob|
| 40|engineering|  4|Diana|
+---+-----------+---+-----+



In [5]:
# using col() function
from pyspark.sql.functions import col
df_filtered = df.filter(col("age") > 30)

df_filtered.show()

+---+---+----+
|age| id|name|
+---+---+----+
| 35|  2| Bob|
+---+---+----+



In [9]:
# filter with multiple conditions

# AND condition (&)
df_filtered = df.filter((df.age > 25) & (df.department == "engineering"))
df_filtered.show()

+---+-----------+---+-----+
|age| department| id| name|
+---+-----------+---+-----+
| 35|engineering|  2|  Bob|
| 40|engineering|  4|Diana|
+---+-----------+---+-----+



In [11]:
# OR (|) condition
df_filtered = df.filter((df.age < 30) | (df.department == 'Finance'))
df_filtered.show()

+---+----------+---+-------+
|age|department| id|   name|
+---+----------+---+-------+
| 29|     sales|  1|  Alice|
| 22|     sales|  3|Charlie|
+---+----------+---+-------+



# String Filters

In [14]:
# filter rows where department equals 'Marketing'
df_filtered = df.filter(df.department == 'sales')
df_filtered.show()

+---+----------+---+-------+
|age|department| id|   name|
+---+----------+---+-------+
| 29|     sales|  1|  Alice|
| 22|     sales|  3|Charlie|
+---+----------+---+-------+



In [15]:
# case sensitive filter
df_filtered = df.filter(col('department').like('SALES'))
df_filtered.show()

+---+----------+---+----+
|age|department| id|name|
+---+----------+---+----+
+---+----------+---+----+



In [17]:
# contains a substring
df_filtered = df.filter(col('department').contains("engineer"))
df_filtered.show()

+---+-----------+---+-----+
|age| department| id| name|
+---+-----------+---+-----+
| 35|engineering|  2|  Bob|
| 40|engineering|  4|Diana|
+---+-----------+---+-----+



In [18]:
# filter rows where the name starts with 'A'
df.filter(col("name").startswith("A")).show()

+---+----------+---+-----+
|age|department| id| name|
+---+----------+---+-----+
| 29|     sales|  1|Alice|
+---+----------+---+-----+



In [19]:
# filter rows where the name ends with 'e'
df.filter(col("name").endswith("e")).show()

+---+----------+---+-------+
|age|department| id|   name|
+---+----------+---+-------+
| 29|     sales|  1|  Alice|
| 22|     sales|  3|Charlie|
+---+----------+---+-------+



In [20]:
# filter rows where the name matches a regex
df.filter(col("name").rlike("^A.*")).show()

+---+----------+---+-----+
|age|department| id| name|
+---+----------+---+-----+
| 29|     sales|  1|Alice|
+---+----------+---+-----+



# Null Functions

In [22]:
# filter rows where a column is null
df_filtered = df.filter(df.department.isNull())
df_filtered.show()

+---+----------+---+----+
|age|department| id|name|
+---+----------+---+----+
+---+----------+---+----+



In [23]:
# filter rows where a column is not null
df_filtered = df.filter(df.department.isNotNull())
df_filtered.show()

+---+-----------+---+-------+
|age| department| id|   name|
+---+-----------+---+-------+
| 29|      sales|  1|  Alice|
| 35|engineering|  2|    Bob|
| 22|      sales|  3|Charlie|
| 40|engineering|  4|  Diana|
+---+-----------+---+-------+



# Filter from a list

In [24]:
# filter rows where department is in a list
departments = ["engineering", "finance"]
df_filtered = df.filter(col("department").isin(departments))
df_filtered.show()

+---+-----------+---+-----+
|age| department| id| name|
+---+-----------+---+-----+
| 35|engineering|  2|  Bob|
| 40|engineering|  4|Diana|
+---+-----------+---+-----+



In [25]:
# Negate the filter (not in list)
df_filtered = df.filter(~col("department").isin(departments))
df_filtered.show()

+---+----------+---+-------+
|age|department| id|   name|
+---+----------+---+-------+
| 29|     sales|  1|  Alice|
| 22|     sales|  3|Charlie|
+---+----------+---+-------+



# Data Cleaning

In [27]:
# 1. Drop all fully duplicate rows
# Removes rows where all columns match exactly
df = df.dropDuplicates()
df

DataFrame[age: bigint, department: string, id: bigint, name: string]

In [30]:
# 2. Drop duplicates based on specific columns
# Keeps the first row for each unique id
df = df.dropDuplicates(["id"])
df

DataFrame[age: bigint, department: string, id: bigint, name: string]

In [32]:
# 3. Get only distinct rows (same as SELECT DISTINCT)
# Removes duplicates across all columns
df = df.distinct()
df

DataFrame[age: bigint, department: string, id: bigint, name: string]

In [34]:
# 4. Drop rows with any null values
# Removes rows with even a single null field
df = df.dropna()
df

DataFrame[age: bigint, department: string, id: bigint, name: string]

In [39]:
# 5. Drop rows with nulls in specific columns
# Only keeps rows where 'department' and 'age' are not null
df = df.dropna(subset=["department", "age"])
df

DataFrame[age: bigint, department: string, id: bigint, name: string]

In [38]:
# 6. Fill missing values for all columns
# Replaces all nulls with a default value
df = df.fillna("N/A")
df

DataFrame[age: bigint, department: string, id: bigint, name: string]

In [41]:
# 7. Fill missing values for specific columns
# Sets default age as 0 if missing
df = df.fillna({"age": 0})

# grouping

In [43]:
from pyspark.sql.functions import count, sum, avg, min, max, countDistinct

In [45]:
# basic aggregation first
# count rows
df.count()

4

In [46]:
#Count Distinct Values in a column
df.select(countDistinct("Department")).show()

+--------------------------+
|count(DISTINCT Department)|
+--------------------------+
|                         2|
+--------------------------+



In [48]:
#Sum
df.select(sum("age")).show()

+--------+
|sum(age)|
+--------+
|     126|
+--------+



In [49]:
#Multiple Aggregations
df.select(min("age"), max("age")).show()

+--------+--------+
|min(age)|max(age)|
+--------+--------+
|      22|      40|
+--------+--------+



# Aggregations with Grouping

In [54]:
#Group by a single column
df.groupBy("Department").sum("Salary").show()

+-----------+-----------+
| Department|sum(Salary)|
+-----------+-----------+
|      sales|      95000|
|engineering|     130000|
+-----------+-----------+



In [57]:
#GroupBy with Multiple Columns
df.groupBy("Department", "name").sum("salary").show()

+-----------+-------+-----------+
| Department|   name|sum(salary)|
+-----------+-------+-----------+
|      sales|  Alice|      50000|
|engineering|    Bob|      60000|
|      sales|Charlie|      45000|
|engineering|  Diana|      70000|
+-----------+-------+-----------+



In [59]:
#Group by with multiple aggregations

df.groupBy("Department").agg(
    count("name").alias("Employee_Count"),
    avg("salary").alias("Average_Salary"),
    max("salary").alias("Max_Salary")
    )

DataFrame[Department: string, Employee_Count: bigint, Average_Salary: double, Max_Salary: bigint]

In [61]:
#Filter after aggregation
df.groupBy("Department").agg(sum("salary").alias("Total_Salary")).filter("Total_Salary > 100000").show()

+-----------+------------+
| Department|Total_Salary|
+-----------+------------+
|engineering|      130000|
+-----------+------------+



# Joins

In [62]:
# Basic Join
df1.join(df2, on="id", how="inner")

NameError: name 'df1' is not defined

In [63]:
from pyspark.sql import SparkSession

# Create a SparkSession if not already created
# spark = SparkSession.builder.appName("JoinExample").getOrCreate()

# Create df1
data1 = [(1, "Alice", "Sales"),
         (2, "Bob", "Engineering"),
         (3, "Charlie", "HR")]
columns1 = ["id", "name", "department"]
df1 = spark.createDataFrame(data1, columns1)
df1.show()

# Create df2
data2 = [(1, "New York"),
         (2, "London"),
         (4, "Paris")]
columns2 = ["id", "city"]
df2 = spark.createDataFrame(data2, columns2)
df2.show()

# Basic Join
df1.join(df2, on="id", how="inner").show()

+---+-------+-----------+
| id|   name| department|
+---+-------+-----------+
|  1|  Alice|      Sales|
|  2|    Bob|Engineering|
|  3|Charlie|         HR|
+---+-------+-----------+

+---+--------+
| id|    city|
+---+--------+
|  1|New York|
|  2|  London|
|  4|   Paris|
+---+--------+

+---+-----+-----------+--------+
| id| name| department|    city|
+---+-----+-----------+--------+
|  1|Alice|      Sales|New York|
|  2|  Bob|Engineering|  London|
+---+-----+-----------+--------+



In [65]:
# Join on Multiple Columns
df1.join(df2, on=["id"], how="left").show()

+---+-------+-----------+--------+
| id|   name| department|    city|
+---+-------+-----------+--------+
|  1|  Alice|      Sales|New York|
|  3|Charlie|         HR|    NULL|
|  2|    Bob|Engineering|  London|
+---+-------+-----------+--------+



In [67]:
# Conditional Join
df1.join(df2, (df1.id == df2.id) & (df2.city == "New York"), how="inner").show()

+---+-----+----------+---+--------+
| id| name|department| id|    city|
+---+-----+----------+---+--------+
|  1|Alice|     Sales|  1|New York|
+---+-----+----------+---+--------+



In [69]:
# Select ALL columns from df1, and SOME columns from df2 (useful for left joins)
result = df1.join(df2, on="id", how="left").select(df1["*"], df2["city"])

In [70]:
# Broadcast Join for Small DataFrames
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), on="id", how="inner")

DataFrame[id: bigint, name: string, department: string, city: string]