# PySpark Exercise: Hands-on and Interview based

## Hands-on:

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, desc, explode, lit
from functools import reduce

In [2]:
spark = SparkSession.builder.appName("Hands-on").getOrCreate()

your 131072x1 screen size is bogus. expect trouble
24/12/04 14:09:23 WARN Utils: Your hostname, DESKTOP-7M8TID9 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/12/04 14:09:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/04 14:09:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


1. Creating DataFrames
Create a DataFrame from the following list of dictionaries:

```
data = [{"id": 1, "name": "Alice", "age": 23},
{"id": 2, "name": "Bob", "age": 27},
{"id": 3, "name": "Cathy", "age": 22}]
```

In [3]:
data = [
    {
        "id": 1,
        "name": "Alice",
        "age": 23,
    },
    {
        "id": 2,
        "name": "Bob",
        "age": 27,
    },
    {
        "id": 3,
        "name": "Cathy",
        "age": 22
    },
]

In [4]:
df = spark.createDataFrame(data)

In [5]:
df.show()

                                                                                

+---+---+-----+
|age| id| name|
+---+---+-----+
| 23|  1|Alice|
| 27|  2|  Bob|
| 22|  3|Cathy|
+---+---+-----+



* Display the schema of the DataFrame.

In [6]:
print(df.schema)

StructType([StructField('age', LongType(), True), StructField('id', LongType(), True), StructField('name', StringType(), True)])


2. Selecting Data
    * Select the name and age columns from the DataFrame.
    * Add 5 years to the age column and create a new column named age_plus_5.

In [7]:
df.select("name", "age").show()

+-----+---+
| name|age|
+-----+---+
|Alice| 23|
|  Bob| 27|
|Cathy| 22|
+-----+---+



In [8]:
df = df.withColumn("age_plus_5", col("age") + 5)

In [9]:
df.show()

+---+---+-----+----------+
|age| id| name|age_plus_5|
+---+---+-----+----------+
| 23|  1|Alice|        28|
| 27|  2|  Bob|        32|
| 22|  3|Cathy|        27|
+---+---+-----+----------+



3. Renaming Columns
    * Rename the id column to user_id.

In [10]:
df = df.withColumnRenamed("id", "user_id")

In [11]:
df.show()

+---+-------+-----+----------+
|age|user_id| name|age_plus_5|
+---+-------+-----+----------+
| 23|      1|Alice|        28|
| 27|      2|  Bob|        32|
| 22|      3|Cathy|        27|
+---+-------+-----+----------+



4. Dropping Columns
    * Drop the user_id column from the DataFrame

In [12]:
df = df.drop("user_id")

In [13]:
df.show()

+---+-----+----------+
|age| name|age_plus_5|
+---+-----+----------+
| 23|Alice|        28|
| 27|  Bob|        32|
| 22|Cathy|        27|
+---+-----+----------+



5. Distinct Values
    * Find all distinct values in a column of your choice from the DataFrame.

In [14]:
df.select("age").distinct().show()

+---+
|age|
+---+
| 23|
| 27|
| 22|
+---+



6. Basic Column Operations
    * Create a new column that concatenates the name column with the string "_student".

In [15]:
df = df.withColumn("name_student", concat(col("name"), lit("_student")))

In [16]:
df.show()

+---+-----+----------+-------------+
|age| name|age_plus_5| name_student|
+---+-----+----------+-------------+
| 23|Alice|        28|Alice_student|
| 27|  Bob|        32|  Bob_student|
| 22|Cathy|        27|Cathy_student|
+---+-----+----------+-------------+



## Data Manipulation:

7. Filtering Rows
    * Filter rows where age is greater than 25.
    * Filter rows where the name starts with the letter "A".

In [17]:
df.filter(col("age") > 25).show()

+---+----+----------+------------+
|age|name|age_plus_5|name_student|
+---+----+----------+------------+
| 27| Bob|        32| Bob_student|
+---+----+----------+------------+



In [18]:
df.filter(col("name").startswith("A")).show()

+---+-----+----------+-------------+
|age| name|age_plus_5| name_student|
+---+-----+----------+-------------+
| 23|Alice|        28|Alice_student|
+---+-----+----------+-------------+



8. Aggregations
    * Count the total number of rows.
    * Group by age and count the number of records for each age.

In [19]:
df.count()

3

In [20]:
df.groupBy("age").count().show()

+---+-----+
|age|count|
+---+-----+
| 23|    1|
| 27|    1|
| 22|    1|
+---+-----+



9. Sorting
    * Sort the DataFrame by name in ascending order.
    * Sort the DataFrame by age in descending order.

In [21]:
df.orderBy("name").show()

+---+-----+----------+-------------+
|age| name|age_plus_5| name_student|
+---+-----+----------+-------------+
| 23|Alice|        28|Alice_student|
| 27|  Bob|        32|  Bob_student|
| 22|Cathy|        27|Cathy_student|
+---+-----+----------+-------------+



In [22]:
df.orderBy(desc("age")).show()

+---+-----+----------+-------------+
|age| name|age_plus_5| name_student|
+---+-----+----------+-------------+
| 27|  Bob|        32|  Bob_student|
| 23|Alice|        28|Alice_student|
| 22|Cathy|        27|Cathy_student|
+---+-----+----------+-------------+



10. Working with Null Values
    * Identify rows with null values in any column.
    * Replace null values in the age column with the average age.

In [23]:
df.filter(reduce(lambda a, b: a | b, [col(c).isNull() for c in df.columns])).show()

+---+----+----------+------------+
|age|name|age_plus_5|name_student|
+---+----+----------+------------+
+---+----+----------+------------+



In [24]:
average_age = df.select("age").na.drop().agg({"age": "avg"}).first()[0]

In [25]:
df = df.fillna({"age": average_age})

In [26]:
df.show()

+---+-----+----------+-------------+
|age| name|age_plus_5| name_student|
+---+-----+----------+-------------+
| 23|Alice|        28|Alice_student|
| 27|  Bob|        32|  Bob_student|
| 22|Cathy|        27|Cathy_student|
+---+-----+----------+-------------+



11. Joining DataFrames
Given two DataFrames:

`df1 = [(1, "Alice"), (2, "Bob"), (3, "Cathy")]`

`df2 = [(1, "A"), (2, "B"), (4, "D")]`

Perform:
- Inner join
- Left join
- Full outer join
- Left semi join
- Left anti join

In [27]:
df1 = spark.createDataFrame([(1, "Alice"), (2, "Bob"), (3, "Cathy")], ["id", "name"])

In [28]:
df2 = spark.createDataFrame([(1, "A"), (2, "B"), (4, "D")], ["id", "letter"])

In [29]:
df1.join(df2, on="id", how="inner").show()

+---+-----+------+
| id| name|letter|
+---+-----+------+
|  1|Alice|     A|
|  2|  Bob|     B|
+---+-----+------+



In [30]:
df1.join(df2, on="id", how="left").show()

+---+-----+------+
| id| name|letter|
+---+-----+------+
|  1|Alice|     A|
|  2|  Bob|     B|
|  3|Cathy|  NULL|
+---+-----+------+



In [31]:
df1.join(df2, on="id", how="outer").show()

+---+-----+------+
| id| name|letter|
+---+-----+------+
|  1|Alice|     A|
|  2|  Bob|     B|
|  3|Cathy|  NULL|
|  4| NULL|     D|
+---+-----+------+



In [32]:
df1.join(df2, on="id", how="left_semi").show()

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+



In [33]:
df1.join(df2, on="id", how="left_anti").show()

+---+-----+
| id| name|
+---+-----+
|  3|Cathy|
+---+-----+



12. Pivot Tables
    * Use pivoting to calculate the average age grouped by gender and occupation.

In [34]:
data = [
    {"id": 1, "name": "Alice", "age": 23, "gender": "F", "occupation": "Engineer"},
    {"id": 2, "name": "Bob", "age": 27, "gender": "M", "occupation": "Doctor"},
    {"id": 3, "name": "Cathy", "age": 22, "gender": "F", "occupation": "Artist"},
]

In [35]:
df = spark.createDataFrame(data)

In [36]:
df.groupBy("gender").pivot("occupation").agg({"age": "avg"}).show()

+------+------+------+--------+
|gender|Artist|Doctor|Engineer|
+------+------+------+--------+
|     F|  22.0|  NULL|    23.0|
|     M|  NULL|  27.0|    NULL|
+------+------+------+--------+



13. Exploding Columns
    * Given a column with lists, explode the list into individual rows:

```
data = [{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 50]}]
```

In [37]:
data = [{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 50]}]

In [38]:
df = spark.createDataFrame(data)

In [39]:
df.select("id", explode(col("values")).alias("value")).show()

+---+-----+
| id|value|
+---+-----+
|  1|   10|
|  1|   20|
|  1|   30|
|  2|   40|
|  2|   50|
+---+-----+



14. Union DataFrames
    * Combine two DataFrames vertically using union. Ensure both DataFrames have the same schema.

In [40]:
df1 = spark.createDataFrame([(1, "Alice", 23), (2, "Bob", 27)], ["id", "name", "age"])
df2 = spark.createDataFrame([(3, "Cathy", 22), (4, "David", 30)], ["id", "name", "age"])

In [41]:
df1.union(df2).show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 23|
|  2|  Bob| 27|
|  3|Cathy| 22|
|  4|David| 30|
+---+-----+---+



## Interview based:

1. What is PySpark? How does it differ from Pandas?
    * Discuss the differences in terms of scalability, distributed computing, and use cases.

PySpark is the Python API for Apache Spark, designed for big data processing and distributed computing, allowing users to handle large datasets across a cluster of machines. In contrast, Pandas is an in-memory data manipulation library suitable for smaller datasets, operating on a single machine. While PySpark excels in scalability and parallel processing for big data applications, Pandas is ideal for data analysis and exploration tasks on manageable datasets.

2. What are the advantages of using PySpark over traditional Python frameworks?
    * Mention aspects like fault tolerance, in-memory computation, and support for big data.

PySpark offers significant advantages over traditional Python frameworks, particularly for big data processing. It provides fault tolerance through Resilient Distributed Datasets (RDDs), enabling automatic recovery from failures. Its in-memory computation speeds up data processing, while its ability to handle large datasets across distributed systems enhances scalability. Additionally, PySpark integrates well with the Hadoop ecosystem and includes a user-friendly DataFrame API and MLlib for machine learning, making it a robust choice for data analysis and processing.

3. Explain the concept of Resilient Distributed Dataset (RDD). How does it differ from DataFrames?

Resilient Distributed Dataset (RDD) is a fundamental data structure in Apache Spark that represents an immutable, fault-tolerant collection of objects distributed across a cluster. RDDs allow for parallel processing and lazy evaluation but require more manual optimization and lack a schema. In contrast, DataFrames provide a higher-level abstraction with a defined schema, enabling more efficient execution through Spark's Catalyst optimizer and a more user-friendly API for structured data manipulation. Overall, DataFrames are generally preferred for most data processing tasks due to their performance and ease of use.

4. What is lazy evaluation in PySpark? Why is it useful?

Lazy evaluation in PySpark is a strategy that delays the execution of data transformations until an action is called. This approach allows for optimization of the entire computation plan, efficient resource management, and improved fault tolerance. By building a logical plan of transformations, PySpark can reduce unnecessary computations and enhance the performance of data processing pipelines.

5. Explain the difference between transformations and actions in PySpark.
    * Provide examples of each.

Transformations are operations that create a new RDD (Resilient Distributed Dataset) from an existing one. They are lazy, meaning they do not compute their results immediately. Instead, they build up a logical plan of transformations to be executed when an action is called. Common transformations include map, filter, flatMap, groupByKey, and reduceByKey.

Example of a Transformation:
```python
from pyspark import SparkContext

sc = SparkContext("local", "Transformation Example")

# Create an RDD
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Transformation: Map to square each element
squared_rdd = rdd.map(lambda x: x ** 2)

# Note: The computation is not executed yet.
```

Actions are operations that trigger the execution of the transformations and return a value to the driver program or write data to an external storage system. Actions include operations like collect, count, first, take, and saveAsTextFile.
Example of an Action:
```python
# Action: Collect the results to the driver
result = squared_rdd.collect()

# This will execute the transformations and return the squared values
print(result)  # Output: [1, 4, 9, 16, 25]
```

6. How is the join() operation optimized in PySpark? What are the different types of joins PySpark supports?

In PySpark, the join() operation is optimized through techniques like broadcast joins, partitioning, sort-merge joins, skew handling, and the Catalyst optimizer. It supports several types of joins, including inner, outer, left, right, cross, semi, and anti joins, allowing for flexible data manipulation. These optimizations and join types enhance performance and enable efficient handling of large datasets in distributed environments.

7. What are the key differences between PySpark DataFrames and SQL tables? Can you use SQL queries on PySpark DataFrames?

PySpark DataFrames are distributed collections of data organized into named columns, optimized for performance in a distributed computing environment, while SQL tables are structured data stored in relational databases. You can manipulate PySpark DataFrames using Python APIs, whereas SQL tables are accessed through SQL queries. Additionally, PySpark allows you to run SQL queries on DataFrames by registering them as temporary views, enabling a seamless integration of SQL with PySpark's capabilities.

8. Explain the role of SparkSession in PySpark. Why is it needed?

SparkSession in PySpark is the main entry point for working with Spark, providing a unified interface for accessing Spark's features, such as DataFrames and SQL queries. It simplifies configuration management and automatically creates a SparkContext, which is essential for connecting to a Spark cluster. Overall, SparkSession streamlines the development process by consolidating various functionalities into a single object.

9. What is the difference between SparkContext and SparkSession? When to use each of them?

SparkContext is the entry point for low-level Spark operations, primarily used for working with RDDs. In contrast, SparkSession is a higher-level entry point that encapsulates SparkContext and is designed for working with structured data using DataFrames and Datasets, as well as for executing SQL queries. It is recommended to use SparkSession for most applications due to its enhanced functionality and ease of use.