# 1. Databricks and Apache Spark

- **Row-Oriented data on disk (CSV):** data is stored in row wise basis (each record, ordered by column)
- **Column-Oriented data on disk (Parquet, Delta, ORC):** data is stored in column wise basis (all data in column 1, all data in column 2... ordered by row)
  - When we want to query column 1, 2, 3 then it only fetch those 3 column, not all like Row-Orientied
  - It keeps statistics, metadata, etc. has compression built-in

In [0]:
dbultils.widgets.text("Text", "Hello World!")
dbultils.widgets.dropdown("Dropdown", "1", [str (x) for x in rang(1,10)])
dbultils.widgets.combobox("Combobox", "A", ["A", "B", "C"])
dbultils.widgets.multiselect("Multiselect", "Yes", ["Yes", "No", "Maybe"])

In [0]:
%sql
CREATE WIDGET TEXT state DEFAULT "CA"


# 2. Apache Spark Core

## SparkSQL
- It is a **module** for Structured data processing with multiple interfaces.
- It includes any object that has a Schema or Structure, including SQL tables, DataFrames API for Python, Scala, Java and R

## Transformations
- DataFrame Transformations are **lazily** eveluated (Job won't start until having **Action**)
  - Schema eagerly evaludated by Driver, but Job not spawned
  - Benefit of "Lazy Evaluation": Spark can make Optimization decisions after it look at the DAG (Directed Acyclic Graph)
- Actions: are methods that trigger
  - Job is spawned
  - Examples: df.count(), df.collect(), df.show(), display(df)

## DataFrameReader
- Interface used to load a DataFrame from external storage
  - ```spark.read.csv("/Filestore/tables/LifeExp_headers.csv")```
- Explicit vs Implicit vs Infer Schema
  1. **Explicitly** define Schema _**without reading**_ data files
      ```
      DDL_schema = ("coutry STRING, lifeexp DOUBLE, region STRING)
      userDF = spark.read.option("header", True).schema(DDL_schema).csv("/Filestore/tables/LifeExp_headers.csv")
      ```
  2. **Implicitly** create default Column names and Data types _**without reading**_ data files
      ```
      df1 = spark.read.load("/Filestore/tables/LifeExp_headers.csv", format = "csv", header = False)
      display(df1)
      ```
  3. **Infer** column names and data types _**by reading**_ data files
      ```
      df2 = spark.read.load("/Filestore/tables/LifeExp_headers.csv", format = "csv", header = True, inferSchema = True)
      display(df2)
      ```

## DataFrameWriter
- Write DataFrame to external storage
    ```
    df.write
      .format("delta)
      .mode("append")
      .save(outPath)
    ```
- Write as SQL table
    ```
    df.write
      .mode("overwrite")
      .saveAsTable("evants_p")
    ```

## Query Execution
We can express the same query using any interface. The Spark SQL engine generates the same query plan used to optimize and execute on our Spark cluster.

![query execution engine](https://files.training.databricks.com/images/aspwd/spark_sql_query_execution_engine.png)

<img src="https://files.training.databricks.com/images/icon_note_32.png" alt="Note"> Resilient Distributed Datasets (RDDs) are the low-level representation of datasets processed by a Spark cluster. In early versions of Spark, you had to write <a href="https://spark.apache.org/docs/latest/rdd-programming-guide.html" target="_blank">code manipulating RDDs directly</a>. In modern versions of Spark you should instead use the higher-level DataFrame APIs, which Spark automatically compiles into low-level RDD operations.

# 4. Spark Architect and Performance

### Clusters
- Example with a Cluster has 1 Driver, 6 Workers. Each Worker has 1 Executor and 2 Cores.
  - Driver: brain of cluster which allocate tasks and data to worker nodes
  - Worker: receives tasks and data, performs and return result to Deiver

## Transformation

### Narrow vs Wide Transformation
- Narrow Transformation: 1-1 Partition
  - select, filter, cast, union
  - Start with 1 memory partition, do transformations and data stay within the **same** memory partition
- Wide Transformation: Causes Shuffle/Exchange
  - distinct, groupBy, sort, join
  - Redistribute data and then create **new** memory partitions
  - Redistibuting or re-partitioning data so the data is grouped differently across partitions
    - Based on data size we may need to decrease/increase the number of Shuffle partitions via ```spark.sql.shuffle.patitions```

![Narrow vs Wide Transformation](./images/Narrow_And_Wide_Transformation.png)

#### Process of Narrow Transformation
Example: Filter color != "brown". We have 12-16 memory partitions of data
- Step 1: Driver puts data files into 12-16 equal sized parition
- Step 2: Driver allocates 12 partitions to 12 Cores, each Core gets 1 partition --> Opps we still have 4 partitions left
- Step 3: 4 Cores finish early and return result to Driver. The other Cores are still processing
- Step 4: Driver allocates 4 partitions for another iteration to the 4 Cores. The other Cores finish
- Step 5: 4 Cores finish the 2nd iteration and return resul to Driver
- Step 6: Driver collects the result and delivers to the client

![Process of Narrow Transformation](./images/Narrow_Transformation_Example.png)

#### Process of Wide Transformation
Example: Count total rows for each color. We have **19.5 MiB** data size, with 6 initial partitions
- Stage 1: Local cCount
  - Step 1: Driver allocates 6 partitions to 6 Cores, each Core gets 1 partition
  - Step 2: 6 Cores finish early and **write** the result into Disk in dictionary key:value, so the file is only **568B**. For example:
    - Core 2: Red:3, Blue:5, Yellow:7
    - Core 3: Red:4, Blue:4, Yellow:8
    - ...
    - Core 12: Red:7, Blue:5, Yellow:5
- Stage 2: Global Count
  - Step 1: Driver allocates Core 7 to read the counts and do the "Global Count", then send the result
  - Step 2: Core 7 **read** and sum the Local Counts and return the result to Driver
  - Step 3: Driver collects the result and delivers to the client

![Process of Wide Transformation](./images/Wide_Transformation_Example.png)

## Performance and Query Optimization
There are 5 plans:

**Input: Query, no matter programming language -->**
1. Unresolved Logical Plan
    - Drive reviews and confirms the **```schema correct?```**

2. Analyzed Logical Plan
    - Drive looks at **[Metadata Catalog] for ANALYSIS** and **```add data types```** to the columns

3. Optimized Logical Plan
    - Driver looks at **[Catalyst Catalog] for LOGICAL OPTIMIZATION** and check to apply number of rules-based (Filter, Join, etc.) to determine whether to **```move Filter before Join?```**

4. Physical Plan
    - Driver comes up with several ways (plans) to address the query **for PHYSICAL PLANNING** to determine **```which Join strategy to use, Data Skipping, Predicate Pushdown?```**

5. Selected Physical Plan
    - Driver puts ways in **[Cost Model] for WHOLE-STAGE CODE GENERATION** to see which way (plan) needs lowest cost = the best
    - The plan would be **```Java bytecode and sent to Executors```**

**--> Output: RDD, no matter dataframe, sql table or view**

![Query Optimization](./images/Query_Optimization.png)

### Explain
![Explain](./images/Explain.png)

For each step, read from bottom to top

**Step 1: Parsed Logical Plan**
- It is shown as the script is wrote: Inner Join then Filter
- Check the schema, e.g. lastname, firstname. dept... (no data types)

**Step 2: Analyzed Logical Plan**
- Add data type to each column of schema in step 1
- Add CAST to ```dept``` to double as it is string

It does not display all possible plans, but a optimized plan

**Step 3: Optimized Logical Plan**
- Move the Filter before Join, to get less rows to join

**Step 4: Physical PLan**
- Pick the best plan: Filter before Join and choose "Sort Merge Join" instead of "Inner Join"


### Cost Based Optimization (CBO): Smaller Tables Join
![Cost Based Optimization (CBO)](./images/CBO.png)

- Enable configurations: CBO, join reorder
- In script, (2) we join **```large```** table to **```medium```** table, then (1) we join **```small```** table
- In CBO step, it chooses to:
  - (1) get **```small```** table join with **```medium```** table then join with **```large```** table last
  - Afterward, (2) it does shuffle/exchange




### Adaptive Query Execution (AQE)
![Adaptive Query Execution (AQE)](./images/AQE.png)
- After RDD created, Spark will look at Statistics and shuffle count to see how big they are. Then, it will turn back to Analyzed Logical Plan to see whether it can fine-tune the number of shuffle.


![With And Without AQE](./images/With_Without_AQE.png)
- **Without AQE:** we (1) start with 4 memory partitions and (2) end up with 200 memory partitions. **_But why does it need to shuffle 200 partitions for only 568B?_**, which mean 2B foreach partition --> too small
- **With AQE:** we (1) start with 4 memory partitions and (2) end up with 1 partition, and much faster, even without parallism


![Number of Jobs of AQE](./images/AQE_Number_Of_Jobs.png)
- Number of jobs **increase** from 1 to 3 (job #3, #4, #5)
  - Job #3 filters at WHERE clause to reduce size.
  - Job #4 with AQE, needs 1 Shuffle
- Hence, it **has cost overhead** with it, but it has benefit in long run

### Predicate Pushdown on RDDs
- Predicate Pushdown is when the data source actively **limits the number** of rows returned to Spark reader vi SELECT/WHERE/FILTER
- Predicate Pushdown filters the data in the database query,
  - Reducing the number of entries retrieved from the source database and 
  - Improving query performance
  - By default, the Spark Dataset API will automatically push down valid WHERE clauses to the database
- **Cast** function cannot be Pushed down

![Predicate Pushdown](./images/Predicate_Pushdown.png)

### Caching - Best Practices
- Don't cach unless you're sure a DataFrame will be **used multiple times**
  - e.g. EDA (Exploratory Data Analysis), ML traning dataset
- Omit unneeded columns to reduce the storage footprint
- After calling `.cache()` which is **lazy transformation**, ensure all partitions are accessed with **Action**
  - e.g. `count()` - put RDD into Cache
- Manually evict cache when not needed
  - e.g. `unpersist()` - remove RDD from Cache

## Memory Partitioning

### Guidelines
- Error on the side of **too many small** than to few large Memory Partitions. If so large memory, then the core does not have enough memory, leading 2 possible consequences:
  - Spill to disk, waiting for more RAM, then bring it back
  - OOM: out of memory error
- Sweet spot initial size: **128MB and 1GB**
- Calculate the size of Shuffle partitions by dividing Shuffle stage input (4TB) by the target partition size (200MB).
  - e.g. 4TB / 200MB = 20,000 Shuffle Partition count
  - By default, it is 200 `spark.conf.get("spark.sql.shuffle.partitions")`
- Can manually set number of Shuffle Partitions on case-by-case basis
  - `spark.conf.set("spark.sql.shuffle.partitions", "20000")`
  - This setting is Local for **1 session** only.

![200_Default_Partitions](./images/200_Default_Partitions.png)
- Example: 
  - It starts with 8 partitions, then spawns 200 shuffle partitions
  - But there are only 42KB (~no thing) to write
  - Even some of tasks which means memory partitions reside has 0 bytes (blank)
  - It looks like AEQ turned off --> turn it on

### Cores in Cluster
- Initially, the Driver determines the number of Memory Partitions and its size. It decides based on:
  - Number of Cores in Cluster. 
  - More Cores, more Patitions.
- Get to number of Cores by 2 ways:
  - `sc.defaultParallelism` or `spark.sparkContext.defaultParallelism`
  - Spark UI -> Cores
  
  ![Spark UI](./images/Spark_UI_Cores.png)


### No. of Memory Partition for DataFrame
- If Memory Partitions are sized too large (> 1GB), we can manually change in No. of Partitions (to higher number) to get them into a more reasonable size rang (**128MB** to **1GB**)
- AQE can resolve some Partitions issues
  - e.g. for small dataset, AQE won't create default 200 Shuffle Partitions, but rather a far lower number
- We need to **convert DataFrame into RDD** to get number of Partitions used for the DataFrame
  - `df.rdd.getNumPartitions()`

### Re-Partition a DataFrame
There are 2 ways:
1. **`coalesce(int)`:**
    - Returns new DF with exactly N partitions when N < current No. of Partitions
    - **Narrow** transformation
    - Pros:
        - Retain sort order
        - No shuffle
    - Cons:
        - Only decrease No. of Partitions
        - Unevenly balanced partition sizes

2. **`repartition(int, [col])`:**
    - Return new DF with exactly N partitions
    - **Wide** transformation
    - Pros:
        - Evenly balanced partition sizes
        - Both increase/decrease No. of Partition
    - Cons:
        - Not retain sort order
        - Require Shuffle

**Notes:**
  - More No. of Partition, less size
  - Less No. of Paritions, more size

# 5. Structured Streaming

## Streaming Query

### Sources:
- Kafka, Files, Event Hubs, Kinesis
- DataFrame
  - ```
    df = (spark.readStream
              .option("maxFilesPerTrigger", 1)
              .format("delta")
              .load(DA.paths.events)
            )
    df.isStreaming
    ```
- SQL Views & Tables
  - ```
    df.createOrReplaceTempView("v_event")
    spark.readStream.format("delta").table("v_event")
    ```

### Sinks
- **Where** to write data: Kafka, Files, Event Hubs/EventGrid, Foreach(Batch) for custom logic to store data
- **What** data to write (Output Modes)
  - APPEND: **add new** records only
  - UPDATE **update changed** records in place
    - Only rows updated since last trigger written
    - Different from **Complete** mode since **Update** mode outputs only changed rows since last trigger
    - If query does not contain aggregations, **Update** same as **Append** mode
  - COMPLETE: **rewrite** full output
  - Example:
  
    ![Output Modes](./images/Output_Modes.png)

### Trigger Types
- Default: Process each micro-batch as soon as previous one has been processed (or 500ms). *No coding required for this*. **Not recommended**
- Fixed interval: Micro-batch processing kicked off at the **user-specified interval** 
  - `.trigger(processingTime="1 second")` = every 1 second, bring new data
- One-time: Process **all** available data as **a single micro-batch** and then automatically stop the query 
  - `.trigger(once=True)` for manually trigger
- AvailableNow: Like Trigger One, available data processed before query stops, but in **multiple batches** instead of one 
  - `.trigger(availableNow=True)` **--> Recommended**
- ContinuousProcessing: Long-running tasks that **continuously read, process and write** data as soon events are available 
  - `.trigger(continuous="1 second")`

### End-to-end fault tolerance
Guaranteed in Structure Streaming by:
1. Checkpointing: Directed Acyclic Graph (DAG) of all DStream transformation stored in reliable storage (along with optional State)
    - `dbultils.fs.ls(checkpoitPath)`
2. Write-ahead logs: To commit offsets
    - Before it reads data into RAM, it writes data into disk system. Then it commits offsets. Hence, it knows where it left off in case there are interuption, then when the stream turn on, it switch back to where is left.
3. Idempoten sinks: Writes given row only once, even if sent multiple
4. Replayable data sources: Join allowed to poll data again

![Checkpointing](./images/Checkpointing.png)


### Example: Complete Stream Query
![Complete Streaming Query](./images/Complete_Streaming_Query.png)

**Step 1:** Read Stream (lazy)
  - `.option("maxFilesPerTrigger", 1)` - how much data to read, in this case 1 file at a time
    - It helps not overrun resource
    - We can also use `.option("maxBytesPerTrigger", 1000)`
  - All of them will be held in RAM

**Step 2:** Transformation

**Step 3:** Write Stream (lazy)
  - `emailTrafficDF.writeStream` write the DF in step 2 to disk
  - `querryName("email_traffic)` the name of this query, It is good to provide name for query.
  - `option("checkpointLocation", checkpointPath)` it is **mandatory**
  - `start(outputPath)` write into directory. `start` is **Action**

### Streaming Best Practices
1. Select trigger interval over nothing to unintended cost
2. Use ADLS Gen2 > Blob storage for Azure
3. Name the Streaming job so it's easily identifiable
4. Don't run multitple Stream on the same Driver. Multiplexing on same cluster is generally not recommended
5. Alter `maxFilesPerTrigger` or `maxBytesPerTrigger` to achieve Partition sized around **128MB - 200MB** (for best latency and throughput)
6. Can convert `SortMergeJoin` to `BroadcastHashJoin`. May need to increase auto-broadcast hash join threshold to larger size
7. If have Shuffle, consider setting Shuffle Partition number manually (since AQE is disabled in Streaming) to match number of Cores or 2x number of Cores
8. Turn off Stats collection on initial Stream to decrease latency
9. If possible, Auto-Optimize initial Stream to coalesce tiny files
10. Use compute-optimized workers and RocksDB state store

## Streaming Aggregates