The purpose of this notebook is to scrape potential job postings, and store them in Elasticsearch. The data will be subsequently analyzed in Kibana. We will use pre-written scrapers on [Apify's marketplace](https://console.apify.com/). Specifically, we will use [Google Jobs Scraper](https://console.apify.com/actors/SpK8RxKhIgV6BWOz9/console) actor. We will start by scraping the data first.

In [24]:
# IMPORT LIBRARIES
from apify_client import ApifyClient
from opensearchpy import OpenSearch
from opensearchpy.client import IndicesClient
import os
import re
import hashlib

In [25]:
# ENVIRONMENT VARIABLES
APIFY_TOKEN = os.environ["brave_apify_token"]
OPENSEARCH_USER = os.environ["opensearch_user"]
OPENSEARCH_PWD = os.environ["opensearch_pwd"]

# APIFY INPUT
ACTOR_ID = "SpK8RxKhIgV6BWOz9"

TITLES = [
    "Machine Learning Engineer", 
    "Data Scientist", 
    "MLOps Engineer", 
    "Data Analyst", 
    "Data Engineer"
]

NUM_PAGES = 1
MAX_CONCURRENCY = 10

BASE_QUERIES = {
    "maxPagesPerQuery": NUM_PAGES,
    "csvFriendlyOutput": False,
    "countryCode": "ca",
    "languageCode": "",
    "maxConcurrency": MAX_CONCURRENCY,
    "saveHtml": False,
    "saveHtmlToKeyValueStore": False,
    "includeUnfilteredResults": False,
}
QUERY_URL = "https://www.google.ca/search?q=JOB&ibp=htl;jobs&uule=w+CAIQICIGQ2FuYWRh"

# OPENSEARCH INPUT
OPENSEARCH_HOST = 'search-swift-hire-dev-jfmldmym4cfbiwdhwmtuqq6ihy.us-west-2.es.amazonaws.com'
INDEX_NAME = 'jobs_harpreet_matharoo'

Now, we prepare the queries based on the variables defined above and run the actors. 

In [26]:
# PROCESS QUERIES TO CREATE A QUERY FOR EACH SEARCHED TITLE
processed_titles = ["%20".join(title.split()) for title in TITLES]
query_urls = [re.sub("JOB", title, QUERY_URL) for title in processed_titles]

# PREPARE QUERIES
queries = []
for query_url in query_urls:
    query = BASE_QUERIES.copy()
    query["queries"] = query_url
    queries.append(query)

In [27]:
# Initialize the ApifyClient with your API token
client = ApifyClient(APIFY_TOKEN)

items = []
# Run the Actor and wait for it to finish
for query in queries:
    run = client.actor(ACTOR_ID).call(run_input=query)
    
    # Fetch and print Actor results from the run's dataset (if there are any)
    for item in client.dataset(run["defaultDatasetId"]).iterate_items():
        items.append(item)

Finally, we create an index in Elasticsearch and add all the collected data

In [28]:
# Initialize the Elastic Search Client and Add the Scraped Records to the Index
opensearch_client = OpenSearch(
            hosts=[{'host': OPENSEARCH_HOST, 'port': 443}],
            http_compress=True,  # enables gzip compression for request bodies
            http_auth=(OPENSEARCH_USER, OPENSEARCH_PWD),
            use_ssl=True,
            ssl_assert_hostname=False,
            ssl_show_warn=False,
        )

# CREATE an Index to Store the Jobs
if not IndicesClient(opensearch_client).exists(index=INDEX_NAME):
    opensearch_client.indices.create(index=INDEX_NAME)

In [40]:
for item in items:
    for _, job_data in enumerate(item['googleJobs']):
        
        # Create a Unique Identifier with SHA-256
        encoded_jd = str(job_data).encode("utf-8")
        id = hashlib.encoded_jd.hexdigest()

        # Check if the id already exists 
        if not opensearch_client.exists(index=INDEX_NAME, id=id):
            opensearch_client.create(
                index=INDEX_NAME,
                id=id,
                body=job_data
            )