# Spark context

In [1]:
import findspark

In [2]:
findspark.init()

In [3]:
import os

os.environ['PYSPARK_PYTHON'] = 'python'

In [4]:
from pyspark.sql import SparkSession

# Khởi tạo SparkSession
# spark.master: địa chỉ và giao thức kết nối của cluster manager mà các worker và client dùng để submit job và chỉ dùng để submit job.
    # Do cụm spark này được xây trên docker compose không có cluster manager bên ngoài nên sử dụng cluster manager mặc định trong spark => Giao thức kết nối là spark
# spark.driver.host: địa chỉ ip chính xác mà các client và excutor dùng để kết nối đến spark driver. 
    # Do đoạn code này được submit lên cluster manager nằm trong chính contianer driver nên có thể để mặc định là localhost. 
    # Tuy nhiên trong môi trường k8s, code được submit lên api server container chứ không phải spark-driver container nên phải chỉ rõ. Tốt nhất là chỉ rõ trong mọi trường hợp.
# spark.driver.bindaddress: địa chỉ mà spark-driver chấp nhận gói tin ip. (cài này thêm vào thì lỗi)
    # Nếu bạn muốn chỉ đính danh chỉ 1 máy có thể kết nối đến spark-driver thì cấu hình chính xác địa chỉ ip của máy đó
    # Nếu để là 0.0.0.0 thì spark-driver chấp nhận gói tin ip của tất cả các máy (hiển nhiên vẫn phải mở thêm port)
# .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
spark = SparkSession.builder \
    .appName("test-1") \
    .config("spark.master", "spark://spark-master:7077") \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .getOrCreate()

In [5]:
spark

In [6]:
spark.sparkContext.getConf().getAll()

[('spark.driver.host', 'fe00ffc222b2'),
 ('spark.driver.port', '35547'),
 ('spark.executor.id', 'driver'),
 ('spark.app.submitTime', '1726116014205'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'),
 ('

# How to create RDD

## Ví dụ

In [66]:
numbers = [1, 2, 3, 4, 5]
# Tạo RDD bằng cách phân mảnh mảng numbers
rdd = spark.sparkContext.parallelize(numbers)

In [67]:
rdd.collect()

[1, 2, 3, 4, 5]

In [68]:
# Create an RDD from a list of tuples
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]
rdd = spark.sparkContext.parallelize(data)

In [69]:
# Collect action: Retrieve all elements of the RDD
print("ALl elements of the RDD: ", rdd.collect())

ALl elements of the RDD:  [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]


## RDDs Operation: actions

In [70]:
# RDD support rất nhiều action để có thể show kết quả trong quá trình xử lý tương tác
# count(): Đếm số phần tử trong RDD
count = rdd.count()
print("The total number of elements in rdd: ", count)

The total number of elements in rdd:  4


In [71]:
# first(): lấy phần tử đầu tiên của RDD
first_element = rdd.first()
print("The first element of the rdd: ", first_element)

The first element of the rdd:  ('Alice', 25)


In [72]:
# take(n): lấy n phần tử đầu tiên từ RDD
print("The first two elements of the rdd: ", rdd.take(2))

The first two elements of the rdd:  [('Alice', 25), ('Bob', 30)]


In [74]:
# foreach(func**): cho phép apply 1 function lên các phần tử trong RDD
rdd.foreach(lambda x: print(x))  # In từng phần tử x ra

## RDDs Operation: transformations

In [75]:
# Map transformations: convert name ra in hoa
mapped_rdd  = rdd.map(lambda x: (x[0].upper(), x[1]))

In [76]:
result = mapped_rdd.collect()
print("rdd with uppercase name: ", result)

rdd with uppercase name:  [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]


In [77]:
# Lọc các phần tử rừ RDD cũ sang RDD mới: do RDD là không thay đổi mỗi khi được tạo ra
fillted_rdd = rdd.filter(lambda x:x[1] > 30)
fillted_rdd.collect()

[('Charlie', 35), ('Alice', 40)]

In [78]:
# reduceByKey(): transform groups elements có cùng 1 key vào
# Ví dụ đếm số người có cùng tuổi trong rdd trên
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()

[('Alice', 65), ('Bob', 30), ('Charlie', 35)]

In [79]:
# SortBy transformation: Sort the RDD by age in descending order
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()

[('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]

## Save RDDs to text file

In [62]:
# Cẩn thận khi sử dụng lệnh saveAsTextFile và sử dụng đường dẫn tại local, spark sẽ lưu nó vào cụm của mình. 
# Spark chỉ dùng để xử lý dữ liệu không dùng để chứa dữ liệu sau xử lý. Ở đây làm vậy vì đường dẫn này là volumn của spark
# Lưu RDD vào thư mục output
rdd.saveAsTextFile("/home/jovyan/notebooks/output")

Py4JJavaError: An error occurred while calling o439.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:106)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1620)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1620)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1606)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1606)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:563)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 86.0 failed 4 times, most recent failure: Lost task 1.3 in stage 86.0 (TID 112) (172.18.0.5 executor 0): ExitCodeException exitCode=1: chmod: changing permissions of '/home/jovyan/notebooks/output/_temporary/0/_temporary/attempt_202409120326286597368956697215441_0240_m_000001_3': Operation not permitted

	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1007)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:513)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1081)
	at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:113)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.initWriter(SparkHadoopWriter.scala:238)
	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:126)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2451)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:83)
	... 51 more
Caused by: ExitCodeException exitCode=1: chmod: changing permissions of '/home/jovyan/notebooks/output/_temporary/0/_temporary/attempt_202409120326286597368956697215441_0240_m_000001_3': Operation not permitted

	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1007)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:513)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1081)
	at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:113)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.initWriter(SparkHadoopWriter.scala:238)
	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:126)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)


# Using Dataframes

## Read csv file to dataframe

In [7]:
%%bash
head -10 ./data/products.csv

id,name,category,quantity,price
1,iPhone 12,Electronics,10,899.99
2,Nike Air Max 90,Clothing,25,119.99
3,KitchenAid Stand Mixer,Home Appliances,5,299.99
4,The Great Gatsby,Books,50,12.99
5,L'Oreal Paris Mascara,Beauty,100,9.99
6,Yoga Mat,Sports,30,29.99
7,Samsung 4K Smart TV,Electronics,8,799.99
8,Levi's Jeans,Clothing,15,49.99
9,Dyson Vacuum Cleaner,Home Appliances,3,399.99


In [8]:
# csv_file_path = './data/products.csv'
# df = spark.read.csv(csv_file_path, header=True)
df = spark.read.csv("hdfs://namenode:9000/data/products.csv", header=True, inferSchema=True)

In [9]:
df.printSchema()

df.show(5)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
+---+--------------------+---------------+--------+------+
only showing top 5 rows



In [11]:
# Không ai để schema là auto cả mà thường phải set
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [12]:
# Define the schema
schema = StructType([
    StructField(name='id', dataType=IntegerType(), nullable=True),
    StructField(name='name', dataType=StringType(), nullable=True),
    StructField(name='category', dataType=StringType(), nullable=True),
    StructField(name='quantity', dataType=IntegerType(), nullable=True),
    StructField(name='price', dataType=DoubleType(), nullable=True)
])

In [13]:
df.write.json("hdfs://namenode:9000/output/products.json", mode='overwrite')

In [14]:
csv_file_path = './data/products.csv'
df = spark.read.csv(csv_file_path, header=True, schema=schema)

df.printSchema()
df.show(5)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
+---+--------------------+---------------+--------+------+
only showing top 5 rows



In [15]:
# Để spark dự đoán loại dữ liệu tự động
csv_file_path = './data/products.csv'
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

df.printSchema()
df.show(5)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
+---+--------------------+---------------+--------+------+
only showing top 5 rows



## Read JSON file to Dataframe

### Single Line JSON

In [16]:
%%bash
head -10 data/products_singleline.json

{"id":1,"name":"iPhone 12","category":"Electronics","quantity":10,"price":899.99}
{"id":2,"name":"Nike Air Max 90","category":"Clothing","quantity":25,"price":119.99}
{"id":3,"name":"KitchenAid Stand Mixer","category":"Home Appliances","quantity":5,"price":299.99}
{"id":4,"name":"The Great Gatsby","category":"Books","quantity":50,"price":12.99}
{"id":5,"name":"L'Oreal Paris Mascara","category":"Beauty","quantity":100,"price":9.99}
{"id":6,"name":"Yoga Mat","category":"Sports","quantity":30,"price":29.99}
{"id":7,"name":"Samsung 4K Smart TV","category":"Electronics","quantity":8,"price":799.99}
{"id":8,"name":"Levi's Jeans","category":"Clothing","quantity":15,"price":49.99}
{"id":9,"name":"Dyson Vacuum Cleaner","category":"Home Appliances","quantity":3,"price":399.99}
{"id":10,"name":"Harry Potter Series","category":"Books","quantity":20,"price":15.99}


In [17]:
json_file_path = './data/products_singleline.json'
df = spark.read.json(json_file_path)

In [18]:
df.printSchema()

df.show(5)

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
+---------------+---+--------------------+------+--------+
only showing top 5 rows



### Multi line JSON

In [19]:
%%bash
head -20 data/products_multiline.json

[
  {
    "id": 1,
    "name": "iPhone 12",
    "category": "Electronics",
    "quantity": 10,
    "price": 899.99
  },
  {
    "id": 2,
    "name": "Nike Air Max 90",
    "category": "Clothing",
    "quantity": 25,
    "price": 119.99
  },
  {
    "id": 3,
    "name": "KitchenAid Stand Mixer",
    "category": "Home Appliances",
    "quantity": 5,


In [20]:
json_file_path='./data/products_multiline.json'
df = spark.read.json(json_file_path, multiLine=True)

In [21]:
df.printSchema()

df.show()

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
|         Sports|  6|            Yoga Mat| 29.99|      30|
|    Electronics|  7| Samsung 4K Smart TV|799.99|       8|
|       Clothing|  8|        Levi's Jeans| 49.99|      15|
|Home Appliances|  9|Dyson Vacuum Cleaner|399.99|       3|
|          Books| 10| Harry Potter Series| 15.99|      20|
|         Beauty| 11|        MAC Lipstick| 1

## Read parquet file into dataframe

In [20]:
parquet_file_path = './data/products.parquet/'

In [21]:
df = spark.read.parquet(parquet_file_path)

df.printSchema()
df.show(5)

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
+---------------+---+--------------------+------+--------+
only showing top 5 rows



# Dataframe Operator

In [22]:
data_file_path = './data/stocks.txt'
df = spark.read.csv(data_file_path, header=True, inferSchema=True)

# Display schema of Dataframe
df.printSchema()

# Show the initial Dataframe
print('Initial Dataframe: ')
df.show(10)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

Initial Dataframe: 
+---+----------------+-----------+--------+-------+
| id|            name|   category|quantity|  price|
+---+----------------+-----------+--------+-------+
|  1|          iPhone|Electronics|      10| 899.99|
|  2|         Macbook|Electronics|       5|1299.99|
|  3|            iPad|Electronics|      15| 499.99|
|  4|      Samsung TV|Electronics|       8| 799.99|
|  5|           LG TV|Electronics|      10| 699.99|
|  6|      Nike Shoes|   Clothing|      30|  99.99|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|
|  8| Sony Headphones|Electronics|      12| 149.99|
|  9|Beats Headphones|Electronics|      20| 199.99|
| 10|    Dining Table|  Furniture|      10| 249.99|
+---+----------------+-----------+--------+-------+
only showing top 10 rows



In [23]:
# Select specific columns
selected_columns = df.select('id', 'name', 'price')
print("Selected Columns: ")
selected_columns.show(10)

Selected Columns: 
+---+----------------+-------+
| id|            name|  price|
+---+----------------+-------+
|  1|          iPhone| 899.99|
|  2|         Macbook|1299.99|
|  3|            iPad| 499.99|
|  4|      Samsung TV| 799.99|
|  5|           LG TV| 699.99|
|  6|      Nike Shoes|  99.99|
|  7|    Adidas Shoes|  89.99|
|  8| Sony Headphones| 149.99|
|  9|Beats Headphones| 199.99|
| 10|    Dining Table| 249.99|
+---+----------------+-------+
only showing top 10 rows



In [24]:
# Filter: apply 1 điều kiện gì đó để lọc row)
filtered_data = df.filter(df.quantity > 20)
print("Filtered_data: ", filtered_data.count())
filtered_data.show()

Filtered_data:  12
+---+--------------+-----------+--------+-----+
| id|          name|   category|quantity|price|
+---+--------------+-----------+--------+-----+
|  6|    Nike Shoes|   Clothing|      30|99.99|
|  7|  Adidas Shoes|   Clothing|      25|89.99|
| 12|        Apples|       Food|     100|  0.5|
| 13|       Bananas|       Food|     150| 0.25|
| 14|       Oranges|       Food|     120| 0.75|
| 15|Chicken Breast|       Food|      50| 3.99|
| 16| Salmon Fillet|       Food|      30| 5.99|
| 24|    Laptop Bag|Accessories|      25|29.99|
| 25|      Backpack|Accessories|      30|24.99|
| 28|         Jeans|   Clothing|      30|59.99|
| 29|       T-shirt|   Clothing|      50|14.99|
| 30|      Sneakers|   Clothing|      40|79.99|
+---+--------------+-----------+--------+-----+



In [25]:
# GroupBy and Aggregations(tập hợp)
grouped_data = df.groupby('category').agg({'quantity': 'sum', 'price': 'avg'})
print('Grouped and Aggregated Data: ')
grouped_data.show()

Grouped and Aggregated Data: 
+-----------+-------------+------------------+
|   category|sum(quantity)|        avg(price)|
+-----------+-------------+------------------+
|       Food|          450|2.2960000000000003|
|     Sports|           35|             34.99|
|Electronics|           98| 586.6566666666665|
|   Clothing|          200|  99.2757142857143|
|  Furniture|           41|            141.99|
|Accessories|           55|             27.49|
+-----------+-------------+------------------+



In [26]:
# Join Dataframes
df2 = df.select('id', 'category').limit(10)
joined_data = df.join(df2, 'id', 'inner')
print('Joined Data: ')
joined_data.show()

Joined Data: 
+---+----------------+-----------+--------+-------+-----------+
| id|            name|   category|quantity|  price|   category|
+---+----------------+-----------+--------+-------+-----------+
|  1|          iPhone|Electronics|      10| 899.99|Electronics|
|  2|         Macbook|Electronics|       5|1299.99|Electronics|
|  3|            iPad|Electronics|      15| 499.99|Electronics|
|  4|      Samsung TV|Electronics|       8| 799.99|Electronics|
|  5|           LG TV|Electronics|      10| 699.99|Electronics|
|  6|      Nike Shoes|   Clothing|      30|  99.99|   Clothing|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|   Clothing|
|  8| Sony Headphones|Electronics|      12| 149.99|Electronics|
|  9|Beats Headphones|Electronics|      20| 199.99|Electronics|
| 10|    Dining Table|  Furniture|      10| 249.99|  Furniture|
+---+----------------+-----------+--------+-------+-----------+



In [27]:
# Sort: Arrange rows based on one or more columns
sorted_data = df.orderBy('price')
print('Sorted Data: ')
sorted_data.show(10)

from pyspark.sql.functions import col, desc
sorted_data = df.orderBy(col('price').desc(), col('id').desc())
print('Sorted Data Streaming: ')
sorted_data.show(10)

Sorted Data: 
+---+--------------+-----------+--------+-----+
| id|          name|   category|quantity|price|
+---+--------------+-----------+--------+-----+
| 13|       Bananas|       Food|     150| 0.25|
| 12|        Apples|       Food|     100|  0.5|
| 14|       Oranges|       Food|     120| 0.75|
| 15|Chicken Breast|       Food|      50| 3.99|
| 16| Salmon Fillet|       Food|      30| 5.99|
| 29|       T-shirt|   Clothing|      50|14.99|
| 19|      Yoga Mat|     Sports|      20|19.99|
| 25|      Backpack|Accessories|      30|24.99|
| 24|    Laptop Bag|Accessories|      25|29.99|
| 20|  Dumbbell Set|     Sports|      15|49.99|
+---+--------------+-----------+--------+-----+
only showing top 10 rows

Sorted Data Streaming: 
+---+----------------+-----------+--------+-------+
| id|            name|   category|quantity|  price|
+---+----------------+-----------+--------+-------+
|  2|         Macbook|Electronics|       5|1299.99|
|  1|          iPhone|Electronics|      10| 899.99|
|  4

In [28]:
# Distinct: Get unique rows
distinct_rows = df.select('category').distinct()
print('Distinct Product Categories: ')
distinct_rows.show()

Distinct Product Categories: 
+-----------+
|   category|
+-----------+
|       Food|
|     Sports|
|Electronics|
|   Clothing|
|  Furniture|
|Accessories|
+-----------+



In [29]:
# Drop: Remove specified columns
dropped_columns = df.drop("quantity", "category")
print("Dropped Columns: ")
dropped_columns.show(10)

Dropped Columns: 
+---+----------------+-------+
| id|            name|  price|
+---+----------------+-------+
|  1|          iPhone| 899.99|
|  2|         Macbook|1299.99|
|  3|            iPad| 499.99|
|  4|      Samsung TV| 799.99|
|  5|           LG TV| 699.99|
|  6|      Nike Shoes|  99.99|
|  7|    Adidas Shoes|  89.99|
|  8| Sony Headphones| 149.99|
|  9|Beats Headphones| 199.99|
| 10|    Dining Table| 249.99|
+---+----------------+-------+
only showing top 10 rows



In [30]:
# WithColumn: Add new calculated columns
df_with_new_column = df.withColumn("revenue", df.quantity * df.price)
print('Dataframe with new column: ')
df_with_new_column.show(10)

Dataframe with new column: 
+---+----------------+-----------+--------+-------+-------+
| id|            name|   category|quantity|  price|revenue|
+---+----------------+-----------+--------+-------+-------+
|  1|          iPhone|Electronics|      10| 899.99| 8999.9|
|  2|         Macbook|Electronics|       5|1299.99|6499.95|
|  3|            iPad|Electronics|      15| 499.99|7499.85|
|  4|      Samsung TV|Electronics|       8| 799.99|6399.92|
|  5|           LG TV|Electronics|      10| 699.99| 6999.9|
|  6|      Nike Shoes|   Clothing|      30|  99.99| 2999.7|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|2249.75|
|  8| Sony Headphones|Electronics|      12| 149.99|1799.88|
|  9|Beats Headphones|Electronics|      20| 199.99| 3999.8|
| 10|    Dining Table|  Furniture|      10| 249.99| 2499.9|
+---+----------------+-----------+--------+-------+-------+
only showing top 10 rows



In [31]:
# Alias
df_with_alias = df.withColumnRenamed('price', 'product_price')
print('Dataframe with Aliased Column: ')
df_with_alias.show(10)

Dataframe with Aliased Column: 
+---+----------------+-----------+--------+-------------+
| id|            name|   category|quantity|product_price|
+---+----------------+-----------+--------+-------------+
|  1|          iPhone|Electronics|      10|       899.99|
|  2|         Macbook|Electronics|       5|      1299.99|
|  3|            iPad|Electronics|      15|       499.99|
|  4|      Samsung TV|Electronics|       8|       799.99|
|  5|           LG TV|Electronics|      10|       699.99|
|  6|      Nike Shoes|   Clothing|      30|        99.99|
|  7|    Adidas Shoes|   Clothing|      25|        89.99|
|  8| Sony Headphones|Electronics|      12|       149.99|
|  9|Beats Headphones|Electronics|      20|       199.99|
| 10|    Dining Table|  Furniture|      10|       249.99|
+---+----------------+-----------+--------+-------------+
only showing top 10 rows



# Spark SQL

In [32]:
# spark sql là module của spark tạo điều kiện để quert dữ liệu có cấu trúc, bán cấu trúc bằng việc sử dụng lệnh sql thay vì khả năng spark dataframe
# module này cung cấp khả năng xử lý theo batch và streaming cùng 1 lúc
data_file_path = './data/persons.csv'
df = spark.read.csv(data_file_path, header=True, inferSchema=True)

df.printSchema()

print('Initial Dataframe: ')
df.show(10)

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

Initial Dataframe: 
+------------------+---+------+------+
|              name|age|gender|salary|
+------------------+---+------+------+
|          John Doe| 30|  Male| 50000|
|        Jane Smith| 25|Female| 45000|
|     David Johnson| 35|  Male| 60000|
|       Emily Davis| 28|Female| 52000|
|    Michael Wilson| 40|  Male| 75000|
|       Sarah Brown| 32|Female| 58000|
|        Robert Lee| 29|  Male| 51000|
|       Lisa Garcia| 27|Female| 49000|
|    James Martinez| 38|  Male| 70000|
|Jennifer Rodriguez| 26|Female| 47000|
+------------------+---+------+------+
only showing top 10 rows



In [33]:
# Register the Dataframe as a Temporary Table
# Dùng để tạo 1 view tạm thời từ dataframe dùng cho truy vấn sql
# Việc bắt buộc phải làm nêu muốn truy vấn bằng sql
df.createOrReplaceTempView('my_table')

In [34]:
# Lấy tất cả person có tuổi lớn hơn bằng 25
result = spark.sql('SELECT * FROM my_table WHERE age > 25')
result.show()

+------------------+---+------+------+
|              name|age|gender|salary|
+------------------+---+------+------+
|          John Doe| 30|  Male| 50000|
|     David Johnson| 35|  Male| 60000|
|       Emily Davis| 28|Female| 52000|
|    Michael Wilson| 40|  Male| 75000|
|       Sarah Brown| 32|Female| 58000|
|        Robert Lee| 29|  Male| 51000|
|       Lisa Garcia| 27|Female| 49000|
|    James Martinez| 38|  Male| 70000|
|Jennifer Rodriguez| 26|Female| 47000|
|  William Anderson| 33|  Male| 62000|
|   Karen Hernandez| 31|Female| 55000|
|Christopher Taylor| 37|  Male| 69000|
|     Matthew Davis| 36|  Male| 67000|
|    Patricia White| 29|Female| 50000|
|     Daniel Miller| 34|  Male| 64000|
| Elizabeth Jackson| 30|Female| 52000|
|     Joseph Harris| 28|  Male| 53000|
|      Linda Martin| 39|Female| 71000|
+------------------+---+------+------+



In [35]:
# Tính toán lương trung bình
avg_salary_by_gender = spark.sql("SELECT gender, AVG(salary) FROM my_table GROUP BY gender")
avg_salary_by_gender.show()

+------+-----------+
|gender|avg(salary)|
+------+-----------+
|Female|    52300.0|
|  Male|    62100.0|
+------+-----------+



In [36]:
# Tạo temporary view
df.createOrReplaceTempView('people')
# Query temporary view
result = spark.sql('SELECT * FROM people WHERE age > 25')
result.show() 

+------------------+---+------+------+
|              name|age|gender|salary|
+------------------+---+------+------+
|          John Doe| 30|  Male| 50000|
|     David Johnson| 35|  Male| 60000|
|       Emily Davis| 28|Female| 52000|
|    Michael Wilson| 40|  Male| 75000|
|       Sarah Brown| 32|Female| 58000|
|        Robert Lee| 29|  Male| 51000|
|       Lisa Garcia| 27|Female| 49000|
|    James Martinez| 38|  Male| 70000|
|Jennifer Rodriguez| 26|Female| 47000|
|  William Anderson| 33|  Male| 62000|
|   Karen Hernandez| 31|Female| 55000|
|Christopher Taylor| 37|  Male| 69000|
|     Matthew Davis| 36|  Male| 67000|
|    Patricia White| 29|Female| 50000|
|     Daniel Miller| 34|  Male| 64000|
| Elizabeth Jackson| 30|Female| 52000|
|     Joseph Harris| 28|  Male| 53000|
|      Linda Martin| 39|Female| 71000|
+------------------+---+------+------+



In [37]:
# Check nếu temporary view tồn tại
view_exits = spark.catalog.tableExists("people")
view_exits

True

In [38]:
# Drop 1 temporary view
spark.catalog.dropTempView("people")
# Check nếu temporary view tồn tại
view_exits = spark.catalog.tableExists("people")
view_exits

False

# Subquries

In [39]:
# Create Dataframe
employee_data = [
    (1, "John"), (2, "Alice"), (3, "Bob"), (4, "Emily"),
    (5, "David"), (6, "Sarah"), (7, "Michael"), (8, "Lisa"), (9, "William")
]

employees = spark.createDataFrame(employee_data, ["id", "name"])

salary_data = [
    ("HR", 1, 60000), ("HR", 2, 55000), ("HR", 3, 58000), 
    ("IT", 4, 70000), ("IT", 5, 72000), ("IT", 6, 68000), 
    ("Sales", 7, 75000), ("Sales", 8, 78000), ("Sales", 9, 77000)
]

salaries = spark.createDataFrame(salary_data, ["department", "id", "salary"])

employees.show()
salaries.show()

+---+-------+
| id|   name|
+---+-------+
|  1|   John|
|  2|  Alice|
|  3|    Bob|
|  4|  Emily|
|  5|  David|
|  6|  Sarah|
|  7|Michael|
|  8|   Lisa|
|  9|William|
+---+-------+

+----------+---+------+
|department| id|salary|
+----------+---+------+
|        HR|  1| 60000|
|        HR|  2| 55000|
|        HR|  3| 58000|
|        IT|  4| 70000|
|        IT|  5| 72000|
|        IT|  6| 68000|
|     Sales|  7| 75000|
|     Sales|  8| 78000|
|     Sales|  9| 77000|
+----------+---+------+



In [40]:
# Register as temporary view
employees.createOrReplaceTempView("employees")
salaries.createOrReplaceTempView("salaries")

In [41]:
# Subquery to find employees with salaries above average
result = spark.sql(
    """
        SELECT name
        FROM employees
        WHERE id IN (
            SELECT id
            FROM salaries
            WHERE salary > (SELECT AVG(salary) FROM salaries)
        )
    """
)
result.show()

+-------+
|   name|
+-------+
|  Emily|
|  David|
|Michael|
|   Lisa|
|William|
+-------+



# Window Function

In [42]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [43]:
# Subquery to find employees with salaries above average
employee_salary = spark.sql(
    """
        SELECT salaries.*, employees.name
        FROM salaries
        LEFT JOIN employees on salaries.id = employees.id
    """
)
employee_salary.show()

+----------+---+------+-------+
|department| id|salary|   name|
+----------+---+------+-------+
|     Sales|  7| 75000|Michael|
|        IT|  6| 68000|  Sarah|
|     Sales|  9| 77000|William|
|        IT|  5| 72000|  David|
|     Sales|  8| 78000|   Lisa|
|        HR|  1| 60000|   John|
|        HR|  3| 58000|    Bob|
|        HR|  2| 55000|  Alice|
|        IT|  4| 70000|  Emily|
+----------+---+------+-------+



In [44]:
# Create a window specification
# Windown func giúp thực hiện query trên 1 cửa sổ windown gồm nhiều row thay vì thực hiện trên từng row
# Dataframe được chia thành các windown - partition
windown_spec = Window.partitionBy("department").orderBy(F.desc("salary"))

In [45]:
employee_salary.withColumn("rank", F.rank().over(windown_spec)).show()

+----------+---+------+-------+----+
|department| id|salary|   name|rank|
+----------+---+------+-------+----+
|        HR|  1| 60000|   John|   1|
|        HR|  3| 58000|    Bob|   2|
|        HR|  2| 55000|  Alice|   3|
|        IT|  5| 72000|  David|   1|
|        IT|  4| 70000|  Emily|   2|
|        IT|  6| 68000|  Sarah|   3|
|     Sales|  8| 78000|   Lisa|   1|
|     Sales|  9| 77000|William|   2|
|     Sales|  7| 75000|Michael|   3|
+----------+---+------+-------+----+



# Đóng spark session sau khi hoàn thành xong công việc

In [22]:
spark.stop()