In [None]:
## Load Packages
import pandas as pd
import re
import numpy as np
from geopy.geocoders import GoogleV3
from geopy.extra.rate_limiter import RateLimiter
from google.cloud import bigquery
from oauth2client.service_account import ServiceAccountCredentials
import os

In [None]:
#Setup BigQuery Credentials
gbq_credentials = ''

#Apply credentials to BigQuery Client
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = gbq_credentials
client = bigquery.Client()
dataset_id = "ppl-stage-bigquery.pa_people_derived_tables"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "europe-west2"
try:
    dataset = client.create_dataset(dataset)  # API request
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
except Exception as e:
    print(e)

409 POST https://bigquery.googleapis.com/bigquery/v2/projects/ppl-stage-bigquery/datasets?prettyPrint=false: Already Exists: Dataset ppl-stage-bigquery:pa_people_derived_tables


In [None]:
#SQL Query and project
bigquery_project = 'ppl-stage-bigquery'
query = """WITH offer_fields AS
  (
    SELECT
       lof.offer_id                                                                                             AS offer_id
      ,MAX(CASE WHEN lof.text = "Job Hopper" THEN lof.value END)                                                AS job_hopper
      ,MAX(CASE WHEN lof.text = 'Blockchain Experience at Hire' THEN lof.value END)                             AS blockchain_experience
    FROM lever.offer_field lof
    GROUP BY offer_id
  )
SELECT
  la.id                                                                                                         AS application_id
, la.opportunity_id                                                                                             AS opportunity_id
, lo.contact                                                                                                    AS contact_id
, lo.name                                                                                                       AS candidate
, lo.origin                                                                                                     AS source
, lar.text                                                                                                      AS archive_reason
, CASE
   WHEN lo.archived_at IS NULL THEN 'Active'
   WHEN lar.text = 'Hired' THEN 'Hired'
   WHEN lar.text IS NOT NULL AND lar.text != 'Hired' THEN 'Rejected'
   END                                                                                                          AS application_status
, lo.location                                                                                                   AS candidate_location
, CAST(lo.archived_at AS DATE)                                                                                  AS archived_at
, CAST(lo.last_advanced_at AS DATE)                                                                             AS last_advanced_at
, CAST(lo.last_interaction_at AS DATE)                                                                          AS last_interaction_at
, CAST(hist.updated_at AS DATE)                                                                                 AS stage_moved_at_date
, CASE
  WHEN CAST(lo.archived_at AS DATE) IS NOT NULL AND RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) = 1
  THEN CAST(lo.archived_at AS DATE)
  WHEN CAST(lo.archived_at AS DATE) IS NULL AND RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) = 1
  THEN CAST(hist.updated_at AS DATE)
  WHEN RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) != 1
  THEN CAST(hist.updated_at AS DATE)
  END                                                                                                           AS functional_stage_date
, DATE_TRUNC
    ((CASE
  WHEN CAST(lo.archived_at AS DATE) IS NOT NULL AND RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) = 1
  THEN CAST(lo.archived_at AS DATE)
  WHEN CAST(lo.archived_at AS DATE) IS NULL AND RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) = 1
  THEN CAST(hist.updated_at AS DATE)
  WHEN RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) != 1
  THEN CAST(hist.updated_at AS DATE)
  END
    )
  , WEEK)                                                                                                       AS functional_stage_week
, lst.text                                                                                                      AS stage_name
, CASE
    WHEN lst.text IN ('Offer Extended', 'Offer Approval & Reference Check', 'Verbal Offer & Background Check') THEN 'Offer'
    WHEN lst.text = 'Exec/ Advisor/ Enhanced Interview' THEN 'Exec/ Advisor/ Enhanced Interview'
    WHEN lst.text = 'Project' THEN 'Project'
    WHEN lst.text = 'Team Interview' THEN 'Team Interview'
    WHEN lst.text = 'HM Screen' THEN 'HM Screen'
    WHEN lst.text = 'Recruiter Screen' THEN 'Recruiter Screen'
    WHEN lst.text IN ('Applied', 'Application Review') THEN 'Application Review'
    WHEN lst.text = 'Responded' THEN 'Responded'
    WHEN lst.text IN ('Email reachout', 'Inmail') THEN 'Reached Out'
    WHEN lst.text = 'New lead' THEN 'New Lead'
    ELSE ''
    END                                                                                                         AS stage_group
 ,CASE
    WHEN RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) != 1
    THEN DATE_DIFF(CAST(LAG(hist.updated_at) OVER (PARTITION BY la.id ORDER BY CAST(hist.updated_at AS DATE) DESC) AS DATE), CAST(hist.updated_at AS DATE), DAY)
    WHEN RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) = 1 AND CAST(lo.archived_at AS DATE) IS NOT NULL
    THEN DATE_DIFF(CAST(lo.archived_at AS DATE), CAST(hist.updated_at AS DATE), DAY)
    WHEN RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) = 1 AND CAST(lo.archived_at AS DATE) IS NULL
    THEN DATE_DIFF(CURRENT_DATE(), CAST(hist.updated_at AS DATE), DAY)
    END                                                                                                         AS days_in_stage
  ,RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC)                                               AS stage_rank
,CASE
    WHEN RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) > 1 THEN 'Completed'
    WHEN RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) = 1 AND lo.archived_at IS NOT NULL THEN 'Completed'
    WHEN RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) = 1 AND lo.archived_at IS NULL THEN 'Active'
    WHEN RANK() OVER (PARTITION BY la.id ORDER BY hist.updated_at DESC) IS NULL AND lo.archived_at IS NULL THEN 'Active'
    END                                                                                                         AS stage_status
  ,lu.name                                                                                                      AS recruiting_poc
  ,CASE
    WHEN lu.deactivated_at IS NOT NULL THEN "not active"
    WHEN lu.deactivated_at IS NULL THEN "active"
  END                                                                                                           AS recruiter_status
  ,lr.custom_field_dept                                                                                         AS department_at_offer
  ,lr.custom_field_subdept                                                                                      AS subteam_at_offer
  ,lr.custom_field_level                                                                                        AS level_at_offer
  ,lr.employment_status                                                                                         AS employment_status
  ,off.blockchain_experience                                                                                    AS blockchain_experience
  ,off.job_hopper                                                                                               AS job_hopper
FROM lever.opportunity lo
    LEFT JOIN
      lever.application la
      ON lo.id = la.opportunity_id
    LEFT JOIN
      lever.archive_reason lar
      ON lo.archived_reason_id = lar.id
    LEFT JOIN
      lever.opportunity_stage_history hist
      ON lo.id = hist.opportunity_id
    LEFT JOIN
      lever.stage lst
      ON hist.stage_id = lst.id
    LEFT JOIN
      lever.user lu
      ON lo.owner_id = lu.id
    LEFT JOIN
      lever.requisition lr
      ON la.requisition_for_hire_id = lr.id
    LEFT JOIN
      lever.offer offer
      ON lo.id = offer.opportunity_id
    LEFT JOIN
      offer_fields off
      ON offer.id = off.offer_id
ORDER BY opportunity_id DESC, stage_rank ASC"""

In [None]:
#specify the project and result
query_job = client.query(query, project=bigquery_project)
results = query_job.result()

#print results back to GBQ
df = (
    results.to_dataframe()
    .result()
    .to_dataframe()
)

In [None]:
# Define the new schema
new_schema = [
    bigquery.SchemaField("application_id", "STRING"),
    bigquery.SchemaField("opportunity_id", "STRING"),
    bigquery.SchemaField("contact_id", "STRING"),
    bigquery.SchemaField("candidate", "STRING"),
    bigquery.SchemaField("source", "STRING"),
    bigquery.SchemaField("archive_reason", "STRING"),
    bigquery.SchemaField("application_status", "STRING"),
    bigquery.SchemaField("candidate_location", "STRING"),
    bigquery.SchemaField("archived_at", "DATE"),
    bigquery.SchemaField("last_advanced_at", "DATE"),
    bigquery.SchemaField("last_interaction_at", "DATE"),
    bigquery.SchemaField("stage_moved_at_date", "DATE"),
    bigquery.SchemaField("functional_stage_date", "DATE"),
    bigquery.SchemaField("functional_stage_week", "DATE"),
    bigquery.SchemaField("stage_name", "STRING"),
    bigquery.SchemaField("stage_group", "STRING"),
    bigquery.SchemaField("days_in_stage", "INTEGER"),
    bigquery.SchemaField("stage_rank", "INTEGER"),
    bigquery.SchemaField("stage_status", "STRING"),
    bigquery.SchemaField("recruiting_poc", "STRING"),
    bigquery.SchemaField("recruiter_status", "STRING"),
    bigquery.SchemaField("department_at_offer", "STRING"),
    bigquery.SchemaField("subteam_at_offer", "STRING"),
    bigquery.SchemaField("level_at_offer", "STRING"),
    bigquery.SchemaField("employment_status", "STRING"),
    bigquery.SchemaField("blockchain_experience", "STRING"),
    bigquery.SchemaField("job_hopper", "STRING")


    # Add or modify fields as necessary
]

#create a derived destination table
destination_table = f"{client.project}.{dataset_id}.pa_people_derived_tables"

# Set up the job configuration with the new schema
job_config = bigquery.LoadJobConfig(
    schema=new_schema,
    schema_update_options=[
        bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
    ]
)

# Load the DataFrame into BigQuery using the defined schema
client.delete_table(destination_table, not_found_ok=True)  # for debugging only - remove in production
job = client.load_table_from_dataframe(df, destination_table, job_config=job_config)
job.result()  # Wait for the job to complete

LoadJob<project=ppl-stage-bigquery, location=europe-west2, id=c95add7c-2c90-4c99-85df-a1137bf26819>

In [None]:
!git clone https://github.com/josephd100/people_analytics.git

fatal: destination path 'people_analytics' already exists and is not an empty directory.


In [None]:
%ls