## Main ETL Algorithm
The objective of this notebook is to autonomously scrape job listings from google, transform the data, and upload it into a PostgreSQL server. 

The algorithm searches and scrapes job listings for 5 data oriented job roles in 14 major cities in the United States. 

Some information such as API keys, host names, and database names have been omitted for security reasons.

In [1]:
!pip install google-search-results # installs the required google-search-results package into environment
import numpy as np
import pandas as pd
import psycopg2
import getpass
from datetime import date
from sqlalchemy.engine.url import URL
from sqlalchemy import create_engine
from serpapi import GoogleSearch

Collecting google-search-results
  Downloading https://files.pythonhosted.org/packages/77/30/b3a6f6a2e00f8153549c2fa345c58ae1ce8e5f3153c2fe0484d444c3abcb/google_search_results-2.4.2.tar.gz
Building wheels for collected packages: google-search-results
  Building wheel for google-search-results (setup.py) ... [?25ldone
[?25h  Created wheel for google-search-results: filename=google_search_results-2.4.2-cp37-none-any.whl size=32004 sha256=8cba5b712005ca5ae6a2778e6639aa4e1cd561d018101a0b284b5f7078a5b4a0
  Stored in directory: /home/nblkd6/.cache/pip/wheels/8e/86/9b/b4debab19a41bbfac16c7149b17ce81e63234914bd638a1a0e
Successfully built google-search-results
Installing collected packages: google-search-results
Successfully installed google-search-results-2.4.2


In [2]:
def get_job_data(job, search_term):
    '''
    This function collects all the desired data from a single job posting.
    A job posting is a dictionary that contains several other lists and dictionaries.
    This algorithm navigates through these data structures and collects the 
    desired data. If the data is not available than the field is filled with np.nan.
    This data is then packaged into a dictionary and returned to the caller.
    
    Args:
        job : Dict
            A single job posting returned by a API search
        search_term : Str
            The search term used to search for job postings on google
    
    Returns:
        Dict
            A dictionary containing the desired features and data from the job posting.

    '''
    search_term = search_term
    title = job['title']
    company_name = job['company_name']
#     location = job['location']
    description = job['description']
    job_id = job['job_id']
    
    # A job postings can not have a location so we need to account for this
    if 'location' not in job:
        location = np.nan
    else:
        location = job['location']

    '''
    The outer if statement is necessary because some jobs have a single highlight with no title key.
    I suspect this happens when no highlights are provided by the employer.
    For these jobs, there's no text to assign to the qualifications, responsibilities, or benefits variables.
    When this happens, I'll assign the text to a variable called 'items'
    It's likely that this text is the same as the description text but I'll store it just in case.
    
    Note: I'm making some assumptions here. 
    It's possible a job has no highlights in the job_highlights list.
    It's possible a job with multiple highlights will have a highlight with no title key.
    Here are the only permuations I'm considering based on my analysis of the data:
        - [{'items': ['']}]
        - [{'title': '', 'items': ['']}]
        - [{'title': '', 'items': ['']}, {'title': '', 'items': ['']}, ... ]
    '''
    
    # If the first highlight has no 'title' key
    if 'title' not in job['job_highlights'][0]:
        items = ''
        for element in job['job_highlights'][0]['items']:
            items += element + '\n'
        qualifications = np.nan
        responsibilities = np.nan
        benefits = np.nan
    
    else:
        items = np.nan
        qualifications = np.nan
        responsibilities = np.nan
        benefits = np.nan
        
        for highlight in job['job_highlights']:
            if 'title' and 'items' in highlight: # can probably remove this if-statement
                if highlight['title'] == 'Qualifications':
                    qualifications = ''
                    for element in highlight['items']:
                        qualifications += element + '\n'
                if highlight['title'] == 'Responsibilities':
                    responsibilities = ''
                    for element in highlight['items']:
                        responsibilities += element + '\n'
                if highlight['title'] == 'Benefits':
                    benefits = ''
                    for element in highlight['items']:
                        benefits += element + '\n'
    
    date_scraped = pd.to_datetime(str(date.today())) # aka the current date
    date_posted = np.nan
    posted_at = np.nan
    schedule_type = np.nan
    work_from_home = np.nan
    salary = np.nan
    
    for extension in job['detected_extensions']:
        if extension == 'posted_at':
            posted_at = job['detected_extensions'][extension]
            date_posted = date_scraped - pd.Timedelta(days=int(posted_at.split()[0]))
        if extension == 'schedule_type':
            schedule_type = job['detected_extensions'][extension]
        if extension == 'work_from_home':
            work_from_home = bool(job['detected_extensions'][extension])
        if extension == 'salary':
            salary = job['detected_extensions'][extension]
    
    # ommit 'via ' from 'via LinkedIn' before assignment
    via = job['via'][3:]
    
    return {
        'search_term': search_term,
        'title': title,
        'company_name': company_name,
        'location': location,
        'description': description,
        'job_id': job_id,
        'qualifications': qualifications, 
        'responsibilities': responsibilities, 
        'benefits': benefits, 
        'items': items,
        'via': via,
        'posted_at': posted_at,
        'schedule_type': schedule_type,
        'work_from_home': work_from_home,
        'salary': salary,
        'date_posted': date_posted,
        'date_scraped': date_scraped
    }

In [3]:
def pull_jobs_from_api(search_term, location, start):
    '''
    This function connects and sends a request to the SerpAPI. SerpAPI searches the job postings
    on google with the given search parameters, scrapes the data, and then responds with the results.
    Each search can return a maximum of ten job posting.
    
    Args:
        search_term : Str
            The search term used to search for job postings on google
        location: Str
            The US city containing the job postings we want
        start: Int
            Pagination. Tells the API what page we want to pull job postings from.
    Returns:
        List
            Returns a list of lists. 
            The list containing the job postings is in the 'jobs_results' key.
            IMPORTANT: if a search results in no job postings, there will be no
            'jobs_results' key and instead there will be a 'error' key
    '''
    
    params = {
    "api_key": "my_api_key",
    "engine": "google_jobs",
    "google_domain": "google.com",
    "q": search_term, 
    "hl": "en",
    "gl": "us",
    "location": location,
    "start": start
    }

    search = GoogleSearch(params)
    results = search.get_dict()
   
    return results

In [4]:
def get_all_jobs(search_term, location):
    '''
    This function collects all the job postings for an entire city/location.
    All job postings are stored into a dataframe.
    
    Args:
        search_term : Str
            The search term used to search for job postings on google
        location: Str
            The US city containing the job postings we want
            
    Returns:
        pandas.DataFrame()
            Contains all job postings pulled for an entire city.
            Each row represents a single job posting.
    '''
    
    print('Scraping all {} job postings in {}'.format(search_term, location))
    
    df = pd.DataFrame()
    error = False
    start = 0
    
    while(error == False): 
        results = pull_jobs_from_api(search_term, location, start)
        if 'error' in results:
            error = True
        else:
            for job in results['jobs_results']:
                df = df.append(get_job_data(job, search_term), ignore_index=True)
        print('Page {} complete.'.format(int(start / 10) + 1))
        start += 10
    
    print('Done.')
    return df

In [5]:
# this function will probably be depricated
def get_n_jobs(search_term, location, end):
    '''
    This function collects n job postings for a city/location.
    The job postings are stored into a dataframe.
    
    Args:
        search_term : Str
            The search term used to search for job postings on google
        location: Str
            The US city containing the job postings we want
        end: Int
            The page you want to stop searching.
            start=0 -> first page of results
            start=10 -> second page of results
            start=20 -> third page of results
            
    Returns:
        pandas.DataFrame
            Contains n job postings pulled for a city.
            Each row represents a single job posting.
    '''
    
    print('Scraping all {} job postings in {}'.format(search_term, location))
    
    df = pd.DataFrame()
    error = False
    start = 0
    end = end
    
    while(error == False and start <= end): 
        results = pull_ten_jobs(search_term, location, start)
        if 'error' in results:
            error = True
        else:
            for job in results['jobs_results']:
                df = df.append(get_job_data(job, search_term), ignore_index=True)
        start += 10
    
    return df

In [6]:
def push_df_to_db(df, schema, table, username, password):
    '''
    This function takes the dataframe containing job posting data for
    a city and SQL injects it into the defined schema and table.
    In my case, I am uploading to a PostgreSQL server at my University.
    
    Args:
        df : pandas.DataFrame
            DataFrame containing job posting data for a city
        schema : Str
        table : Str
        username : Str
    
    Returns:
        None
    '''

    password = password
    username = username
    host = 'host_name'
    database = 'database_name'
    
    # first, check for duplicates
    df = remove_duplicates(df, username, password, schema, table)
    
    postgres_db = {
        'drivername': 'postgres',
        'username': username,
        'password': password,
        'host': host,
        'database': database
    }
    
    engine = create_engine(URL(**postgres_db), echo=False)
    
    df.to_sql(
        table, 
        engine, 
        schema = schema, 
        if_exists="append", 
        index=False
    )
    print('{} jobs pushed to {}.{}'.format(df.shape[0], schema, table))

In [7]:
def query_database(sql_query, user):
    '''
    This function take a SQL query and queries the PostgreSQL database.
    It returns a dataframe containing the result of that query.
    
    Args:
        sql_query: Str
            Contains the SQL query you want to query the database with.
        user: Str
    
    Returns:
        pandas.DataFrame
            Contains result of your query.
    '''
    
    sql_query = sql_query
    database = "database_name"
    user     = user
    password = getpass.getpass("Enter password: ")

    connection = psycopg2.connect(
        database = database,
        user = user,
        host = 'host_name',
        password = password)
    
    df = pd.read_sql_query(sql_query, connection)
    connection.close()
    return df

In [8]:
def get_all_jobs_for_all_search_terms(search_terms, location):
    '''
    Args:
        search_terms : List
            Contains search terms used to search for job postings on google
        location: Str
            The US city containing the job postings we want
    Returns:
        pandas.DataFrame
            Contains all job postings for all listed search terms pulled for an entire city.
            Each row represents a single job posting.
    '''
    
    df = pd.DataFrame()
    for search_term in search_terms:
        df = pd.concat([df, get_all_jobs(search_term, location)], ignore_index=True)
    
    return df

In [9]:
def remove_duplicates(df, user, password, schema, table):
    '''
    This function removes any job postings with a job_id that is already in the database.
    This is to prevent any rows from violating the primary key constraints which would 
    reject the entire insert.
    
    Args:
        df: pandas.DataFrame
            DataFrame containing the job posting data you want to upload to the database.
        user: Str
        password: Str
        schema: Str
        table: Str
    Returns:
        pandas.DataFrame
            Contains all unique jobs to be uploaded to the database.
    '''
    
    primary_keys = get_db_primary_keys(user, password, schema, table)['job_id'].tolist()
    jobs_removed = 0
    
    for job_id in df['job_id']:
        if job_id in primary_keys:
            df = df[df['job_id'] != job_id] # remove the job/row from the dataframe
            jobs_removed += 1
    print('{} duplicate jobs found and removed.'.format(jobs_removed))
    return df

In [10]:
def get_db_primary_keys(user, password, schema, table):
    '''
    This function querries is the database for all primary keys in a given schema and table.
    
    Args:
        user: Str
        password: Str
        schema: Str
        table: Str
    
    Returns:
        pandas.DataFrame
            Contains all job_ids/primary keys for the given schema and table.
    '''
    sql_query = 'SELECT job_id FROM {}.{}'.format(schema, table)
    database = "database_name"
    user     = user
    password = password

    connection = psycopg2.connect(
        database = database,
        user = user,
        host = 'host_name',
        password = password)
    
    df = pd.read_sql_query(sql_query, connection)
    connection.close()
    return df

In [11]:
def batch_etl(search_terms, locations, schema, table, username):
    '''
    This function collects all the available job listings for all defined job roles and uploads 
    them to the PostgreSQL server. This is then repeated for all defined locations. 
    
    Args:
        search_terms: List
            Contains a list of job roles to search for
        locations: List
            Contains a list of locations 
        schema: Str
        table: Str
        username: Str
        password: Str
    
    Returns:
        None.
    '''
    
    search_terms = search_terms
    locations = locations
    schema = schema
    table = table
    username = username
    password = getpass.getpass('Enter password: ')
    
    for location in locations:
        data = get_all_jobs_for_all_search_terms(search_terms, location)
        print('{} jobs collected from {}'.format(data.shape[0], location))
        push_df_to_db(data, schema, table, username, password)

In [12]:
locations = [
    'New York, New York, United States', # 23.6 M
    'Los Angeles, CA, California, United States', # 22.9 M
    'Chicago, Illinois, United States', # 21.4 M
    'San Francisco Bay Area, United States', # 18.9 M
    'Houston, TX, Texas, United States', # 13.1 M
    'Miami, Florida, United States', # 8.4 M
    'Boston, Massachusetts, United States', # 8.1 M
    'Phoenix, AZ, Arizona, United States', # 6.2 M
    'Philadelphia, Pennsylvania, United States', # 6.1 M
    'Austin, TX, Texas, United States', # 5.5 M
    'Kansas City, Missouri, United States', # 2.1 M
    'Seattle, Washington, United States', # 5.4 M
    'Washington, District of Columbia, United States', # 11.4 M
    'Denver, CO, Colorado, United States' # 6.1 M
]

search_terms = [
    'Data Scientist',
    'Data Analyst', 
    'Data Engineer',
    'Machine Learning Engineer',
    'Business Intelligence Analyst'
]

schema = 'schema_name'
table = 'table_name'
username = 'username'

In [13]:
'''
Main
'''
batch_etl(search_terms, locations, schema, table, username)

Enter password: ········
Scraping all Data Scientist job postings in Austin, TX, Texas, United States
Page 1 complete.
Page 2 complete.
Page 3 complete.
Page 4 complete.
Page 5 complete.
Page 6 complete.
Page 7 complete.
Page 8 complete.
Page 9 complete.
Page 10 complete.
Page 11 complete.
Page 12 complete.
Page 13 complete.
Page 14 complete.
Page 15 complete.
Page 16 complete.
Page 17 complete.
Page 18 complete.
Page 19 complete.
Done.
Scraping all Data Analyst job postings in Austin, TX, Texas, United States
Page 1 complete.
Page 2 complete.
Page 3 complete.
Page 4 complete.
Page 5 complete.
Page 6 complete.
Page 7 complete.
Page 8 complete.
Page 9 complete.
Page 10 complete.
Page 11 complete.
Page 12 complete.
Page 13 complete.
Page 14 complete.
Page 15 complete.
Page 16 complete.
Page 17 complete.
Page 18 complete.
Page 19 complete.
Page 20 complete.
Page 21 complete.
Page 22 complete.
Page 23 complete.
Page 24 complete.
Page 25 complete.
Page 26 complete.
Page 27 complete.
Page 28

Page 5 complete.
Page 6 complete.
Page 7 complete.
Page 8 complete.
Page 9 complete.
Page 10 complete.
Page 11 complete.
Page 12 complete.
Page 13 complete.
Page 14 complete.
Page 15 complete.
Page 16 complete.
Page 17 complete.
Page 18 complete.
Page 19 complete.
Page 20 complete.
Page 21 complete.
Page 22 complete.
Page 23 complete.
Page 24 complete.
Page 25 complete.
Page 26 complete.
Page 27 complete.
Page 28 complete.
Page 29 complete.
Page 30 complete.
Done.
Scraping all Business Intelligence Analyst job postings in Seattle, Washington, United States
Page 1 complete.
Page 2 complete.
Page 3 complete.
Page 4 complete.
Page 5 complete.
Page 6 complete.
Page 7 complete.
Page 8 complete.
Page 9 complete.
Page 10 complete.
Page 11 complete.
Page 12 complete.
Page 13 complete.
Page 14 complete.
Page 15 complete.
Page 16 complete.
Page 17 complete.
Page 18 complete.
Page 19 complete.
Page 20 complete.
Done.
1297 jobs collected from Seattle, Washington, United States
0 duplicate jobs fou

In [14]:
sql_query = 'SELECT * FROM {}.{}'.format(schema, table)
test = query_database(sql_query, username)

Enter password: ········


In [15]:
test.shape

(80662, 17)

In [81]:
test.head(5)

Unnamed: 0,job_id,benefits,company_name,date_posted,description,items,location,posted_at,qualifications,responsibilities,salary,schedule_type,search_term,title,via,work_from_home,date_scraped
0,eyJqb2JfdGl0bGUiOiJEYXRhIFNjaWVudGlzdCwgUHJvZH...,"In addition to salary, you will also be eligib...",Etsy,2023-06-05,Company Description\n\nEtsy is the global mark...,,"Brooklyn, NY",6 days ago,2+ years of experience as a data scientist or ...,Data scientists at Etsy use rigorous methods t...,,Full-time,Data Scientist,"Data Scientist, Product Analytics",SmartRecruiters Job Search,,2023-06-11
1,eyJqb2JfdGl0bGUiOiJEYXRhIFNjaWVudGlzdCAtIEFkcy...,,Walmart,NaT,Position Summary...\n\nWhat you'll do...\n\nww...,,"Hoboken, NJ",,You’re proficient in modeling algorithms and p...,They lead communications and interactions with...,,Full-time,Data Scientist,"Data Scientist - Ads Measurement - Hoboken, NJ",Walmart Careers,,2023-06-11
2,eyJqb2JfdGl0bGUiOiJTZW5pb3IgRGF0YSBTY2llbnRpc3...,,Glocomms,2023-06-09,A Fortune 100 financial services provider is s...,,"New York, NY",2 days ago,"3+ years of experience using Python, SQL, and ...","Data Scientist, you will be responsible for de...",,Full-time,Data Scientist,Senior Data Scientist,Glocomms,,2023-06-11
3,eyJqb2JfdGl0bGUiOiJEYXRhIFNjaWVudGlzdCAoUmVtb3...,We are offering a market competitive salary of...,Transfix,NaT,"About Transfix:\n\nTransfix, named to Forbes’ ...",,Anywhere,,You have 1+ years of working experience manipu...,"In this role, you’ll learn about managing your...","89,250–150,500 a year",Full-time,Data Scientist,Data Scientist (Remote),Built In NYC,1.0,2023-06-11
4,eyJqb2JfdGl0bGUiOiJEYXRhIFNjaWVudGlzdCwgTWFya2...,Competitive compensation\nFully remote work en...,Obviously,NaT,What We’re Looking For\n\nYou don’t take any d...,,Anywhere,,You understand the data needs of marketers and...,You’re ready to go in front of a group of anal...,,Full-time,Data Scientist,"Data Scientist, Marketing Analytics",Startup Jobs,1.0,2023-06-11
