# Introduction to Spark
Some things about spark

## Creating a Session
To create a session
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("BootcampDay1").getOrCreate()

print(spark.version)
```
## Reading a `CSV` & DataFrames
Some of the most common methods you are going to use to a dataframe includes:
- `.select()`
- `.filter()`
- `.groupBy()`
- `.agg()`
  
To read a csv, you simply
```python
df = spark.read.csv('x.csv', header=True, inferSchema=True)
df.show()
```
- To inspect the schema: `df.printSchema()`
- To count the number of rows: `row_count = df.count()`

### Aggregating Tables
You can use `groupBy()` to aggregate the table:
```python
df.groupBy('gender').agg({'salary': 'avg'}).show()
```

### Filtering a Table
```python
filtered_df = df.filter(df['age'] > 50).select('age', 'occupation')
```

## More DataFrames

To read csv: `spark.read.csv(csv)`

To read json: `spark.read.json('json')`

To read parquet: `spark.read.parquet('parquet')`

### Manually Configuring a Schema
- We must specify the data type of the columns
```python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType

# construct the schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("scores", ArrayTyoe(IntegerType()), True),
])

# set the schema
df = spark.createDataFrame(data, schema=schema)
```

### Filtering and Selection
- Use `.select()` to choose specific columns
- Use `.filter()` or `.where()` to filter rows based on conditions
- Use `.sort()` or orderBy() to order by a collection of columns

```python
df.select("name", "age").show()

df.filter(df['age'] > 30).show()
```

### Dropping Missing Values
`df.na.drop().show()`

# Diving More to DataFrames & Processing

### 1\. Solving Null Values

#### *Dropping Null Values*

You can remove rows that contain null values using the `.drop()` method.

| Method | Description | Example |
| :--- | :--- | :--- |
| **`.drop()`** | Drops rows where *any* column is null. | `df.na.drop()` |
| **`.drop('any')`** | Alias for the default behavior. | `df.na.drop('any')` |
| **`.drop('all')`** | Drops rows only if *all* columns are null. | `df.na.drop('all')` |
| **`.drop(subset=['col1', 'col2'])`** | Drops rows where nulls appear in *specified* columns. | `df.na.drop(subset=['Age', 'Income'])` |

-----

#### *Filling Null Values*

You can replace null values using the `.fill()` method. This is often preferred over dropping data, especially with numerical columns.

| Method | Description | Example |
| :--- | :--- | :--- |
| **`.fill(value)`** | Fills nulls in all columns of the same type as `value`. | `df.na.fill(0)` |
| **`.fill(value, subset=[...])`** | Fills nulls in specified columns with `value`. | `df.na.fill('Unknown', ['Name'])` |
| **Advanced Filling** | Use `mean`, `median`, or `mode` to fill nulls, often calculated using `avg()` and then passed to `.fill()`. | `mean_age = df.agg({'Age': 'avg'}).collect()[0][0]` <br> `df.na.fill(mean_age, ['Age'])` |

### 2\. Column Operations: `withColumn`, `withColumnRenamed`, and `drop`

These operations are used to transform, add, rename, and remove columns from a DataFrame.

-----

#### *Adding or Transforming Columns with `withColumn`*

The `.withColumn()` method is used to add a new column or replace an existing one. It requires a column name and a column expression (often created using the **`F.col`** or **`F.lit`** functions from `pyspark.sql.functions`).

```python
from pyspark.sql import functions as F

# Add a new column 'Salary_USD' by multiplying 'Salary' by a literal value
df = df.withColumn('Salary_USD', F.col('Salary') * 0.75)

# Create a new column 'Is_Senior' based on a condition
df = df.withColumn('Is_Senior', F.when(F.col('Age') > 45, 'Yes').otherwise('No'))
```

#### *Renaming Columns with `withColumnRenamed`*

The `.withColumnRenamed()` method is straightforward for changing a column's name.

```python
# Rename the column 'ID' to 'Employee_ID'
df = df.withColumnRenamed('ID', 'Employee_ID')
```

#### *Removing Columns with `drop`*

The `.drop()` method removes one or more specified columns.

```python
# Remove a single column
df = df.drop('Temp_Column')

# Remove multiple columns
df = df.drop('col1', 'col2', 'col3')
```

### 3\. Row Operations: `filter` and `groupBy`

These operations are crucial for selecting subsets of data and summarizing information.

-----

#### *Selecting Rows with `filter` (or `where`)*

The `.filter()` method (or its alias `.where()`) selects rows based on a specified condition. The condition is typically written using column expressions.

```python
# Filter rows where 'Age' is greater than 30
df_adults = df.filter(F.col('Age') > 30)

# Filter rows where 'Department' is 'Sales' AND 'Salary' is greater than 50000
df_sales_high = df.filter((F.col('Department') == 'Sales') & (F.col('Salary') > 50000))
```

**Note:** Use the `&` (AND) and `|` (OR) operators for combining multiple conditions, and ensure conditions are enclosed in parentheses.

#### *Grouping and Aggregating with `groupBy`*

The `.groupBy()` method groups the DataFrame by one or more columns, typically followed by an aggregation function to calculate summary statistics for each group.

```python
# Calculate the average salary for each department
df_summary = df.groupBy('Department').agg(
    F.count('*').alias('Employee_Count'), # Counts the number of rows in each group
    F.avg('Salary').alias('Average_Salary'), # Calculates the mean salary
    F.max('Age').alias('Max_Age')
).orderBy('Average_Salary', ascending=False)
```

The aggregation function is applied using the `.agg()` method, which can handle multiple aggregations at once. Common aggregation functions include `F.count`, `F.sum`, `F.avg`, `F.min`, and `F.max`.

## Advance DataFrame Operations

### 1\. Joining DataFrames

Joining DataFrames combines them based on common key columns. PySpark supports joins using column equality expressions.

-----

#### Joining with the `on` Parameter (Recommended)

The `on` parameter is the standard and often clearer way to specify the join condition. It accepts either a **string** or a **list of strings** containing the names of the common column(s) in both DataFrames.

  * **Behavior:** When using `on`, PySpark automatically handles columns with the same name, resulting in only **one copy** of the join key column in the output.

<!-- end list -->

```python
# Assuming both df_A and df_B have a column named 'ID'
df_joined = df_A.join(df_B, on='ID', how='inner')

# Joining on multiple columns: 'ID' and 'Dept'
df_joined_multi = df_A.join(df_B, on=['ID', 'Dept'], how='left')
```

#### Joining with an Equality Condition (`==`)

You can define a join condition using a standard **Boolean expression** formed by comparing columns from the two DataFrames. This is necessary when the join key columns have **different names**.

  * **Behavior:** When using an explicit condition, if the joining columns have the same name, they will appear **twice** in the output (e.g., `df_A.ID` and `df_B.ID`). If they have different names, they will both be present.

<!-- end list -->

```python
from pyspark.sql import functions as F

# Joining df_A using 'A_Key' and df_B using 'B_Key'
df_joined_expr = df_A.join(
    df_B,
    df_A['A_Key'] == df_B['B_Key'],
    how='inner'
)
```

### 2\. Union (Combining DataFrames)

The `.union()` operation combines the rows of two or more DataFrames. For the union to succeed, the DataFrames must have the **same schema** (the same number of columns, and corresponding columns must have compatible data types).

-----

#### Simple Union with Identical Schema

If two DataFrames, `df1` and `df2`, have an identical column structure, you can simply use `.union()`:

```python
# df1 and df2 must have the exact same column names and types in the same order
df_combined = df1.union(df2)
```

#### Union by Name (`unionByName`)

If the DataFrames have the same columns but they are in a **different order**, use `.unionByName()`:

```python
# df1 has ['ColA', 'ColB'], df2 has ['ColB', 'ColA']
df_combined_by_name = df1.unionByName(df2)

# To handle cases where one DataFrame might have extra columns
# use allowMissingColumns=True (but be cautious, as missing columns will be filled with nulls)
df_combined_strict = df1.unionByName(df2, allowMissingColumns=False)
```

### 3\. Working with Complex Types: Arrays and Maps

PySpark supports complex nested data types like Arrays (lists of elements) and Maps (key-value pairs) which are useful for semi-structured data.

-----

#### Creating and Working with Arrays

An Array column holds a sequence of values of the same type.

**Example Setup (Using sample from prompt):**

```python
from pyspark.sql import functions as F
# Create an array column named 'scores'
df = df.withColumn("scores", F.array(F.lit(85), F.lit(90), F.lit(78)))
```

| Operation | Function | Description | Example |
| :--- | :--- | :--- | :--- |
| **Explode** | `F.explode()` | Converts each element of an array into a separate row. | `df.withColumn('score', F.explode('scores'))` |
| **Size** | `F.size()` | Returns the number of elements in the array. | `df.withColumn('count', F.size('scores'))` |
| **Element Access** | `F.col()[index]` | Accesses an element by its zero-based index. | `df.withColumn('first', F.col('scores')[0])` |

#### Creating and Working with Maps

A Map column holds key-value pairs, similar to a Python dictionary.

**Example Setup (Schema definition from prompt):**

```python
from pyspark.sql.types import MapType, StringType, StructField
# Define a schema with a MapType column
map_schema = StructField('properties', MapType(StringType(), StringType()), True)
# Create a map column (requires creating a map from key-value pairs)
df = df.withColumn("attributes", F.create_map(F.lit('color'), F.lit('red'), F.lit('size'), F.lit('large')))
```

| Operation | Function | Description | Example |
| :--- | :--- | :--- | :--- |
| **Key Access** | `F.col()['key']` | Accesses the value associated with a specific key. | `df.withColumn('color', F.col('attributes')['color'])` |
| **Keys/Values** | `F.map_keys()`, `F.map_values()` | Returns an array containing all keys or all values in the map. | `df.withColumn('keys', F.map_keys('attributes'))` |

### 4\. Working with Structs

A **StructType** (or **Struct**) is a way to group related columns into a single nested column, similar to a record or an object in other languages.

-----

#### Creating a Struct

You can create a Struct using the `F.struct()` function, which combines several columns into one.

```python
# Group 'firstName' and 'lastName' into a single 'Name' struct column
df_struct = df.withColumn('Name', F.struct('firstName', 'lastName'))

# The resulting schema: Name.firstName and Name.lastName
```

#### Accessing Elements in a Struct

Accessing data inside a Struct is done using **dot notation**.

```python
# Access the 'lastName' field inside the 'Name' struct
df_lastName = df_struct.withColumn('Last', F.col('Name.lastName'))

# You can also use select
df_select = df_struct.select(F.col('Name.*')) # Expands all fields in the struct
```

## UDF

### 1\. Standard PySpark UDF (User-Defined Function)

A **standard UDF** applies a Python function **row-by-row** to a PySpark DataFrame.

| Feature | Description |
| :--- | :--- |
| **Execution** | **Row-by-Row.** The function is called for every single row in the DataFrame. |
| **Data Transfer** | **High Overhead.** Data is individually **serialized** (converted from Spark's internal format to Python's) and **deserialized** (converted back) for every single row, causing significant communication overhead. |
| **Performance** | **Slow.** The serialization cost and single-row execution limit performance. It also hinders Spark's Catalyst Optimizer. |
| **Syntax** | Defined using `from pyspark.sql.functions import udf`. You **must** explicitly specify the **return type**. |
| **Best Used For** | Logic that is too complex or impossible to express using built-in Spark functions, especially on **small-to-medium datasets** where the overhead is negligible. |

#### Example

```python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# 1. Define the Python function
def custom_greet(name):
    if name is None:
        return None
    return "Hello, " + name.upper()

# 2. Convert to a PySpark UDF
greet_udf = udf(custom_greet, StringType())

# 3. Apply the UDF
df.withColumn("Greeting", greet_udf(df['Name'])).show()
```

-----

### 2\. Pandas UDF (Vectorized UDF)

A **Pandas UDF** applies a Python function **batch-by-batch** using Pandas Series or DataFrames. It is often referred to as a **Vectorized UDF**.

| Feature | Description |
| :--- | :--- |
| **Execution** | **Batch/Vectorized.** Spark splits the data into batches, and the function operates on the entire batch at once as a Pandas Series (column) or DataFrame. |
| **Data Transfer** | **Low Overhead.** Uses **Apache Arrow** for efficient, columnar data transfer between the Spark (JVM) and Python execution environments, significantly reducing serialization cost. |
| **Performance** | **Much Faster** than standard UDFs (potentially up to 100x). It leverages the **vectorized operations** and optimized C/C++ backend of Pandas and NumPy. |
| **Syntax** | Defined using the `@pandas_udf` decorator. Requires specifying the **input and output types** using Python type hints (e.g., `pd.Series`) and the return type in the decorator. |
| **Best Used For** | Complex logic that benefits from **Pandas/NumPy vectorized operations** and performing group-wise transformations on **large datasets**. |

#### Example (Series to Series - Scalar UDF)

```python
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

# 1. Define the Python function using type hints
@pandas_udf(DoubleType())
def standardize_score(scores: pd.Series) -> pd.Series:
    # Operations are applied to the entire Pandas Series at once (vectorized)
    return (scores - scores.mean()) / scores.std()

# 2. Apply the UDF
df.withColumn("Standardized_Score", standardize_score(df['Score'])).show()
```

-----

### Performance Hierarchy

For optimal performance in PySpark, you should prioritize your custom logic implementation in this order:

1.  **Spark Built-in Functions** (`pyspark.sql.functions`): **Fastest.** Use these first, as they are natively compiled and fully optimized by Spark.
2.  **Pandas UDF (Vectorized)**: **Much Faster** than standard UDFs. Use for complex logic that benefits from Pandas/NumPy.
3.  **Standard PySpark UDF**: **Slowest.** Use only as a last resort when the logic cannot be expressed using the other two methods.
