This code aims to automate the delivery of job postings at the industry level to all members of the client team. The objective is to ensure that each member receives timely and relevant job postings within their respective industries, thereby improving the efficiency of job leads and enhancing the value provided to our clients.
 

In [None]:
import msal
import json
import logging
import requests
import os
import base64
from datetime import datetime, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import functions as F
from pyspark.dbutils import DBUtils  # For secret management in Databricks

# Initialize Databricks Utilities
dbutils = DBUtils(spark)

# Microsoft 365 Application credentials fetched from Databricks secrets
CLIENT_ID = dbutils.secrets.get(scope="n/a", key="CLIENT_ID")
CLIENT_SECRET = dbutils.secrets.get(scope="n/a", key="CLIENT_SECRET")
TENANT_ID = dbutils.secrets.get(scope="n/a", key="TENANT_ID")
MAIL_USERNAME = 'n/a'

AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"
SCOPE = ["https://graph.microsoft.com/.default"]

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize Spark session
spark = SparkSession.builder.appName("n/a").enableHiveSupport().getOrCreate()


In [None]:
# Read the lists of companies and excluded people from text files
def read_list_from_file(file_path):
    with open(file_path, 'r') as file:
        return [line.strip() for line in file.readlines() if line.strip()]

# Load company and recipient lists
target_companies = read_list_from_file('list_target_companies.txt')


In [None]:
def get_access_token():
    """Get the access token using MSAL."""
    app = msal.ConfidentialClientApplication(
        CLIENT_ID,
        authority=AUTHORITY,
        client_credential=CLIENT_SECRET,
    )
    result = app.acquire_token_for_client(scopes=SCOPE)
    if "access_token" in result:
        return result['access_token']
    raise Exception("Failed to acquire token", result.get("error"), result.get("error_description"))

def make_clickable(url):
  return f'<a href="{url}">{url}</a>'


In [None]:
def send_email_via_graph_api(subject, recipient, body, attachment=None, attachment_name=None):
    recipient='n/a'
    access_token = get_access_token()
    headers = {'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json'}

    email_data = {
        "message": {
            "subject": subject,
            "body": {"contentType": "HTML", "content": body},
            "from": {"emailAddress": {"address": MAIL_USERNAME}},
            "toRecipients": [{"emailAddress": {"address": recipient}}],
        }
    }

    if attachment:
        email_data["message"]["attachments"] = [
            {"@odata.type": "#microsoft.graph.fileAttachment", "name": attachment_name, "contentBytes": base64.b64encode(attachment).decode('utf-8')}
        ]

    response = requests.post(f'https://graph.microsoft.com/v1.0/users/{MAIL_USERNAME}/sendMail', headers=headers, data=json.dumps(email_data))
    if response.status_code != 202:
        raise Exception(f"Error sending email: {response.status_code} - {response.text}")
    logger.info(f"Email sent to {recipient}")



In [None]:
def create_and_send_emails(start_date, end_date):
    # Get distinct team members and industries from the data
    industries_query = """
        SELECT DISTINCT team_member, company_industry
        FROM hive_metastore.goldlayer_coresignal.industries_primary_recent
    """
    industries_df = spark.sql(industries_query).toPandas()

    for _, industry_row in industries_df.iterrows():
        team_member = industry_row['team_member']
        company_industry = industry_row['company_industry']

        # Query job postings for each team member's industry
        jobs_query = f"""
            SELECT DISTINCT
                companyName,
                jobTitle,
                jobFunction,
                jobPostedDate,
                skill,
                city,
                country,
                seniority,
                postURL
            FROM hive_metastore.goldlayer_coresignal.industries_primary_recent
            WHERE team_member = '{team_member}'
              AND company_industry = '{company_industry}'
              AND country = 'United Kingdom'
              AND jobPostedDate BETWEEN '{start_date}' AND '{end_date}'
        """
        job_posts_df = spark.sql(jobs_query).toPandas()

        if job_posts_df.empty:
            logger.info(f"No job postings found for {team_member} in {company_industry} industry.")
            continue

        # Convert postURL to clickable format
        job_posts_df['postURL'] = job_posts_df['postURL'].apply(make_clickable)
        job_posts_df['Location'] = job_posts_df['city'] + ', ' + job_posts_df['country']

        # Convert jobFunction array to a string for grouping
        job_posts_df['jobFunction'] = job_posts_df['jobFunction'].apply(lambda x: ', '.join(x))

        # Aggregate skills and format each job posting, ignoring None values in the skill column
        job_posts_df = job_posts_df.groupby(
            ['companyName', 'jobTitle', 'jobFunction', 'jobPostedDate', 'Location', 'seniority', 'postURL', 'country'], as_index=False
        ).agg({'skill': lambda x: ', '.join(sorted(set([s for s in x if s is not None])))})

        # Generate email body content
        job_postings_content = ''
        current_company = None
        for _, row in job_posts_df.iterrows():
            if current_company != row['companyName']:
                if current_company is not None:
                    job_postings_content += '<br>'
                job_postings_content += f"<b>{row['companyName']}</b> ({row['country']})<br><br>"
                current_company = row['companyName']
            job_postings_content += f"{row['jobTitle']}; {row['jobFunction']}; {row['Location']}; {row['seniority']}; {row['postURL']}; skills - {row['skill']}<br><br>"

        formatted_start_date = datetime.strptime(start_date, '%Y-%m-%d').strftime('%d %B %Y')
        formatted_end_date = datetime.strptime(end_date, '%Y-%m-%d').strftime('%d %B %Y')

        # Prepare email
        email_body = f"""
            <html>
            <body>
            <p>Good morning {team_member},<br><br>
            Here are the job postings for the <b>{company_industry}</b> industry between {formatted_start_date} and {formatted_end_date}:</p>
            <p>{job_postings_content}</p>
            </body>
            </html>
        """
        subject = f"{company_industry} Industry – Weekly Job Postings Report"

        # Send email
        send_email_via_graph_api(team_member, subject, email_body)

# Define date range for the previous week
today = datetime.today()
last_monday = today - timedelta(days=today.weekday() + 7)
last_sunday = last_monday + timedelta(days=6)
start_date = last_monday.strftime('%Y-%m-%d')
end_date = last_sunday.strftime('%Y-%m-%d')

# Run the email creation and sending process
create_and_send_emails(start_date, end_date)
