# PySpark Exercise: Hands-on and Interview based

## Hands-on:

### 1. Creating DataFrames

In [380]:
# Import required modules
from pyspark.sql import SparkSession,functions as F

# Initialize Spark session
spark = SparkSession.builder.appName("PySpark Exercise").getOrCreate()
# Create a DataFrame from a list of tuples
data = [
    {"id": 1, "name": "Alice", "age": 23},
    {"id": 2, "name": "Bob", "age": 27},
    {"id": 3, "name": "Cathy", "age": 22},
]
df = spark.createDataFrame(data)
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



### 2. Selecting Data

#### Select the name and age columns from the DataFrame.

In [381]:
df_with_name_and_age = df.select("name", "age")
df_with_name_and_age.show()

+-----+---+
| name|age|
+-----+---+
|Alice| 23|
|  Bob| 27|
|Cathy| 22|
+-----+---+



#### Add 5 years to the age column and create a new column named age_plus_5.

In [382]:
df.withColumn("age_plus_5", df.age + 5).show()

+---+---+-----+----------+
|age| id| name|age_plus_5|
+---+---+-----+----------+
| 23|  1|Alice|        28|
| 27|  2|  Bob|        32|
| 22|  3|Cathy|        27|
+---+---+-----+----------+



### 3. Renaming Columns

#### Rename the id column to user_id.

In [383]:
df = df.withColumnRenamed('id','user_id')
df.show()

+---+-------+-----+
|age|user_id| name|
+---+-------+-----+
| 23|      1|Alice|
| 27|      2|  Bob|
| 22|      3|Cathy|
+---+-------+-----+



### 4. Dropping Columns

#### Drop the user_id column from the DataFrame.

In [384]:
df = df.drop('user_id')
df.show()

+---+-----+
|age| name|
+---+-----+
| 23|Alice|
| 27|  Bob|
| 22|Cathy|
+---+-----+



### 5. Distinct Values

#### Find all distinct values in a column of your choice from the DataFrame.

In [385]:
# Show distinct values in the age column
df.select('age').distinct().show()

+---+
|age|
+---+
| 23|
| 27|
| 22|
+---+



### 6. Basic Column Operations

#### Create a new column that concatenates the name column with the string "_student".

In [386]:

df.withColumn('name_with_student', F.concat(F.col('name'),F.lit('_student'))).show()

+---+-----+-----------------+
|age| name|name_with_student|
+---+-----+-----------------+
| 23|Alice|    Alice_student|
| 27|  Bob|      Bob_student|
| 22|Cathy|    Cathy_student|
+---+-----+-----------------+



## Data Manipulation:

### 7. Filtering Rows

#### Filter rows where age is greater than 25.

In [387]:
df.filter(df.age > 25).show()

+---+----+
|age|name|
+---+----+
| 27| Bob|
+---+----+



#### Filter rows where the name starts with the letter "A".

In [388]:
df.filter(df.name.like('A%')).show()

+---+-----+
|age| name|
+---+-----+
| 23|Alice|
+---+-----+



### 8. Aggregations

#### Count the total number of rows.

In [389]:
count = df.count()
print(count)

3


#### Group by age and count the number of records for each age.

In [390]:
df.groupBy('age').count().show()

+---+-----+
|age|count|
+---+-----+
| 23|    1|
| 27|    1|
| 22|    1|
+---+-----+



### 9. Sorting

#### Sort the DataFrame by name in ascending order.

In [391]:
df.orderBy(df.name.asc()).show()

+---+-----+
|age| name|
+---+-----+
| 23|Alice|
| 27|  Bob|
| 22|Cathy|
+---+-----+



#### Sort the DataFrame by age in descending order.

In [392]:
df.orderBy(df.age.desc()).show()

+---+-----+
|age| name|
+---+-----+
| 27|  Bob|
| 23|Alice|
| 22|Cathy|
+---+-----+



### 10. Working with Null Values

#### Identify rows with null values in any column.

In [415]:
# Create a new DataFrame
data = [
    {"id": 1, "name": "Alice", "age": 23},
    {"name": "Bob", "age": 27},
    {"id": 3, "name": "Cathy"},
    {"id": 4, "name": "David", "age": 20},
    {"name": "Emma"},
]
df = spark.createDataFrame(data)
df.show()


# Identify if there are null values in any column
df.filter(F.col("id").isNull() | F.col("age").isNull() | F.col("name").isNull()).show()

+----+----+-----+
| age|  id| name|
+----+----+-----+
|  23|   1|Alice|
|  27|NULL|  Bob|
|NULL|   3|Cathy|
|  20|   4|David|
|NULL|NULL| Emma|
+----+----+-----+

+----+----+-----+
| age|  id| name|
+----+----+-----+
|  27|NULL|  Bob|
|NULL|   3|Cathy|
|NULL|NULL| Emma|
+----+----+-----+



#### Replace null values in the age column with the average age.

In [416]:
# Calculate the average age
average_age = df.agg(F.avg('age')).collect()[0][0]

# Replace null values in the 'age' column with the average age
df = df.fillna({'age': average_age})
df.show()

+---+----+-----+
|age|  id| name|
+---+----+-----+
| 23|   1|Alice|
| 27|NULL|  Bob|
| 23|   3|Cathy|
| 20|   4|David|
| 23|NULL| Emma|
+---+----+-----+



### 11. Joining DataFrames

In [417]:
# Given two DataFrames:
df1_data = [(1, "Alice"), (2, "Bob"), (3, "Cathy")]
df1_columns = ["id", "name"]
df1 = spark.createDataFrame(df1_data, df1_columns)

df2_data = [(1, "A"), (2, "B"), (4, "D")]
df2_columns = ["id", "initial_first_name"]
df2 = spark.createDataFrame(df2_data, df2_columns)

# Show them
df1.show()
df2.show()

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
|  3|Cathy|
+---+-----+

+---+------------------+
| id|initial_first_name|
+---+------------------+
|  1|                 A|
|  2|                 B|
|  4|                 D|
+---+------------------+



#### Inner join

In [418]:
df1.join(df2,df1.id == df2.id,"inner").show()

+---+-----+---+------------------+
| id| name| id|initial_first_name|
+---+-----+---+------------------+
|  1|Alice|  1|                 A|
|  2|  Bob|  2|                 B|
+---+-----+---+------------------+



#### Left join

In [419]:
df1.join(df2,df1.id == df2.id,"left").show()

+---+-----+----+------------------+
| id| name|  id|initial_first_name|
+---+-----+----+------------------+
|  1|Alice|   1|                 A|
|  2|  Bob|   2|                 B|
|  3|Cathy|NULL|              NULL|
+---+-----+----+------------------+



#### Full outer join

In [420]:
df1.join(df2,df1.id == df2.id,"fullouter").show()

+----+-----+----+------------------+
|  id| name|  id|initial_first_name|
+----+-----+----+------------------+
|   1|Alice|   1|                 A|
|   2|  Bob|   2|                 B|
|   3|Cathy|NULL|              NULL|
|NULL| NULL|   4|                 D|
+----+-----+----+------------------+



#### Left semi join

In [421]:
df1.join(df2,df1.id == df2.id,"leftsemi").show()

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+



#### Left anti join

In [422]:
df1.join(df2,df1.id == df2.id,"leftanti").show()

+---+-----+
| id| name|
+---+-----+
|  3|Cathy|
+---+-----+



### 12. Pivot Tables

#### Use pivoting to calculate the average age grouped by gender and occupation.

In [423]:
# Create a DataFrame
data = [
    {"id": 1, "name": "Alice", "age": 23, "gender": "F", "occupation": "Engineer"},
    {"id": 2, "name": "Bob", "age": 27, "gender": "M", "occupation": "Engineer"},
    {"id": 3, "name": "Cathy", "age": 22, "gender": "F", "occupation": "Teacher"},
    {"id": 4, "name": "David", "age": 20, "gender": "M", "occupation": "Sales"},
    {"id": 5, "name": "Emma", "age": 32, "gender": "F", "occupation": "Teacher"},
]
df = spark.createDataFrame(data)

# Pivot
df.groupBy('occupation').pivot("gender", ["M", "F"]).avg("age").show()



+----------+----+----+
|occupation|   M|   F|
+----------+----+----+
|     Sales|20.0|NULL|
|   Teacher|NULL|27.0|
|  Engineer|27.0|23.0|
+----------+----+----+



### 13. Exploding Columns

#### Given a column with lists, explode the list into individual rows:

In [424]:
data = [{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 50]}]
df = spark.createDataFrame(data)
df.select(F.explode(df.values).alias("value"), df.id).show()

+-----+---+
|value| id|
+-----+---+
|   10|  1|
|   20|  1|
|   30|  1|
|   40|  2|
|   50|  2|
+-----+---+



### 14. Union DataFrames

#### Combine two DataFrames vertically using union. Ensure both DataFrames have the same schema.

In [425]:
# Create a DataFrame to combine with existing df
data1 = [
    {"id": 1, "name": "Alice", "age": 23},
    {"id": 2, "name": "Bob", "age": 27},
    {"id": 3, "name": "Cathy", "age": 22},
]
df1 = spark.createDataFrame(data1)
data2 = [
    {"id": 4, "name": "David", "age": 20},
    {"id": 5, "name": "Emma", "age": 32},
]
df2 = spark.createDataFrame(data2)

# Ensure that the schemas are the same
df1.printSchema()
df2.printSchema()

# Union
union_df = df1.union(df2)
union_df.show()


root
 |-- age: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

root
 |-- age: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

+---+---+-----+
|age| id| name|
+---+---+-----+
| 23|  1|Alice|
| 27|  2|  Bob|
| 22|  3|Cathy|
| 20|  4|David|
| 32|  5| Emma|
+---+---+-----+



## Interview based:

### 1. What is PySpark? How does it differ from Pandas?

PySpark is the Python API for Apache Spark, a powerful tool for big data processing.  
It allows you to analyze and process large datasets using distributed computing across multiple machines.

- Scale:  
PySpark: Handles huge datasets across clusters (big data).  
Pandas: Limited to data that fits in your computer’s memory.

- Speed:  
PySpark: Faster for big data with in-memory distributed processing.  
Pandas: Slower for large datasets.

- Use Case:  
PySpark: Ideal for big data and parallel processing.  
Pandas: Best for smaller datasets and local analysis.

### 2. What are the advantages of using PySpark over traditional Python frameworks?

1. Handles Big Data:  
PySpark processes massive datasets across multiple machines, unlike pandas, which is limited to single-machine memory.
2. Faster Processing:  
PySpark uses in-memory computing, making it much faster than traditional Python frameworks for large datasets.
3. Scalable:  
It works seamlessly for small to huge datasets by scaling across clusters.
4. Supports Many Data Sources:  
PySpark connects easily to systems like Hadoop, databases, and cloud storage.
5. Fault-Tolerant:  
It automatically recovers from hardware or network failures.

### 3. Explain the concept of Resilient Distributed Dataset (RDD). How does it differ from DataFrames?

A Resilient Distributed Dataset (RDD) is the basic data structure in Apache Spark. It is:

- Immutable: Once created, it cannot be changed.
- Distributed: Spread across multiple machines in a cluster.
- Fault-Tolerant: Automatically recovers from failures.
- Low-Level: You can control data partitioning and transformations directly.

RDDs are more flexible but harder to use, while DataFrames are easier and faster for structured data tasks.

### 4. What is lazy evaluation in PySpark? Why is it useful?

Lazy evaluation means PySpark doesn’t execute your operations immediately.  
Instead, it builds a logical plan of all transformations and waits until an action (like collect(), count(), or save()) is called to execute them.  
In short, lazy evaluation makes PySpark faster and more efficient.

### 5. Explain the difference between transformations and actions in PySpark.

Transformations
- Definition: Operations that define how data is modified (e.g., map(), filter()), creating a new RDD/DataFrame without executing it.
- Lazy: Only executed when an action is called.

Actions
- Definition: Operations that trigger computation and return results (e.g., collect(), count()).
- Execution: Forces PySpark to process transformations.

### 6. How is the join() operation optimized in PySpark? What are the different types of joins PySpark supports?

PySpark optimizes joins by:
- Broadcast Join: Uses small dataset replication across nodes for fast lookup.
- Shuffle Join: Reorganizes data across partitions efficiently.

Types of Joins in PySpark:
- Inner Join
- Left Outer Join
- Right Outer Join
- Full Outer Join
- Cross Join
- Semi Join
- Anti Join


### 7. What are the key differences between PySpark DataFrames and SQL tables? Can you use SQL queries on PySpark DataFrames?

Execution:
- PySpark DataFrames: Processed in a distributed manner using Spark’s engine.
- SQL Tables: Typically stored in a database and processed using the database engine.

Storage:
- PySpark DataFrames: Can be created from various sources like HDFS, CSV, or Parquet.
- SQL Tables: Stored in relational databases like MySQL, PostgreSQL, etc.

API:
- PySpark DataFrames: Can be manipulated using PySpark’s Python API.
- SQL Tables: Accessed through SQL queries.

You can use SQL queries on PySpark DataFrames by registering them as temporary views and querying them with spark.sql().

### 8. Explain the role of SparkSession in PySpark. Why is it needed?

Role of SparkSession in PySpark:
- Entry Point: It is the main entry point for all Spark functionality.
- Initialization: It initializes the Spark application and provides access to Spark’s APIs for reading data, creating DataFrames, and performing SQL operations.

Why is it needed?
- Simplifies API Access: Combines multiple contexts (SparkContext, SQLContext, etc.) into one unified interface.
- Manages Resources: Handles the configuration of Spark applications and resource management.

In short, SparkSession is essential for interacting with Spark in PySpark.

### 9. What is the difference between SparkContext and SparkSession? When to use each of them?

SparkContext:
- Low-level entry point for Spark functionality, primarily used for working with RDDs and managing the cluster.
- Essential for handling distributed data processing.

SparkSession:
- Higher-level entry point, introduced in Spark 2.0, that includes SparkContext and provides additional functionality for DataFrames, SQL, and other Spark features.
- Simplifies accessing Spark's various APIs.