# INTRODUCTION

In [0]:
A Catalog is the top-level container for organizing and managing data assets in Databricks.
Catalog
 └── Schema (Database)
      └── Tables / Views / Volumes
learning
 └── learning
      ├── employees (table)
      ├── employees_parquet_single (volume)
            

In [0]:
Schema
1.Logical database inside a catalog
2.Schema = Database (same thing)
Volume
A Volume is a governed storage location in Unity Catalog used to store files (not tables)
It is Databricks’ managed replacement for DBFS when using Unity Catalog.
We can store :
Parquet files
JSON / CSV files
Images

Hierarchy
Catalog
 └── Schema
      └── Volume
           └── Files (parquet, json, csv, etc.)

In [0]:
Volume vs Table (Very important)
Feature            	Volume	                       Table
Stores	              Files	                        Structured data
Data format	             Any	                      Parquet
Queryable by SQL	     ❌                      	✅


In [0]:
%sql


In [0]:
%sql
SHOW CATALOGS;

catalog
demo1
learning
learning2
learning3
samples
spark4
spark_4
system
workspace


In [0]:
%sql
SHOW SCHEMAS IN spark_4;

databaseName
default
file_handling
information_schema


In [0]:
%sql
SHOW VOLUMES IN learning.learning;

database,volume_name
learning,learning


# MODULE_4

In [0]:
# spark.read
#df.write
#.format("csv" / "json" / "parquet")
#.option("key", "value")
#.load("path") / .save("path")
#eg.df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load("/Workspace/Users/renugoel89@gmail.com/Drafts/samples_data.csv")
#eg.  df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/Workspace/Users/renugoel89@gmail.com/Drafts/samples_data.csv")  


In [0]:
# write mode
df.write.format("csv").mode("overwrite").save("path/students")
df.write.mode("overwrite").option("header", "true").csv("/Volumes/learning/learning/learning/students.csv")
#df.write → starts the writer.
#.format("json") → saves in JSON format.
#.mode("overwrite") → replace existing data if present.
#.save("path") → actually writes to given location.

In [0]:
.mode()
Defines what to do if output path already exists.
Options:
overwrite = Replace old notebook with a new one.
append = Add extra pages to an existing notebook.



In [0]:
.option("key", "value")
Used to provide extra settings while reading or writing.
Common examples:
.option("header", "true") → use first row as column names.
.option("inferSchema", "true") → automatically detect column data types.
.option("sep", "\t") → set column delimiter (e.g., tab).
.option("multiline", "true") → read multi-line JSON.
Options = customize how Spark handles the file.


In [0]:
.save("path")
Writes the DataFrame into the folder output/students.


# CSV FILE

In [0]:
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load("/Volumes/learning/learning/learning/samples_data.csv")

In [0]:
df.show()

+---+-------+---+-------+-------+----------+----------+-----+
| id|   name|age| gender| salary| join_date|department|score|
+---+-------+---+-------+-------+----------+----------+-----+
|  1|  Alice| 25|      F|  50000|10-05-2021|     Sales|   88|
|  2|    Bob| 30|      M|  62000|15-03-2020|        HR|   92|
|  3|Charlie| 30|      M|  58000|15-07-2019|        IT|   79|
|  4|  david| 45|   male|  45000|        IT|        IT|   45|
|  5|    Eve| -3|      F|  72000|2022-13-01|        IT|   65|
|  5|    Eve|200|      F|1000000|20-11-2018|   Finance|  300|
|  7|  Frank| 33|      M|  54000|wrong_date|   finance|   73|
|  8|   Renu| 29| FEMALE|  51000|01-09-2021|     Sales|   85|
|  9| Grace | 30|      F|  49000|01-12-2020|     SALES|   90|
| 10|  Henry| 41|Unknown|  34000|05-05-2017|        IT|   82|
+---+-------+---+-------+-------+----------+----------+-----+



In [0]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/Volumes/learning/learning/learning/samples_data.csv")

In [0]:
df.show()

+---+-------+---+-------+-------+----------+----------+-----+
| id|   name|age| gender| salary| join_date|department|score|
+---+-------+---+-------+-------+----------+----------+-----+
|  1|  Alice| 25|      F|  50000|10-05-2021|     Sales|   88|
|  2|    Bob| 30|      M|  62000|15-03-2020|        HR|   92|
|  3|Charlie| 30|      M|  58000|15-07-2019|        IT|   79|
|  4|  david| 45|   male|  45000|        IT|        IT|   45|
|  5|    Eve| -3|      F|  72000|2022-13-01|        IT|   65|
|  5|    Eve|200|      F|1000000|20-11-2018|   Finance|  300|
|  7|  Frank| 33|      M|  54000|wrong_date|   finance|   73|
|  8|   Renu| 29| FEMALE|  51000|01-09-2021|     Sales|   85|
|  9| Grace | 30|      F|  49000|01-12-2020|     SALES|   90|
| 10|  Henry| 41|Unknown|  34000|05-05-2017|        IT|   82|
+---+-------+---+-------+-------+----------+----------+-----+



#JSON FILE

In [0]:
df1 = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .json("/Volumes/learning/learning/learning/employees.json")

In [0]:
# error means
This output means Spark could not parse your data correctly
What does _corrupt_record mean?
Spark tried to read a file (CSV / JSON)
Schema inference failed
The entire row could not be parsed

This usually happens when:
Wrong file format
Incorrect delimiter
Multiline JSON without multiline=true
Header mismatch
Invalid data

Solution can be
Provide Schema Explicitly (Best Practice)

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("department", StringType(), True),
    StructField("salary", IntegerType(), True)
])

In [0]:
df1 = spark.read \
    .schema(schema) \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .json("/Volumes/learning/learning/learning/employees.json")

In [0]:
df1.show()

+---+-------+----------+------+
| id|   name|department|salary|
+---+-------+----------+------+
|  1|  Alice|        IT| 75000|
|  2|    Bob|        HR| 65000|
|  3|Charlie|   Finance| 80000|
+---+-------+----------+------+



In [0]:
df.show()

+---+-------+---+-------+-------+----------+----------+-----+
| id|   name|age| gender| salary| join_date|department|score|
+---+-------+---+-------+-------+----------+----------+-----+
|  1|  Alice| 25|      F|  50000|10-05-2021|     Sales|   88|
|  2|    Bob| 30|      M|  62000|15-03-2020|        HR|   92|
|  3|Charlie| 30|      M|  58000|15-07-2019|        IT|   79|
|  4|  david| 45|   male|  45000|        IT|        IT|   45|
|  5|    Eve| -3|      F|  72000|2022-13-01|        IT|   65|
|  5|    Eve|200|      F|1000000|20-11-2018|   Finance|  300|
|  7|  Frank| 33|      M|  54000|wrong_date|   finance|   73|
|  8|   Renu| 29| FEMALE|  51000|01-09-2021|     Sales|   85|
|  9| Grace | 30|      F|  49000|01-12-2020|     SALES|   90|
| 10|  Henry| 41|Unknown|  34000|05-05-2017|        IT|   82|
+---+-------+---+-------+-------+----------+----------+-----+



# FILTER THE DATA and write in other file

In [0]:
from pyspark.sql.functions import col, lower, trim

it_df = df.filter(
    lower(trim(col("department"))) == "it"
)

it_df.show()

+---+-------+---+-------+------+----------+----------+-----+
| id|   name|age| gender|salary| join_date|department|score|
+---+-------+---+-------+------+----------+----------+-----+
|  3|Charlie| 30|      M| 58000|15-07-2019|        IT|   79|
|  4|  david| 45|   male| 45000|        IT|        IT|   45|
|  5|    Eve| -3|      F| 72000|2022-13-01|        IT|   65|
| 10|  Henry| 41|Unknown| 34000|05-05-2017|        IT|   82|
+---+-------+---+-------+------+----------+----------+-----+



# WRITE MODE CSV

In [0]:

it_df.write.mode("overwrite").option("header", "true").csv("/Volumes/learning/learning/learning/it_df.csv")


In [0]:
# LETS CHECK  create only one file  
Spark can write multiple CSV files, but it depends on the number of partitions in it_df
✅ Reason: it_df has only ONE partition
#Common reasons why it_df has 1 partition
1️⃣ Small dataset
Spark automatically uses 1 partition for small data.

2️⃣ DataFrame created manually

If you did:
it_df = spark.createDataFrame(data)
👉 This usually creates 1 partition

In [0]:
it_df = it_df.repartition(4)

it_df.write.mode("overwrite") \
    .option("header", "true") \
    .csv("/Volumes/learning/learning/learning/it_df_repartition")

In [0]:
# lets check 

In [0]:
it_df = it_df.coalesce(1)
it_df.write.mode("overwrite") \
    .option("header", "true") \
    .csv("/Volumes/learning/learning/learning/it_df_coalesce")

# check

In [0]:
df1.show()

+---+-------+----------+------+
| id|   name|department|salary|
+---+-------+----------+------+
|  1|  Alice|        IT| 75000|
|  2|    Bob|        HR| 65000|
|  3|Charlie|   Finance| 80000|
+---+-------+----------+------+



In [0]:

#Create new rows as a DataFrame
new_data = [
    (4, "David", "IT", 72000),
    (5, "Eva", "HR", 68000),
    (6, "Frank", "Finance", 83000)
]

new_df = spark.createDataFrame(
    new_data,
    ["id", "name", "department", "salary"]
)

In [0]:
final_df = df1.union(new_df)
final_df.show()

+---+-------+----------+------+
| id|   name|department|salary|
+---+-------+----------+------+
|  1|  Alice|        IT| 75000|
|  2|    Bob|        HR| 65000|
|  3|Charlie|   Finance| 80000|
|  4|  David|        IT| 72000|
|  5|    Eva|        HR| 68000|
|  6|  Frank|   Finance| 83000|
+---+-------+----------+------+



# WRITE MODE JSON

In [0]:
final_df.write \
    .mode("overwrite") \
    .json("/Volumes/learning/learning/learning/total_employees.json")

In [0]:
# LETS SEE 

In [0]:
final_df.coalesce(1) \
    .write \
    .mode("overwrite") \
    .json("/Volumes/learning/learning/learning/total_employees1.json")

In [0]:
# LETS SEE 

# parquet

In [0]:
Parquet is a columnar file format designed for big data analytics (used heavily with Spark, Hive, Databricks, etc.).
❌ Row-based storage (CSV, JSON)

Data is stored row by row:

1, Alice, IT, 75000
2, Bob, HR, 65000
3, Charlie, Finance, 80000
👉 Reading only salary still scans entire rows

✅ Column-based storage (Parquet)

Data is stored column by column:

id       → 1 | 2 | 3
name     → Alice | Bob | Charlie
dept     → IT | HR | Finance
salary   → 75000 | 65000 | 80000
👉 Reading only salary scans only that column

1.Reads only required columns(faster Spark jobs)
3.No need to infer schema every time
4.Prevents schema mismatch errors
5.Works perfectly with:
SELECT salary
GROUP BY department
WHERE department = 'IT'

# csv,json,parquet

In [0]:
# csv 
id,name,salary
1,Alice,5000
2,Bob,6000

# json
{
  "id": 1,
  "name": "Alice",
  "address": { "city": "Delhi", "pin": 110001 }
}
#parquet
id:      [1,2,3]
name:    [Alice,Bob,Carol]
salary:  [5000,6000,7000]

# DATA 

In [0]:
data = [
    (1, "Alice", "IT", 75000),
    (2, "Bob", "HR", 65000),
    (3, "Charlie", "Finance", 80000),
    (4, "Diana", "IT", 72000)
]

columns = ["id", "name", "department", "salary"]

df = spark.createDataFrame(data, columns)
df.show()

+---+-------+----------+------+
| id|   name|department|salary|
+---+-------+----------+------+
|  1|  Alice|        IT| 75000|
|  2|    Bob|        HR| 65000|
|  3|Charlie|   Finance| 80000|
|  4|  Diana|        IT| 72000|
+---+-------+----------+------+



# WRITE MODE PARQUET


In [0]:
df.write \
  .mode("overwrite") \
  .parquet("/Volumes/learning/learning/learning/employees_parquet")

In [0]:
# LETS SEE

In [0]:
df.coalesce(1) \
  .write \
  .mode("overwrite") \
  .parquet("/Volumes/learning/learning/learning/employees_parquet_single")

In [0]:
In Databricks,CSV, JSON, and Parquet is stored as a folder, not a single file.
employees_parquet/
 ├── part-00000-8a9c.parquet
 ├── part-00001-8a9c.parquet
 ├── part-00002-8a9c.parquet
 ├── _SUCCESS
Why multiple part-*.parquet fil

Spark works in a distributed manner:

1.Each partition of a DataFrame is written by a separate task
2.Each task creates one file
3.All files together form the dataset



# READ MODE PARQUET



In [0]:
parquet_df = spark.read.parquet("/Volumes/learning/learning/learning/employees_parquet_single")
parquet_df.show()

+---+-------+----------+------+
| id|   name|department|salary|
+---+-------+----------+------+
|  1|  Alice|        IT| 75000|
|  2|    Bob|        HR| 65000|
|  3|Charlie|   Finance| 80000|
|  4|  Diana|        IT| 72000|
+---+-------+----------+------+



In [0]:
display(parquet_df)

id,name,department,salary
1,Alice,IT,75000
2,Bob,HR,65000
3,Charlie,Finance,80000
4,Diana,IT,72000


# Benefits of Parquet

In [0]:
1.Parquet stores data column-wise, not row-wise.
👉 Spark reads only required columns, not the full table.
eg.SELECT name, salary FROM employees;
✔ Reads only name and salary columns
❌ Skips other columns
2.Smaller File Size (Compression)
👉 70–90% size reduction compared to CSV/JSON
3.Better Query Performance
👉 Queries run much faster than CSV/JSON
4.Schema Enforcement(no need to write inferschema=True)
eg.✔ salary stays numeric
❌ No accidental strings like "five thousand"
5.splittable & Parallel Processing
Files can be split
Spark reads them in parallel
👉 Better cluster utilization

#updating the value and store in a parquet file again

In [0]:
df.show()

+---+-------+----------+------+
| id|   name|department|salary|
+---+-------+----------+------+
|  1|  Alice|        IT| 75000|
|  2|    Bob|        HR| 65000|
|  3|Charlie|   Finance| 80000|
|  4|  Diana|        IT| 72000|
+---+-------+----------+------+



In [0]:
#update the value in dept column where name ="alice"
from pyspark.sql.functions import when, col
updated_df = df.withColumn(
    "department",
    when(col("name") == "Alice", "Finance")
    .otherwise(col("department"))
)

updated_df.show()

+---+-------+----------+------+
| id|   name|department|salary|
+---+-------+----------+------+
|  1|  Alice|   Finance| 75000|
|  2|    Bob|        HR| 65000|
|  3|Charlie|   Finance| 80000|
|  4|  Diana|        IT| 72000|
+---+-------+----------+------+



In [0]:
#Write the updated data back (Parquet / JSON)
#Parquet (recommended)
updated_df.write.mode("overwrite").parquet("/Volumes/learning/learning/learning/employees_parquet_single")

#JSON
#updated_df.write.mode("overwrite").json("output/employees_json")


In [0]:
# LETS SEE 

# partition,repartition,coalesce




In [0]:
data = [
    (1, "Alice",   "IT",       75000),
    (2, "Bob",     "HR",       65000),
    (3, "Charlie", "Finance",  80000),
    (4, "David",   "IT",       72000),
    (5, "Eva",     "HR",       68000),
    (6, "Frank",   "Finance",  83000),
    (7, "Grace",   "IT",       77000),
    (8, "Henry",   "HR",       69000)
]

df = spark.createDataFrame(
    data,
    ["id", "name", "department", "salary"]
)

df.show()

+---+-------+----------+------+
| id|   name|department|salary|
+---+-------+----------+------+
|  1|  Alice|        IT| 75000|
|  2|    Bob|        HR| 65000|
|  3|Charlie|   Finance| 80000|
|  4|  David|        IT| 72000|
|  5|    Eva|        HR| 68000|
|  6|  Frank|   Finance| 83000|
|  7|  Grace|        IT| 77000|
|  8|  Henry|        HR| 69000|
+---+-------+----------+------+



In [0]:
df.write.mode("overwrite").json(
    "/Volumes/learning/learning/learning/employees_no_repartition"
)

In [0]:
#Result
One file per partition
Many small JSON files
Not efficient

In [0]:

df_rep = df.repartition(2)

In [0]:
df_rep.write.mode("overwrite").json(
    "/Volumes/learning/learning/learning/employees_repartition"
)

In [0]:
What happens here
Spark reshuffles data
Evenly distributes rows
2 output JSON files

In [0]:
#Use coalesce() (no shuffle – faster)
df_coal = df.coalesce(2)

In [0]:
df_coal.write.mode("overwrite").json(
    "/Volumes/learning/learning/learning/employees_coalesce"
)

In [0]:
What happens
No full shuffle
Faster than repartition
2 output files
Possible uneven data

In [0]:
Common operations that cause shuffle:

repartition()

groupBy()

join()

distinct()

orderBy()

reduceByKey()

In [0]:
repartition(2) (WITH shuffle)
What happens

Spark reshuffles data
Can increase OR decrease partitions
Rows are evenly redistributed
Result
Partition 1 → Alice, Charlie, Eva, Grace
Partition 2 → Bob, David, Frank, Henry

Good for large datasets
✔ Balanced partitions
✔ Good parallelism
❌ Expensive (shuffle)

When to use

Before writing large data
To increase parallelism
After heavy filtering / joins
When data is skewed

In [0]:
coalesce(2) (NO shuffle)
What happens

Reduces number of partitions only
It simply merges existing partitions
After coalesce(2):

Partition 1 → Alice, Bob, Charlie, David, Eva
Partition 2 → Frank, Grace, Henry

✔ No network cost
❌ Uneven partitions possible
✅ Very fast
❌ Can cause data skew
❌ Cannot increase partitions
⚠️ Single executor load

When to use
Writing single output file

In [0]:
What does shuffle really mean? (Simple)

Spark processes data in partitions.
When Spark needs data in a new order or new grouping, it must:

Break existing partitions

Send data over the network

Re-create new partitions

👉 This whole process is called a shuffle

# API'S

In [0]:
import requests

url = "https://jsonplaceholder.typicode.com/posts"
data = requests.get(url).json()

In [0]:
df = spark.createDataFrame(data)
df.show(truncate=False) #Do NOT cut (shorten) long column values — show the full content.
                                #Default width is 20 characters

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+--------------------------------------------------------------------------+------+
|body                                                                                                                                                                                                                                |id |title                                                                     |userId|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+--------------------------------------------------------------------------+------+
|quia et suscipit\nsuscipit recusandae consequunt

In [0]:
Column	Value
id	1
userId	1
title	"sunt aut facere repellat provident occaecati excepturi optio reprehenderit"
body	A long text paragraph

In [0]:
df.printSchema()

root
 |-- body: string (nullable = true)
 |-- id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- userId: long (nullable = true)



#post

In [0]:
#This is the JSON data you are sending to the server.(or add)

In [0]:
#POST – Create data
import requests

post_data = {
    "userId": 1,
    "title": "Learning PySpark API",
    "body": "This is a POST request example"
}

response = requests.post(
    "https://jsonplaceholder.typicode.com/posts",
    json=post_data
)

print(response.status_code)
print(response.json())

201
{'userId': 1, 'title': 'Learning PySpark API', 'body': 'This is a POST request example', 'id': 101}


In [0]:
Because id :101,is created by the server, not by you
When the server receives a POST request:

It creates a new record

It automatically generates a unique ID

Then it sends the created object back

In [0]:
What happens here:

requests.post() → makes a POST request

URL /posts → endpoint to create new data

json=post_data:

Converts Python dict → JSON

Sends it in request body

Sets Content-Type: application/json

# put - update


In [0]:
#PUT – Update the SAME data

#We update post id = 1

update_data = {
    "userId": 1,
    "title": "Updated Title",
    "body": "This data is updated using PUT"
}

response = requests.put(
    "https://jsonplaceholder.typicode.com/posts/1",  # where userid =1
    json=update_data
)

print(response.status_code)
print(response.json())

200
{'userId': 1, 'title': 'Updated Title', 'body': 'This data is updated using PUT', 'id': 1}


In [0]:
Real-world APIs (Critical concept)

In real production APIs:

id → ❌ never changeable

userId → ❌ often NOT changeable (ownership field)

title, body → ✅ changeable

In [0]:
PUT vs PATCH (Interview favorite)
PUT

Replaces entire object

Usually requires all fields

PATCH

Updates only specific fields

requests.patch("/posts/1", json={"title": "New title"})

In [0]:
response=requests.patch("https://jsonplaceholder.typicode.com/posts/1", json={"title": "New title"})
print(response.status_code)
print(response.json())

200
{'userId': 1, 'id': 1, 'title': 'New title', 'body': 'quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto'}


In [0]:
#DELETE – Remove data
import requests

response = requests.delete(
    "https://jsonplaceholder.typicode.com/posts/1"  # delete post with id = 1
)

print(response.status_code)
print(response.text)

200
{}


In [0]:
200 OK → deleted successfully (with response)
201 Created – Resource successfully created (e.g., after POST).
204 No Content → deleted successfully (no response body)
404 Not Found – Requested resource doesn’t exist.
503 Service Unavailable – Server temporarily unavailable (e.g., maintenance).

# THANKU