In [13]:
from pyspark.sql import SparkSession
import time

spark = SparkSession.builder \
    .appName("CV compare") \
    .master("local[*]") \
    .config("spark.driver.memory", "7g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.network.timeout", "30000s") \
    .config("spark.sql.files.maxPartitionBytes", "64m")  \
    .config("spark.memory.fraction", "0.8")  \
    .config("spark.default.parallelism", "16") \
    .config("spark.executor.heartbeatInterval", "20000s") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
    .getOrCreate()


In [2]:
start_time_hdfs_rdd = time.time()
citizens_rdd_hdfs = spark.sparkContext.textFile("hdfs://172.30.2.147:9000/demo_directory/citizens_data.csv")
citizens_rdd_hdfs = citizens_rdd_hdfs.repartition(8)
header = citizens_rdd_hdfs.first()
data_citizens_rdd_hdfs = citizens_rdd_hdfs.filter(lambda line: line != header)

print(data_citizens_rdd_hdfs.filter(lambda x: x.split(",")[4] == 'French').collect())

end_time_hdfs_rdd = time.time()
time_hdfs_rdd = end_time_hdfs_rdd - start_time_hdfs_rdd
print("Thời gian chạy với HDFS: {:.2f} giây".format(time_hdfs_rdd))

[]
Thời gian chạy với HDFS: 724.73 giây


In [74]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_extract
import time

# Khởi tạo SparkSession với các cấu hình tối ưu
spark = SparkSession.builder \
    .appName("CV compare") \
    .master("local[4]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.network.timeout", "30000s") \
    .config("spark.sql.shuffle.partitions", "512") \
    .config("spark.sql.files.maxPartitionBytes", "128m") \
    .config("spark.rdd.compress", "true") \
    .config("spark.io.compression.codec", "snappy") \
    .config("spark.shuffle.file.buffer", "128k") \
    .config("spark.memory.fraction", "0.8")  \
    .config("spark.default.parallelism", "16") \
    .config("spark.executor.heartbeatInterval", "20000s") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=25 -XX:MaxGCPauseMillis=100") \
    .getOrCreate()

start_time_hdfs_rdd = time.time()

# Đường dẫn tới file Parquet
parquet_path = r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.parquet"

# Kiểm tra nếu tệp Parquet đã tồn tại
if not os.path.exists(parquet_path):
    # Đọc file CSV và chuyển thành DataFrame
    df = spark.read.csv(r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.csv", 
                        header=True, inferSchema=True)

    df.cache()

    # Ghi dữ liệu dưới dạng Parquet với nén snappy và partitioning theo country
    df.write.option("compression", "snappy") \
             .partitionBy("country") \
             .mode("overwrite") \
             .parquet(parquet_path)

# Đọc dữ liệu Parquet
df_parquet = spark.read.parquet(parquet_path)

# Giảm số partition nếu cần thiết
df_parquet = df_parquet.coalesce(16)

# Khởi tạo biến đếm tổng số lần xuất hiện của từ 'french'
total_count_french = 0

# Duyệt qua tất cả các cột
for column in df_parquet.columns:
    # Lọc các từ chứa "french" không phân biệt hoa/thường và đếm số lần xuất hiện
    count_french_in_column = df_parquet \
        .withColumn("french_count", 
                    regexp_extract(lower(col(column)), "(french)", 0)) \
        .filter(col("french_count") != "") \
        .count()
    
    total_count_french += count_french_in_column

# In kết quả
print(f"Số lần từ 'french' xuất hiện trong toàn bộ các cột: {total_count_french}")

# Thời gian thực thi
end_time_hdfs_rdd = time.time()
time_hdfs_rdd = end_time_hdfs_rdd - start_time_hdfs_rdd
print("Thời gian chạy với HDFS: {:.2f} giây".format(time_hdfs_rdd))


Số lần từ 'french' xuất hiện trong toàn bộ các cột: 1155146
Thời gian chạy với HDFS: 380.26 giây


In [8]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_extract
import time

# Khởi tạo SparkSession với các cấu hình tối ưu
spark = SparkSession.builder \
    .appName("CV compare") \
    .master("local[*]") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.instances", "8") \
    .config("spark.network.timeout", "30000s") \
    .config("spark.sql.shuffle.partitions", "512") \
    .config("spark.sql.files.maxPartitionBytes", "128m") \
    .config("spark.rdd.compress", "true") \
    .config("spark.io.compression.codec", "snappy") \
    .config("spark.shuffle.file.buffer", "128k") \
    .config("spark.memory.fraction", "0.8")  \
    .config("spark.default.parallelism", "16") \
    .config("spark.executor.heartbeatInterval", "20000s") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=25 -XX:MaxGCPauseMillis=100") \
    .getOrCreate()

start_time_hdfs_rdd = time.time()

# Đường dẫn tới file Parquet
parquet_path = r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.parquet"

# Kiểm tra nếu tệp Parquet đã tồn tại
if not os.path.exists(parquet_path):
    # Đọc file CSV và chuyển thành DataFrame
    df = spark.read.csv(r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.csv", 
                        header=True, inferSchema=True)

    df.cache()

    # Ghi dữ liệu dưới dạng Parquet với nén snappy và partitioning theo country
    df.write.option("compression", "snappy") \
             .partitionBy("country") \
             .mode("overwrite") \
             .parquet(parquet_path)

# Đọc dữ liệu Parquet
df_parquet = spark.read.parquet(parquet_path)

# Giảm số partition nếu cần thiết
df_parquet = df_parquet.repartition(16)

# Chỉ kiểm tra trên 2 cột name và country
columns_to_check = ["name", "country"]

# Khởi tạo biến đếm tổng số lần xuất hiện của từ 'french'
total_count_french = 0

# Tính tổng số lần xuất hiện của từ 'french' trong các cột được chọn
for column in columns_to_check:
    french_count_in_column = df_parquet \
        .withColumn("french_count", 
                    regexp_extract(lower(col(column)), "(french)", 0)) \
        .filter(col("french_count") != "") \
        .count()
    
    total_count_french += french_count_in_column

# In kết quả
print(f"Số lần từ 'french' xuất hiện trong cột {', '.join(columns_to_check)}: {total_count_french}")

# Thời gian thực thi
end_time_hdfs_rdd = time.time()
time_hdfs_rdd = end_time_hdfs_rdd - start_time_hdfs_rdd
print("Thời gian chạy với HDFS: {:.2f} giây".format(time_hdfs_rdd))


Số lần từ 'french' xuất hiện trong cột name, country: 1155146
Thời gian chạy với HDFS: 26.19 giây


In [10]:
pip install "dask[distributed]" --upgrade

Collecting dask[distributed]
  Downloading dask-2024.11.2-py3-none-any.whl.metadata (3.7 kB)
Collecting distributed==2024.11.2 (from dask[distributed])
  Downloading distributed-2024.11.2-py3-none-any.whl.metadata (3.3 kB)
Collecting msgpack>=1.0.2 (from distributed==2024.11.2->dask[distributed])
  Downloading msgpack-1.1.0-cp310-cp310-win_amd64.whl.metadata (8.6 kB)
Collecting sortedcontainers>=2.0.5 (from distributed==2024.11.2->dask[distributed])
  Using cached sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting tblib>=1.6.0 (from distributed==2024.11.2->dask[distributed])
  Using cached tblib-3.0.0-py3-none-any.whl.metadata (25 kB)
Collecting zict>=3.0.0 (from distributed==2024.11.2->dask[distributed])
  Using cached zict-3.0.0-py2.py3-none-any.whl.metadata (899 bytes)
Downloading distributed-2024.11.2-py3-none-any.whl (1.0 MB)
   ---------------------------------------- 0.0/1.0 MB ? eta -:--:--
   ---------- ----------------------------- 0.3/1.0 MB ? eta -:--:-

In [1]:
import os

# Xác định số CPU có trên hệ thống
num_cpus = os.cpu_count()

# Điều chỉnh số worker (mặc định là số CPU)
num_workers = max(1, num_cpus - 1)  # Trừ đi 1 để dành tài nguyên cho hệ thống
print(f"Number of CPUs: {num_cpus}")
print(f"Number of Workers: {num_workers}")


Number of CPUs: 8
Number of Workers: 7


In [None]:
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
import threading
import psutil
import time
import os
import logging
import gc

# Cấu hình logging
logging.basicConfig(
    filename=r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Hadoop\log_read_citizens.log",
    level=logging.INFO,
    format='[%(asctime)s] [%(levelname)s] %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    force=True
)

def wait_for_memory_to_cool(threshold=95, cooldown=80, check_interval=10):
    """Chờ cho đến khi RAM hạ nhiệt xuống dưới mức cooldown."""
    while psutil.virtual_memory().percent >= threshold:
        logging.warning(f"Bộ nhớ RAM vẫn cao ({psutil.virtual_memory().percent}%). Chờ {check_interval} giây...")
        time.sleep(check_interval)
    logging.info(f"Bộ nhớ RAM đã hạ nhiệt ({psutil.virtual_memory().percent}%). Tiếp tục xử lý.")

def save_chunk_to_temp(chunk, temp_file_path, thread_id, chunk_id):
    """Lưu chunk đã xử lý tạm thời vào file CSV."""
    temp_dir = os.path.join("tmp")
    os.makedirs(temp_dir, exist_ok=True)
    temp_chunk_file = os.path.join(temp_dir, f"{temp_file_path}_thread_{thread_id}_chunk_{chunk_id}.csv")
    chunk.to_csv(temp_chunk_file, index=False)
    logging.info(f"Chunk {chunk_id} của thread {thread_id} đã được lưu vào file tạm: {temp_chunk_file}")
    return temp_chunk_file

def count_french_words(chunk):
    """Đếm số lần từ 'french' xuất hiện trong các cột 'name' và 'country'."""
    french_word_count = 0
    french_word = "french"
    
    # Kiểm tra trong cột 'name'
    if 'name' in chunk.columns:
        french_word_count += chunk['name'].apply(lambda x: str(x).lower().count(french_word)).sum()
    
    # Kiểm tra trong cột 'country'
    if 'country' in chunk.columns:
        french_word_count += chunk['country'].apply(lambda x: str(x).lower().count(french_word)).sum()
    
    return french_word_count

def process_chunk(chunk, thread_id, chunk_id, temp_file_path, processed_chunks):
    current_memory = psutil.virtual_memory().percent
    
    if current_memory >= 97:
        logging.warning(f"Memory usage đạt ngưỡng 97%. Lưu tạm chunk {chunk_id}.")
        temp_file = save_chunk_to_temp(chunk, temp_file_path, thread_id, chunk_id)
        return None

    processed_chunks.add((thread_id, chunk_id))
    processing_time = 0.1
    gc.collect()

    french_word_count = count_french_words(chunk)
    logging.info(f"Thread {threading.current_thread().name} đã xử lý chunk {chunk_id} và tìm thấy {french_word_count} từ 'french' trong {processing_time:.2f} giây.")
    
    return thread_id, chunk_id, chunk, processing_time, french_word_count

def read_and_process_csv(file_path, chunk_size, num_threads, temp_file_path):
    results = []
    total_start_time = time.time()
    processed_chunks = set()
    total_french_word_count = 0

    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = []
        chunk_id = 0

        for chunk in pd.read_csv(file_path, usecols=['name', 'country'], chunksize=chunk_size):
            future = executor.submit(process_chunk, chunk, threading.get_ident(), chunk_id, temp_file_path, processed_chunks)
            futures.append(future)
            chunk_id += 1

        for future in futures:
            result = future.result()
            if result is not None:
                thread_id, chunk_id, processed_chunk, processing_time, french_word_count = result
                results.append((thread_id, chunk_id, processed_chunk, processing_time))
                total_french_word_count += french_word_count
            else:
                logging.warning(f"Một chunk đã tạm dừng xử lý do giới hạn bộ nhớ.")

    total_processing_time = time.time() - total_start_time
    # Kết hợp các chunk lại
    full_data = pd.concat([result[2] for result in sorted(results, key=lambda x: (x[0], x[1]))], ignore_index=True)
    

    logging.info(f"Tổng thời gian xử lý: {total_processing_time:.2f} giây")
    logging.info(f"Tổng số dòng đã đọc và xử lý: {len(full_data)}")
    logging.info(f"Tổng số từ 'french' đã tìm thấy: {total_french_word_count}")
    return full_data

# File CSV cần đọc
file_path = r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.csv"
chunk_size = 11250000
num_threads = 8
temp_file_path = "temp_citizens_data"

if __name__ == "__main__":
    try:
        full_data = read_and_process_csv(file_path, chunk_size, num_threads, temp_file_path)
    except Exception as e:
        logging.error(f"Lỗi trong quá trình xử lý: {e}")


In [2]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import time
import psutil
import os

def count_french_words(chunk):
    """Hàm đếm từ 'french' trong chunk"""
    try:
        french_word = "french"
        return (
            chunk['name'].str.lower().str.count(french_word).sum() +
            chunk['country'].str.lower().str.count(french_word).sum()
        )
    except Exception as e:
        print(f"Error in count_french_words: {e}")
        return 0

def process_chunk(chunk, chunk_index):
    """Xử lý từng chunk và thông báo chi tiết"""
    try:
        start_time = time.time()
        print(f"[Thread-{chunk_index}] Bắt đầu xử lý chunk với {len(chunk)} dòng...")
        
        # Đếm từ 'french' trong chunk
        count = count_french_words(chunk)
        processing_time = time.time() - start_time
        
        # Lấy thông tin tài nguyên (CPU, RAM) của thread đang xử lý
        cpu_percent, ram_percent = get_process_resources(os.getpid())
        print(f"[Thread-{chunk_index}] Đã hoàn thành xử lý trong {processing_time:.2f}s.")
        print(f"[Thread-{chunk_index}] CPU: {cpu_percent}% | RAM: {ram_percent}% | Số từ 'french': {count}")
        
        return len(chunk), count, processing_time
    except Exception as e:
        print(f"[Thread-{chunk_index}] Lỗi: {e}")
        return 0, 0, 0

def get_process_resources(pid=None):
    """Lấy thông tin tài nguyên của process (CPU, RAM)"""
    try:
        process = psutil.Process(pid)
        cpu_percent = process.cpu_percent(interval=0.1)  # % CPU
        memory_info = process.memory_info()  # RAM info (in bytes)
        memory_percent = (memory_info.rss / psutil.virtual_memory().total) * 100  # % RAM
        return cpu_percent, memory_percent
    except psutil.NoSuchProcess:
        return 0, 0

def read_and_process_csv(file_path, chunk_size, num_workers):
    total_count = 0
    total_rows = 0
    total_processing_time = 0

    start_time = time.time()

    with ThreadPoolExecutor(max_workers=num_workers) as executor:  # Sử dụng ThreadPoolExecutor
        futures = {}
        for chunk_index, chunk in enumerate(
            pd.read_csv(file_path, chunksize=chunk_size, usecols=['name', 'country'])
        ):
            try:
                # Gửi chunk tới executor
                future = executor.submit(process_chunk, chunk, chunk_index)
                futures[future] = chunk_index
            except Exception as e:
                print(f"Error while submitting chunk {chunk_index}: {e}")
                continue

        # Đợi kết quả từ tất cả các thread
        for future in as_completed(futures):
            chunk_index = futures[future]
            try:
                rows, count, processing_time = future.result()
                total_rows += rows
                total_count += count
                total_processing_time += processing_time
            except Exception as e:
                print(f"[Thread-{chunk_index}] Xử lý thất bại với lỗi: {e}")

    end_time = time.time()
    print(f"Tổng số dòng: {total_rows}, tổng số từ 'french': {total_count}")
    print(f"Tổng thời gian xử lý: {end_time - start_time:.2f} giây")
    print(f"Tổng thời gian thực tế của các chunks: {total_processing_time:.2f} giây")

# Thực thi
file_path = r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.csv"
chunk_size = 22_500_000  # Tăng kích thước chunk citizens_data.parquet
num_workers = 4         # Sử dụng 6 luồng (threads) để chạy song song

read_and_process_csv(file_path, chunk_size, num_workers)


[Thread-0] Bắt đầu xử lý chunk với 22500000 dòng...
[Thread-0] Đã hoàn thành xử lý trong 21.43s.
[Thread-0] CPU: 100.3% | RAM: 32.76767512146712% | Số từ 'french': 289490
[Thread-1] Bắt đầu xử lý chunk với 22500000 dòng...
[Thread-1] Đã hoàn thành xử lý trong 22.63s.
[Thread-1] CPU: 100.0% | RAM: 60.95120021691164% | Số từ 'french': 287879
[Thread-2] Bắt đầu xử lý chunk với 22500000 dòng...
[Thread-2] Đã hoàn thành xử lý trong 22.74s.
[Thread-2] CPU: 100.0% | RAM: 40.934407996627975% | Số từ 'french': 289141
[Thread-3] Bắt đầu xử lý chunk với 22500000 dòng...
[Thread-3] Đã hoàn thành xử lý trong 23.22s.
[Thread-3] CPU: 0.0% | RAM: 67.1897380229319% | Số từ 'french': 288636
Tổng số dòng: 90000000, tổng số từ 'french': 1155146
Tổng thời gian xử lý: 135.58 giây
Tổng thời gian thực tế của các chunks: 90.02 giây


In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import time
import psutil
import os
import pyarrow.parquet as pq
import pyarrow as pa

def count_french_words(chunk):
    """Đếm từ 'french' trong chunk"""
    try:
        french_word = "french"
        return (
            chunk['name'].str.lower().str.count(french_word).sum() +
            chunk['country'].str.lower().str.count(french_word).sum()
        )
    except Exception as e:
        print(f"Error in count_french_words: {e}")
        return 0

def process_chunk(chunk, chunk_index):
    """Xử lý chunk và trả kết quả"""
    try:
        start_time = time.time()
        
        # Đếm từ 'french' trong chunk
        count = count_french_words(chunk)
        processing_time = time.time() - start_time
        
        # Tránh log không cần thiết nếu không cần
        if chunk_index % 10 == 0:  # Chỉ in log mỗi 10 chunk để giảm tải
            cpu_percent, ram_percent = get_process_resources(os.getpid())
            print(f"[Thread-{chunk_index}] Thời gian xử lý: {processing_time:.2f}s, CPU: {cpu_percent}% | RAM: {ram_percent}% | Số từ 'french': {count}")
        
        return len(chunk), count, processing_time
    except Exception as e:
        print(f"[Thread-{chunk_index}] Lỗi: {e}")
        return 0, 0, 0

def get_process_resources(pid=None):
    """Lấy tài nguyên của process (CPU, RAM)"""
    try:
        process = psutil.Process(pid)
        cpu_percent = process.cpu_percent(interval=0.1)
        memory_info = process.memory_info()
        memory_percent = (memory_info.rss / psutil.virtual_memory().total) * 100
        return cpu_percent, memory_percent
    except psutil.NoSuchProcess:
        return 0, 0

def create_parquet_from_csv(csv_file, parquet_file):
    """Tạo file Parquet từ CSV nếu chưa có"""
    if not os.path.exists(parquet_file):
        print(f"File {parquet_file} chưa tồn tại, đang chuyển đổi...")
        df = pd.read_csv(csv_file, usecols=['name', 'country'])
        table = pa.Table.from_pandas(df)
        pq.write_table(table, parquet_file)
        print(f"File Parquet {parquet_file} đã được tạo.")
    else:
        print(f"File Parquet {parquet_file} đã tồn tại, sẽ đọc từ đó.")

def read_and_process_parquet(file_path, chunk_size, num_workers):
    total_count = 0
    total_rows = 0
    total_processing_time = 0

    start_time = time.time()

    # Đọc dữ liệu từ file Parquet
    parquet_file = pq.ParquetFile(file_path)
    
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = []
        
        # Đọc dữ liệu theo từng row group (không tải toàn bộ vào RAM)
        for chunk_index in range(parquet_file.num_row_groups):
            try:
                # Đọc 1 row group và chuyển đổi thành pandas DataFrame
                chunk = parquet_file.read_row_group(chunk_index).to_pandas()[['name', 'country']]
                
                # Gửi chunk vào xử lý song song
                future = executor.submit(process_chunk, chunk, chunk_index)
                futures.append(future)
            except Exception as e:
                print(f"Lỗi khi đọc row group {chunk_index}: {e}")
                continue

        # Chờ tất cả các thread hoàn thành và tổng hợp kết quả
        for future in as_completed(futures):
            try:
                rows, count, processing_time = future.result()
                total_rows += rows
                total_count += count
                total_processing_time += processing_time
            except Exception as e:
                print(f"Lỗi khi xử lý chunk: {e}")

    end_time = time.time()
    print(f"Tổng số dòng: {total_rows}, tổng số từ 'french': {total_count}")
    print(f"Tổng thời gian: {end_time - start_time:.2f} giây")
    print(f"Tổng thời gian thực tế của các chunks: {total_processing_time:.2f} giây")

# Thực thi
csv_file_path = r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.csv"
parquet_file_path = r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.parquet"
chunk_size = 50_00_000  # Kích thước chunk
num_workers = 32         # Thử tăng lên 32 luồng nếu hệ thống hỗ trợ

# Kiểm tra và tạo file Parquet nếu chưa có
create_parquet_from_csv(csv_file_path, parquet_file_path)

# Đọc và xử lý dữ liệu từ file Parquet
read_and_process_parquet(parquet_file_path, chunk_size, num_workers)


File Parquet D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.parquet đã tồn tại, sẽ đọc từ đó.
[Thread-0] Thời gian xử lý: 8.91s, CPU: 100.2% | RAM: 31.079261510940892% | Số từ 'french': 13517
[Thread-10] Thời gian xử lý: 16.68s, CPU: 101.4% | RAM: 19.739856397898983% | Số từ 'french': 13487
[Thread-20] Thời gian xử lý: 16.33s, CPU: 160.3% | RAM: 27.929192319426637% | Số từ 'french': 13577
[Thread-30] Thời gian xử lý: 16.74s, CPU: 99.9% | RAM: 32.996155056898075% | Số từ 'french': 13471


In [None]:
import pandas as pd
import pyarrow.parquet as pq
import time
import os
import multiprocessing

def count_french_words(chunk):
    """Đếm từ 'french' trong chunk"""
    french_word = "french"
    return (
        chunk['name'].str.lower().str.count(french_word).sum() +
        chunk['country'].str.lower().str.count(french_word).sum()
    )

def process_chunk(chunk):
    """Xử lý chunk và trả kết quả"""
    try:
        # Đếm từ 'french' trong chunk
        count = count_french_words(chunk)
        return len(chunk), count
    except Exception as e:
        return 0, 0

def create_parquet_from_csv(csv_file, parquet_file):
    """Tạo file Parquet từ CSV nếu chưa có"""
    if not os.path.exists(parquet_file):
        df = pd.read_csv(csv_file, usecols=['name', 'country'])
        table = pa.Table.from_pandas(df)
        pq.write_table(table, parquet_file)

def read_and_process_parquet(file_path, chunk_size, num_workers):
    total_count = 0
    total_rows = 0

    start_time = time.time()

    # Đọc dữ liệu từ file Parquet
    parquet_file = pq.ParquetFile(file_path)

    # Tạo pool đa luồng
    with multiprocessing.Pool(processes=num_workers) as pool:
        results = []

        # Đọc dữ liệu theo từng row group (không tải toàn bộ vào RAM)
        for chunk_index in range(parquet_file.num_row_groups):
            try:
                # Đọc 1 row group và chuyển đổi thành pandas DataFrame
                chunk = parquet_file.read_row_group(chunk_index).to_pandas()[['name', 'country']]
                
                # Gửi chunk vào xử lý song song
                results.append(pool.apply_async(process_chunk, (chunk,)))
            except Exception as e:
                continue

        # Chờ tất cả các chunk hoàn thành và tổng hợp kết quả
        for result in results:
            rows, count = result.get()
            total_rows += rows
            total_count += count

    end_time = time.time()
    print(f"Tổng thời gian xử lý: {end_time - start_time:.2f} giây")
    print(f"Tổng số từ 'french': {total_count} trong {total_rows} dòng")

# Thực thi
csv_file_path = r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.csv"
parquet_file_path = r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.parquet"
chunk_size = 50_00_000  # Kích thước chunk, bạn có thể điều chỉnh
num_workers = multiprocessing.cpu_count()  # Tự động dùng số worker theo số lõi CPU

# Kiểm tra và tạo file Parquet nếu chưa có
create_parquet_from_csv(csv_file_path, parquet_file_path)

# Đọc và xử lý dữ liệu từ file Parquet
read_and_process_parquet(parquet_file_path, chunk_size, num_workers)


In [3]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import time
import psutil
import os
import pyarrow.parquet as pq
import pyarrow as pa

def count_french_words(chunk):
    """Đếm từ 'french' trong chunk"""
    try:
        french_word = "french"
        # Calculate count in 'name' and 'country' columns using optimized string operations
        return (
            chunk['name'].str.lower().str.contains(french_word).sum() +
            chunk['country'].str.lower().str.contains(french_word).sum()
        )
    except Exception as e:
        print(f"Error in count_french_words: {e}")
        return 0

def process_chunk(chunk, chunk_index):
    """Xử lý chunk và trả kết quả"""
    try:
        start_time = time.time()
        
        # Đếm từ 'french' trong chunk
        count = count_french_words(chunk)
        processing_time = time.time() - start_time
        
        # Log each chunk's processing time every 10th chunk
        if chunk_index % 10 == 0:
            cpu_percent, ram_percent = get_process_resources(os.getpid())
            print(f"[Thread-{chunk_index}] Thời gian xử lý: {processing_time:.2f}s, CPU: {cpu_percent}% | RAM: {ram_percent}% | Số từ 'french': {count}")
        
        return len(chunk), count, processing_time
    except Exception as e:
        print(f"[Thread-{chunk_index}] Lỗi: {e}")
        return 0, 0, 0

def get_process_resources(pid=None):
    """Lấy tài nguyên của process (CPU, RAM)"""
    try:
        process = psutil.Process(pid)
        cpu_percent = process.cpu_percent(interval=0.1)
        memory_info = process.memory_info()
        memory_percent = (memory_info.rss / psutil.virtual_memory().total) * 100
        return cpu_percent, memory_percent
    except psutil.NoSuchProcess:
        return 0, 0

def create_parquet_from_csv(csv_file, parquet_file):
    """Tạo file Parquet từ CSV nếu chưa có"""
    if not os.path.exists(parquet_file):
        print(f"File {parquet_file} chưa tồn tại, đang chuyển đổi...")
        df = pd.read_csv(csv_file, usecols=['name', 'country'])
        table = pa.Table.from_pandas(df)
        pq.write_table(table, parquet_file)
        print(f"File Parquet {parquet_file} đã được tạo.")
    else:
        print(f"File Parquet {parquet_file} đã tồn tại, sẽ đọc từ đó.")

def read_and_process_parquet(file_path, chunk_size, num_workers):
    total_count = 0
    total_rows = 0
    total_processing_time = 0

    start_time = time.time()

    # Đọc dữ liệu từ file Parquet
    parquet_file = pq.ParquetFile(file_path)
    
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = []
        
        # Read row groups with a chunk of rows for efficient memory usage
        for chunk_index in range(parquet_file.num_row_groups):
            try:
                # Read a row group and convert it to pandas DataFrame (with specific columns)
                chunk = parquet_file.read_row_group(chunk_index).to_pandas()[['name', 'country']]
                
                # Send chunk for processing
                future = executor.submit(process_chunk, chunk, chunk_index)
                futures.append(future)
            except Exception as e:
                print(f"Lỗi khi đọc row group {chunk_index}: {e}")
                continue

        # Wait for all threads to complete and aggregate results
        for future in as_completed(futures):
            try:
                rows, count, processing_time = future.result()
                total_rows += rows
                total_count += count
                total_processing_time += processing_time
            except Exception as e:
                print(f"Lỗi khi xử lý chunk: {e}")

    end_time = time.time()
    print(f"Tổng số dòng: {total_rows}, tổng số từ 'french': {total_count}")
    print(f"Tổng thời gian: {end_time - start_time:.2f} giây")
    print(f"Tổng thời gian thực tế của các chunks: {total_processing_time:.2f} giây")

# Thực thi
csv_file_path = r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.csv"
parquet_file_path = r"D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.parquet"
chunk_size = 22_500_000  # Kích thước chunk
num_workers = 4         # Sử dụng 32 worker threads nếu hệ thống hỗ trợ

# Kiểm tra và tạo file Parquet nếu chưa có
create_parquet_from_csv(csv_file_path, parquet_file_path)

# Đọc và xử lý dữ liệu từ file Parquet
read_and_process_parquet(parquet_file_path, chunk_size, num_workers)


File Parquet D:\Data Engineer\dwh and etl\lesson\lesson 2\documents\Talend\Sync Hadoop to DWH\citizens_data.parquet đã tồn tại, sẽ đọc từ đó.
[Thread-0] Thời gian xử lý: 6.92s, CPU: 99.7% | RAM: 19.53274264685553% | Số từ 'french': 13517
[Thread-10] Thời gian xử lý: 9.09s, CPU: 99.9% | RAM: 24.62432136293459% | Số từ 'french': 13487
[Thread-20] Thời gian xử lý: 8.19s, CPU: 100.0% | RAM: 34.54536106426395% | Số từ 'french': 13577
[Thread-30] Thời gian xử lý: 9.35s, CPU: 100.3% | RAM: 17.56504075096439% | Số từ 'french': 13471
[Thread-40] Thời gian xử lý: 9.16s, CPU: 163.5% | RAM: 23.79931017054629% | Số từ 'french': 13487
[Thread-50] Thời gian xử lý: 10.34s, CPU: 100.3% | RAM: 31.718791910825644% | Số từ 'french': 13684
[Thread-60] Thời gian xử lý: 6.11s, CPU: 99.7% | RAM: 34.27832013771367% | Số từ 'french': 13456
[Thread-70] Thời gian xử lý: 6.53s, CPU: 100.3% | RAM: 32.168476122985794% | Số từ 'french': 13557
[Thread-80] Thời gian xử lý: 6.56s, CPU: 100.0% | RAM: 29.171535294947926% 