# Set ENV Variable to Project Path

In [35]:
# Automatically reload modules when they change
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


Insert project root folder in environment variable

In [36]:
import os
import sys

def find_project_root(start_path=None, markers=(".git", "pyproject.toml", "requirements.txt")):
    """
    Walks up from start_path until it finds one of the marker files/folders.
    Returns the path of the project root.
    """
    if start_path is None:
        start_path = os.getcwd()

    current_path = os.path.abspath(start_path)

    while True:
        # check if any marker exists in current path
        if any(os.path.exists(os.path.join(current_path, marker)) for marker in markers):
            return current_path

        new_path = os.path.dirname(current_path)  # parent folder
        if new_path == current_path:  # reached root of filesystem
            raise FileNotFoundError(f"None of the markers {markers} found above {start_path}")
        current_path = new_path

project_root = find_project_root()
print("Project root:", project_root)

if project_root not in sys.path:
    sys.path.insert(0, project_root)


Project root: c:\ds_analytics_projects\darshil_course\apache-pyspark\darshil-pyspark


# Import Libraries

Import packages

In [37]:
import pandas as pd
import numpy as np
from pathlib import Path

Relative import

In [38]:
from utils.file_utils import get_project_path

# Apache Spark Data Sources

---

## Overview

This notebook covers Spark's unified data source API, exploring how to read and write data across different formats and systems with consistent syntax and powerful optimization features.

## Setup and Initialization

In [39]:
# Initialize Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("DataSourcesDemo") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Spark Context: {spark.sparkContext.appName}")

Spark Version: 3.5.6
Spark Context: DataSourcesDemo


## 🔎 Step 1: What are Spark Data Sources?

Spark provides a **unified API** for reading and writing data across multiple formats and systems:

### Six Core Built-in Sources:
1. **CSV** - Comma-separated values
2. **JSON** - JavaScript Object Notation
3. **Parquet** - Columnar storage (default format)
4. **ORC** - Optimized Row Columnar
5. **JDBC/ODBC** - Database connections
6. **Plain-text files** - Raw text data

### Community Connectors:
- Avro, Delta Lake, MongoDB, Cassandra, Elasticsearch, and hundreds more!

## 🔎 Step 2: The General API Structure

### Read API Pattern
```python
DataFrameReader.format(...).option("key", "value").schema(...).load()
```

### Write API Pattern  
```python
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).save()
```

In [40]:
# Let's create some sample data to work with throughout this notebook
from pyspark.sql.types import *

# Create sample flight data
schema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", IntegerType(), True)
])

data = [
    ("United States", "Romania", 15),
    ("United States", "Croatia", 1),
    ("United States", "Ireland", 344),
    ("Egypt", "United States", 15),
    ("United States", "India", 62),
    ("United States", "Singapore", 1),
    ("United States", "Grenada", 62),
    ("Costa Rica", "United States", 588),
    ("Senegal", "United States", 40),
    ("Moldova", "United States", 1)
]

sample_df = spark.createDataFrame(data, schema)
sample_df.show()
# print(f"Sample data created with {sample_df.count()} rows")

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+



## 🔎 Step 3: Read Modes (for handling bad data)

Spark provides three modes to handle malformed records:

- **`permissive`** (default): Sets malformed fields to `null`, stores full record in `_corrupt_record`
- **`dropMalformed`**: Drops bad rows entirely
- **`failFast`**: Stops immediately when encountering malformed record

In [41]:
# Demonstrate read modes with intentionally malformed CSV data
malformed_csv_data = """id,name,age
1,Alice,30
2,Bob,not_a_number
3,Charlie,25
4,Dora
5,Eve,45,extra_column
"""

# Write malformed data to temp location
malformed_csv_path = get_project_path('lessons', '05_io', 'temp', 'malformed_data.csv')
with open(malformed_csv_path, 'w') as f:
    f.write(malformed_csv_data)

# Create manualSchema
manual_schema = StructType(
    [
        StructField(name="id", dataType=LongType(), nullable=True),
        StructField(name="name", dataType=StringType(), nullable=True),
        StructField(name="age", dataType=LongType(), nullable=True),
    ]
)

print("\n=== PERMISSIVE Mode (default) ===")
permissive_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("mode", "PERMISSIVE") \
    .load(malformed_csv_path, schema=manual_schema)

permissive_df.show()
print(f"Rows in permissive mode: {permissive_df.count()}")

print("\n=== DROPMALFORMED Mode ===")
drop_malformed_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("mode", "DROPMALFORMED") \
    .load(malformed_csv_path, schema=manual_schema)

drop_malformed_df.show()
print(f"Rows in dropMalformed mode: {drop_malformed_df.count()}")

# print("\n=== FAILFAST Mode ===")
# drop_malformed_df = spark.read.format("csv") \
#     .option("header", "true") \
#     .option("mode", "FAILFAST") \
#     .load(malformed_csv_path, schema=manual_schema)

# drop_malformed_df.show()
# print(f"Rows in dropMalformed mode: {drop_malformed_df.count()}")


=== PERMISSIVE Mode (default) ===
+---+-------+----+
| id|   name| age|
+---+-------+----+
|  1|  Alice|  30|
|  2|    Bob|NULL|
|  3|Charlie|  25|
|  4|   Dora|NULL|
|  5|    Eve|  45|
+---+-------+----+

Rows in permissive mode: 5

=== DROPMALFORMED Mode ===
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 30|
|  3|Charlie| 25|
+---+-------+---+

Rows in dropMalformed mode: 5


## 🔎 Step 4: Save Modes (when writing data)

- **`append`** → Add new data to existing
- **`overwrite`** → Replace existing data completely  
- **`errorIfExists`** (default) → Fail if data already exists
- **`ignore`** → Skip writing if data already exists

In [42]:
# Demonstrate save modes
test_path = str(Path(get_project_path('lessons', '05_io', 'temp', 'save_modes')))

print("=== First write (should succeed) ===")
sample_df.limit(3).write.format("parquet").mode("overwrite").save(test_path)
print("Initial data written successfully")

print("\n=== Reading back the data ===")
initial_read = spark.read.format("parquet").load(test_path)
initial_read.show()
print(f"Initial count: {initial_read.count()}")

print("\n=== Append mode ===")
sample_df.limit(2).write.format("parquet").mode("append").save(test_path)
appended_read = spark.read.format("parquet").load(test_path)
print(f"After append count: {appended_read.count()}")
appended_read.show()

print("\n=== Overwrite mode ===")
sample_df.limit(1).write.format("parquet").mode("overwrite").save(test_path)
overwritten_read = spark.read.format("parquet").load(test_path)
print(f"After overwrite count: {overwritten_read.count()}")
overwritten_read.show()

=== First write (should succeed) ===
Initial data written successfully

=== Reading back the data ===
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+

Initial count: 3

=== Append mode ===
After append count: 5
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+


=== Overwrite mode ===
After overwrite count: 1
+-----------------+-------------------+-----+
|DEST

## 🔹 15.1 CSV Files

CSV is the most common format for data exchange, though not the most efficient for analytics.

In [43]:
# === CSV Reading Examples ===
csv_path = str(Path(get_project_path('data', 'darshil-data', 'flight-data', 'csv', '2010-summary.csv')))
out_dir_tsv = str(Path(get_project_path('lessons', '05_io', 'temp', 'tsv')))

print("\n=== Reading CSV with various options ===")
csv_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("mode", "FAILFAST") \
    .option("inferSchema", "true") \
    .load(csv_path)

csv_df.show(5)
csv_df.printSchema()

print("\n=== Writing CSV with custom separator (TSV) ===")
csv_df.write.format("csv") \
    .mode("overwrite") \
    .option("sep", "\t") \
    .option("header", "true") \
    .save(out_dir_tsv)

print("TSV file created successfully")


=== Reading CSV with various options ===
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)


=== Writing CSV with custom separator (TSV) ===
TSV file created successfully


## 🔹 15.2 JSON Files

JSON is excellent for semi-structured data with nested fields and varying schemas.

In [44]:
# === JSON Reading and Writing ===
json_path = str(Path(get_project_path('data', 'darshil-data', 'flight-data', 'json', '2010-summary.json')))
out_dir_json = str(Path(get_project_path('lessons', '05_io', 'temp', 'json')))

print("\n=== Reading JSON with options ===")
json_df = spark.read.format("json") \
    .option("mode", "FAILFAST") \
    .option("inferSchema", "true") \
    .load(json_path)

json_df.show(5)
json_df.printSchema()

print("\n=== Writing JSON ===")
json_df.write.format("json") \
    .mode("overwrite") \
    .save(out_dir_json)

print("JSON file created successfully")


=== Reading JSON with options ===
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)


=== Writing JSON ===
JSON file created successfully


## 🔹 15.3 Parquet Files

**Parquet is Spark's default format** - columnar storage optimized for analytics workloads.

### Benefits:
- **Columnar storage** → efficient compression and querying
- **Schema evolution** → add/remove columns over time
- **Predicate pushdown** → filter data at file level
- **Complex types** → arrays, maps, structs supported

In [45]:
from pyspark.sql.functions import struct, col, array

# === Parquet Operations ===
parquet_path = str(Path(get_project_path('data', 'darshil-data', 'flight-data', 'parquet', '2010-summary.parquet')))
out_dir_parquet = str(Path(get_project_path('lessons', '05_io', 'temp', 'parquet')))

print("\n=== Reading from Parquet ===")
parquet_df = spark.read.format("parquet") \
    .option("mode", "FAILFAST") \
    .option("inferSchema", "true") \
    .load(parquet_path)

print("\n=== Add Cols to Parquet file ===")
parquet_df = parquet_df.select("*",
                               array("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").alias("ORIGIN_DEST_ARRAY"),
                               struct("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME", "count").alias("ROW_STRUCT")
                            )

print("\n=== Parquet file info ===")
print(f"Row count: {parquet_df.count()}")
print("Schema:")
parquet_df.printSchema()

print("\n=== Writing Parquet ===")
parquet_df.write.format("parquet") \
    .mode("overwrite") \
    .save(out_dir_parquet)

print("JSON file created successfully")


=== Reading from Parquet ===

=== Add Cols to Parquet file ===

=== Parquet file info ===
Row count: 255
Schema:
root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)
 |-- ORIGIN_DEST_ARRAY: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- ROW_STRUCT: struct (nullable = false)
 |    |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |    |-- DEST_COUNTRY_NAME: string (nullable = true)
 |    |-- count: long (nullable = true)


=== Writing Parquet ===
JSON file created successfully


## 🔹 15.4 ORC Files

**ORC (Optimized Row Columnar)** is optimized for Hadoop ecosystems and great for large sequential reads.

In [46]:
# === ORC Operations ===
orc_path = str(Path(get_project_path('data', 'darshil-data', 'flight-data', 'orc', '2010-summary.orc')))
out_dir_orc = str(Path(get_project_path('lessons', '05_io', 'temp', 'orc')))

print("\n=== Reading from ORC ===")
orc_df = spark.read.format("orc").load(orc_path)
orc_df.show(5)
orc_df.printSchema()

print("=== Writing to ORC format ===")
orc_df.write.format("orc").mode("overwrite").save(out_dir_orc)

print(f"\nORC file successfully processed {orc_df.count()} rows")


=== Reading from ORC ===
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)

=== Writing to ORC format ===

ORC file successfully processed 255 rows


## 🔹 15.5 Text Files

Plain text files are useful for unstructured data processing and custom parsing logic.

In [47]:
# === Text File Operations ===
out_dir_txt = str(Path(get_project_path('lessons', '05_io', 'temp', 'txt')))

print("=== Reading text file (CSV as raw text) ===")
text_df = spark.read.text(csv_path)
text_df.show(5, truncate=False)

print("\n=== Processing text data with custom parsing ===")
from pyspark.sql.functions import split, col

# Skip header and parse CSV manually
parsed_text = text_df.filter(~col("value").contains("DEST_COUNTRY_NAME")) \
    .select(split(col("value"), ",").alias("rows")) \
    .select(
        col("rows")[0].alias("destination"),
        col("rows")[1].alias("origin"), 
        col("rows")[2].cast("int").alias("count")
    )

parsed_text.show()

print("\n=== Writing selected column as text ===")
parsed_text.select("destination").write.mode("overwrite").text(out_dir_txt)
print("Text file written successfully")

=== Reading text file (CSV as raw text) ===
+-------------------------------------------+
|value                                      |
+-------------------------------------------+
|DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count|
|United States,Romania,1                    |
|United States,Ireland,264                  |
|United States,India,69                     |
|Egypt,United States,24                     |
+-------------------------------------------+
only showing top 5 rows


=== Processing text data with custom parsing ===
+--------------------+----------------+-----+
|         destination|          origin|count|
+--------------------+----------------+-----+
|       United States|         Romania|    1|
|       United States|         Ireland|  264|
|       United States|           India|   69|
|               Egypt|   United States|   24|
|   Equatorial Guinea|   United States|    1|
|       United States|       Singapore|   25|
|       United States|         Grenada|   54|
|      

## 🔹 15.6 JDBC Connections

Spark can connect to relational databases for reading and writing data.

**Note:** This example shows the syntax. In a real environment, you'd need:
- JDBC driver JAR files
- Running database instance
- Proper network connectivity

In [48]:
# === JDBC Examples (syntax demonstration) ===
print("=== JDBC Read Syntax ===")
jdbc_read_example = """
# Reading from database
jdbcDF = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/mydb") \
    .option("dbtable", "employees") \
    .option("user", "root") \
    .option("password", "mypassword") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load()
"""
print(jdbc_read_example)

print("\n=== JDBC Write Syntax ===")
jdbc_write_example = """
# Writing to database  
df.write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/mydb") \
    .option("dbtable", "employees_backup") \
    .option("user", "root") \
    .option("password", "mypassword") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()
"""
print(jdbc_write_example)

print("\n=== Advanced JDBC Options ===")
advanced_options = """
Key JDBC options:
- numPartitions: Control parallelism
- fetchsize: Rows fetched per round trip
- batchsize: Rows inserted per batch
- isolationLevel: Transaction isolation
- sessionInitStatement: Setup SQL per session
"""
print(advanced_options)

=== JDBC Read Syntax ===

# Reading from database
jdbcDF = spark.read.format("jdbc")     .option("url", "jdbc:mysql://localhost:3306/mydb")     .option("dbtable", "employees")     .option("user", "root")     .option("password", "mypassword")     .option("driver", "com.mysql.cj.jdbc.Driver")     .load()


=== JDBC Write Syntax ===

# Writing to database  
df.write.format("jdbc")     .option("url", "jdbc:mysql://localhost:3306/mydb")     .option("dbtable", "employees_backup")     .option("user", "root")     .option("password", "mypassword")     .option("driver", "com.mysql.cj.jdbc.Driver")     .mode("overwrite")     .save()


=== Advanced JDBC Options ===

Key JDBC options:
- numPartitions: Control parallelism
- fetchsize: Rows fetched per round trip
- batchsize: Rows inserted per batch
- isolationLevel: Transaction isolation
- sessionInitStatement: Setup SQL per session



## 🔹 15.7 Advanced I/O Concepts

### 1. Writing in Parallel
Control parallelism with partitioning - one file per partition.

In [49]:
# === Parallel Writing ===
print(f"=== Original partitions: {sample_df.rdd.getNumPartitions()} ===")

print("\n=== Writing with 5 partitions ===")
parallel_path = "/tmp/parallel_write"
sample_df.repartition(5).write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(parallel_path)

# Check number of files created
import os
files = [f for f in os.listdir(parallel_path) if f.endswith('.csv')]
print(f"Number of CSV files created: {len(files)}")
print(f"Files: {files[:3]}...")  # Show first 3 files

print("\n=== Writing with 1 partition (single file) ===")
single_file_path = "/tmp/single_file"
sample_df.coalesce(1).write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(single_file_path)

single_files = [f for f in os.listdir(single_file_path) if f.endswith('.csv')]
print(f"Number of CSV files created: {len(single_files)}")

=== Original partitions: 12 ===

=== Writing with 5 partitions ===
Number of CSV files created: 4
Files: ['part-00000-0616e57a-5d97-4a8a-b78e-bd46ce7578c1-c000.csv', 'part-00001-0616e57a-5d97-4a8a-b78e-bd46ce7578c1-c000.csv', 'part-00003-0616e57a-5d97-4a8a-b78e-bd46ce7578c1-c000.csv']...

=== Writing with 1 partition (single file) ===
Number of CSV files created: 1


## 🔹 15.7 Advanced I/O Concepts

This section covers advanced techniques for optimizing data I/O performance in Spark through parallel writing, partitioning, and bucketing strategies.

In [50]:
# Setup - Create sample data for demonstrations
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
import os

# Initialize Spark if not already done
spark = SparkSession.builder \
    .appName("AdvancedIOConcepts") \
    .getOrCreate()

# Create sample flight data (csvFile equivalent)
schema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", IntegerType(), True)
])

data = [
    ("United States", "Romania", 15),
    ("United States", "Croatia", 1),
    ("United States", "Ireland", 344),
    ("Egypt", "United States", 15),
    ("United States", "India", 62),
    ("United States", "Singapore", 1),
    ("United States", "Grenada", 62),
    ("Costa Rica", "United States", 588),
    ("Senegal", "United States", 40),
    ("Moldova", "United States", 1),
    ("Egypt", "Romania", 25),
    ("India", "United States", 125),
    ("Germany", "United States", 89),
    ("France", "United States", 156),
    ("Japan", "United States", 78)
]

csvFile = spark.createDataFrame(data, schema)
print("Sample data created:")
csvFile.show()
print(f"Total rows: {csvFile.count()}")
print(f"Current partitions: {csvFile.rdd.getNumPartitions()}")

Sample data created:
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
|            Egypt|            Romania|   25|
|            India|      United States|  125|
|          Germany|      United States|   89|
|           France|      United States|  156|
|            Japan|      United States|   78|
+-----------------+-------------------+-----+

Total rows: 15
Current partitions: 12


### **1. Writing in Parallel**

**Key Concept**: One file per partition.

Controlling the number of partitions determines how many files are written in parallel, which affects both write performance and file organization.

In [51]:
# === Writing in Parallel Demo ===
print("=== 1. Writing with 5 partitions (5 files) ===")
out_dir_csv = str(Path(get_project_path('lessons', '05_io', 'temp', 'csv')))

# Repartition to 5 partitions and write
csv_df.repartition(5).write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(out_dir_csv)

=== 1. Writing with 5 partitions (5 files) ===


In [52]:
# Check the files created
csv_files = [f for f in os.listdir(out_dir_csv) if f.endswith('.csv')]
print(f"Number of CSV files created: {len(csv_files)}")
print(f"Files: {csv_files}")

Number of CSV files created: 5
Files: ['part-00000-c606de9b-8c4b-4bdd-9ea9-f2b4feab6a34-c000.csv', 'part-00001-c606de9b-8c4b-4bdd-9ea9-f2b4feab6a34-c000.csv', 'part-00002-c606de9b-8c4b-4bdd-9ea9-f2b4feab6a34-c000.csv', 'part-00003-c606de9b-8c4b-4bdd-9ea9-f2b4feab6a34-c000.csv', 'part-00004-c606de9b-8c4b-4bdd-9ea9-f2b4feab6a34-c000.csv']


In [53]:
# Show file sizes
for file in csv_files:
    file_path = Path(out_dir_csv)/f"{file}"
    size = os.path.getsize(file_path)
    print(f"  {file}: {size} bytes")

  part-00000-c606de9b-8c4b-4bdd-9ea9-f2b4feab6a34-c000.csv: 1538 bytes
  part-00001-c606de9b-8c4b-4bdd-9ea9-f2b4feab6a34-c000.csv: 1566 bytes
  part-00002-c606de9b-8c4b-4bdd-9ea9-f2b4feab6a34-c000.csv: 1480 bytes
  part-00003-c606de9b-8c4b-4bdd-9ea9-f2b4feab6a34-c000.csv: 1491 bytes
  part-00004-c606de9b-8c4b-4bdd-9ea9-f2b4feab6a34-c000.csv: 1482 bytes


### **2. Partitioning (Directory-based Organization)**

**Key Concept**: Create directory structure based on column values for efficient querying.

**Benefits**:
- **Partition pruning**: Only read relevant directories
- **Faster filtering**: Skip entire partitions during queries
- **Organized storage**: Logical data organization

In [55]:
# === Directory-based Partitioning ===
print("=== 2. Partitioning by DEST_COUNTRY_NAME ===")
out_dir_partition_parquet = str(Path(get_project_path('lessons', '05_io', 'temp', 'partition_parquet')))

# Partition by destination country
csv_df.limit(10).write.mode("overwrite") \
    .partitionBy("DEST_COUNTRY_NAME") \
    .format("parquet") \
    .save(out_dir_partition_parquet)

print("\n👉 **Directory structure created:**")

# Show the directory structure
def show_directory_tree(path, prefix="", max_files=3):
    """Display directory tree structure"""
    try:
        items = sorted(os.listdir(path))
        dirs = [item for item in items if os.path.isdir(os.path.join(path, item))]
        files = [item for item in items if os.path.isfile(os.path.join(path, item)) and not item.startswith('.')]
        
        # Show directories (partitions)
        for i, dir_name in enumerate(dirs):
            is_last_dir = i == len(dirs) - 1 and len(files) == 0
            print(f"{prefix}{'└── ' if is_last_dir else '├── '}{dir_name}/")
            
            # Show files in each partition directory
            subpath = os.path.join(path, dir_name)
            if os.path.isdir(subpath):
                subfiles = [f for f in os.listdir(subpath) if not f.startswith('.')]
                for j, filename in enumerate(subfiles[:max_files]):
                    is_last_file = j == len(subfiles[:max_files]) - 1
                    extension = "    " if is_last_dir else "│   "
                    print(f"{prefix}{extension}{'└── ' if is_last_file else '├── '}{filename}")
                if len(subfiles) > max_files:
                    extension = "    " if is_last_dir else "│   "
                    print(f"{prefix}{extension}└── ... ({len(subfiles) - max_files} more files)")
        
        # Show root-level files
        for i, filename in enumerate(files[:max_files]):
            is_last = i == len(files[:max_files]) - 1
            print(f"{prefix}{'└── ' if is_last else '├── '}{filename}")
            
    except Exception as e:
        print(f"{prefix}Error reading directory: {e}")

show_directory_tree(out_dir_partition_parquet)

=== 2. Partitioning by DEST_COUNTRY_NAME ===

👉 **Directory structure created:**
├── DEST_COUNTRY_NAME=Costa%20Rica/
│   └── part-00000-d40f60ce-5339-488b-b1ea-bf07b16dd907.c000.snappy.parquet
├── DEST_COUNTRY_NAME=Egypt/
│   └── part-00000-d40f60ce-5339-488b-b1ea-bf07b16dd907.c000.snappy.parquet
├── DEST_COUNTRY_NAME=Equatorial%20Guinea/
│   └── part-00000-d40f60ce-5339-488b-b1ea-bf07b16dd907.c000.snappy.parquet
├── DEST_COUNTRY_NAME=Senegal/
│   └── part-00000-d40f60ce-5339-488b-b1ea-bf07b16dd907.c000.snappy.parquet
├── DEST_COUNTRY_NAME=United%20States/
│   └── part-00000-d40f60ce-5339-488b-b1ea-bf07b16dd907.c000.snappy.parquet
└── _SUCCESS


### **3. Bucketing (Table-based Organization)**

**Key Concepts**:
- Groups rows into fixed number of **buckets** by column hash
- Helps reduce shuffle in future joins
- Requires managed tables (saveAsTable)
- Pre-distributes data for optimal join performance

In [58]:
# === Bucketing Demonstration ===
print("=== 3. Bucketing by 'count' column ===")
out_dir_bucket_parquet = str(Path(get_project_path('lessons', '05_io', 'temp', 'bucket_parquet')))

# Create bucketed table
csv_df.write.format("parquet") \
    .mode("overwrite") \
    .bucketBy(10, "count") \
    .sortBy("count") \
    .option("path", out_dir_bucket_parquet) \
    .saveAsTable("bucketedFiles")

print("✅ Bucketed table 'bucketedFiles' created with 10 buckets")

# Read the bucketed table
bucketed_df = spark.table("bucketedFiles")
print("\nBucketed table contents:")
# bucketed_df.orderBy("count").show()

print(f"\n📊 Table info:")
print(f"   Rows: {bucketed_df.count()}")
print(f"   Partitions: {bucketed_df.rdd.getNumPartitions()}")

# Show table metadata
print("\n🔧 **Table Metadata:**")
spark.sql("DESCRIBE EXTENDED bucketedFiles").filter(
    col("col_name").isin(["Num Buckets", "Bucket Columns", "Sort Columns"])
).show(truncate=False)

=== 3. Bucketing by 'count' column ===
✅ Bucketed table 'bucketedFiles' created with 10 buckets

Bucketed table contents:

📊 Table info:
   Rows: 255
   Partitions: 10

🔧 **Table Metadata:**
+--------------+---------+-------+
|col_name      |data_type|comment|
+--------------+---------+-------+
|Num Buckets   |10       |       |
|Bucket Columns|[`count`]|       |
|Sort Columns  |[`count`]|       |
+--------------+---------+-------+



## 🔎 **Step 5: Why This Matters?**

Understanding these advanced I/O concepts is crucial for building efficient, scalable data pipelines in production environments.

## ✅ **In Simple Words**:

**Spark's unified API lets you read/write many formats with consistent syntax:**

🔧 **Formats**:
- **CSV/JSON**: Flexible for data exchange
- **Parquet/ORC**: Efficient for analytics
- **JDBC**: Connects to databases

⚡ **Optimizations**:
- **Partitioning**: Organize data in directories → faster filtering
- **Bucketing**: Pre-distribute data → faster joins
- **Parallel Writing**: Control file output → optimal performance

🎯 **Result**: Scalable, high-performance data pipelines for real-world analytics!