In [10]:
for row in results:
    print("{} : {} views".format(row.url, row.view_count))

In [9]:
from google.cloud import bigquery 
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    r'C:\Users\SzymonKarpecki\Documents\job-offers-analysis.json')
bigquery_project = "job-offers-analysis"

client = bigquery.Client(project=bigquery_project, credentials=credentials)

query_job = client.query(
    """
    SELECT
        offer_id
    FROM
        just_join_it.offers
    """
)

results = query_job.result()

In [146]:
import requests
import json
import pandas as pd
import sqlalchemy as sal
from datetime import datetime
from google.cloud import bigquery 
from google.oauth2 import service_account
from dateutil import parser

credentials = service_account.Credentials.from_service_account_file(
    r'C:\Users\SzymonKarpecki\Documents\job-offers-analysis.json')
bigquery_project = "job-offers-analysis"

client = bigquery.Client(project=bigquery_project, credentials=credentials)

bigquery_dataset = "just_join_it"
offers_table_name = "offers"
offers_skills_table_name = "offers_skills"
offers_table_bigquery_id = bigquery_dataset + "." + offers_table_name
offers_skill_table_bigquery_id = bigquery_dataset + "." + offers_skills_table_name
columns_to_drop = ["street", "address_text", "latitude", "longitude", "open_to_hire_ukrainians", "company_logo_url", "way_of_apply"]
starting_offer_id = 0


# download data and create basic dataframe
api_url = "https://justjoin.it/api/offers"
r = requests.get(api_url)
data = json.loads(r.text)
df_api_offers = pd.DataFrame.from_dict(data)
df_api_offers = df_api_offers.drop(columns_to_drop, axis=1)
df_api_offers = df_api_offers.rename(columns={"marker_icon": "category"})

#class to represent employment type
class EmploymentType:
    def __init__(self, emp_type: str, emp_code: str):
        self.emp_type = emp_type
        self.emp_code = emp_code

    def get_salary_dict(self, emp_type_dict: dict) -> dict:
        min_salary = None
        max_salary = None
        currency = None

        try:
            if emp_type_dict is not None and emp_type_dict.get("type") == self.emp_code:
                salary_str = emp_type_dict.get("salary")
                if salary_str is not None:
                    salary_dict = dict(salary_str)
                    min_salary = salary_dict.get("from")
                    max_salary = salary_dict.get("to")
                    currency = salary_dict.get("currency")
        except ValueError:
            pass

        return ({
            f"{self.emp_type}_min_salary": min_salary,
            f"{self.emp_type}_max_salary": max_salary,
            f"{self.emp_type}_currency": currency})

#employment types avaiable in api
b2b_emp_type = EmploymentType("b2b", "b2b")
coe_emp_type = EmploymentType("contract_of_employment", "permanent")
com_emp_type = EmploymentType("contract_of_mandate", "mandate_contract")

#method to create multiple columns with salaries ranges
def unnest_salaries(emp_types_list: dict):
    b2b_dict = b2b_emp_type.get_salary_dict(dict())
    coe_dict = coe_emp_type.get_salary_dict(dict())
    com_dict = com_emp_type.get_salary_dict(dict())

    #iterating over list of JSONs
    for emp_type_dict_entry in emp_types_list:
        if emp_type_dict_entry is None:
            continue
        try:
            emp_type_dict = dict(emp_type_dict_entry)
            emp_type = emp_type_dict.get("type")
        except ValueError:
            continue

        if emp_type == b2b_emp_type.emp_code:
            b2b_dict = b2b_emp_type.get_salary_dict(emp_type_dict)
        elif emp_type == coe_emp_type.emp_code:
            coe_dict = coe_emp_type.get_salary_dict(emp_type_dict)
        elif emp_type == com_emp_type.emp_code:
            com_dict = com_emp_type.get_salary_dict(emp_type_dict)

    final_dict = dict(**b2b_dict, **coe_dict, **com_dict)
    return final_dict

#expand json into multiple columns
df_salary = df_api_offers.apply(lambda row: unnest_salaries(row.employment_types), axis=1, result_type='expand')
df_api_offers = pd.concat([df_api_offers, df_salary], axis=1)

#final changes

#parse published_at to datetime as it will later cause errors because of Zulu sign in the timestamp
df_api_offers["published_at"] = df_api_offers["published_at"].apply(lambda x: parser.isoparse(x) )
df_api_offers["offer_url"] = df_api_offers["id"].apply(lambda x: f"https://justjoin.it/offers/{x}")
df_api_offers.insert(0, "offer_id", df_api_offers.index + 1 + starting_offer_id)
df_api_offers["load_date_utc"] = datetime.utcnow()

df_offers_skills = df_api_offers[["offer_id", "skills"]]
df_offers_skills = df_offers_skills.explode("skills", ignore_index=True)

df_offers_skills_normalized = pd.json_normalize(df_offers_skills["skills"])
df_offers_skills = df_offers_skills[["offer_id"]].join(df_offers_skills_normalized)
df_offers_skills.insert(0, "offer_skill_id", df_offers_skills.index + 1)


In [147]:
job_offers = client.load_table_from_dataframe(df_api_offers, offers_table_bigquery_id)
results_offers = job_offers.result()

job_offers_skills = client.load_table_from_dataframe(df_offers_skills, offers_skill_table_bigquery_id)
results_offers_skills = job_offers_skills.result()

In [91]:
df_api_offers.dtypes

job_config = bigquery.LoadJobConfig(
 
    schema = [
        bigquery.SchemaField("offer_id", "INTEGER"),
        bigquery.SchemaField("title", "STRING"),
        bigquery.SchemaField("city", "STRING"),
        bigquery.SchemaField("country_code", "STRING"),
        bigquery.SchemaField("category", "STRING"),
        bigquery.SchemaField("workplace_type", "STRING"),
        bigquery.SchemaField("company_name", "STRING"),
        bigquery.SchemaField("company_url", "STRING"),
        bigquery.SchemaField("company_size", "STRING"),
        bigquery.SchemaField("experience_level", "STRING"),
        bigquery.SchemaField("published_at", "TIMESTAMP"),
        bigquery.SchemaField("remote_interview", "BOOL"),
        bigquery.SchemaField("id", "STRING"),
        bigquery.SchemaField("display_offer", "BOOL"),
        bigquery.SchemaField("employment_types", "RECORD", mode='REPEATED',
            fields=(
                bigquery.SchemaField("salary", "RECORD", fields=(
                    bigquery.SchemaField("currency", "STRING"),
                    bigquery.SchemaField("to", "FLOAT"),
                    bigquery.SchemaField("from", "FLOAT")
                )),
                bigquery.SchemaField("type", "STRING")
            )),
        bigquery.SchemaField("skills", "RECORD", mode="REPEATED", fields=(
            bigquery.SchemaField("level", "INTEGER"),
            bigquery.SchemaField("name", "STRING")
        )),
        bigquery.SchemaField("remote", "BOOL"),
        bigquery.SchemaField("multilocation", "RECORD", mode="REPEATED", fields=(
            bigquery.SchemaField("slug", "STRING"),
            bigquery.SchemaField("street", "STRING"),
            bigquery.SchemaField("city", "STRING")
        )),
        bigquery.SchemaField("b2b_min_salary", "FLOAT"),
        bigquery.SchemaField("b2b_max_salary", "FLOAT"),
        bigquery.SchemaField("b2b_currency", "STRING"),
        bigquery.SchemaField("contract_of_employment_min_salary", "FLOAT"),
        bigquery.SchemaField("contract_of_employment_max_salary", "FLOAT"),
        bigquery.SchemaField("contract_of_employment_currency", "STRING"),
        bigquery.SchemaField("contract_of_mandate_min_salary", "FLOAT"),
        bigquery.SchemaField("contract_of_mandate_max_salary", "FLOAT"),
        bigquery.SchemaField("contract_of_mandate_currency", "STRING"),
        bigquery.SchemaField("offer_url", "STRING"),
        bigquery.SchemaField("load_date_utc", "TIMESTAMP")
    ],
    source_format=bigquery.SourceFormat.PARQUET
)

In [104]:
# df_api_offers["offer_id"] = df_api_offers["offer_id"].astype("int64")
# df_api_offers["title"] = df_api_offers["title"].astype("string")
# df_api_offers["city"] = df_api_offers["city"].astype("string")
# df_api_offers["country_code"] = df_api_offers["country_code"].astype("string")
# df_api_offers["category"] = df_api_offers["category"].astype("string")
# df_api_offers["workplace_type"] = df_api_offers["workplace_type"].astype("string")
# df_api_offers["company_name"] = df_api_offers["company_name"].astype("string")
# df_api_offers["company_url"] = df_api_offers["company_url"].astype("string")
# df_api_offers["company_size"] = df_api_offers["company_size"].astype("string")
# df_api_offers["experience_level"] = df_api_offers["experience_level"].astype("string")
# df_api_offers["published_at"] = df_api_offers["published_at"].astype("string")
# df_api_offers["remote_interview"] = df_api_offers["remote_interview"].astype("bool")
# df_api_offers["id"] = df_api_offers["id"].astype("string")
# df_api_offers["display_offer"] = df_api_offers["display_offer"].astype("bool")
# df_api_offers["employment_types"] = df_api_offers["employment_types"].astype("string")
# df_api_offers["skills"] = df_api_offers["skills"].astype("string")
# df_api_offers["remote"] = df_api_offers["remote"].astype("bool")
# df_api_offers["multilocation"] = df_api_offers["multilocation"].astype("string")
# df_api_offers["b2b_min_salary"] = df_api_offers["b2b_min_salary"].astype("float")
# df_api_offers["b2b_max_salary"] = df_api_offers["b2b_max_salary"].astype("float")
# df_api_offers["b2b_currency"] = df_api_offers["b2b_currency"].astype("string")
# df_api_offers["contract_of_employment_min_salary"] = df_api_offers["contract_of_employment_min_salary"].astype("float")
# df_api_offers["contract_of_employment_max_salary"] = df_api_offers["contract_of_employment_max_salary"].astype("float")
# df_api_offers["contract_of_employment_currency"] = df_api_offers["contract_of_employment_currency"].astype("string")
# df_api_offers["contract_of_mandate_min_salary"] = df_api_offers["contract_of_mandate_min_salary"].astype("float")
# df_api_offers["contract_of_mandate_max_salary"] = df_api_offers["contract_of_mandate_max_salary"].astype("float")
# df_api_offers["contract_of_mandate_currency"] = df_api_offers["contract_of_mandate_currency"].astype("string")
# df_api_offers["offer_url"] = df_api_offers["offer_url"].astype("string")
# df_api_offers["load_date_utc"] = df_api_offers["load_date_utc"].astype("string")

published_at come in format with "Z" as the last character meaning it is a UTC date. What I had to do for now is to simply ignore that Z and create a proper datetime. For future I need to create a proper datetime from this with an information regarding timezone

In [129]:
# df_api_offers = df_api_offers.drop("offer_id", axis=1)
# df_api_offers = df_api_offers.drop("title", axis=1)
# df_api_offers = df_api_offers.drop("city", axis=1)
# df_api_offers = df_api_offers.drop("country_code", axis=1)
# df_api_offers = df_api_offers.drop("category", axis=1)
# df_api_offers = df_api_offers.drop("workplace_type", axis=1)
# df_api_offers = df_api_offers.drop("company_name", axis=1)
# df_api_offers = df_api_offers.drop("company_url", axis=1)
# df_api_offers = df_api_offers.drop("company_size", axis=1)
# df_api_offers = df_api_offers.drop("experience_level", axis=1)
df_api_offers = df_api_offers.drop("published_at", axis=1) ### THIS SHIT MADE ME LOSE 3 FUCKING HOURS
# df_api_offers = df_api_offers.drop("remote_interview", axis=1)
# df_api_offers = df_api_offers.drop("id", axis=1)
# df_api_offers = df_api_offers.drop("display_offer", axis=1)
# df_api_offers = df_api_offers.drop("employment_types", axis=1)
# df_api_offers = df_api_offers.drop("skills", axis=1)
# df_api_offers = df_api_offers.drop("remote", axis=1)
# df_api_offers = df_api_offers.drop("multilocation", axis=1)
# df_api_offers = df_api_offers.drop("b2b_min_salary", axis=1)
# df_api_offers = df_api_offers.drop("b2b_max_salary", axis=1)
# df_api_offers = df_api_offers.drop("b2b_currency", axis=1)
# df_api_offers = df_api_offers.drop("contract_of_employment_min_salary", axis=1)
# df_api_offers = df_api_offers.drop("contract_of_employment_max_salary", axis=1)
# df_api_offers = df_api_offers.drop("contract_of_employment_currency", axis=1)
# df_api_offers = df_api_offers.drop("contract_of_mandate_min_salary", axis=1)
# df_api_offers = df_api_offers.drop("contract_of_mandate_max_salary", axis=1)
# df_api_offers = df_api_offers.drop("contract_of_mandate_currency", axis=1)
# df_api_offers = df_api_offers.drop("offer_url", axis=1)
# df_api_offers = df_api_offers.drop("load_date_utc", axis=1)

KeyError: "['published_at'] not found in axis"