# PySpark SQL agg() Function: Mastering Aggregation Operations

## Introduction to the `agg()` Function

The `agg()` function in PySpark allows you to perform multiple aggregation operations on grouped data. It is commonly used in combination with the `groupBy()` function to compute summary statistics (like sum, average, count) across different columns.


## Basic Syntax:

```
DataFrame.groupBy(*cols).agg(*expressions)
```

### Parameters

- **`cols`**: The columns to group by.
- **`expressions`**: The aggregation functions to apply, such as `sum()`, `avg()`, `count()`, etc.


## Why Use `agg()`?

- It allows you to apply multiple aggregations in a single statement.
- You can customize your aggregate operations with aliases and apply different aggregate functions on different columns.
- It simplifies writing complex analytical queries by chaining multiple aggregation expressions in one go.


## Practical Examples 

### 1. Basic Aggregation with a Single Column

**Scenario**: You want to calculate the total sales from a sales DataFrame.

**Code Example**:

In [0]:
from pyspark.sql.functions import sum

df = spark.createDataFrame([
    ("ItemA", 10),
    ("ItemB", 20),
    ("ItemA", 30),
    ("ItemB", 40),
    ("ItemC", 50)
], ["ITEM", "SALES"])

# Aggregate total SALES using sum()
df.groupBy("ITEM").agg(sum("SALES").alias("TOTAL_SALES")).show()


+-----+-----------+
| ITEM|TOTAL_SALES|
+-----+-----------+
|ItemA|         40|
|ItemB|         60|
|ItemC|         50|
+-----+-----------+



### 2. Performing Multiple Aggregations

**Scenario**: You want to calculate both the total and the average sales per item using `sum()` and `avg()`.

**Code Example**:

In [0]:
from pyspark.sql.functions import sum, avg

# Apply multiple aggregations: sum and avg
df.groupBy("ITEM").agg(
    sum("SALES").alias("TOTAL_SALES"),
    avg("SALES").alias("AVERAGE_SALES")
).show()


+-----+-----------+-------------+
| ITEM|TOTAL_SALES|AVERAGE_SALES|
+-----+-----------+-------------+
|ItemA|         40|         20.0|
|ItemB|         60|         30.0|
|ItemC|         50|         50.0|
+-----+-----------+-------------+



### 3. Aggregating Multiple Columns

**Scenario**: You have a DataFrame with both sales and profit data, and you want to calculate the total sales and total profit per item.

**Code Example**:

In [0]:
df_multi = spark.createDataFrame([
    ("ItemA", 10, 2),
    ("ItemB", 20, 4),
    ("ItemA", 30, 6),
    ("ItemB", 40, 8),
    ("ItemC", 50, 10)
], ["ITEM", "SALES", "PROFIT"])

# Aggregate multiple columns: total SALES and total PROFIT
df_multi.groupBy("ITEM").agg(
    sum("SALES").alias("TOTAL_SALES"),
    sum("PROFIT").alias("TOTAL_PROFIT")
).show()


+-----+-----------+------------+
| ITEM|TOTAL_SALES|TOTAL_PROFIT|
+-----+-----------+------------+
|ItemA|         40|           8|
|ItemB|         60|          12|
|ItemC|         50|          10|
+-----+-----------+------------+



### 4. Using Built-in Functions in `agg()`

**Scenario**: You want to group by `ITEM` and calculate the maximum and minimum sales for each item.

**Code Example**:

In [0]:
from pyspark.sql.functions import max, min

# Apply max() and min() aggregations
df.groupBy("ITEM").agg(
    max("SALES").alias("MAX_SALES"),
    min("SALES").alias("MIN_SALES")
).show()


+-----+---------+---------+
| ITEM|MAX_SALES|MIN_SALES|
+-----+---------+---------+
|ItemA|       30|       10|
|ItemB|       40|       20|
|ItemC|       50|       50|
+-----+---------+---------+



### 5. Aggregating Without Grouping (Global Aggregation)

**Scenario**: You want to calculate the total sales across the entire DataFrame, without grouping by any specific column.

**Code Example**:

In [0]:
# Global aggregation: calculate total SALES without grouping
df.agg(sum("SALES").alias("TOTAL_GLOBAL_SALES")).show()


+------------------+
|TOTAL_GLOBAL_SALES|
+------------------+
|               150|
+------------------+



### 6. Custom Aggregation Expressions

**Scenario**: You want to use custom expressions to calculate more complex metrics, such as calculating the percentage of total sales for each item.

**Code Example**:

In [0]:
from pyspark.sql.functions import col

# Calculate percentage of total sales
total_sales = df.agg(sum("SALES").alias("TOTAL")).collect()[0]["TOTAL"]

df.groupBy("ITEM").agg(
    (sum("SALES") / total_sales * 100).alias("SALES_PERCENTAGE")
).show()


+-----+------------------+
| ITEM|  SALES_PERCENTAGE|
+-----+------------------+
|ItemA|26.666666666666668|
|ItemB|              40.0|
|ItemC| 33.33333333333333|
+-----+------------------+

