In [None]:
import os
import time
import threading
import json
import requests
import pandas as pd
from pandas import json_normalize
from dotenv import load_dotenv
import http.client
import urllib.parse
import snowflake.connector
from sqlalchemy import create_engine
from joblib import Parallel, delayed
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


In [None]:
load_dotenv()
snowflake_password = os.getenv('SNOWFLAKE_PASSWORD')

# Rate limit control
rate_lock = threading.Lock()
last_request_time = 0

## Connect to  Snowflake database for raw data initial processing

In [24]:
#Establish a connection to Snowflake

def connect_to_snowflake():
    try:

        conn = snowflake.connector.connect(
            user="NIKKILW2025",
            password=snowflake_password,
            account="gbszkwp-by30611",
            warehouse="SNOWFLAKE_LEARNING_WH",
            database="linkedin_db",
            schema="linkedin_raw"
        )
        print("Connection to Snowflake established successfully.")
        return conn
    except Exception as e:
        print(f"Error connecting to Snowflake: {e}")
        return None

conn = connect_to_snowflake()

Connection to Snowflake established successfully.


In [25]:
#query the seniority data
def query_raw_api_data(conn):
    query = """
        SELECT DISTINCT SENIORITY FROM LINKEDIN_JOB_API_CLEANED_DATA
    """

    df_lvl = pd.read_sql(query, conn)
    df_lvl.SENIORITY = df_lvl.SENIORITY.str.lower()
    print(df_lvl.shape)
    return df_lvl

df_lvl = query_raw_api_data(conn)
df_lvl.head()

  df_lvl = pd.read_sql(query, conn)


(36, 1)


Unnamed: 0,SENIORITY
0,medium-high level
1,it does not correspond
2,executive
3,not valid
4,senior level


In [26]:
# df_lvl.to_csv('df_lvl.csv', index=False)

df_lvl = pd.read_csv('df_lvl.csv')
df_lvl.shape

(36, 1)

In [None]:
#Call ChatGPT 4.1 API
def job_seniority_category_single_gpt(lvl):
    global last_request_time

    # Concurrency Rate Limit: No more than 2 times per second
    with rate_lock:
        elapsed = time.time() - last_request_time
        if elapsed < 0.5:
            time.sleep(0.5 - elapsed)
        last_request_time = time.time()

    # Process empty values
    if pd.isna(lvl) or not lvl:
        return "Not Applicable"

    processed_input = str(lvl).lower().replace("_", " ")

    session = requests.Session()
    retries = Retry(
        total=5,
        backoff_factor=2,
        status_forcelist=[500, 502, 503, 504]
    )
    session.mount('https://', HTTPAdapter(max_retries=retries))

    try:
        print(f"\nProcessing: {lvl}")
        response = session.post(
            "https://api.openai.com/v1/chat/completions",
            headers={
                "Authorization": f"Bearer {os.getenv('CHATGPT_API_KEY')}",
                "Content-Type": "application/json",
                "User-Agent": "JobClassification/1.0 (Python)"
            },
            json={
                "model": "gpt-4-1106-preview",
                "messages": [
                    {
                        "role": "system",
                        "content": (
                            "You are an advanced job title classifier. "
                            "Categorize the job experience into EXACTLY ONE of:\n"
                            "'No Experience', 'Entry Level', 'Intermediate', 'Senior', 'Manager/Executive', 'Not Applicable'\n"
                            "Special Rules:\n"
                            "- Contains 'Minimum experience' (case insensitive) → Entry Level\n"
                            "- Contains 'association' or 'associate' (case insensitive) → Entry Level\n"
                            "Examples:\n"
                            "- 'medium-high level' → Intermediate\n"
                            "- 'executive' → Manager/Executive\n"
                            "- 'mid-senior level' → Intermediate\n"
                            "- 'entry level' → Entry Level\n"
                            "- 'minimum experience' → Entry Level\n"
                            "- 'association Level' → Entry Level\n"
                            "- 'associate' → Entry Level\n"
                            "- 'first job' → Entry Level\n"
                            "Definitions:\n"
                            "- Entry Level: 0-2 years experience\n"
                            "- Intermediate: 3-5 years experience\n"
                            "- Senior: 6+ years experience\n"
                            "Output ONLY the category name, no explanations."
                        ),
                    },
                    {
                        "role": "user",
                        "content": f"Job experience: {processed_input}"
                    }
                ],
                "temperature": 0.3
            },
            timeout=45
        )

        print(f"Status Code: {response.status_code}")
        if response.status_code != 200:
            print(f"Error Response: {response.text[:200]}")

        if response.status_code == 200:
            try:
                result = response.json()
                final_result = result["choices"][0]["message"]["content"].strip().lower().replace(" ", "")
                print(f"API Response: {final_result}")

                # mapping returned value
                mapping = {
                    'noexperience': 'No Experience',
                    'entrylevel': 'Entry Level',
                    'entry level': 'Entry Level',
                    'intermediate': 'Intermediate',
                    'seniorlevel': 'Senior',
                    'senior': 'Senior',
                    'manager': 'Manager/Executive',
                    'executive': 'Manager/Executive',
                    'manager/executive': 'Manager/Executive'
                }
                return mapping.get(final_result, "Not Applicable")

            except json.JSONDecodeError:
                print(f"Invalid JSON response: {response.text[:200]}")
                return "Not Applicable"

        return "Not Applicable"

    except requests.exceptions.RequestException as e:
        print(f"API Error ({type(e).__name__}): {str(e)}")
        return "Not Applicable"


# Multithread process, lower the N-job number
start = time.time()
df_lvl['SENIORITY_STANDARDIZED'] = Parallel(n_jobs=4, backend="threading")(
    delayed(job_seniority_category_single_gpt)(lvl) for lvl in df_lvl['SENIORITY']
)
print("\nFinal Results:")
print(df_lvl.tail())
print(df_lvl.shape)
print(f"\nExecution time: {time.time() - start:.2f} seconds")


Processing: medium-high level



Processing: it does not correspond
Status Code: 200
API Response: intermediate

Processing: executive

Processing: not valid
Status Code: 200
API Response: manager/executive
Status Code: 200
API Response: notapplicable

Processing: senior level
Status Code: 200
API Response: notapplicable

Processing: minimum experience
Status Code: 200
API Response: senior

Processing: management
Status Code: 200
API Response: entrylevel

Processing: without experience
Status Code: 200
API Response: manager/executive

Processing: middle head manager
Status Code: 200
API Response: noexperience

Processing: assistant
Status Code: 200
API Response: manager/executive

Processing: entry level
Status Code: 200
API Response: entrylevel

Processing: senior employee
Status Code: 200
API Response: entrylevel

Processing: mid-senior level
Status Code: 200
API Response: senior

Processing: middle level
Status Code: 200
API Response: intermediate

Processing: associate
Status Code: 200
API Response: intermediate


In [None]:
#Call Deepseek API

# def ds_job_seniority_category_single(lvl):
#     global last_request_time

#     # 速率控制
#     with rate_lock:
#         elapsed = time.time() - last_request_time
#         if elapsed < 0.5:
#             time.sleep(0.5 - elapsed)
#         last_request_time = time.time()

#     # 空值处理
#     if pd.isna(lvl) or not lvl:
#         return "Not Applicable"

#     # 统一处理输入
#     processed_input = str(lvl).lower().replace("_", " ")  # 统一处理下划线和空格

#     # API配置
#     session = requests.Session()
#     retries = Retry(
#         total=5,
#         backoff_factor=2,
#         status_forcelist=[500, 502, 503, 504]
#     )
#     session.mount('https://', HTTPAdapter(max_retries=retries))

#     try:
#         # 发送请求（添加调试日志）
#         print(f"\nProcessing: {lvl}")
#         response = session.post(
#             "https://api.deepseek.com/v1/chat/completions",
#             headers={
#                 "Authorization": f"Bearer {os.getenv('DEEPSEEK_API_KEY')}",
#                 "Content-Type": "application/json",
#                 "User-Agent": "JobClassification/1.0 (Python)"
#             },
#             json={
#                 "model": "deepseek-chat",
#                 "messages": [
#                     {
#                         "role": "system",
#                         "content": (
#                             "You are an advanced job title classifier. Categorize the job experience into EXACTLY ONE of:\n"
#                             "'No Experience', 'Entry Level', 'Intermediate', 'Senior', 'Manager/Executive', 'Not Applicable'\n"
#                             "Special Rules:\n"
#                             "- 包含'Minimum experience' (不区分大小写） → Entry Level\n"
#                             "- 包含'association'或'associate' (不区分大小写） → Entry Level\n"
#                             "Examples:\n"
#                             "- 'medium-high level' → Intermediate\n"
#                             "- 'executive' → Manager/Executive\n"
#                             "- 'mid-senior level' → Intermediate\n"
#                             "- 'entry level' (不区分大小写） → Entry Level\n"
#                             "- 'minimum experience' (不区分大小写） → Entry Level\n"
#                             "- 'association Level' (不区分大小写） → Entry Level\n"
#                             "- 'associate' (不区分大小写） → Entry Level\n"
#                             "Definitions:\n"
#                             "- entry Level (不区分大小写）: 0-2 years experience\n"
#                             "- intermediate (不区分大小写）: 3-5 years experience\n"
#                             "- senior (不区分大小写）: 6+ years experience\n"
#                             "Output ONLY the category name, no explanations."
#                         )
#                     },
#                     {
#                         "role": "user",
#                         "content": f"Job experience: {processed_input}"
#                     }
#                 ],
#                 "temperature": 0.3
#             },
#             timeout=45
#         )

#         # 调试日志
#         print(f"Status Code: {response.status_code}")
#         if response.status_code != 200:
#             print(f"Error Response: {response.text[:200]}")

#         # 处理响应
#         if response.status_code == 200:
#             try:
#                 result = response.json()
#                 final_result = result["choices"][0]["message"]["content"].strip().lower().replace(" ", "")
#                 print(f"API Response: {final_result}")  # 显示原始分类结果

#                 return {
#                     'noexperience': 'No Experience',
#                     'entry level': 'Entry Level',
#                     'intermediate': 'Intermediate',
#                     'seniorlevel': 'Senior',
#                     'senior': 'Senior',
#                     'manager': 'Manager/Executive',
#                     'executive': 'Manager/Executive',
#                     'manager/executive': 'Manager/Executive'
#                 }.get(final_result, "Not Applicable")

#             except json.JSONDecodeError:
#                 print(f"Invalid JSON response: {response.text[:200]}")
#                 return "Not Applicable"

#         return "Not Applicable"

#     except requests.exceptions.RequestException as e:
#         print(f"API Error ({type(e).__name__}): {str(e)}")
#         return "Not Applicable"

# # 执行测试
# start = time.time()
# df_lvl['SENIORITY_STANDARDIZED'] = Parallel(n_jobs=4, backend="threading")(
#     delayed(ds_job_seniority_category_single)(lvl) for lvl in df_lvl['SENIORITY']
# )
# print("\nFinal Results:")
# print(df_lvl.tail())
# print(df_lvl.shape)
# print(f"\nExecution time: {time.time() - start:.2f} seconds")

In [36]:
df_lvl.head()

Unnamed: 0,SENIORITY,SENIORITY_STANDARDIZED
0,medium-high level,Intermediate
1,it does not correspond,Not Applicable
2,executive,Manager/Executive
3,not valid,Not Applicable
4,senior level,Senior


In [35]:
#write to a new snowflake table for seniority

def load_to_snowflake(df_lvl):
    # Create a Snowflake connection engine
   engine = create_engine(
        'snowflake://{user}:{password}@{account}/{database}/{schema}?warehouse={warehouse}'.format(
        user="NIKKILW2025",
        password=snowflake_password,
        account="gbszkwp-by30611",
        warehouse="SNOWFLAKE_LEARNING_WH",
        database="linkedin_db",
        schema="linkedin_raw"
    )
   )

   table_name = "job_seniority"

   df_lvl.to_sql(
        name=table_name,
        con=engine,
        if_exists='replace',
        index=False
    )

   print(f"Data loaded to Snowflake table {table_name} successfully.")


load_to_snowflake(df_lvl)

Data loaded to Snowflake table job_seniority successfully.
