In [1]:
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Processing and ML").getOrCreate()

spark
from google.colab import drive # kết nối lên drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import pandas as pd
df = pd.read_parquet("/content/drive/MyDrive/GoogleColab/jobs_clean.parquet")
df = spark.createDataFrame(df)

In [4]:
# Hiển thị schema ban đầu để kiểm tra
print("Schema ban đầu:")
df.printSchema()

# Hiển thị vài dòng đầu để xem dữ liệu
print("\nDữ liệu ban đầu:")
df.show(5, truncate=False)

print("Số dòng ban đầu:", df.count())

Schema ban đầu:
root
 |-- ten_cong_viec: string (nullable = true)
 |-- khu_vuc: string (nullable = true)
 |-- ngay_dang_clean: string (nullable = true)
 |-- ngay_dang_date: date (nullable = true)
 |-- link: string (nullable = true)
 |-- cap_bac_standardized: string (nullable = true)
 |-- cap_bac_code: long (nullable = true)
 |-- exp_year: double (nullable = true)
 |-- kinh_nghiem_group: long (nullable = true)
 |-- nganh_nghe_clean: string (nullable = true)
 |-- mo_ta_cong_viec_clean: string (nullable = true)
 |-- yeu_cau_cong_viec_clean: string (nullable = true)
 |-- yeu_cau_cong_viec_en: string (nullable = true)
 |-- mo_ta_cong_viec_en: string (nullable = true)


Dữ liệu ban đầu:
+-----------------------------------------------------------------------------------------------------------------------------------------+-----------+---------------+--------------+--------------------------------------------------------------------------------------------------------------------------------

In [None]:
mlb_nganh_nghe = joblib.load("/content/drive/MyDrive/GoogleColab/mlb_nganh_nghe.pkl")

In [None]:
import pandas as pd
data_new = pd.DataFrame([{
    "khu_vuc": "Hồ Chí Minh",
    "cap_bac_code": 3,
    "kinh_nghiem_group": 2,
    "nganh_nghe_clean": "công nghệ thông tin",
    "mo_ta_cong_viec_en": "python, sql, good communication, hard-working, time manager, analysis data, process, english skill",
    "yeu_cau_cong_viec_en": ""  # nếu có thể bạn gộp chung vào phần trên cũng được
}])


In [None]:
import joblib

mlb_nganh_nghe = joblib.load("/content/drive/MyDrive/GoogleColab/mlb_nganh_nghe.pkl")

# Apply standardize
def standardize_nganh_nghe(tags, tag_to_group):
    result = []
    for tag in tags:
        tag_lower = tag.lower()
        group = tag_to_group.get(tag_lower, tag)
        result.append(group)
    return list(set(result))

# Create tag_to_group mapping
tag_to_group = {}
for group, tags in nganh_mapping.items():
    for tag in tags:
        tag_to_group[tag.lower().strip()] = group

data_new['nganh_nghe_clean_list'] = data_new['nganh_nghe_clean'].apply(
    lambda x: [tag.strip() for tag in x.split(',')] if isinstance(x, str) else []
)

data_new['nganh_nghe_chuan_hoa'] = data_new['nganh_nghe_clean_list'].apply(
    lambda tags: standardize_nganh_nghe(tags, tag_to_group)
)

# OHE ngành
nganh_OHE = mlb_nganh_nghe.transform(data_new['nganh_nghe_chuan_hoa'])
nganh_OHE_df = pd.DataFrame(nganh_OHE, columns=[f'nganh_{c}' for c in mlb_nganh_nghe.classes_])
data_new = pd.concat([data_new.reset_index(drop=True), nganh_OHE_df.reset_index(drop=True)], axis=1)


In [None]:
# Giả sử bạn đã có SparkSession tên là spark
df_spark = spark.createDataFrame(data_new)

from pyspark.sql.functions import concat_ws, col
df_spark = df_spark.withColumn("ky_nang_combined", concat_ws(" ", col("mo_ta_cong_viec_en"), col("yeu_cau_cong_viec_en")))

# Load pipeline đã huấn luyện
from pyspark.ml import PipelineModel
pipeline_model = PipelineModel.load("/content/drive/MyDrive/GoogleColab/pipeline_model_spark")

df_transformed = pipeline_model.transform(df_spark)

In [None]:
khu_vuc_idx = StringIndexer(inputCol="khu_vuc", outputCol="khu_vuc_idx")
khu_vuc_idx_model = khu_vuc_idx.fit(df_transformed)
df_indexed = khu_vuc_idx_model.transform(df_transformed)

# 👉 Lưu model
khu_vuc_idx_model.write().overwrite().save("/content/drive/MyDrive/GoogleColab/khu_vuc_idx_model")

In [None]:
khu_vuc_ohe = OneHotEncoder(inputCol="khu_vuc_idx", outputCol="khu_vuc_vec")
khu_vuc_ohe_model = khu_vuc_ohe.fit(df_indexed)
df_ohe = khu_vuc_ohe_model.transform(df_indexed)

# 👉 Lưu model
khu_vuc_ohe_model.write().overwrite().save("/content/drive/MyDrive/GoogleColab/khu_vuc_ohe_model")

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

assembler_numeric = VectorAssembler(inputCols=["cap_bac_code", "kinh_nghiem_group"], outputCol="numeric_features")
df_numeric = assembler_numeric.transform(df_ohe)

scaler = StandardScaler(inputCol="numeric_features", outputCol="numeric_scaled")
scaler_model = scaler.fit(df_numeric)
df_scaled = scaler_model.transform(df_numeric)

# 👉 Lưu model
scaler_model.write().overwrite().save("/content/drive/MyDrive/GoogleColab/scaler_model")

In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="label_raw", outputCol="label")
indexer_model = indexer.fit(df)

# 👉 Lưu
indexer_model.write().overwrite().save("/content/drive/MyDrive/GoogleColab/indexer_model_label")