In [31]:
import pandas as pd
from pandas import json_normalize
import requests
import json
import snowflake.connector

from dotenv import load_dotenv
import os
import http.client
import urllib.parse


from googletrans import Translator
import string
import re

from sqlalchemy import create_engine
from snowflake.connector import connect
from snowflake.connector.pandas_tools import write_pandas
import logging
import asyncio

In [32]:
load_dotenv()
rapidapi_key = os.getenv('RAPIDAPI_KEY')
rapidapi_host = "linkedin-job-search-api.p.rapidapi.com"
snowflake_user = os.getenv('SNOWFLAKE_USER')
account = os.getenv('SNOWFLAKE_ACCOUNT')
snowflake_password = os.getenv('SNOWFLAKE_PASSWORD')

In [33]:
def extract_linkedin_job_data():

    headers = {
        'x-rapidapi-key': rapidapi_key,
        'x-rapidapi-host': rapidapi_host
    }

    location = "Australia"
    limit = 100
    offset = 0
    titles = ["Data Engineer" , "Data Scientist", "Data Analyst"]

    df_daily_all = pd.DataFrame()

    for title_filter in titles:

        #URL code the title filter and location filter
        title_encoded=urllib.parse.quote(title_filter)
        location_encoded=urllib.parse.quote(location)

        #API endpoint 7day data query
        #base_url = f"/active-jb-7d?limit={limit}&offset={offset}&title_filter={title_encoded}&location_filter={location_encoded}"

        #API endpoint last 24 hour data query
        base_url = f"/active-jb-24h?limit={limit}&offset={offset}&title_filter={title_encoded}&location_filter={location_encoded}"
        url =  f"https://{rapidapi_host}{base_url}"


        response = requests.get(url, headers=headers)

        if response.status_code == 200:
            data = response.json()
            # Convert the JSON data to a DataFrame
            df_daily = json_normalize(data)
            df_daily['job_category'] = title_filter
            print(title_filter, df_daily.shape)
            df_daily_all = pd.concat([df_daily_all, df_daily], ignore_index=True)


        else:
            print(f"Error: {response.status_code}")

    return df_daily_all

df_daily_all = extract_linkedin_job_data()


Data Engineer (8, 45)
Data Scientist (7, 45)
Data Analyst (2, 45)


In [34]:
def get_clean_data_jobs(df_daily_all):
     pattern=re.compile(r'\bData Engineer\b|\bData Scientist\b|\bData Analyst\b' , re.IGNORECASE)
     df_daily_all = df_daily_all[df_daily_all['title'].str.contains(pattern)]
     return df_daily_all

df_daily_all = get_clean_data_jobs(df_daily_all)
df_daily_all.shape

(15, 45)

In [37]:
def update_columns(df_daily_all):
    df_daily_all.columns = df_daily_all.columns.str.upper()
    df_daily_all = df_daily_all[['ID', 'DATE_POSTED', 'DATE_CREATED', 'TITLE', 'JOB_CATEGORY',
       'ORGANIZATION', 'ORGANIZATION_URL', 'DATE_VALIDTHROUGH', 'LOCATIONS_RAW',
       'LOCATION_TYPE', 'LOCATION_REQUIREMENTS_RAW', 'EMPLOYMENT_TYPE', 'URL',
       'SOURCE_TYPE', 'SOURCE', 'SOURCE_DOMAIN', 'ORGANIZATION_LOGO',
       'CITIES_DERIVED', 'REGIONS_DERIVED', 'COUNTRIES_DERIVED',
       'LOCATIONS_DERIVED', 'TIMEZONES_DERIVED', 'LATS_DERIVED',
       'LNGS_DERIVED', 'REMOTE_DERIVED', 'RECRUITER_NAME', 'RECRUITER_TITLE',
       'RECRUITER_URL', 'LINKEDIN_ORG_EMPLOYEES', 'LINKEDIN_ORG_URL',
       'LINKEDIN_ORG_SIZE', 'LINKEDIN_ORG_SLOGAN', 'LINKEDIN_ORG_INDUSTRY',
       'LINKEDIN_ORG_FOLLOWERS', 'LINKEDIN_ORG_HEADQUARTERS',
       'LINKEDIN_ORG_TYPE', 'LINKEDIN_ORG_FOUNDEDDATE',
       'LINKEDIN_ORG_SPECIALTIES', 'LINKEDIN_ORG_LOCATIONS',
       'LINKEDIN_ORG_DESCRIPTION', 'LINKEDIN_ORG_RECRUITMENT_AGENCY_DERIVED',
       'SENIORITY', 'DIRECTAPPLY', 'LINKEDIN_ORG_SLUG']]
    return df_daily_all

df_daily_all = update_columns(df_daily_all)
df_daily_all.head(2)

Unnamed: 0,ID,DATE_POSTED,DATE_CREATED,TITLE,JOB_CATEGORY,ORGANIZATION,ORGANIZATION_URL,DATE_VALIDTHROUGH,LOCATIONS_RAW,LOCATION_TYPE,...,LINKEDIN_ORG_HEADQUARTERS,LINKEDIN_ORG_TYPE,LINKEDIN_ORG_FOUNDEDDATE,LINKEDIN_ORG_SPECIALTIES,LINKEDIN_ORG_LOCATIONS,LINKEDIN_ORG_DESCRIPTION,LINKEDIN_ORG_RECRUITMENT_AGENCY_DERIVED,SENIORITY,DIRECTAPPLY,LINKEDIN_ORG_SLUG
0,1672160270,2025-05-18T04:19:02,2025-05-18T04:32:38.752594,Senior Data Engineer,Data Engineer,Blinq,https://www.linkedin.com/company/blinq-me,2025-06-17T04:19:02,"[{'@type': 'Place', 'address': {'@type': 'Post...",,...,"Melbourne, Victoria",Privately Held,,[''],"['4 Bank Pl, Level 3, Melbourne, Victoria 3000...","At Blinq, our vision is to power the start of ...",False,Not Applicable,True,blinq-me
1,1672118464,2025-05-18T03:41:38,2025-05-18T04:02:02.582796,Data Engineer,Data Engineer,Azooa,https://www.linkedin.com/company/azooa,2025-06-17T03:41:37,"[{'@type': 'Place', 'address': {'@type': 'Post...",TELECOMMUTE,...,"Canberra ACT , ACT",Privately Held,2021.0,"['IT recruitment ', 'IT carer coaching', 'ACT ...","['121 Markus Clarke St, Level 8, Canberra ACT ...",Azooa is a trusted Australian technology consu...,True,Mid-Senior level,True,azooa


### Process the df_daily_all data 


In [38]:
def extract_job_date(date_created):
    #extract the job_date from job_created
    df_daily_all['job_date'] = pd.to_datetime(df_daily_all['DATE_CREATED']).dt.date
    return df_daily_all['job_date']

#Apply the function to the DataFrame
df_daily_all['job_date'] = extract_job_date(df_daily_all['DATE_CREATED'])
df_daily_all['job_date'][:5]


0    2025-05-18
1    2025-05-18
2    2025-05-17
3    2025-05-17
4    2025-05-18
Name: job_date, dtype: object

In [39]:
#Extrac the job city and state from the LOCATIONS_RAW field

def extract_city(list):
    """
    Extracts the city from the given text using regex.
    """
    # Regex pattern to match the city in the LOCATIONS_RAW field
    text = str(list)
    city_pattern = r"'addressLocality':\s*'(.*)',\s'addressRegion':"
    match = re.search(city_pattern, text)
    if match:
        city = match.group(1)
        if 'sidney' in city.lower() or 'sídney' in city.lower() or '悉尼' in city.lower(): #fix wrong city name
            return "Sydney"
        return city
    else:
        return None



def extract_state(list):
    """
    Extracts the state from the given text using regex.
    """
    # Regex pattern to match the region/state in the LOCATIONS_RAW field
    text = str(list)
    state_pattern = r"'addressRegion':\s*(.*)',\s'streetAddress'"
    match = re.search(state_pattern, text)
    if match:
        state = match.group(1)
        state= state.replace("'", "").strip()
        return state
    else:
        return None


#Extract city and state from Locations Raw
df_daily_all['city'] = df_daily_all['LOCATIONS_RAW'].apply(extract_city)


df_daily_all['state'] = df_daily_all['LOCATIONS_RAW'].apply(extract_state)



df_daily_all.head()

Unnamed: 0,ID,DATE_POSTED,DATE_CREATED,TITLE,JOB_CATEGORY,ORGANIZATION,ORGANIZATION_URL,DATE_VALIDTHROUGH,LOCATIONS_RAW,LOCATION_TYPE,...,LINKEDIN_ORG_SPECIALTIES,LINKEDIN_ORG_LOCATIONS,LINKEDIN_ORG_DESCRIPTION,LINKEDIN_ORG_RECRUITMENT_AGENCY_DERIVED,SENIORITY,DIRECTAPPLY,LINKEDIN_ORG_SLUG,job_date,city,state
0,1672160270,2025-05-18T04:19:02,2025-05-18T04:32:38.752594,Senior Data Engineer,Data Engineer,Blinq,https://www.linkedin.com/company/blinq-me,2025-06-17T04:19:02,"[{'@type': 'Place', 'address': {'@type': 'Post...",,...,[''],"['4 Bank Pl, Level 3, Melbourne, Victoria 3000...","At Blinq, our vision is to power the start of ...",False,Not Applicable,True,blinq-me,2025-05-18,Melbourne,VIC
1,1672118464,2025-05-18T03:41:38,2025-05-18T04:02:02.582796,Data Engineer,Data Engineer,Azooa,https://www.linkedin.com/company/azooa,2025-06-17T03:41:37,"[{'@type': 'Place', 'address': {'@type': 'Post...",TELECOMMUTE,...,"['IT recruitment ', 'IT carer coaching', 'ACT ...","['121 Markus Clarke St, Level 8, Canberra ACT ...",Azooa is a trusted Australian technology consu...,True,Mid-Senior level,True,azooa,2025-05-18,Canberra,
2,1669943999,2025-05-17T12:55:00,2025-05-17T14:21:40.785658,Data Engineer (Alteryx & operational support),Data Engineer,Ingrity,https://www.linkedin.com/company/ingrity-pty-ltd,2025-06-16T12:54:59,"[{'@type': 'Place', 'address': {'@type': 'Post...",,...,"['Analytics Advisory', 'Data Strategy', 'Data ...","['Level 6, 478 George Street, Sydney, NSW 2000...",INGRITY is a progressive data and analytics co...,False,Sin experiencia,False,ingrity-pty-ltd,2025-05-17,Sydney,NSW
3,1669943997,2025-05-17T12:54:53,2025-05-17T14:21:40.783322,Data Engineer (Python),Data Engineer,Ingrity,https://www.linkedin.com/company/ingrity-pty-ltd,2025-06-16T12:54:52,"[{'@type': 'Place', 'address': {'@type': 'Post...",,...,"['Analytics Advisory', 'Data Strategy', 'Data ...","['Level 6, 478 George Street, Sydney, NSW 2000...",INGRITY is a progressive data and analytics co...,False,Mid-Senior level,False,ingrity-pty-ltd,2025-05-17,Sydney,NSW
4,1671919930,2025-05-16T23:36:04,2025-05-18T02:39:34.945979,Google Cloud Data Engineer,Data Engineer,Fractal,https://www.linkedin.com/company/fractal-analy...,2025-06-15T23:36:04,"[{'@type': 'Place', 'address': {'@type': 'Post...",,...,"['Marketing Analytics', 'Advanced Analytics', ...","['Suite 76J, One World Trade Center, New York,...",Fractal is one of the most prominent providers...,False,Mid-Senior level,True,fractal-analytics,2025-05-18,Melbourne,VIC


In [40]:
#Extract the employment type from the EMPLOYMENT_TYPE field

def extract_employment_type(df_daily_all):
    """
    Extracts the employment type from the given text using regex.
    """
    df_daily_all['employment_type'] = (
        df_daily_all['EMPLOYMENT_TYPE']
        .astype(str)
        .str.replace(r"[\[\]']", '', regex=True)
        .str.strip()
    )

    df_daily_all.drop(columns=['EMPLOYMENT_TYPE'], inplace=True)

    return df_daily_all['employment_type']


df_daily_all['employment_type'] = extract_employment_type(df_daily_all)
df_daily_all['employment_type'].tail()

10     FULL_TIME
11     FULL_TIME
12     FULL_TIME
13    CONTRACTOR
14     FULL_TIME
Name: employment_type, dtype: object

In [41]:
#Extract the employment size
def extract_employee_size(LINKEDIN_ORG_SIZE):
    """
    Extracts the employee size from the given text using regex.
    """
    df_daily_all['org_size'] = (
        df_daily_all['LINKEDIN_ORG_SIZE']
        .astype(str)
        .str.replace(r"employees", '', regex=True)
        .str.strip()
    )

    return df_daily_all['org_size']


df_daily_all['org_size'] = extract_employee_size(df_daily_all['LINKEDIN_ORG_SIZE'])
df_daily_all['org_size'].tail()

10    11-50
11    11-50
12    11-50
13    11-50
14    11-50
Name: org_size, dtype: object

In [42]:
df_daily_all.columns = df_daily_all.columns.str.upper()
df_daily_all.head(2)

Unnamed: 0,ID,DATE_POSTED,DATE_CREATED,TITLE,JOB_CATEGORY,ORGANIZATION,ORGANIZATION_URL,DATE_VALIDTHROUGH,LOCATIONS_RAW,LOCATION_TYPE,...,LINKEDIN_ORG_DESCRIPTION,LINKEDIN_ORG_RECRUITMENT_AGENCY_DERIVED,SENIORITY,DIRECTAPPLY,LINKEDIN_ORG_SLUG,JOB_DATE,CITY,STATE,EMPLOYMENT_TYPE,ORG_SIZE
0,1672160270,2025-05-18T04:19:02,2025-05-18T04:32:38.752594,Senior Data Engineer,Data Engineer,Blinq,https://www.linkedin.com/company/blinq-me,2025-06-17T04:19:02,"[{'@type': 'Place', 'address': {'@type': 'Post...",,...,"At Blinq, our vision is to power the start of ...",False,Not Applicable,True,blinq-me,2025-05-18,Melbourne,VIC,FULL_TIME,51-200
1,1672118464,2025-05-18T03:41:38,2025-05-18T04:02:02.582796,Data Engineer,Data Engineer,Azooa,https://www.linkedin.com/company/azooa,2025-06-17T03:41:37,"[{'@type': 'Place', 'address': {'@type': 'Post...",TELECOMMUTE,...,Azooa is a trusted Australian technology consu...,True,Mid-Senior level,True,azooa,2025-05-18,Canberra,,CONTRACTOR,11-50


In [43]:
#Only keep the relevant columns

df_daily_all = df_daily_all[['ID', 'TITLE', 'JOB_CATEGORY',
       'JOB_DATE', 'CITY', 'STATE', 'EMPLOYMENT_TYPE' ,
       'ORGANIZATION', 'ORGANIZATION_URL', 'URL',
       'SOURCE_TYPE', 'SOURCE', 'SOURCE_DOMAIN',
       'ORGANIZATION_LOGO', 'REMOTE_DERIVED', 'RECRUITER_NAME', 'RECRUITER_TITLE',
       'RECRUITER_URL', 'LINKEDIN_ORG_URL',
       'ORG_SIZE', 'LINKEDIN_ORG_INDUSTRY',
       'LINKEDIN_ORG_HEADQUARTERS',
       'LINKEDIN_ORG_TYPE', 'LINKEDIN_ORG_FOUNDEDDATE',
       'LINKEDIN_ORG_SPECIALTIES', 'LINKEDIN_ORG_LOCATIONS',
       'LINKEDIN_ORG_DESCRIPTION','LINKEDIN_ORG_RECRUITMENT_AGENCY_DERIVED',
       'SENIORITY', 'DIRECTAPPLY',
       'LINKEDIN_ORG_SLUG']]


df_daily_all.head()

Unnamed: 0,ID,TITLE,JOB_CATEGORY,JOB_DATE,CITY,STATE,EMPLOYMENT_TYPE,ORGANIZATION,ORGANIZATION_URL,URL,...,LINKEDIN_ORG_HEADQUARTERS,LINKEDIN_ORG_TYPE,LINKEDIN_ORG_FOUNDEDDATE,LINKEDIN_ORG_SPECIALTIES,LINKEDIN_ORG_LOCATIONS,LINKEDIN_ORG_DESCRIPTION,LINKEDIN_ORG_RECRUITMENT_AGENCY_DERIVED,SENIORITY,DIRECTAPPLY,LINKEDIN_ORG_SLUG
0,1672160270,Senior Data Engineer,Data Engineer,2025-05-18,Melbourne,VIC,FULL_TIME,Blinq,https://www.linkedin.com/company/blinq-me,https://au.linkedin.com/jobs/view/senior-data-...,...,"Melbourne, Victoria",Privately Held,,[''],"['4 Bank Pl, Level 3, Melbourne, Victoria 3000...","At Blinq, our vision is to power the start of ...",False,Not Applicable,True,blinq-me
1,1672118464,Data Engineer,Data Engineer,2025-05-18,Canberra,,CONTRACTOR,Azooa,https://www.linkedin.com/company/azooa,https://au.linkedin.com/jobs/view/data-enginee...,...,"Canberra ACT , ACT",Privately Held,2021.0,"['IT recruitment ', 'IT carer coaching', 'ACT ...","['121 Markus Clarke St, Level 8, Canberra ACT ...",Azooa is a trusted Australian technology consu...,True,Mid-Senior level,True,azooa
2,1669943999,Data Engineer (Alteryx & operational support),Data Engineer,2025-05-17,Sydney,NSW,CONTRACTOR,Ingrity,https://www.linkedin.com/company/ingrity-pty-ltd,https://au.linkedin.com/jobs/view/data-enginee...,...,"Sydney, NSW",Privately Held,2018.0,"['Analytics Advisory', 'Data Strategy', 'Data ...","['Level 6, 478 George Street, Sydney, NSW 2000...",INGRITY is a progressive data and analytics co...,False,Sin experiencia,False,ingrity-pty-ltd
3,1669943997,Data Engineer (Python),Data Engineer,2025-05-17,Sydney,NSW,CONTRACTOR,Ingrity,https://www.linkedin.com/company/ingrity-pty-ltd,https://au.linkedin.com/jobs/view/data-enginee...,...,"Sydney, NSW",Privately Held,2018.0,"['Analytics Advisory', 'Data Strategy', 'Data ...","['Level 6, 478 George Street, Sydney, NSW 2000...",INGRITY is a progressive data and analytics co...,False,Mid-Senior level,False,ingrity-pty-ltd
4,1671919930,Google Cloud Data Engineer,Data Engineer,2025-05-18,Melbourne,VIC,FULL_TIME,Fractal,https://www.linkedin.com/company/fractal-analy...,https://au.linkedin.com/jobs/view/google-cloud...,...,New York,Privately Held,2000.0,"['Marketing Analytics', 'Advanced Analytics', ...","['Suite 76J, One World Trade Center, New York,...",Fractal is one of the most prominent providers...,False,Mid-Senior level,True,fractal-analytics


## Connect to  Snowflake database for raw data initial processing

In [44]:
#Establish a connection to Snowflake

def connect_to_snowflake():
    try:

        conn = snowflake.connector.connect(
            user=snowflake_user,
            password=snowflake_password,
            account=account,
            warehouse="SNOWFLAKE_LEARNING_WH",
            database="linkedin_db",
            schema="linkedin_raw"
        )
        print("Connection to Snowflake established successfully.")
        return conn
    except Exception as e:
        print(f"Error connecting to Snowflake: {e}")
        return None

conn = connect_to_snowflake()

Connection to Snowflake established successfully.


In [45]:
#query the raw data and narrow down to DE, DS and DA roles
#This data is filtered by relevant roles, and translated into English (but no city, state and seniority fix)
def query_existing_job_data(conn):
    query = """
        SELECT * FROM LINKEDIN_JOB_API_CLEANED_DATA
        WHERE (
            lower(TITLE) LIKE '%data engineer%'
            OR lower(TITLE) LIKE '%data scientist%'
            OR lower(TITLE) LIKE '%data analyst%'
            )
    """

    df = pd.read_sql(query, conn)
    print(df.shape)
    return df

df = query_existing_job_data(conn)
df.head()

  df = pd.read_sql(query, conn)


(639, 31)


Unnamed: 0,ID,TITLE,JOB_CATEGORY,JOB_DATE,CITY,STATE,EMPLOYMENT_TYPE,ORGANIZATION,ORGANIZATION_URL,URL,...,LINKEDIN_ORG_HEADQUARTERS,LINKEDIN_ORG_TYPE,LINKEDIN_ORG_FOUNDEDDATE,LINKEDIN_ORG_SPECIALTIES,LINKEDIN_ORG_LOCATIONS,LINKEDIN_ORG_DESCRIPTION,LINKEDIN_ORG_RECRUITMENT_AGENCY_DERIVED,SENIORITY,DIRECTAPPLY,LINKEDIN_ORG_SLUG
0,1599037955,Senior Data Engineer,Data Engineer,2025-04-28,Greater Sydney Area,,FULL_TIME,One51 | Data & Analytics Consultancy,https://www.linkedin.com/company/one51consulting,https://au.linkedin.com/jobs/view/senior-data-...,...,"Sydney, NSW",Privately Held,2020.0,"['Business Intelligence', 'Data Warehousing', ...","['333 George Street, Level 13, Sydney, NSW 200...",Drawing on a wealth of expertise and a deep un...,False,Mid-Senior level,False,one51consulting
1,1598878880,Senior Data Engineer,Data Engineer,2025-04-28,Brisbane,QLD,CONTRACTOR,Data#3,https://www.linkedin.com/company/data3,https://au.linkedin.com/jobs/view/senior-data-...,...,"Toowong, Queensland",Public Company,1977.0,"['Cloud Solutions', 'Mobility Solutions', 'Sec...","['555 Coronation Dr, Toowong, Queensland 4066,...","Data#3 Limited (DTL), is focused on helping cu...",False,Medium-high level,False,data3
2,1598880069,Data Engineer,Data Engineer,2025-04-28,Sydney,NSW,CONTRACTOR,Whizdom,https://www.linkedin.com/company/whizdom-recru...,https://au.linkedin.com/jobs/view/data-enginee...,...,"Canberra, Australian Capital Territory",Privately Held,2006.0,"['IT Recruitment', 'Recruitment for Government...","['28-34 Thynne St, Unit 7, Canberra, Australia...","Established in 2006, Whizdom is an Australian ...",True,Middle level,True,whizdom-recruitment
3,1598878894,Senior Data Engineer and Business Intelligence...,Data Engineer,2025-04-28,Moreton Bay,QLD,FULL_TIME,University of the Sunshine Coast,https://www.linkedin.com/school/university-of-...,https://au.linkedin.com/jobs/view/senior-data-...,...,,,,,,,,Not Applicable,False,
4,1598880083,Senior Data Engineer and Business Intelligence...,Data Engineer,2025-04-28,Sunshine Coast,QLD,FULL_TIME,University of the Sunshine Coast,https://www.linkedin.com/school/university-of-...,https://au.linkedin.com/jobs/view/senior-data-...,...,,,,,,,,Not Applicable,False,


In [17]:
# df.to_csv('api_cleaned_job_id_snowflake.csv', index=False)

In [46]:
list_daily = df_daily_all.columns.tolist()
list_df = df.columns.tolist()
list_daily ==  list_df

True

In [47]:
#Check the Job ID from df and only keep those new jobs based on the Job IDs
def keep_new_jobs():
    existing_job_ids = df.ID.unique().tolist()

    df_new_jobs = df_daily_all[~(df_daily_all['ID'].isin(existing_job_ids))].reset_index(drop=True)
    print(f'{df_new_jobs.shape[0]} new jobs ready to load to Snowflake')
    return df_new_jobs

df_new_jobs = keep_new_jobs()
df_new_jobs.tail()

0 new jobs ready to load to Snowflake


Unnamed: 0,ID,TITLE,JOB_CATEGORY,JOB_DATE,CITY,STATE,EMPLOYMENT_TYPE,ORGANIZATION,ORGANIZATION_URL,URL,...,LINKEDIN_ORG_HEADQUARTERS,LINKEDIN_ORG_TYPE,LINKEDIN_ORG_FOUNDEDDATE,LINKEDIN_ORG_SPECIALTIES,LINKEDIN_ORG_LOCATIONS,LINKEDIN_ORG_DESCRIPTION,LINKEDIN_ORG_RECRUITMENT_AGENCY_DERIVED,SENIORITY,DIRECTAPPLY,LINKEDIN_ORG_SLUG


In [48]:
async def translate_text(translator, text, target_language='en'):
    try:
        if not text or pd.isna(text):
            return 'NA'
        result = await translator.translate(str(text), dest=target_language)
        return result.text
    except Exception as e:
        logging.warning(f"Error translating text: {e} (Text: {text})")
        return text

async def translate_column(df, col, target_language='en'):
    translator = Translator()
    unique_values = df[col].dropna().unique()
    tasks = {val: translate_text(translator, val, target_language) for val in unique_values}
    translation_map = {}
    for val, task in tasks.items():
        translation_map[val] = await task
    df[col] = df[col].map(translation_map).fillna('NA')

async def translate_columns(df, columns, target_language='en'):
    for col in columns:
        await translate_column(df, col, target_language)
    logging.info(f"Translated columns: {columns}")
    return df


def sync_translate_columns(df, columns, target_language='en'):
    """同步调用异步的列翻译函数，兼容普通脚本环境"""
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            # 如果已有事件循环在运行，使用 nest_asyncio
            import nest_asyncio
            nest_asyncio.apply()
        return loop.run_until_complete(translate_columns(df, columns, target_language))
    except RuntimeError as e:
        logging.error(f"Event loop error: {e}, creating new event loop.")
        # 创建新的事件循环，保障兼容性
        new_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(new_loop)
        return new_loop.run_until_complete(translate_columns(df, columns, target_language))

# 然后调用
translate_cols = ['CITY', 'STATE', 'ORGANIZATION', 'SENIORITY']
df_new_jobs = sync_translate_columns(df_new_jobs, translate_cols)

In [49]:
df_new_jobs['CITY'].unique()

array([], dtype=float64)

In [50]:
df_new_jobs['STATE'].unique()

array([], dtype=float64)

In [51]:
df_new_jobs.head()

Unnamed: 0,ID,TITLE,JOB_CATEGORY,JOB_DATE,CITY,STATE,EMPLOYMENT_TYPE,ORGANIZATION,ORGANIZATION_URL,URL,...,LINKEDIN_ORG_HEADQUARTERS,LINKEDIN_ORG_TYPE,LINKEDIN_ORG_FOUNDEDDATE,LINKEDIN_ORG_SPECIALTIES,LINKEDIN_ORG_LOCATIONS,LINKEDIN_ORG_DESCRIPTION,LINKEDIN_ORG_RECRUITMENT_AGENCY_DERIVED,SENIORITY,DIRECTAPPLY,LINKEDIN_ORG_SLUG


In [52]:
df_new_jobs.shape

(0, 31)

In [25]:
#SQLalchemy v2.0.40
# def load_to_snowflake(df_new_jobs):
#     # Create a Snowflake connection engine
#     engine = create_engine(
#         'snowflake://{user}:{password}@{account}/{database}/{schema}?warehouse={warehouse}'.format(
#             user=snowflake_user,
#             password=snowflake_password,
#             account=account,
#             warehouse="SNOWFLAKE_LEARNING_WH",
#             database="linkedin_db",
#             schema="linkedin_raw"
#         )
#     )

#     table_name = "linkedin_job_api_cleaned_data"

#     df_new_jobs.to_sql(
#         name=table_name,
#         con=engine,
#         if_exists='append', #append data
#         index=False
#     )

#     print(f"Data loaded to Snowflake table {table_name} successfully.")


# load_to_snowflake(df_new_jobs)

In [53]:
#Downgraded SQLalchemy to 1.4.45
def load_to_snowflake(df, table_name="LINKEDIN_JOB_API_CLEANED_DATA"):
    # Snowflake connection parameters
    conn_params = {
        'user': snowflake_user,
        'password': snowflake_password,
        'account': account,
        'warehouse': 'SNOWFLAKE_LEARNING_WH',
        'database': 'linkedin_db',
        'schema': 'linkedin_raw'
    }

    # Create Snowflake connection
    conn = connect(**conn_params)

    # Ensure column names are uppercase to match Snowflake
    df.columns = df.columns.str.upper()

    # Check if the table exists
    try:
        with conn.cursor() as cursor:
            cursor.execute(f"DESC TABLE {table_name}")
            logging.info("Table %s exists.", table_name)
    except Exception as e:
        logging.error("Table %s does not exist or cannot be accessed: %s", table_name, e)
        conn.close()
        return

    # Load DataFrame to Snowflake
    try:
        # Append the DataFrame to the existing table
        write_pandas(conn, df, table_name, auto_create_table=False)
        logging.info("Successfully loaded %d rows to Snowflake", len(df))
    except Exception as e:
        logging.error("An error occurred while loading data to Snowflake: %s", e)
    finally:
        conn.close()  # Ensure the connection is closed after loading

# Example usage
load_to_snowflake(df_new_jobs)

In [30]:
df_new_jobs['CITY'].unique()

array(['Melbourne', 'Canberra', 'Sydney', 'Perth', 'Greater Sydney Area'],
      dtype=object)