In [1]:
import sys

sys.path.append("..")  # Adds higher directory to python modules path.

import json
import re
import sqlite3
from pathlib import Path

import polars as pl

from utility.path import JOB51_SQLITE_FILE_PATH, JOBOSS_SQLITE_FILE_PATH

In [2]:
with sqlite3.connect(JOBOSS_SQLITE_FILE_PATH) as conn:
    joboss = pl.read_database(query="SELECT * FROM joboss", connection=conn).lazy()

with sqlite3.connect(JOB51_SQLITE_FILE_PATH) as conn:
    job51 = pl.read_database(query="SELECT * FROM job51", connection=conn).lazy()

with Path.open(Path("maps.json"), "r", encoding="utf-8") as f:
    maps: dict[str, dict[str, str] | list[str] | dict[str, list[str]]] = json.load(f)

In [3]:
job_name_filter_list: list[str] = maps["job_name_filter_list"]  # type: ignore[reportArgumentType]
filter_pattern = "|".join(job_name_filter_list)


def clean_data(df: pl.LazyFrame) -> pl.LazyFrame:
    """Clean data by removing non-utf8 characters and empty strings."""
    return df.with_columns(
        # check if the string contains non-utf8 characters
        pl.when(pl.col(pl.Utf8).str.contains(r"\\u[0-9a-fA-F]{4}"))
        .then(None)
        # check if the string is empty
        .when(pl.col(pl.Utf8).str.len_bytes() == 0)
        .then(None)
        .otherwise(pl.col(pl.Utf8))
        .name.keep()
    )


def parse_company_tag(company_tag: str) -> dict[str, str]:
    """Parse company tag to area, type, and size."""
    tags = company_tag.split(",")
    if len(tags) == 3:  # noqa: PLR2004
        return {
            "company_area": tags[0],
            "company_type": tags[1],
            "company_size": tags[2],
        }
    if "人" in company_tag and len(tags) == 2:  # noqa: PLR2004
        return {"company_area": tags[0], "company_type": "", "company_size": tags[1]}

    msg = f"Invalid company tag: {company_tag}"
    raise ValueError(msg)

In [4]:
joboss = (
    joboss.select(pl.col(pl.Utf8))
    .filter(pl.col("job_name").str.contains(filter_pattern))
    .filter(~pl.col("job_name").str.contains("短视频|抖音|推广"))
    .filter(
        ~(
            pl.col("job_name").str.contains("运营")
            & ~pl.col("skill_tags").str.contains("数据")
        )
    )
)

joboss = (
    joboss.with_columns(
        [
            pl.col("edu_exp")
            .str.split_exact(",", 1)
            .struct.rename_fields(["work_year", "degree"])
            .alias("edu_exp"),
        ]
    )
    .unnest("edu_exp")
    .with_columns(
        [  # remove meaningless because of internship
            pl.col("work_year").str.replace_all(r"\d+天/周", ""),
            pl.col("degree").str.replace_all(r"\d+个月", ""),
        ]
    )
)

joboss = joboss.with_columns(
    pl.col("company_tag").map_elements(
        parse_company_tag,
        return_dtype=pl.Struct(
            {
                "company_area": pl.String,
                "company_type": pl.String,
                "company_size": pl.String,
            }
        ),
    )
).unnest("company_tag")


# Concatenate and rename columns
joboss = joboss.with_columns(
    pl.concat_str(
        [pl.col("skill_tags"), pl.col("job_other_tags")],
        separator=",",
        ignore_nulls=True,
    ).alias("tags")
).drop("skill_tags", "job_other_tags")

joboss = clean_data(joboss)

In [5]:
job51 = (
    job51.select(pl.col(pl.Utf8))
    .filter(pl.col("jobName").str.contains(filter_pattern))
    .filter(~pl.col("jobName").str.contains("短视频|抖音|推广"))
    .filter(
        ~(pl.col("jobName").str.contains("运营") & ~pl.col("tags").str.contains("数据"))
    )
)

job51 = (
    job51.drop("logo", "issueDate")
    .rename(
        {
            "jobName": "job_name",
            "workYear": "work_year",
            "companyName": "company_name",
            "companySize": "company_size",
            "companyType": "company_type",
        }
    )
    .with_columns(
        [
            pl.col("degree").str.replace_all("中技/中专", "中专/中技"),
        ]
    )
)
job51 = clean_data(job51)

In [6]:
job_lazy = clean_data(pl.concat([job51, joboss], how="diagonal"))

In [7]:
province_city_map: dict[str, list[str]] = maps["province_city_map"]  # type: ignore[reportArgumentType]
province_part_map: dict[str, list[str]] = maps["province_part_map"]  # type: ignore[reportArgumentType]


def map_city_with_province(
    city: str, province_city_map: dict[str, list[str]] = province_city_map
) -> str:
    """Map city with province."""
    extra_city_province_map = {
        "澄迈": "海南省",
        "屯昌": "海南省",
        "琼海": "海南省",
        "广西": "广西壮族自治区",
        "新疆": "新疆维吾尔自治区",
        "西藏": "西藏自治区",
        "宁夏": "宁夏回族自治区",
        "内蒙古": "内蒙古自治区",
        "香港": "香港特别行政区",
        "澳门": "澳门特别行政区",
        "台湾": "台湾省",
        "北京": "北京市",
        "上海": "上海市",
        "天津": "天津市",
        "重庆": "重庆市",
    }

    if city in extra_city_province_map:
        return extra_city_province_map[city]

    if city.endswith("省"):
        return city

    if not city.endswith(("自治州", "自治区", "自治县", "地区", "盟", "市")):
        city = city + "市"

    for province, cities in province_city_map.items():
        if city in cities:
            return province

    msg = f"City {city} not found in the province_city_dict."
    raise ValueError(msg)


def map_province_with_part(
    provienct: str, province_part_map: dict[str, list[str]] = province_part_map
) -> str:
    """Map province with part."""
    for part, provinces in province_part_map.items():
        if provienct in provinces:
            return part

    msg = f"Province {provienct} not found in the province_part_dict."
    raise ValueError(msg)


special_province_regx = "|".join(
    {
        "北京",
        "深圳",
        "上海",
        "天津",
        "重庆",
        "广西",
        "新疆",
        "西藏",
        "宁夏",
        "内蒙古",
    }  # special province in 51job, not before "省"
)


job_lazy: pl.LazyFrame = (
    # map city with province and map province with part
    job_lazy.with_columns(
        # `西安·雁塔区·电子城` -> `西安`
        pl.when(pl.col("area").str.contains("·"))
        .then(pl.col("area").str.extract(r"(.*?)·"))
        # `湖南省长沙岳麓区` -> `湖南省`
        .when(pl.col("area").str.contains("省"))
        .then(pl.col("area").str.extract(r"(.*?省)"))
        # `北京北京昌平区` -> `北京`
        .when(pl.col("area").str.contains(special_province_regx))
        .then(pl.col("area").str.extract(rf"({special_province_regx})"))
        .otherwise(pl.col("area"))
        .map_elements(map_city_with_province, return_dtype=pl.String)
        .alias("province")
    ).with_columns(  # map province with part
        pl.col("province")
        .map_elements(map_province_with_part, return_dtype=pl.String)
        .alias("part")
    )
)

In [8]:
# set 8 hours a day, 20 days a month, 12 months a year
def _calculate_annual_salary_interval(salary_string: str) -> tuple[int, ...] | int:
    """Calculate annual salary based on the given salary string.

    Support the following formats:
    - 12-24K·18薪
    - 5-7千·14薪
    - 18-36K
    - 3.5-4千
    - 8千-1.2万·15薪
    - 8千-1.2万
    - 1.3-2.4万
    - 1.5千/天
    - 3千及以下
    - 500元/天
    - 10-30元/时
    - 10000-15000元/月
    - 200-300元/周
    """
    if not isinstance(salary_string, str):
        msg = f"Invalid salary type: {salary_string}"
        raise TypeError(msg)

    pattern_dict = {
        # not the number of multiplier should match
        # if not salary range, it should 1,
        # if salary range, it should 2.
        r"(\d+-\d+K)·(\d+)薪": (1000, 1000),
        r"(\d+-\d+千)·(\d+)薪": (1000, 1000),
        r"(\d+(?:\.\d+)?-\d+(?:\.\d+)?K)": (1000, 1000),
        r"(\d+(?:\.\d+)?-\d+(?:\.\d+)?千)": (1000, 1000),
        r"(\d+千-\d+(?:\.\d+)?万)·(\d+)薪": (1000, 10000),
        r"(\d+(?:\.\d+)?千-\d+(?:\.\d+)?万)": (1000, 10000),
        r"(\d+(?:\.\d+)?-\d+(?:\.\d+)?万)": (10000, 10000),
        r"(\d+(?:\.\d+)?)千/天": (1000 * 20,),
        r"(\d+千)及以下": (1000,),
        r"(\d+)元/天": (20,),
        r"(\d+-\d+)元/时": (8 * 20, 8 * 20),
        r"(\d+-\d+)元/月": (1, 1),
        r"(\d+-\d+)元/周": (4, 4),
    }

    match, multiplier = next(
        (
            (re.search(pattern, salary_string), multiplier)
            for pattern, multiplier in pattern_dict.items()
            if re.search(pattern, salary_string)
        ),
        (None, None),
    )

    if match is None or multiplier is None:
        msg = f"Invalid salary string: {salary_string}"
        raise ValueError(msg)

    salary_range = match.group(1)
    if salary_range is None:
        msg = f"Invalid salary range: {salary_string}"
        raise ValueError(msg)

    range_multiplier = int(match.group(2)) if len(match.groups()) == 2 else 12  # noqa: PLR2004

    salary_range = salary_range.translate(str.maketrans("", "", "千万K"))
    salary_base: list[int] = [
        float(salary) * range_multiplier * m
        for salary, m in zip(salary_range.split("-"), multiplier, strict=False)
    ]

    return tuple(map(int, salary_base)) if len(salary_base) > 1 else int(salary_base[0])


def calculate_monthly_salary_midpoint(salary_string: str) -> int:
    """Calculate the midpoint of the annual salary based on the given salary string."""
    salary_interval = _calculate_annual_salary_interval(salary_string)
    if isinstance(salary_interval, int):
        return salary_interval // 12
    return sum(salary_interval) // 2 // 12


def map_monthly_salary_code(monthly_salary_point: int) -> str:
    """Map monthly salary point to salary range code."""
    salary_ranges = [
        (3000, "3000元以下"),
        (5000, "3000元至5000元"),
        (10000, "5000元至10000元"),
        (20000, "10000元至20000元"),
        (50000, "20000元至50000元"),
        (100000, "50000元至100000元"),
    ]

    for upper_bound, label in salary_ranges:
        if monthly_salary_point <= upper_bound:
            return label

    return "100000元以上"

In [9]:
def _calculate_mean_work_year(work_year: str) -> int:
    """Calculate the mean work year based on the given work year string.

    Support the following formats:
    - 经验不限
    - 在校生/应届生
    - 3年
    - 3-5年
    - 3年(及)以上
    - 3年以内
    """
    if work_year == "经验不限":
        return -1
    if bool(re.search(r"在校|应届|无需经验", work_year)):
        return 0
    pattern_list = [r"(\d+(?:-\d+)?)年", r"(\d+)年(?:及)?以上", r"(\d+)年以内"]
    match = next(
        (
            re.search(pattern, work_year)
            for pattern in pattern_list
            if re.search(pattern, work_year)
        ),
        None,
    )

    if not match:
        msg = f"Invalid work year: {work_year}"
        raise ValueError(msg)

    work_year = match.group(1)
    if "-" in work_year:
        start, end = map(int, work_year.split("-"))
        return (start + end) // 2

    return int(work_year)


def map_work_year_midpoint_interval(work_year_string: str) -> str:
    """Map the work year string to the corresponding interval."""
    work_year_midpoint = _calculate_mean_work_year(work_year_string)

    intervals = [
        (-1, "经验不限"),
        (0, "无经验"),
        (1, "1年以内"),
        (3, "1-3年"),
        (5, "3-5年"),
        (10, "5-10年"),
    ]

    for upper_bound, label in intervals:
        if work_year_midpoint <= upper_bound:
            return label

    return "10年以上"


company_area_cleaned_map: dict[str, str] = maps["company_area_cleaned_map"]  # type: ignore[reportArgumentType]


def map_company_area_with_economic_area(
    company_area: str, maps: dict[str, str] = company_area_cleaned_map
) -> str:
    """Map the company area to the corresponding economic area."""
    for economic_area, areas in maps.items():
        if company_area in areas:
            return economic_area

    msg = f"Invalid company area: {company_area}"
    raise ValueError(msg)

In [10]:
city_level_map: dict[str, str] = maps["city_level_map"]  # type: ignore[reportArgumentType]
city_level_exclusion_list: list[str] = maps["city_level_exclusion_list"]  # type: ignore[reportArgumentType]


def map_area_city_with_level(
    area_string: str, city_level_map: dict[str, str] = city_level_map
) -> str | None:
    """Map the area city with the corresponding level."""
    if area_string in city_level_exclusion_list:
        return None
    if "义乌" in area_string:
        area_string = "金华"
    for city, level in city_level_map.items():
        if city in area_string:
            return level

    msg = f"Invalid area string: {area_string}"
    raise ValueError(msg)

In [11]:
city_level_map["眉山"]

'四线城市'

In [12]:
type_tag_map: dict[str, list[str]] = maps["type_tag_map"]  # type: ignore[reportArgumentType]


def map_tags_skill_with_type(
    tag_string: str, type_tag_map: dict[str, list[str]] = type_tag_map
) -> str | None:
    """Map tags with type."""
    tag_list = tag_string.split(",")
    # count which type tag appears most
    type_count = {
        type_tag: sum(tag in tags for tag in tag_list)
        for type_tag, tags in type_tag_map.items()
    }
    # Check if at least one count is not zero
    if max(type_count.values(), default=0) > 0:
        return max(type_count, key=type_count.get)  # type: ignore[reportCallIssue]

    return None

In [13]:
job = (
    job_lazy.with_columns(
        monthly_salary=pl.col("salary").map_elements(
            calculate_monthly_salary_midpoint, return_dtype=pl.UInt16
        ),
        company_size_cleaned=pl.col("company_size").replace(
            maps["company_size_cleaned_map"], return_dtype=pl.String
        ),
    )
    .with_columns(
        monthly_salary_interval=pl.col("monthly_salary").map_elements(
            map_monthly_salary_code, return_dtype=pl.String
        ),
        work_year_interval=pl.col("work_year").map_elements(
            map_work_year_midpoint_interval, return_dtype=pl.String
        ),
        economic_area=pl.col("company_area").map_elements(
            map_company_area_with_economic_area, return_dtype=pl.String
        ),
        city=pl.col("area").replace(maps["area_city_map"], return_dtype=pl.String),
        skill_type=pl.col("tags").map_elements(
            map_tags_skill_with_type, return_dtype=pl.String
        ),
    )
    .with_columns(
        city_level=pl.col("city").map_elements(
            map_area_city_with_level, return_dtype=pl.String
        ),
    )
    .with_columns(
        monthly_salary_code=pl.col("monthly_salary_interval").replace(
            maps["monthly_salary_code_map"], return_dtype=pl.String
        ),
        degree_code=pl.col("degree").replace(
            maps["degree_code_map"],
            return_dtype=pl.String,
        ),
        work_year_code=pl.col("work_year_interval").replace(
            maps["work_year_code_map"], return_dtype=pl.String
        ),
        company_size_code=pl.col("company_size_cleaned").replace(
            maps["company_size_code_map"], return_dtype=pl.String
        ),
        economic_area_code=pl.col("economic_area").replace(
            maps["economic_area_code_map"], return_dtype=pl.String
        ),
        city_level_code=pl.col("city_level").replace(
            maps["city_level_code_map"], return_dtype=pl.String
        ),
        skill_type_code=pl.col("skill_type").replace(
            maps["skill_type_code_map"], return_dtype=pl.String
        ),
    )
    .collect()
)

display(job.shape)
job.head(3)

(21299, 27)

job_name,tags,area,salary,work_year,degree,company_name,company_type,company_size,company_area,province,part,monthly_salary,company_size_cleaned,monthly_salary_interval,work_year_interval,economic_area,city,skill_type,city_level,monthly_salary_code,degree_code,work_year_code,company_size_code,economic_area_code,city_level_code,skill_type_code
str,str,str,str,str,str,str,str,str,str,str,str,u16,str,str,str,str,str,str,str,str,str,str,str,str,str,str
"""大数据工程师""","""7年及以上,本科,java,…","""广东省东莞横沥镇""","""1.2-2万""","""7年及以上""","""本科""","""广东中泰工业科技股份有限公司…","""民营""","""1000-5000人""",,"""广东省""","""东部""",16000,"""1000-9999人""","""10000元至20000元""","""5-10年""",,"""东莞市""","""大数据工程师""","""新一线城市""","""S4""","""D5""","""W4""","""C5""",,"""L2""","""T1"""
"""数据分析顾问/师(J1365…","""5-10年,本科,数据挖掘,…","""广东省东莞南城区""","""1.5-2.5万·13薪""","""5-10年""","""本科""","""美宜佳控股有限公司""","""民营""","""1000-5000人""",,"""广东省""","""东部""",20000,"""1000-9999人""","""10000元至20000元""","""5-10年""",,"""东莞市""","""大数据工程师""","""新一线城市""","""S4""","""D5""","""W4""","""C5""",,"""L2""","""T1"""
"""电商数据分析师""","""2年,大专,数据分析,exc…","""广东省东莞万江区""","""8千-1.2万""","""2年""","""大专""","""东莞富尚科技有限公司""","""民营""","""150-500人""",,"""广东省""","""东部""",10000,"""100-499人""","""5000元至10000元""","""1-3年""",,"""东莞市""","""数据分析师""","""新一线城市""","""S3""","""D4""","""W2""","""C3""",,"""L2""","""T2"""


In [14]:
# job.select(pl.col("tags").str.split(",").flatten().alias("tags")).group_by("tags").agg(
#     pl.len()
# ).filter(pl.col("len") >= 10).sort("len", descending=True).write_csv("tags.csv")

In [15]:
job.write_csv("job.csv")
job.write_excel("job.xlsx")

<xlsxwriter.workbook.Workbook at 0x7f8ff60d3590>