# Import Packages

In [1]:
import pandas as pd
pd.set_option('display.max_columns', None)
import requests
import json
from dotenv import load_dotenv
import os
from google.cloud import bigquery, bigquery_storage
from joblib import Parallel, delayed
from ratelimiter import RateLimiter
import warnings
warnings.filterwarnings("ignore")

# Load environment variables
load_dotenv()

True

# Define Your Inputs

In [8]:
# Define the start date in the format "YYYY-MM-DD"
START_DATE = "2024-09-01"

# Define the end date in the format "YYYY-MM-DD"
END_DATE = "2024-09-30"

# If this is set to True, the script will add data from the last crawler run ONLY
# If it is set to False, the script will pull data from the start date to the end date.
IS_PULL_DATA_FROM_LAST_CRAWLER_RUN = False

# Define the max number of requests per minute
MAX_REQUESTS_PER_MINUTE = 170

# Define Global Non-User Defined Inputs

In [3]:
# Define the maximum tries for the backoff decorator
EXP_BACKOFF_MAX_TRIES = 10

# Define the relevant columns to be extracted from the downloaded data
RELEVANT_COLS = [
    "job_title_name", "posted_on", "company_name", "remote", "city", "plz", "salary_low",
    "salary_high", "salary_type", "job_type", "phone_number", "industry", "crawled_timestamp", "job_source"
]

# Define the numeric columns
NUMERIC_COLS = ["plz", "salary_low", "salary_high"]

# Retrieve the Notion API token
NOTION_API_TOKEN = os.getenv("NOTION_API_TOKEN")

# Retrieve the database ID
DATABASE_ID = os.getenv("DATABASE_ID")

# Define the request headers including Authorization token
headers = {
    "Authorization": f"Bearer {NOTION_API_TOKEN}",
    "Content-Type": "application/json",
    "Notion-Version": "2022-06-28"
}

# Instantiate the BQ clients

In [4]:
bq_client = bigquery.Client(project="web-scraping-371310")
bq_storage_client = bigquery_storage.BigQueryReadClient()

# Define the Query

In [5]:
# Define the time period filter
if IS_PULL_DATA_FROM_LAST_CRAWLER_RUN:
    TIME_PERIOD_FILTER = "WHERE crawled_timestamp = (SELECT MAX(crawled_timestamp) FROM `web-scraping-371310.crawled_datasets.laura_indeed_data`)"
else:
    TIME_PERIOD_FILTER = f"WHERE crawled_timestamp BETWEEN TIMESTAMP('{START_DATE}') AND TIMESTAMP('{END_DATE}')"

# Create the query
raw_data_query = f"""
SELECT *
FROM `web-scraping-371310.crawled_datasets.laura_indeed_data`
{TIME_PERIOD_FILTER}
QUALIFY ROW_NUMBER() OVER (PARTITION BY job_title_name, company_name, city order by crawled_timestamp asc) = 1
"""

# Execute the query
df_indeed_data = pd.DataFrame(bq_client.query(raw_data_query).result().to_dataframe(bqstorage_client=bq_storage_client))

# Display the data frame
df_indeed_data

Unnamed: 0,job_title_name,job_type,shift_and_schedule,company_name,company_indeed_url,city,remote,salary,crawled_page_rank,job_page_url,posted_on,listing_page_url,job_description,crawled_timestamp,crawler_name,domain,salary_type,salary_low,salary_high,plz,search_query,phone_number,industry,industry_match_type,industry_match_idx
0,Praktikum / Werkstudent Circular Economy (w/m/d),,,Pwc Germany,https://de.indeed.com/cmp/Pwc?campaignid=mobvj...,Köln,,,1,https://de.indeed.com/rc/clk?jk=2cfc770831201c...,Vor 3 Tagen geschaltet,https://de.indeed.com/jobs?q=Circular%20Econom...,Für unseren Geschäftsbereich\nSustainability\n...,2024-09-04 10:09:37+00:00,crawler_1,de,,,,,,,,no_match,11772
1,Trainee Innovation​​​​​​​ Circular Economy Coa...,,,Siegwerk Druckfarben AG & Co. KGaA,https://de.indeed.com/cmp/Siegwerk?campaignid=...,Siegburg,,,1,https://de.indeed.com/rc/clk?jk=0687345dde8ce9...,Vor 7 Tagen geschaltet,https://de.indeed.com/jobs?q=Circular%20Econom...,Siegwerk is one of the world’s leading supplie...,2024-09-04 10:10:28+00:00,crawler_1,de,,,,53721,,,,no_match,11772
2,Nachhaltigkeitsexperte Circular Economy / Krei...,,,TÜV SÜD Product Service GmbH,https://de.indeed.com/cmp/T%C3%BCv-S%C3%BCd?ca...,München,,,1,https://de.indeed.com/rc/clk?jk=c6e5bb5fff9d7c...,Vor > 30 Tagen geschaltet,https://de.indeed.com/jobs?q=Circular%20Econom...,Seit 1866 gilt unsere Leidenschaft der Technik...,2024-09-04 10:09:51+00:00,crawler_1,de,,,,,,,,no_match,11772
3,Mechatroniker (m/w/d) Röntgenröhren,,,Philips,https://de.indeed.com/cmp/Philips?campaignid=m...,Hamburg,,,1,https://de.indeed.com/rc/clk?jk=cfc8f5ffef28b8...,Vor 12 Tagen geschaltet,https://de.indeed.com/jobs?q=Circular%20Econom...,Job Title\nMechatroniker (m/w/d) Röntgenröhren...,2024-09-11 09:07:04+00:00,crawler_1,de,,,,,https://www.google.com/search?hl=en&lr=lang_en...,,Hospitals and Health Care,exact_match,7908
4,HSE Manager Environment and Circular Economy (...,,,HARTMANN,https://de.indeed.com/cmp/Hartmann-1?campaigni...,Heidenheim an der Brenz,,,1,https://de.indeed.com/rc/clk?jk=1813a43f9699b9...,Vor 5 Tagen geschaltet,https://de.indeed.com/jobs?q=Circular%20Econom...,"Bei HARTMANN geht es uns darum, zu\nhelfen\n, ...",2024-09-18 09:06:20+00:00,crawler_1,de,,,,89522,https://www.google.com/search?hl=en&lr=lang_en...,,,no_match,11772
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1486,Technischer Mitarbeiter/Anlagenfahrer (m/w/d) ...,,,BAUFELD-OEL GmbH,https://de.indeed.com/cmp/Baufeld--oel-Gmbh?ca...,Duisburg,,,63,https://de.indeed.com/rc/clk?jk=c3748c08017ea2...,,https://de.indeed.com/jobs?q=Kreislaufwirtscha...,Die PURAGLOBE-BAUFELD-Unternehmensgruppe arbei...,2024-09-18 09:03:31+00:00,crawler_3,de,,,,,,,,no_match,11772
1487,Leitung Labor,,,Deutsche Steinzeug Cremer & Breuer AG,https://de.indeed.com/cmp/Deutsche-Steinzeug-C...,Schwarzenfeld,,,63,https://de.indeed.com/rc/clk?jk=a0e744e9501db2...,Vor > 30 Tagen geschaltet,https://de.indeed.com/jobs?q=Kreislaufwirtscha...,Die Deutsche Steinzeug Cremer & Breuer AG ist ...,2024-09-04 09:11:18+00:00,crawler_3,de,,,,92521,,,,no_match,11772
1488,Softwareentwickler (m/w/d) im Bereich Green PL...,,,Dataciders ixto GmbH,,,,,63,https://de.indeed.com/rc/clk?jk=20c803154cbeca...,Vor 19 Tagen geschaltet,https://de.indeed.com/jobs?q=Kreislaufwirtscha...,Kurzbeschreibung\nDataciders InMediasP steht f...,2024-09-04 09:10:45+00:00,crawler_3,de,,,,,,,,no_match,11772
1489,Produktmanager für Haus- und Großwasserzähler ...,,,Lorenz GmbH & Co. KG,https://de.indeed.com/cmp/Lorenz-Gmbh-&-Co.kg?...,Schelklingen,,,63,https://de.indeed.com/rc/clk?jk=aad5ab80f93fda...,Vor 26 Tagen geschaltet,https://de.indeed.com/jobs?q=Kreislaufwirtscha...,Zur Verstärkung unseres Teams suchen wir zum n...,2024-09-04 09:12:12+00:00,crawler_3,de,,,,89601,,,,no_match,11772


# Clean the data

In [6]:
# Add a new column called job_source
df_indeed_data.loc[:, "job_source"] = "Indeed Crawler"

# Add a new column called unique_identifier
df_indeed_data.loc[:, "unique_identifier"] = df_indeed_data["job_title_name"] + df_indeed_data["company_name"] + df_indeed_data["city"] + df_indeed_data["crawled_timestamp"].astype(str)

# Choose the relevant columns from the data frame
df_indeed_data = df_indeed_data[RELEVANT_COLS]

# Change the data types of numeric fields
for num_col in NUMERIC_COLS:
    df_indeed_data[num_col] = pd.to_numeric(df_indeed_data[num_col])

# Iterate over each string column and override None values with empty strings
for col in list(df_indeed_data.select_dtypes(include="object").columns):
    df_indeed_data[col].fillna("", inplace=True)

for col in list(df_indeed_data.select_dtypes(include="float64").columns):
    df_indeed_data[col].fillna(0, inplace=True)

# Display the data frame
df_indeed_data

Unnamed: 0,job_title_name,posted_on,company_name,remote,city,plz,salary_low,salary_high,salary_type,job_type,phone_number,industry,crawled_timestamp,job_source
0,Praktikum / Werkstudent Circular Economy (w/m/d),Vor 3 Tagen geschaltet,Pwc Germany,,Köln,0.0,0.0,0.0,,,,,2024-09-04 10:09:37+00:00,Indeed Crawler
1,Trainee Innovation​​​​​​​ Circular Economy Coa...,Vor 7 Tagen geschaltet,Siegwerk Druckfarben AG & Co. KGaA,,Siegburg,53721.0,0.0,0.0,,,,,2024-09-04 10:10:28+00:00,Indeed Crawler
2,Nachhaltigkeitsexperte Circular Economy / Krei...,Vor > 30 Tagen geschaltet,TÜV SÜD Product Service GmbH,,München,0.0,0.0,0.0,,,,,2024-09-04 10:09:51+00:00,Indeed Crawler
3,Mechatroniker (m/w/d) Röntgenröhren,Vor 12 Tagen geschaltet,Philips,,Hamburg,0.0,0.0,0.0,,,,Hospitals and Health Care,2024-09-11 09:07:04+00:00,Indeed Crawler
4,HSE Manager Environment and Circular Economy (...,Vor 5 Tagen geschaltet,HARTMANN,,Heidenheim an der Brenz,89522.0,0.0,0.0,,,,,2024-09-18 09:06:20+00:00,Indeed Crawler
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1486,Technischer Mitarbeiter/Anlagenfahrer (m/w/d) ...,,BAUFELD-OEL GmbH,,Duisburg,0.0,0.0,0.0,,,,,2024-09-18 09:03:31+00:00,Indeed Crawler
1487,Leitung Labor,Vor > 30 Tagen geschaltet,Deutsche Steinzeug Cremer & Breuer AG,,Schwarzenfeld,92521.0,0.0,0.0,,,,,2024-09-04 09:11:18+00:00,Indeed Crawler
1488,Softwareentwickler (m/w/d) im Bereich Green PL...,Vor 19 Tagen geschaltet,Dataciders ixto GmbH,,,0.0,0.0,0.0,,,,,2024-09-04 09:10:45+00:00,Indeed Crawler
1489,Produktmanager für Haus- und Großwasserzähler ...,Vor 26 Tagen geschaltet,Lorenz GmbH & Co. KG,,Schelklingen,89601.0,0.0,0.0,,,,,2024-09-04 09:12:12+00:00,Indeed Crawler


# Create a function to return the text content of a field in the database

In [7]:
def generate_text_dict(row, col):
    """
    This function generates a dictionary with the rich text format for a given column in a row.
    """
    return {
        "rich_text": [
            {
                "text": {
                    "content": row[col]
                }
            }
        ]
    }

def generate_numeric_dict(row, col):
    """
    This function generates a dictionary with the number format for a given column in a row.
    """
    return {
        "number": row[col]
    }

# Define a function to upload data to the Notion database

In [9]:
@RateLimiter(max_calls=MAX_REQUESTS_PER_MINUTE, period=60)
def upload_row_to_notion_db(row, counter):
    """
    A function to upload a row to the Notion database
    """
    # Increment the counter so the first record is 1
    counter += 1

    # Data for the new page (record) in the database
    data = {
        "parent": {
            "database_id": DATABASE_ID
        },
        "properties": {
            # job_title_name field
            "job_title_name": {
                "title": [
                    {
                        "text": {
                            "content": row["job_title_name"]
                        }
                    }
                ]
            },

            # posted_on field
            "posted_on": generate_text_dict(row=row, col="posted_on"),
            
            # company_name field
            "company_name": generate_text_dict(row=row, col="company_name"),
            
            # remote field
            "company_name": generate_text_dict(row=row, col="remote"),
            
            # city field
            "city": generate_text_dict(row=row, col="city"),
            
            # plz field
            "plz": generate_numeric_dict(row=row, col="plz"),

            # salary_low field
            "salary_low": generate_numeric_dict(row=row, col="salary_low"),

            # salary_high field
            "salary_high": generate_numeric_dict(row=row, col="salary_high"),
            
            # salary_type field
            "salary_type": generate_text_dict(row=row, col="salary_type"),

            # job_type field
            "job_type": generate_text_dict(row=row, col="job_type"),

            # phone_number field
            "phone_number": generate_text_dict(row=row, col="phone_number"),

            # industry field
            "industry": generate_text_dict(row=row, col="industry"),

            # crawled_timestamp field
            "crawled_timestamp": {
                "date": {
                    "start": row["crawled_timestamp"].strftime("%Y-%m-%d %H:%M:%S")
                }
            },

            # job_source field
            "job_source": {
                "select": {
                    "name": row["job_source"]
                }
            }
        }
    }

    # Make a request to the Notion API
    response = requests.post('https://api.notion.com/v1/pages', headers=headers, data=json.dumps(data))

    # Check if the request was successful
    if response.status_code == 200:
        print(f"The job title -> {row['job_title_name']} was added successfully. This is record {counter} out of {len(df_indeed_data)}!")
    else:
        print(f"Failed to add record number {counter} with job title -> {row['job_title_name']}: ", response.status_code, response.text)

# Upload the records to the Notion database

In [None]:
parallel_job = Parallel(n_jobs=-1, backend="threading")(
    delayed(upload_row_to_notion_db)(row, counter) for counter, row in df_indeed_data.iloc[0:100].iterrows()
)