In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, Row, IntegerType, ArrayType
spark = SparkSession.builder.appName('Iteration_4').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/17 11:57:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Let's read in the data. Note that it's in the format of JSON.
df01 = spark.read.csv('Datasets/job_postings.csv', header=True, inferSchema=True)
df02 = spark.read.csv('Datasets/job_skills.csv', header=True, inferSchema=True)
df01_count = df01.count()
df02_count = df02.count()
(df01_count,df02_count)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

(12218, 12217)

## Use pandas to try again

In [3]:
import pandas as pd

job_postings_df = pd.read_csv('Datasets/job_postings.csv', dtype=str)
job_skills_df = pd.read_csv('Datasets/job_skills.csv', dtype=str)

job_postings_count = len(job_postings_df)
job_skills_count = len(job_skills_df)
(job_postings_count,job_skills_count)

(12217, 12217)

In [None]:
common_columns = list(set(df01.columns).intersection(set(df02.columns)))

# Find common columns
df01_common = df01.select(common_columns)
df02_common = df02.select(common_columns)

# Gain rows
df01_count = df01_common.count()
df02_count = df02_common.count()
print(f'df01行数: {df01_count}, df02行数: {df02_count}')

# Find extral rows
diff_df = df01.join(df02, on='job_link', how='left_anti')

# Display extral rows
print("Display extral rows:")
diff_df.show()

# Delete extral rows
df01_filtered = df01.subtract(diff_df)

df01_filtered.describe()

df01行数: 12218, df02行数: 12217
Display extral rows:
+--------------------+--------------------+--------------------+-----------+-------+---------------+--------------------+----------+------------+----------+-----------+--------------+---------------+---------+--------+
|            job_link| last_processed_time|         last_status|got_summary|got_ner|is_being_worked|           job_title|   company|job_location|first_seen|search_city|search_country|search_position|job_level|job_type|
+--------------------+--------------------+--------------------+-----------+-------+---------------+--------------------+----------+------------+----------+-----------+--------------+---------------+---------+--------+
|                 ...|RED Engineering D...|London, England, ...| 2024-01-15| Slough| United Kingdom|Electrical Engine...|Mid senior|      Onsite|      null|       null|          null|           null|     null|    null|
+--------------------+--------------------+--------------------+----------

[Stage 22:>                                                         (0 + 2) / 2]

## Data Exploration

In [None]:
# Visualise DataFrames df1 

df01_filtered.show()
df01_filtered.columns

In [None]:
# Visualise DataFrames df2
df02.show()
df02.columns

In [None]:
# Convert Spark DataFrame to Pandas DataFrame
pd_df01 = df01_filtered.toPandas()
pd_df02 = df02.toPandas()
# Display summary statistics using Pandas
from IPython.display import display
display(pd_df01.describe())
display(pd_df02.describe())

In [None]:
# For type, we can use print schema. 
df01_filtered.printSchema()
df02.printSchema()

## Data Manipulation

In [None]:
df01_clean = df01.na.drop(subset="job_link")
pd_df01_clean = df01_clean.toPandas()
display(pd_df01_clean.describe(include='all'))

In [None]:
from pyspark.sql.functions import col, udf, split, explode, count
import re
# Use .distinct() and .exceptAll() to check if there are any unmatched 'job_link' entries between the two DataFrames
different_links = df01_filtered.select("job_link").distinct().exceptAll(
    df02.select("job_link").distinct()
)

# Decide whether to merge based on the presence of unmatched 'job_link' entries
if different_links.count() == 0:
    # If there are no unmatched 'job_link' entries, proceed with merging
    merged_data = df01_filtered.join(df02, "job_link", "inner")
    print("DataFrames merged successfully.")
else:
    # If there are unmatched 'job_link' entries, do not merge
    print("DataFrames' job_link columns are not the same, cannot merge.")

if different_links.count() > 0:
    print("Unmatched job_link entries:")
    different_links.show()


In [None]:
merged_data.show()
merged_data.printSchema()

# Display summary statistics using Pandas
pd_merged_data = merged_data.toPandas()
display(pd_merged_data.describe())

In [None]:
df_clean = merged_data.na.drop()
# Display summary statistics using Pandas
pd_df_clean = df_clean.toPandas()
display(pd_df_clean.describe())

In [None]:
# 删除指定的列
columns_to_drop = ['last_processed_time', 'last_status', 'job_link', 'got_summary', 'got_ner',
                   'is_being_worked', 'company', 'first_seen', 'search_city', 'search_position', 'job_type']
filtered_df = merged_data.drop(*columns_to_drop)

# 显示过滤后的DataFrame
filtered_df.show()

In [None]:
# 定义函数
def split_location(location):
    if location is None:
        return Row(city=None, state=None, country=None)

    parts = [part.strip() for part in str(location).split(',')]
    city = state = country = None

    known_countries = ["United States", "United Kingdom", "Canada", "Australia", "India",
                       "Germany", "France", "Italy", "Spain", "Mexico"]
    us_states = {
        'AL': 'Alabama', 'AK': 'Alaska', 'AZ': 'Arizona', 'AR': 'Arkansas', 'CA': 'California',
        'CO': 'Colorado', 'CT': 'Connecticut', 'DE': 'Delaware', 'FL': 'Florida', 'GA': 'Georgia',
        'HI': 'Hawaii', 'ID': 'Idaho', 'IL': 'Illinois', 'IN': 'Indiana', 'IA': 'Iowa',
        'KS': 'Kansas', 'KY': 'Kentucky', 'LA': 'Louisiana', 'ME': 'Maine', 'MD': 'Maryland',
        'MA': 'Massachusetts', 'MI': 'Michigan', 'MN': 'Minnesota', 'MS': 'Mississippi', 'MO': 'Missouri',
        'MT': 'Montana', 'NE': 'Nebraska', 'NV': 'Nevada', 'NH': 'New Hampshire', 'NJ': 'New Jersey',
        'NM': 'New Mexico', 'NY': 'New York', 'NC': 'North Carolina', 'ND': 'North Dakota', 'OH': 'Ohio',
        'OK': 'Oklahoma', 'OR': 'Oregon', 'PA': 'Pennsylvania', 'RI': 'Rhode Island', 'SC': 'South Carolina',
        'SD': 'South Dakota', 'TN': 'Tennessee', 'TX': 'Texas', 'UT': 'Utah', 'VT': 'Vermont',
        'VA': 'Virginia', 'WA': 'Washington', 'WV': 'West Virginia', 'WI': 'Wisconsin', 'WY': 'Wyoming', 'DC': 'Washington'
    }
    # Create a reverse map from full state name to abbreviation
    state_abbrev = {v: k for k, v in us_states.items()}

    # If there is only one part
    if len(parts) == 1:
        if parts[0] in known_countries:
            country = parts[0]
        else:
            state = parts[0]  # Translate to state if not in known countries
    elif len(parts) == 2:
        if parts[1] in known_countries:
            city = parts[0]
            country = parts[1]
            if city in state_abbrev and country == "United States":
                state = state_abbrev[city]
                city = None
            else:
                state = city
                city = None
        else:
            city, state = parts
            if state in us_states.keys():  # If the state name is the abbreviation of the US state
                country = "United States"  # Country name to United States
    elif len(parts) == 3:
        city, state, country = parts
        if state in state_abbrev:  # If state is full name
            state = state_abbrev[state]  # To abbreviation

    return Row(city=city, state=state, country=country)

# 注册UDF
split_location_udf = udf(lambda location: split_location(location), StructType([
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("country", StringType(), True)
]))

# 应用UDF
df_with_location = filtered_df.withColumn("location_split", split_location_udf(col("job_location")))
df_with_location = df_with_location.select("*", "location_split.city", "location_split.state", "location_split.country").drop("location_split")

# 显示过滤后的DataFrame
df_with_location.show()

In [None]:
# 定义技能规范化函数
def normalize_skill_name(skill):
    skill_normalized = skill.strip().lower()
    if skill_normalized in ("go", "golang"):
        return "Go/Golang"
    if skill_normalized == "ruby on rails":
        return "Ruby"
    if skill_normalized in ("ai", "llms", "artificial intelligence", "generative ai", "natural language processing",
                            "nlp", 'machine learning', 'deep learning', 'ml', 'reinforcement learning'):
        return 'ML/AI'
    if 'problem' in skill_normalized and 'solving' in skill_normalized:
        return 'ProblemSolving'
    if 'microsoft' in skill_normalized and ('office' in skill_normalized or 'excel' in skill_normalized or 'Excel' in skill_normalized):
        return 'MicrosoftOffice'
    if re.search(r'\b(cloud|azure|aws|gcp)\b', skill_normalized, re.IGNORECASE):
        return 'Cloud Computing'
    if 'communication' in skill_normalized:
        return 'Communication'
    if re.search(r'\b(software|programming|developer)\b', skill_normalized, re.IGNORECASE):
        return 'SoftwareDevelopment'
    if re.search(r'\b(data science|data scientist)\b', skill_normalized, re.IGNORECASE):
        return 'DataScience'
    if re.search(r'\b(data analysis|data analytics|data_analysis)\b', skill_normalized, re.IGNORECASE):
        return 'DataAnalysis'
    if 'java' in skill_normalized and 'script' not in skill_normalized:
        return 'Java'
    if 'javascript' in skill_normalized or 'typescript' in skill_normalized:
        return 'JavaScript/TypeScript'
    if ' ' in skill_normalized:
        return ''.join([word.capitalize() for word in skill_normalized.split()])
    else:
        return skill_normalized.capitalize()

# 注册UDF
normalize_skill_name_udf = udf(normalize_skill_name, StringType())

# 拆分技能列
df_with_skills = df_with_location.withColumn("skills", explode(split(col("job_skills"), ",")))

# 规范化技能名称
df_with_skills_normalized = df_with_skills.withColumn("normalized_skill", normalize_skill_name_udf(col("skills")))

# 计算技能频率并排序
skill_counts = df_with_skills_normalized.groupBy("normalized_skill").agg(count("*").alias("count")).orderBy(col("count").desc())

# 显示技能频率
skill_counts.show()

# 处理分类列
categories = {
    'Cloud': ['cloud', 'aws', 'azure', 'google cloud', 'cloud engineer', 'data center', 'cloud architect', 'datacenter'],
    'Testing': ['test', 'tester', 'testing', 'quality assurance', 'qa', 'test engineer'],
    'Development': ['developer', 'development', 'software engineer', 'programmer', 'solution architect', 'database developer',
                    'software', 'software development', "C++", "Java", "C#", "JavaScript/TypeScript", "Go/Golang", "Visual Basic",
                    "Assembly language", "PHP", "Delphi/Object Pascal", "Swift", "Rust", "Ruby",
                    "Kotlin", "COBOL"],
    'Data_science': ['data science', 'machine learning', 'ml', 'deep learning', 'ai', 'artificial intelligence', 'data scientist', 'analytics architect', 'data analytics', 'npl'],
    'Data_analysis': ['data analyst', 'data analysis', 'business intelligence', 'data reporting', 'financial data', 'data warehouse', 'data mining', 'data architect'],
    'Devops': ['devops engineer', 'site reliability', 'sre', 'automation engineer', 'infrastructure as code', 'ci/cd', 'release engineer']
}

# 为每个类别创建新列并初始化为0
for category in categories:
    df_with_skills_normalized = df_with_skills_normalized.withColumn(category, lit(0))

# 定义一个函数来检查技能是否属于某个类别
def is_in_category(skill, keywords):
    return any(keyword in skill for keyword in keywords)

# 注册UDF
is_in_category_udf = udf(lambda skill, keywords: is_in_category(skill, keywords), IntegerType())

# 检查 job_title 和其他相关列
for category, keywords in categories.items():
    df_with_skills_normalized = df_with_skills_normalized.withColumn(category, udf(
        lambda job_title: int(any(keyword in job_title for keyword in keywords)), IntegerType())(col("job_title")))

# 更新分类列的值
for category, keywords in categories.items():
    formatted_keywords = [''.join(word.capitalize() for word in keyword.split()) for keyword in keywords]
    column_patterns = [f"{i + 1}. {formatted_keyword}".lower() for i, formatted_keyword in enumerate(formatted_keywords)]
    matching_columns = [col for col in df_with_skills_normalized.columns if any(col.lower().endswith(pattern) for pattern in column_patterns)]
    
    for col_name in matching_columns:
        df_with_skills_normalized = df_with_skills_normalized.withColumn(category, udf(
            lambda val, current_val: max(val, current_val), IntegerType())(col(col_name), col(category)))

# 删除不需要的列
df_final = df_with_skills_normalized.drop('job_title', 'search_country', 'job_location', 'city', 'job_skills')

# 显示最终DataFrame
df_final.show()

pd_df_final = df_final.toPandas()
display(pd_df_final.describe())
