# Ngày 5: Shared Variable và Spark UI

#### I. Shard Variable
Spark theo mặc định không hỗ trợ thay đổi trạng thái biến được chia sẻ trên các Worker Node. Share variables giúp giải quyết một số giới hạn của kiến trúc này.

**1. Broadcast Variable**
   
*Khái niệm*: Broadcast variables dùng để chia sẻ một biến chỉ đọc đến tất cả worker node, nhằm giảm việc truyền dữ liệu lặp đi lặp lại.

*Sử dụng:* 
- Broadcast một từ điển lớn hoặc dữ liệu tham khỏa để tất cả worker node có thể truy cập mà không phải tải lại từ đầu.
- Lợi ích của Broadcast variables là giảm băng thông và cải thiện hiệu suất.

*Cú pháp và ví dụ:*
```python
    broadcast_val = sc.broadcast(large_dict)
    rdd.map(lambda x : some_function(x , broadcast_var.value))
```

![BroadCast](image_1/broadcast_2.png)

**2. Accumulators:**

*Khái niệm:* Accumulators là các biến có thể đếm hoặc tổng hợp dữ liệu trên các worker node, thường dùng để theo dõi số liệu hoặc lỗi mà không ảnh hưởng đến trạng thái ứng dụng.

*Sử dụng:* Theo dõi số lượng bản ghi lỗi hoặc số liệu thống kê

*Cú pháp và ví dụ:* 

```python
    accumulator = sc.accumulator(0)
    
    rdd.foreach( lambda x: accumulator.add(1) if x == 'error' else None)
```

![Accumulator](image_1/accumulator_2.png)


### II. Spark UI

Ba thành phần chính của Spark UI: **JOB  -  STAGE - TASK**

- Một JOB có nhiều STAGE
- Một STAGE có nhiều TASK

![Spark UI](image_1/SparkUI.png)

<p style = "color:green;">Vậy JOB, STAGE, TASK được hình thành như thế nào ?</p>

1. Sự hình thành JOB

Bản chất 1 JOB trong spark UI là 1 Actions khi ta thực hiện một Application trên RDD.

1 JOB = 1 ACTION

2. Sự hình thành của STAGE

- Một JOB bao gồm nhiều Stage, đại diện cho các bước trong quá trình thực thi.
- Ở đây ta lưu ý với hiện tượng Shuffle, Wide transformation có suffle còn narrow transformation thì không.
- Một Job có nhiều Stage với:
  - Số lượng Stage = Số lượng Wide Transformations + 1

3. Sự hình thành TASK

Số lượng **TASK** = Số lượng **PARTITION**

_Vậy câu hỏi được đặt ra là, làm thế nào để biết số lượng PARTITION mà SPARK sử lí._

Trước tiên phải hiểu về Task và Partition trong kiến trúc Cluster của Spark

![executor](image_1/executor.png)

Các **task** bên trong các **Executor** là các **Partition**



_Vậy SPARK chia partition như thế nào ?_

Về kiến thức, việc chia ra các **Executor và Partition** thuộc phạm vi **tối ưu** khi làm việc với Spark

- Ở Executor: thường người ta chia ra trong 1 Executor sẽ có 5 Core CPU, mỗi Core CPU sẽ đảm nhiệm 1 task và thực hiện sử lí song song các task đó trong cùng 1 Executor

- Ở Partiton(task):
    - Mặc định: Spark có cơ chế tự động phân tách Partition và tự động tối ưu số lượng Partion
        ```python
            from pyspark.sql import SparkSesstion
            spark = SparkSession.\
                builder.\
                master("local[4]").\
                appName("nb_partition").\
                getOrCreate()   

            orders_schema = "order_id long, order_date date, customer_id long, order_status string"

            orders_df = spark.read \
                .format("csv") \
                .schema(orders_schema) \
                .load("C:/data/orders_1gb.csv")

            spark.conf.get("spark.sql.shuffle.partitions")
            # Kết quả mặc định là 200 -> Tức là Mặc định khi load dữ liệu vào Spark sẽ chia 200 partition

            spark.conf.get("spark.sql.adaptive.enabled")
            # Tuy nhiên, khi xem spark UI sẽ thấy số lượng Partiton nhỏ hơn rất nhiều
            # Nguyên nhân là Spark có cơ chế tối ưu số lượng Partion

            spark.conf.set("spark.sql.adaptive.enabled", "false")
            # Tắt tối ưu sẽ thấy số lượng partion là 200 trong Spark UI

        ```
    - Tuy nhiên, là một Data Engineer, bạn không thể phụ thuộc hoàn toàn vào Spark, tùy dự án cụ thể mà 
bạn là người data engineer phải tính ra số lượng Partion phù hợp, số executor phù hợp với dự án.

    - Theo kinh nghiệm, ta có thể tính số lượng Partition theo công thức sau:
        - Nếu 1 File : SL Partition = MAX[ (File_Size)/128MB ,  Số lượng Core]
        - Nếu nhiều File: SL Partition = MAX [Số lượng File/(128/file_size_average  +  4MB)  ,  Số lượng Core]

![SL Partion](image_1/partition_2.png)

### Bài tập Vận Dụng:

**Bài 1**: Sử dụng broadcast variables để chia sẻ một từ điển lớn cho tất cả các worker node. Xây dựng một hệ thống dịch thuật sơ khai nhất.

In [1]:
from pyspark.sql import SparkSession
import json

In [2]:
# Khởi tạo Spark Session
spark = SparkSession.builder \
    .appName("TranslationSystem") \
    .getOrCreate()

24/11/17 23:22:15 WARN Utils: Your hostname, ubunchuu-Test resolves to a loopback address: 127.0.1.1; using 10.0.230.156 instead (on interface wlo1)
24/11/17 23:22:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/17 23:22:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
sc = spark.sparkContext

In [4]:
# Tạo từ điển dịch thuật lớn

translation_dict = {
    'hello' :  'xin chào',
    'world' : 'thế giới',
    'python' : 'ngôn ngữ lập trình Python',
    'big data': 'dữ liệu lớn',
    'machine learning': 'học máy',
    'deep learning': 'học sâu',
    'artificial intelligence': 'trí tuệ nhân tạo',
    'data science': 'khoa học dữ liệu',
    'data engineer': 'kỹ sư dữ liệu',
    'is' : 'là',
    'and' : 'và',
    'or' : 'hoặc',
    # Thêm các từ khác
}

# Ở trên ta chỉ tạo một từ điển đơn giản, thực tế ta có thể tải từ điển dịch từ file hoặc database


In [5]:
# Broadcast từ điển dịch
broadcast_dict = sc.broadcast(translation_dict)

In [6]:
# Nguồn dữ liệu
text = [
    'hello world',
    'python is amazing',
    'big data and machine learning',
    'welcome to technology'
]

# Thực tế ta có thể đọc dữ liệu từ file hoặc database

In [7]:
# Tạo RDD từ nguồn dữ liệu

text_rdd = sc.parallelize(text)


In [8]:
# Viết hàm dịch thuật và áp dụng vào MAP

def translate_text(text):
    # Chia câu thành từng từ
    words = text.lower().split()
    
    # Dịch từng từ
    translated_words = []
    for word in words:
        # Sử dụng broadcast dictionary
        translated_word = broadcast_dict.value.get(word, word)
        translated_words.append(translated_word)
    
    # Ghép lại câu
    return ' '.join(translated_words)

# Áp dụng hàm dịch thuật vào RDD
translated_rdd = text_rdd.map(translate_text)

In [9]:
# In kết quả
print("Bản Dịch:")
for original, translated in zip(text_rdd.collect(), translated_rdd.collect()):
    print(f"Gốc:    {original}")
    print(f"Dịch:   {translated}")
    print("---")

Bản Dịch:




Gốc:    hello world
Dịch:   xin chào thế giới
---
Gốc:    python is amazing
Dịch:   ngôn ngữ lập trình Python is amazing
---
Gốc:    big data and machine learning
Dịch:   big data and machine learning
---
Gốc:    welcome to technology
Dịch:   welcome to technology
---


                                                                                

- Xem Spark UI:

Nhận xét:
- Có 2 JOB thành công
- Mỗi JOB có một Stage
- Một Stage có 16 task, ở đây dữ liệu quá nhỏ nên số lượng Partition nó lấy chính bằng số lượng Core

In [10]:
# Dừng Spark Session
spark.stop()

**Bài 2:** Kiểm Tra Chất Lượng Dữ Liệu với Accumulators

In [11]:
from pyspark.sql import SparkSession
import re

In [12]:
# Khởi tạo Spark Session
spark = SparkSession.builder \
    .appName("DataQualityCheck") \
    .getOrCreate()
sc = spark.sparkContext

In [13]:
# Tạo Accumulators
total_records = sc.accumulator(0)
invalid_email_records = sc.accumulator(0)
invalid_age_records = sc.accumulator(0)
empty_name_records = sc.accumulator(0)

In [14]:
# Dữ liệu mẫu
raw_data = [
    "John,john@example.com,25",
    "Alice,,30",
    "Bob,bob@invalid,22",
    "Charlie,charlie@example.com,-5",
    "David,david@example.com,40"
]

In [15]:
# RDD dữ liệu
data_rdd = sc.parallelize(raw_data)

In [16]:
# Hàm kiểm tra chất lượng
def validate_record(record):
    # Tăng tổng số bản ghi
    total_records.add(1)
    
    # Tách thông tin
    parts = record.split(',')
    
    # Kiểm tra tên rỗng
    if not parts[0].strip():
        empty_name_records.add(1)
        return False
    
    # Kiểm tra email
    if not re.match(r"[^@]+@[^@]+\.[^@]+", parts[1]):
        invalid_email_records.add(1)
        return False
    
    # Kiểm tra tuổi
    try:
        age = int(parts[2])
        if age <= 0 or age > 120:
            invalid_age_records.add(1)
            return False
    except ValueError:
        invalid_age_records.add(1)
        return False
    
    return True


# Lọc dữ liệu hợp lệ
valid_records = data_rdd.filter(validate_record)

In [17]:
# Thực thi và in kết quả
valid_records.collect()

print("Báo Cáo Chất Lượng Dữ Liệu:")
print(f"Tổng số bản ghi: {total_records.value}")
print(f"Bản ghi email không hợp lệ: {invalid_email_records.value}")
print(f"Bản ghi tuổi không hợp lệ: {invalid_age_records.value}")
print(f"Bản ghi tên rỗng: {empty_name_records.value}")
print(f"Bản ghi hợp lệ: {valid_records.count()}")

                                                                                

Báo Cáo Chất Lượng Dữ Liệu:
Tổng số bản ghi: 5
Bản ghi email không hợp lệ: 2
Bản ghi tuổi không hợp lệ: 1
Bản ghi tên rỗng: 0
Bản ghi hợp lệ: 2


In [18]:
# Dừng Spark Session
spark.stop()