# Databricks Examples - Complete Beginner's Guide

This notebook teaches you key Databricks & PySpark concepts through runnable examples. Each section has explanations designed for beginners.

**What you'll learn:**
- DataFrames: distributed tables that scale
- Delta Lake: reliable, ACID-compliant storage
- Database connections, partitioning, and schema evolution
- Advanced features like MERGE and time-travel


In [None]:
# Create Spark session (Databricks already provides `spark`)
try:
    spark
except NameError:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("local-demo").getOrCreate()
print('Spark session ready')

In [None]:
# 1) DataFrame transformation example
df = spark.createDataFrame([(1, 'alice', 30), (2, 'bob', 17)], ['id','name','age'])
from pyspark.sql.functions import col, when
df2 = df.filter(col('age') > 18).withColumn('adult', when(col('age') >= 18, True).otherwise(False))
df2.show()

## 1. DataFrame Transformations

### What is a DataFrame?
A **DataFrame** is like an Excel spreadsheet or SQL table in Spark—it has rows and columns. You can filter rows, add/remove columns, and combine data.

### Why use it?
- Works across distributed machines (scales to petabytes of data)
- SQL-like operations are optimized automatically by Spark's engine
- Familiar syntax if you know SQL or pandas

### How it works:
1. **Create** a DataFrame from data (Python list, CSV, Parquet, etc.)
2. **Transform** it: filter rows, add columns, group, join
3. **Execute** an action to see results (e.g., `.show()`, `.count()`)

**Think of it like a recipe:** transformations are instructions; `.show()` executes the recipe.

In [None]:
# 2) Delta write/read example (requires Delta runtime)
path = '/tmp/delta_demo_table'
try:
    df2.write.format('delta').mode('overwrite').save(path)
    df_read = spark.read.format('delta').load(path)
    df_read.show()
except Exception as e:
    print('Delta write/read skipped (runtime may not support Delta):', e)

## 2. Delta Lake Read/Write

### What is Delta Lake?
**Delta Lake** is a storage format built on top of object storage (S3/ADLS). It's like a "super" Parquet:
- **ACID transactions**: Multiple writes don't corrupt data
- **Schema enforcement**: Prevents bad data from entering your table
- **Time travel**: Read data from a past version (e.g., before a mistake)
- **Transaction log** (`_delta_log/`): Git-like commit history for your data

### Why use it?
- Reliability: No more corrupt data files
- Easy auditing: See exactly what changed and when
- Data quality: Enforce column types and constraints

### How it works:
When you write to Delta, Spark:
1. Validates the schema (does it match the table?)
2. Writes data as Parquet files
3. Records a JSON entry in `_delta_log/` as proof of the transaction

In [None]:
# 3) UDF example (normal UDF)
from pyspark.sql.functions import udf, col
import pyspark.sql.types as T
@udf(T.IntegerType())
def add_one(x):
    return x + 1 if x is not None else None
df.withColumn('age_plus_one', add_one(col('age'))).show()

## 3. User-Defined Functions (UDFs)

### What is a UDF?
A **UDF** (User-Defined Function) is your own custom function that runs on each row of data. Use it when built-in functions don't do what you need.

### Example use cases:
- Custom string cleaning (e.g., extract middle name from "John Q. Smith")
- Domain-specific calculations (e.g., mortgage amortization)
- Call external APIs (slower, but sometimes necessary)

### ⚠️ Performance trade-off:
UDFs are slower than built-in Spark functions because:
- Spark can't optimize them (it doesn't know what they do)
- Data crosses from Spark (JVM) ↔ Python (slow serialization)

**Best practice**: Use built-in functions when possible. Only use UDFs when you really need custom logic.

In [None]:
# 4) JDBC read example (placeholder credentials)
jdbc_url = 'jdbc:postgresql://<host>:5432/<db>'
jdbc_props = {
    'user': '<username>',
    'password': '<password>',
    'driver': 'org.postgresql.Driver'
}
# Replace above with secrets (Databricks secrets or environment vars)
try:
    df_jdbc = spark.read.format('jdbc')
        .option('url', jdbc_url)
        .option('dbtable', 'public.my_table')
        .option('user', jdbc_props['user'])
        .option('password', jdbc_props['password'])
        .load()
    df_jdbc.show(5)
except Exception as e:
    print('JDBC example skipped (fill credentials or run in environment with driver):', e)

## 4. JDBC (Connecting to Databases)

### What is JDBC?
**JDBC** (Java Database Connectivity) is a standard way to connect Spark to databases like PostgreSQL, MySQL, Oracle, SQL Server, etc.

### Why use it?
- Read data from existing databases into Spark DataFrames
- Write Spark results back to databases
- Useful for ETL: extract from DB → transform in Spark → load back

### Security note:
**Never hardcode passwords!** Use:
- Databricks Secrets (recommended)
- Environment variables
- AWS/Azure managed secrets

### How it works:
1. Spark opens multiple JDBC connections (one per partition)
2. Each partition reads its data in parallel
3. Results come back as a Spark DataFrame

In [None]:
# 5) Metastore-managed Delta table: create and write
table_name = 'default.demo_table'
path = '/tmp/demo_table_path'
# create table if not exists (uses Delta when supported)
try:
    spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} (id LONG, name STRING, age INT) USING DELTA LOCATION '{path}'")
    # write DataFrame into table
    df2.write.format('delta').mode('append').saveAsTable(table_name)
    spark.sql(f"SELECT * FROM {table_name} LIMIT 5").show()
except Exception as e:
    print('Metastore table example skipped or failed:', e)

## 5. Metastore Tables (Registering Data)

### What is the Metastore?
The **metastore** is a catalog that keeps track of all your tables—like a library index.
- Stores table names, schemas, and file locations
- Allows you to query tables by name (e.g., `SELECT * FROM my_table`)
- Multiple users can find and reuse tables

### Managed vs. External tables?
- **Managed** (default): Databricks manages both the data AND the location. Dropping the table deletes the data.
- **External** (with LOCATION): You manage the data; Databricks just points to it. Dropping the table keeps the data.

### Why use it?
- **Discovery**: See all available data in one place
- **Reusability**: Write once, query many times without reloading
- **SQL interface**: Run SQL queries like traditional databases

In [None]:
# 6) Partitioning and basic compaction pattern
df_big = spark.createDataFrame([(1,'alice','us',30),(2,'bob','us',17),(3,'cara','eu',25)], ['id','name','region','age'])
out_path = '/tmp/delta_partitioned'
# write partitioned by region
try:
    df_big.write.format('delta').mode('overwrite').partitionBy('region').save(out_path)
    # read back partitioned table
    spark.read.format('delta').load(out_path).show()
    # simple compaction: coalesce files before write (local workaround)
    df_big.coalesce(1).write.format('delta').mode('overwrite').partitionBy('region').save(out_path + '_compacted')
    print('Wrote partitioned and compacted (coalesce) datasets')
except Exception as e:
    print('Partitioning example skipped or failed:', e)

## 6. Partitioning & Compaction

### What is Partitioning?
**Partitioning** means organizing files by column values (e.g., by region, date, customer_id).

**Without partitioning:**
```
/data/table/part-0001.parquet  (1GB, mixed regions)
/data/table/part-0002.parquet  (1GB, mixed regions)
```
To read US data, Spark reads ALL files (2GB).

**With partitioning by region:**
```
/data/table/region=us/part-0001.parquet (500MB)
/data/table/region=eu/part-0002.parquet (500MB)
```
To read US data, Spark reads only the `region=us` folder (500MB). **50% faster!**

### The Small-File Problem:
Partitioning often creates many tiny files, slowing reads. **Compaction** combines them:
- Use `.coalesce(1)` before write (forces 1 file per partition)
- Use `OPTIMIZE` in Databricks (compacts small files automatically)

In [None]:
# 7) Schema evolution: mergeSchema pattern (append new column)
base = spark.createDataFrame([(1,'alice')], ['id','name'])
path_schema = '/tmp/delta_schema_demo'
try:
    base.write.format('delta').mode('overwrite').save(path_schema)
    # new data has an extra column 'age'
    new_df = spark.createDataFrame([(2,'bob',28)], ['id','name','age'])
    # unsafe append without mergeSchema will fail if enforcement is strict
    new_df.write.format('delta').mode('append').option('mergeSchema','true').save(path_schema)
    spark.read.format('delta').load(path_schema).show()
except Exception as e:
    print('Schema evolution example skipped or failed:', e)

## 7. Schema Evolution

### What is Schema Evolution?
**Schema** = table structure (column names & types).
**Schema evolution** = adding/removing columns over time as requirements change.

### Example:
- **v1 of data**: `id, name` (deployed 6 months ago)
- **v2 of data**: `id, name, age` (new requirement this week)

Question: Can we append v2 data to the same table?

### The Challenge:
Delta enforces schema by default—v2 data with an extra `age` column won't match v1 schema → **write fails**.

### Solution: `mergeSchema=true`
- Tells Delta: "Accept this new column, add it to the schema"
- Old rows get `NULL` for the new `age` column
- New rows fill in `age` values

### ⚠️ Caution:
`mergeSchema=true` is convenient but can silently change your schema. Better: plan schema changes explicitly with a migration script.

In [None]:
# 8) MERGE INTO and time travel (version AS OF) demo
from pyspark.sql import Row
merge_table = 'default.merge_demo'
merge_path = '/tmp/merge_demo_path'
try:
    # initial data
    init = spark.createDataFrame([Row(id=1, name='alice'), Row(id=2, name='bob')])
    init.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(merge_path)
    spark.sql(f"CREATE TABLE IF NOT EXISTS {merge_table} USING DELTA LOCATION '{merge_path}'")
    # upserts
    updates = spark.createDataFrame([Row(id=2, name='robert'), Row(id=3, name='cara')])
    # perform MERGE via SQL (Databricks supports MERGE INTO).
    # Register updates as temp view
    updates.createOrReplaceTempView('updates_view')
    spark.sql(f"MERGE INTO {merge_table} t USING updates_view s ON t.id = s.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *")
    print('After MERGE:')
    spark.sql(f"SELECT * FROM {merge_table} ORDER BY id").show()
    # time travel: read previous version (version 0)
    try:
        print('Time travel - version 0:')
        spark.read.format('delta').option('versionAsOf', 0).load(merge_path).show()
    except Exception as te:
        print('Time-travel may not be supported in this runtime:', te)
except Exception as e:
    print('MERGE/time-travel example skipped or failed:', e)

## 8. MERGE INTO (Upsert) & Time Travel

### What is MERGE INTO?
**MERGE INTO** is an SQL command for upserts: "Update if exists, insert if not." Like a smart merge.

**Example:** You have a customer table and new updates come in:
- Customer 2 (existing): Update their name from "bob" → "robert"
- Customer 3 (new): Insert a new customer

Without MERGE: complex logic (check if exists, then UPDATE or INSERT).
With MERGE: one statement handles both.

### What is Time Travel?
**Time travel** lets you read data from a past version of your table.

**Why useful?**
- Recover from accidental deletes/overwrites
- Audit changes: "What did this table look like last week?"
- Debug: "When did this bad data appear?"

**How:** Use `versionAsOf` (by transaction number) or `timestampAsOf` (by time).

### Analogy:
Think of Delta's transaction log like Git version control for your data. MERGE is a commit with multiple changes. Time travel is checking out an old commit to see history.

## Summary & Next Steps

You've learned:
1. **DataFrames**: Distributed tables that scale across clusters
2. **Delta Lake**: Reliable storage with ACID, schema enforcement, and time travel
3. **UDFs**: Custom functions (use sparingly; prefer built-ins)
4. **JDBC**: Connect to databases for ETL pipelines
5. **Metastore**: Catalog of tables for discoverability
6. **Partitioning**: Organize data for faster queries
7. **Schema Evolution**: Add columns safely with `mergeSchema`
8. **MERGE INTO**: Upsert logic in one SQL statement
9. **Time Travel**: Read past versions for auditing/recovery

### Next Steps:
- Run this notebook in Databricks or local PySpark environment
- Experiment: modify data, try different filters, add your own transformations
- Read [Databricks Documentation](https://docs.databricks.com) for advanced topics
- Try streaming (readStream/writeStream) for real-time data
- Explore advanced optimizations (Z-ordering, OPTIMIZE, VACUUM)