In [0]:
def job_1900_silver():
    bronze = spark.read.option("multiline", "true").format("json").load("/Volumes/dev/job_prospects/job_1900_bronze")
    return bronze

df = job_1900_silver()

In [0]:
display(df)
df.dtypes
display(df.select("timestamp").distinct())

In [0]:
from pyspark.sql import functions as F

[
    ("application_deadline", "string"),
    ("area", "string"),
    ("company", "string"),
    ("company_address", "string"),
    ("company_rating", "string"),
    ("company_ref", "string"),
    ("company_size", "string"),
    ("employement_type", "string"),
    ("experience", "string"),
    ("gender", "string"),
    ("job_department", "string"),
    ("job_description", "array<string>"),
    ("job_field", "string"),
    ("job_title", "string"),
    ("number_of_jobs", "string"),
    ("number_of_reviews", "string"),
    ("posted_at", "string"),
    ("quantity", "string"),
    ("reviews", "array<string>"),
    ("salary", "string"),
    ("timestamp", "string"),
    ("title", "string"),
    ("url", "string"),
    ("views", "string"),
    ("work_address", "string"),
]

bronze = df


def parse_number(col):
    return (
        F.when(F.col(col).isNull() | (F.trim(F.col(col)) == ""), None)
        .when(
            F.col(col).rlike(r"^\s*\d+(\.\d+)?\s*[kK]"),
            (F.regexp_extract(col, r"(\d+(\.\d+)?)", 1).cast("double") * 1000).cast(
                "int"
            ),
        )
        .when(F.col(col).rlike(r"\d+"), F.regexp_extract(col, r"(\d+)", 1).cast("int"))
        .otherwise(None)
    )


def parse_min_salary(col):
    return (
        F.when(F.col(col).isNull() | (F.trim(F.col(col)) == ""), None)
        .when(F.lower(F.trim(F.col(col))) == "thoả thuận", None)
        .when(F.col(col).rlike(r"(?i)^t[ơo]i\s*|^tới\s*"), None)
        .when(
            F.col(col).rlike(r"(?i)trên\s*([0-9]+(?:[.,][0-9]+)?)\s*([^\d\s-]+)"),
            F.concat(
                F.regexp_extract(col, r"(?i)trên\s*([0-9]+(?:[.,][0-9]+)?)", 1),
                F.lit(" "),
                F.regexp_extract(
                    col, r"(?i)trên\s*[0-9]+(?:[.,][0-9]+)?\s*([^\d\s-]+)", 1
                ),
            ),
        )
        .when(
            F.col(col).rlike(
                r"^\s*([0-9]+(?:[.,][0-9]+)?)\s*-\s*([0-9]+(?:[.,][0-9]+)?)\s*([^\d\s-]+)"
            ),
            F.concat(
                F.regexp_extract(col, r"^\s*([0-9]+(?:[.,][0-9]+)?)", 1),
                F.lit(" "),
                F.regexp_extract(
                    col,
                    r"^\s*[0-9]+(?:[.,][0-9]+)?\s*-\s*[0-9]+(?:[.,][0-9]+)?\s*([^\d\s-]+)",
                    1,
                ),
            ),
        )
        .otherwise(
            F.trim(
                F.concat(
                    F.regexp_extract(col, r"^\s*([0-9]+(?:[.,][0-9]+)?)", 1),
                    F.lit(" "),
                    F.regexp_extract(
                        col, r"^\s*[0-9]+(?:[.,][0-9]+)?\s*([^\d\s-]+)", 1
                    ),
                )
            )
        )
    )


def parse_max_salary(col):
    return (
        F.when(F.col(col).isNull() | (F.trim(F.col(col)) == ""), None)
        .when(F.lower(F.trim(F.col(col))) == "thoả thuận", None)
        .when(F.col(col).rlike(r"(?i)^trên"), None)
        .when(
            F.col(col).rlike(r"(?i)^t[ơo]i\s*([0-9]+(?:[.,][0-9]+)?)\s*(triệu|usd)"),
            F.concat(
                F.regexp_extract(col, r"(?i)^t[ơo]i\s*([0-9]+(?:[.,][0-9]+)?)", 1),
                F.lit(" "),
                F.regexp_extract(
                    col, r"(?i)^t[ơo]i\s*[0-9]+(?:[.,][0-9]+)?\s*(triệu|usd)", 1
                ),
            ),
        )
        .when(
            F.col(col).rlike(
                r"^\s*([0-9]+(?:[.,][0-9]+)?)\s*-\s*([0-9]+(?:[.,][0-9]+)?)\s*([^\d\s-]+)"
            ),
            F.concat(
                F.regexp_extract(
                    col, r"^\s*([0-9]+(?:[.,][0-9]+)?)\s*-\s*([0-9]+(?:[.,][0-9]+)?)", 2
                ),
                F.lit(" "),
                F.regexp_extract(
                    col,
                    r"^\s*[0-9]+(?:[.,][0-9]+)?\s*-\s*([0-9]+(?:[.,][0-9]+)?)\s*([^\d\s-]+)",
                    2,
                ),
            ),
        )
        .otherwise(
            F.trim(
                F.concat(
                    F.regexp_extract(col, r"([0-9]+(?:[.,][0-9]+)?)", 1),
                    F.lit(" "),
                    F.regexp_extract(col, r"([0-9]+(?:[.,][0-9]+)?)\s*([^\d\s-]+)", 2),
                )
            )
        )
    )


def convert_to_usd(col):
    return (
        F.when(
            F.col(col).rlike(r"(?i)([0-9]+(?:[.,][0-9]+)?)\s*triệu"),
            (
                (
                    F.regexp_replace(
                        F.regexp_extract(F.col(col), r"([0-9]+(?:[.,][0-9]+)?)", 1),
                        ",",
                        ".",
                    ).cast("double")
                    * 1_000_000
                )
                * 0.000038
            ).cast("int"),
        )
        .when(
            F.col(col).rlike(r"(?i)([0-9]+(?:[.,][0-9]+)?)\s*usd"),
            F.regexp_replace(
                F.regexp_extract(F.col(col), r"([0-9]+(?:[.,][0-9]+)?)", 1), ",", "."
            )
            .cast("double")
            .cast("int"),
        )
        .otherwise(None)
    )

def parse_float(col):
    return (
        F.when(F.col(col).isNull() | (F.trim(F.col(col)) == ""), None)
        .otherwise(F.regexp_replace(F.col(col), ",", ".").cast("decimal(10,1)"))
    )

def parse_min_experience(col):
    return (
        F.when(F.col(col).isNull() | (F.trim(F.col(col)) == ""), None)
        .when(F.lower(F.trim(F.col(col))) == "không yêu cầu", F.lit(0))
        .when(F.col(col).rlike(r"(?i)^t[ơo]i\s*[0-9]+(?:[.,][0-9]+)?\s*năm"), F.lit(0))
        .when(F.col(col).rlike(r"(?i)^t[ơo]i\s*năm"), F.lit(0))
        .when(F.col(col).rlike(r"(?i)^tới\s*[0-9]+(?:[.,][0-9]+)?\s*năm"), F.lit(0))
        .when(F.col(col).rlike(r"(?i)^trên\s*([0-9]+(?:[.,][0-9]+)?)\s*năm"),
              F.regexp_replace(F.regexp_extract(col, r"(?i)^trên\s*([0-9]+(?:[.,][0-9]+)?)", 1), ",", ".").cast("int"))
        .when(F.col(col).rlike(r"([0-9]+(?:[.,][0-9]+)?)\s*-\s*([0-9]+(?:[.,][0-9]+)?)\s*năm"),
              F.regexp_replace(F.regexp_extract(col, r"([0-9]+(?:[.,][0-9]+)?)\s*-\s*([0-9]+(?:[.,][0-9]+)?)", 1), ",", ".").cast("int"))
        .when(F.col(col).rlike(r"([0-9]+(?:[.,][0-9]+)?)\s*năm"),
              F.regexp_replace(F.regexp_extract(col, r"([0-9]+(?:[.,][0-9]+)?)\s*năm", 1), ",", ".").cast("int"))
        .otherwise(None)
    ).cast("int")

def parse_max_experience(col):
    return (
        F.when(F.col(col).isNull() | (F.trim(F.col(col)) == ""), None)
        .when(F.lower(F.trim(F.col(col))) == "không yêu cầu", None)
        .when(F.col(col).rlike(r"(?i)^t[ơo]i\s*[0-9]+(?:[.,][0-9]+)?\s*năm"), 
              F.regexp_replace(F.regexp_extract(col, r"(?i)^t[ơo]i\s*([0-9]+(?:[.,][0-9]+)?)", 1), ",", ".").cast("int"))
        .when(F.col(col).rlike(r"(?i)^t[ơo]i\s*năm"), None)
        .when(F.col(col).rlike(r"(?i)^tới\s*[0-9]+(?:[.,][0-9]+)?\s*năm"), 
              F.regexp_replace(F.regexp_extract(col, r"(?i)^tới\s*([0-9]+(?:[.,][0-9]+)?)", 1), ",", ".").cast("int"))
        .when(F.col(col).rlike(r"(?i)^trên\s*([0-9]+(?:[.,][0-9]+)?)\s*năm"), None)
        .when(F.col(col).rlike(r"([0-9]+(?:[.,][0-9]+)?)\s*-\s*([0-9]+(?:[.,][0-9]+)?)\s*năm"),
              F.regexp_replace(F.regexp_extract(col, r"([0-9]+(?:[.,][0-9]+)?)\s*-\s*([0-9]+(?:[.,][0-9]+)?)", 2), ",", ".").cast("int"))
        .when(F.col(col).rlike(r"([0-9]+(?:[.,][0-9]+)?)\s*năm"),
              F.regexp_replace(F.regexp_extract(col, r"([0-9]+(?:[.,][0-9]+)?)\s*năm", 1), ",", ".").cast("int"))
        .otherwise(None)
    ).cast("int")

def parse_area(col):
    return F.array_distinct(
        F.transform(
            F.split(
                F.regexp_replace(
                    F.regexp_replace(F.col(col), r"\s*-\s*Việc làm tại\s*", "|"),
                    r"\s*-\s*", "|"
                ),
                r"\|"
            ),
            lambda x: F.trim(x)
        )
    )

bronze = bronze.withColumns(
    {
        "timestamp": F.to_date("timestamp"),
        "number_of_reviews": parse_number("number_of_reviews"),
        "number_of_jobs": parse_number("number_of_jobs"),
        "views": parse_number("views"),
        "salary_low": parse_min_salary("salary"),
        "salary_high": parse_max_salary("salary"),
        "posted_at": F.to_date(F.col("posted_at"), "dd/MM/yyyy"),
        "quantity": parse_number("quantity"),
        "application_deadline": F.try_to_date(F.col("application_deadline"), "dd/MM/yyyy"),
        "minimum_experience": parse_min_experience("experience"),
        "maximum_experience": parse_max_experience("experience"),
        # gender
        # "job_description": F.when(
        #     F.col("job_description").isNotNull(),
        #     F.expr("concat_ws('\n', job_description)")
        # ).otherwise(None),
        "company_size": parse_number("company_size"),
        "company_rating": parse_float("company_rating"),
        "area": parse_area("area")
    }
)
bronze = bronze.withColumns(
    {
        "salary_low": convert_to_usd("salary_low"),
        "salary_high": convert_to_usd("salary_high"),
    }
)

display(bronze)

In [0]:
display(bronze.dropDuplicates(["area"]).sort("area").select("area"))
display(df.dropDuplicates(["area"]).sort("area").select("area"))
# display(df.select("job_department","job_field", "job_description", "url", "salary", "experience").filter(F.col("experience") == "25 - 33 năm"))

In [0]:
display(bronze.dropDuplicates(["salary"]).select("salary_high").sort("salary"))
display(df.dropDuplicates(["salary"]).select("salary").sort("salary"))

In [0]:
# Test cases for parse_number
test_data = [
    ("100", 100),
    ("2k", 2000),
    ("3.5K", 3500),
    (" 7 K ", 7000),
    ("12", 12),
    ("0", 0),
    ("", None),
    (None, None),
    ("abc", None),
    ("1.2k", 1200),
    ("999", 999),
    ("5.7k", 5700),
]

test_df = spark.createDataFrame(test_data, ["input", "expected"])
result_df = test_df.withColumn("parsed", parse_number("input"))
display(result_df)

mismatches = result_df.filter(~(F.col("parsed").eqNullSafe(F.col("expected"))))
assert mismatches.count() == 0, "parse_number failed for some test cases"