In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from googletrans import Translator

# Tạo Spark session
spark = SparkSession.builder \
    .appName("TranslateJobTitle") \
    .getOrCreate()

# Đọc file CSV
df = spark.read.option("header", True).csv("combined_output_no_url.csv")

# Chia thành 20 partition
df = df.repartition(20)

# UDF dịch tiếng Việt -> tiếng Anh
def translate_vi_to_en(text):
    if text is None:
        return ""
    try:
        translator = Translator()  # phải tạo bên trong để tránh lỗi pickle
        return translator.translate(text, src='vi', dest='en').text
    except Exception:
        return text

# Đăng ký UDF
translate_udf = udf(translate_vi_to_en, StringType())

# Áp dụng UDF để tạo cột mới
df_translated = df.withColumn("job_title", translate_udf(df["job_title"]))

# Ghi kết quả ra file mới
df_translated.write.option("header", True).csv("translated_job_titles_output", mode="overwrite")


In [None]:
import asyncio
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from googletrans import Translator

def create_spark_session(app_name="TranslateJobTitle"):
    return SparkSession.builder.appName(app_name).getOrCreate()

def translate_vi_to_en(text):
    if text is None:
        return ""
    try:
        translator = Translator()
        return translator.translate(text, src='vi', dest='en').text
    except Exception:
        return text

def register_translate_udf():
    return udf(translate_vi_to_en, StringType())

def run_translation_spark_job(input_csv: str, output_path: str):
    spark = create_spark_session()

    # Đọc và chia partition
    df = spark.read.option("header", True).csv(input_csv).repartition(20)

    # Đăng ký UDF
    translate_udf = register_translate_udf()

    # Áp dụng UDF
    df_translated = df.withColumn("job_title", translate_udf(df["job_title"]))

    # Ghi ra file CSV
    df_translated.write.option("header", True).csv(output_path, mode="overwrite")

    spark.stop()

In [None]:
run_translation_spark_job("ooutput_split/part_1.csv", "translated_job_titles_output1")