# 1. Import Required Libraries and Initialize SparkSession
Import `SparkSession` and create your Spark session.

In [1]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-17-openjdk-amd64'
os.environ['PATH'] = os.environ['JAVA_HOME'] + '/bin:' + os.environ['PATH']

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Chapter 5 - Basic Structured Operations").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/26 08:48:48 WARN Utils: Your hostname, codespaces-1164d4, resolves to a loopback address: 127.0.0.1; using 10.0.11.153 instead (on interface eth0)
25/07/26 08:48:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/26 08:48:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# 2. Basic DataFrame Creation and Schema
Load JSON data and examine its schema.

In [3]:
df = spark.read.format("json").load("../data/flight-data/json/2015-summary.json")
df.printSchema()
df.show(5)

                                                                                

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows


In [4]:
# Check the inferred schema
spark.read.format("json").load("../data/flight-data/json/2015-summary.json").schema

StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', LongType(), True)])

# 3. Manual Schema Definition
Define and apply a custom schema to the DataFrame.

In [5]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("DEST_COUNTRY_NAME", StringType(), True),
  StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
  StructField("count", LongType(), False, metadata={"hello":"world"})
])
df = spark.read.format("json").schema(myManualSchema) \
  .load("../data/flight-data/json/2015-summary.json")
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



# 4. Working with Columns and Expressions
Explore different ways to reference columns and create expressions.

In [6]:
from pyspark.sql.functions import col, column, expr

# Different ways to reference columns
print("Column references:")
print(col("someColumnName"))
print(column("someColumnName"))

# Complex expressions
print("\nExpression example:")
print(expr("(((someCol + 5) * 200) - 6) < otherCol"))

Column references:
Column<'someColumnName'>
Column<'someColumnName'>

Expression example:
Column<'(((someCol + 5) * 200) - 6) < otherCol'>


# 5. Working with Rows
Create and access Row objects.

In [7]:
from pyspark.sql import Row

myRow = Row("Hello", None, 1, False)
print(f"First element: {myRow[0]}")
print(f"Third element: {myRow[2]}")

First element: Hello
Third element: 1


# 6. Create Temp View and Manual DataFrame
Register DataFrame as SQL table and create DataFrame from scratch.

In [8]:
# Create temp view for SQL queries
df.createOrReplaceTempView("dfTable")

# Create DataFrame from scratch
myManualSchema = StructType([
  StructField("some", StringType(), True),
  StructField("col", StringType(), True),
  StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

[Stage 3:>                                                          (0 + 1) / 1]

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|NULL|    1|
+-----+----+-----+



                                                                                

# 7. Selecting Columns
Various ways to select and manipulate columns.

In [9]:
# Simple column selection
df.select("DEST_COUNTRY_NAME").show(2)

# Multiple column selection
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows
+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows


In [10]:
# Different ways to select the same column
df.select(
    expr("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME")) \
  .show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows


# 8. Column Aliasing and SelectExpr
Rename columns and use SQL-like expressions.

In [11]:
# Column aliasing
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

# Nested aliasing
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME")) \
  .show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows
+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows


In [12]:
# SelectExpr for SQL-like operations
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

# Add computed columns
df.selectExpr(
  "*", # all original columns
  "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry") \
  .show(2)

+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows


In [13]:
# Aggregations with selectExpr
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



# 9. Adding and Renaming Columns
Use withColumn and withColumnRenamed to modify DataFrames.

In [14]:
from pyspark.sql.functions import lit

# Add literal column using select
df.select(expr("*"), lit(1).alias("One")).show(2)

# Add literal column using withColumn
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows
+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows


In [15]:
# Add computed column
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")) \
  .show(2)

# Rename column
print("Columns after renaming:")
print(df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows
Columns after renaming:
['dest', 'ORIGIN_COUNTRY_NAME', 'count']


# 10. Working with Special Column Names
Handle columns with spaces or special characters.

In [16]:
# Create DataFrame with special column name
dfWithLongColName = df.withColumn(
    "This Long Column-Name",
    expr("ORIGIN_COUNTRY_NAME"))

# Use backticks for special column names
dfWithLongColName.selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`") \
  .show(2)

print("Column names:")
print(dfWithLongColName.select(expr("`This Long Column-Name`")).columns)

+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows
Column names:
['This Long Column-Name']


In [17]:
# Use backticks for special column names
dfWithLongColName.selectExpr(
    "This Long Column-Name",
    "`This Long Column-Name` as `new col`") \
  .show(2)

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'Column'. SQLSTATE: 42601 (line 1, pos 10)

== SQL ==
This Long Column-Name
----------^^^


# 11. Filtering Data
Use where/filter to subset your data.

In [18]:
# Chain multiple where clauses
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia") \
  .show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows


# 12. Getting Unique Records
Count distinct values and get unique records.

In [19]:
# Count unique combinations
unique_routes = df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
print(f"Unique origin-destination pairs: {unique_routes}")

# Count unique origins
unique_origins = df.select("ORIGIN_COUNTRY_NAME").distinct().count()
print(f"Unique origin countries: {unique_origins}")

Unique origin-destination pairs: 256
Unique origin countries: 125


# 13. Random Sampling and Splitting
Sample and split DataFrames randomly.

In [20]:
# Random sampling
seed = 5
withReplacement = False
fraction = 0.5
sample_count = df.sample(withReplacement, fraction, seed).count()
print(f"Sample size: {sample_count}")

# Random split
dataFrames = df.randomSplit([0.25, 0.75], seed)
print(f"First split: {dataFrames[0].count()}, Second split: {dataFrames[1].count()}")
print(f"First smaller than second: {dataFrames[0].count() < dataFrames[1].count()}")

Sample size: 138
First split: 71, Second split: 185
First smaller than second: True


# 14. Appending Rows (Union)
Add new rows to existing DataFrames.

In [21]:
# Create new rows
schema = df.schema
newRows = [
  Row("New Country", "Other Country", 5),
  Row("New Country 2", "Other Country 3", 1)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

# Union and filter
df.union(newDF) \
  .where("count = 1") \
  .where(col("ORIGIN_COUNTRY_NAME") != "United States") \
  .show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



# 15. Sorting Data
Sort and order DataFrames.

In [22]:
# Simple sorting
print("Sort by count:")
df.sort("count").show(5)

print("\nOrder by multiple columns:")
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

Sort by count:
+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

Order by multiple columns:
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows


In [31]:
from pyspark.sql.functions import desc, asc

# Explicit ascending/descending
print("Descending order:")
df.orderBy(expr("count desc")).show(2)

print("\nMixed ordering:")
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

Descending order:
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

Mixed ordering:
+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows


In [34]:
# 15. Sorting within partitions (for performance) – debug & load via explicit file list
import os, glob

pattern = "../data/flight-data/json/*-summary.json"
print("cwd:", os.getcwd())
print("glob pattern:", pattern)
matches = glob.glob(pattern)
print(f"Found {len(matches)} files:")
for f in matches: print(" ", f)

if not matches:
    print("⚠️  No files matched, falling back to single file")
    matches = ["../data/flight-data/json/2010-summary.json"]

sorted_within_partitions = (
    spark.read
         .format("json")
         .load(matches)          # now loading the explicit list
         .sortWithinPartitions("count")
)

print(f"Sorted within partitions: {sorted_within_partitions.count()} rows")

cwd: /workspaces/Spark-The-Definitive-Guide/notebooks
glob pattern: ../data/flight-data/json/*-summary.json
Found 6 files:
  ../data/flight-data/json/2014-summary.json
  ../data/flight-data/json/2010-summary.json
  ../data/flight-data/json/2015-summary.json
  ../data/flight-data/json/2011-summary.json
  ../data/flight-data/json/2012-summary.json
  ../data/flight-data/json/2013-summary.json
Sorted within partitions: 1502 rows


In [35]:
# Sort within partitions (for performance)
import glob


sorted_within_partitions = spark.read.format("json").load(glob.glob("../data/flight-data/json/*-summary.json")) \
  .sortWithinPartitions("count")
print(f"Sorted within partitions: {sorted_within_partitions.count()} rows")

Sorted within partitions: 1502 rows


# 16. Limiting Results
Limit the number of rows returned.

In [36]:
# Simple limit
df.limit(5).show()

# Limit with ordering
df.orderBy(expr("count desc")).limit(6).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
+--------------------+-------------------+-----+



# 17. Repartitioning and Coalescing
Control the physical layout of your data.

In [40]:
# Check current partitions
print(f"Current partitions: {df.rdd.getNumPartitions()}")

# Different repartitioning strategies
df_repart = df.repartition(5)
print(f"After repartition(5): {df_repart.rdd.getNumPartitions()}")

df_repart_col = df.repartition(col("DEST_COUNTRY_NAME"))
print(f"After repartition by column: {df_repart_col.rdd.getNumPartitions()}")

df_repart_both = df.repartition(5, col("DEST_COUNTRY_NAME"))
print(f"After repartition(5, column): {df_repart_both.rdd.getNumPartitions()}")

df_coalesced = df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
print(f"After coalesce(2): {df_coalesced.rdd.getNumPartitions()}")

Current partitions: 1
After repartition(5): 5
After repartition by column: 1
After repartition(5, column): 5
After coalesce(2): 2
After repartition by column: 1
After repartition(5, column): 5
After coalesce(2): 2


# 18. Collecting Data to Driver
Bring data back to the driver for inspection.

In [None]:
collectDF = df.limit(10)

# Different ways to inspect data
print("Using take(5):")
print(collectDF.take(5))

print("\nUsing show():")
collectDF.show()

print("\nUsing show(5, False) - no truncation:")
collectDF.show(5, False)

print("\nUsing collect() - returns all rows as list:")
collected_data = collectDF.collect()
print(f"Collected {len(collected_data)} rows")
print(f"First row: {collected_data[0]}")

Using take(5):
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344), Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

Using show():
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldo

In [42]:
print("\nUsing collect() - returns all rows as list:")
collected_data = df.collect()
print(f"Collected {len(collected_data)} rows")
print(f"First row: {collected_data[0]}")


Using collect() - returns all rows as list:
Collected 256 rows
First row: Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)


In [43]:
# Tạo một DataFrame giả lập với số lượng hàng lớn và cố tình repartition để có nhiều partition hơn
# (Để minh họa toLocalIterator tốt hơn, thường thì df sẽ có nhiều partition sẵn)
data = []
for i in range(1, 101): # 100 hàng dữ liệu
    country_name = f"Country_{i % 5}" # Chia thành 5 loại quốc gia để có 5 partition sau repartition
    data.append(Row(ID=i, Country=country_name, Value=i*10))

df_large = spark.createDataFrame(data)

# Repartition thành 5 partition để minh họa toLocalIterator duyệt qua từng partition
# (Mặc dù df.limit(10) trong ví dụ sách không repartition,
# nhưng để thấy rõ toLocalIterator hoạt động theo partition,
# chúng ta sẽ tạo một df có nhiều partition ở đây)
df_repartitioned = df_large.repartition(5, col("Country"))

print(f"DataFrame có {df_repartitioned.count()} dòng và {df_repartitioned.rdd.getNumPartitions()} partitions.")

# Lấy 10 dòng đầu tiên của DataFrame này để làm ví dụ như trong sách
# Lưu ý: limit() không thay đổi số lượng partition vật lý, nó chỉ giới hạn số dòng sau cùng
collectDF = df_repartitioned.limit(10)
print(f"collectDF (limit 10) có {collectDF.count()} dòng.")

DataFrame có 100 dòng và 5 partitions.
collectDF (limit 10) có 10 dòng.
collectDF (limit 10) có 10 dòng.


In [49]:
print("\nUsing toLocalIterator():")

# toLocalIterator() trả về một iterator
local_iterator = collectDF.toLocalIterator()

# Chúng ta duyệt qua từng hàng trong iterator
# Mỗi lần duyệt, Spark sẽ tải một partition dữ liệu về Driver (nếu chưa có)
# và trả về các hàng từ partition đó.
# Sau khi các hàng của một partition được xử lý, bộ nhớ có thể được giải phóng
# trước khi tải partition tiếp theo.
row_count = 0
for row in local_iterator:
    print(f"  Processing row: {row}")
    row_count += 1

print(f"\nSuccessfully iterated through at least {row_count} rows using toLocalIterator.")

# Bạn không thể duyệt lại iterator một lần nữa nếu không gọi lại toLocalIterator()
# try:
#     next(local_iterator)
# except StopIteration:
#     print("Iterator đã duyệt hết.")


Using toLocalIterator():
  Processing row: Row(ID=4, Country='Country_4', Value=40)
  Processing row: Row(ID=5, Country='Country_0', Value=50)
  Processing row: Row(ID=9, Country='Country_4', Value=90)
  Processing row: Row(ID=10, Country='Country_0', Value=100)
  Processing row: Row(ID=14, Country='Country_4', Value=140)
  Processing row: Row(ID=15, Country='Country_0', Value=150)
  Processing row: Row(ID=19, Country='Country_4', Value=190)
  Processing row: Row(ID=20, Country='Country_0', Value=200)
  Processing row: Row(ID=24, Country='Country_4', Value=240)
  Processing row: Row(ID=25, Country='Country_0', Value=250)

Successfully iterated through at least 10 rows using toLocalIterator.


In [50]:
local_iterator = collectDF.toLocalIterator()
try:
    next(local_iterator)
except StopIteration:
    print("Iterator đã duyệt hết.")