# Job Opportunities ETL / Alert
* Create a job to
    1. ETL on job openings data by web scraping.
    2. Send an email alert of the results.
* ToDo
    * Add Docker
    * Create web server for displaying data
    * Set up AWS connection
    * Private key storage

In [1]:
# standard
import logging
import os
import sys
import io
import subprocess
# import pickle
import datetime
import re
import yaml
# aws
import boto3
# web
import requests
import bs4
import time
import random
import html
# data
import pandas as pd
import pyarrow as pa
# user def
import alert

In [2]:
def CONFIG(prod=False):
    """
    Create ref dict
    """
    #usr = os.environ['USR']
    cwd = os.getcwd()
    now = datetime.datetime.today()
    
    dt = now.strftime('%Y%m%d')
    dttm = now.strftime('%Y%m%d_%H%M%S')
    
    if prod:
        run='prod'
        to = ['trevsjordan@gmail.com'] 
        fname = '{DT}-output.parquet'.format(DT=dt)
        lname = '{DT}.log'.format(DT=dt)
    else:
        run='qa'
        to = ['trevsjordan@gmail.com'] # [os.environ['EML_USR']]
        fname = 'qa.parquet'
        lname = 'qa.log'
    
    _cfg = {
        'cwd': cwd
        ,'now': now
        ,'log': '' #  os.path.join(os.environ['LOGS'],'jobs/{LNAME}'.format(LNAME=lname))
        ,'indeed': {
            'search': 'https://www.indeed.com/jobs?q={TTL}&l={CT}%2C%20{ST}&start={NUM}'
            ,'ttls': ['data scientist']#,'business analyst','data analyst','data engineer']
            ,'locs': [('seattle','wa')]#,('austin','tx'),('washington','dc'),('richmind','va'),('boston','ma')]
        }
        ,'local': {'bckt': cwd, 'key': 'data/'+fname}
        ,'aws': {
            'region': '' # os.environ['AWS_REGION']
            ,'s3': {
                'bckt': {'us-east-1':'tsj7ww-useast1','us-west-2':'tsj7ww-uswest2'} # os.environ['S3_USEAST1'], os.environ['S3_USWEST2']
                ,'key': 'data/jobs/indeed/{FNAME}'.format(FNAME=fname)
                ,'regions': ['us-east-1','us-west-2'] # os.environ['AWS_REGION']
            }
            ,'iam': {'arn':'','session':'alert_indeed'}
        }
        ,'email': {
            'frm': 'tsj7ww.messenger@gmail.com' # os.environ['EML_MSNGR']
            ,'to': to
            ,'svr': 'smtp.gmail.com' # os.environ['EML_SVR']
            ,'port': 587 # os.environ['EML_PORT']
            ,'pwd': 'S3ndTh3mM@ils' # subprocess.run(['getpwd',os.environ['GMAIL_PWD']])
        }
        ,'db': {
            'creds': {
                'username': '' # os.environ['USR']
                ,'password': '' # subprocess.run(['getpwd',os.environ['PWD']])
                ,'account': '' # os.environ['DB_ACCT']
                ,'host': '' # os.environ['DB_HOST']
                ,'role': '' # os.environ['DB_ROLE']
                ,'warehouse': '' # os.environ['DB_WH']
                ,'database': '' # os.environ['DB_DB']
                ,'schema': '' # os.environ['DB_SCMA']
            }
        }
    }
    
    return _cfg

In [3]:
def BACKFILL(fpath):
# def BACKFILL(bckt,key,usr,pwd):
    """
    """
    
    with open(fpath,'r+') as f:
        bkfl = f.read()
        f.seek(0)
        f.truncate()
        f.write(datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S'))
    
    return bkfl

In [4]:
def EXTRACT(soup):
    """
    in: soup of search query
    out: job post urls
    """
    _urls = []
    
    for div in soup.find_all(name='div',attrs={'class':'row'}):
        _urls.append(div.find(name='a',attrs={'data-tn-element':'jobTitle'})['href'])
    
    return _urls

In [5]:
def CLEAN(item):
    """quick function for cleaning scraped text data"""
    
    rm = re.compile('[^A-Za-z0-9\&\.\s]+')
    spc = re.compile('\s{2,}')
    
    return re.sub(r'\s([."](?:\s|$))', r'\1',spc.sub(' ',rm.sub('',item))).strip().upper()

In [6]:
def TRANSFORM(row):
    """
    in: soup obj
    out: cleaned text
    """
    _url = 'www.indeed.com'
    _post = {}

    try:
        _post['title'] = row.find(name='a',attrs={'data-tn-element':'jobTitle'}).text
    except:
        _post['title'] = None
    try:
        _post['company'] = row.find(name='a',attrs={'data-tn-element':'companyName'}).text
    except:
        _post['company'] = None
    try:
        _post['rating'] = row.find(name='span',attrs={'class':'ratingsContent'}).text
    except:
        _post['rating'] = None
    try:
        _post['city'] = row.find('div', attrs={'class': 'recJobLoc'})['data-rc-loc'].split(',')[0]
    except:
        _post['city'] = None
    try:
        _post['state'] = row.find('div', attrs={'class': 'recJobLoc'})['data-rc-loc'].split(',')[1]
    except:
        _post['state'] = None
    try:
        _post['salary'] = row.find('nobr').text
    except:
        _post['salary'] = None
    try:
        _post['summary'] = ' '.join([i.text for i in row.find('div',attrs={'class','summary'}).find_all('li')])
    except:
        _post['summary'] = None
#     try:
#         _post['duties'] = None
#     except:
#         _post['duties'] = None
#     try:
#         _post['skills'] = None
#     except:
#         _post['skills'] = None
    try:
        _post['url_post'] = _url+html.unescape(row.find(name='a',attrs={'data-tn-element':'jobTitle'})['href'])
    except:
        _post['url_post'] = None
#     try:
#         _post['url_app'] = None
#     except:
#         _post['url_app'] = None
    
    skip = ['url_post']
    for k,v in _post.items():
        if (v!=None) & (k not in skip):
            _post[k] = CLEAN(v)
    
    return _post

In [7]:
def LOAD(lst,s3,iam):
    """
    in: list of tuples, bucket name, file key
    out: status code
    """
    
    _buffer = io.buffer()
    pd.DataFrame(lst).to_parquet(_buffer,index=False)
    
#     _sts = boto3.client('sts')
#     _sts.assume_role('roleArn'=iam['arn'],'sessionName'=iam['session'])
    
    _s3 = boto3.client('s3',region_name=s3['region'])
    _cfg = {
        'Bucket': s3['bckt']
        ,'Key': s3['key']
        ,'Body': _buffer
        ,'ACL': 'bucket-owner-full-control'
        'ServerSideEncryption': 'AES256'
        ,'StorageClass': 'STANDARD_IA'
        #,'Metadata': {}
    }
    
    return _s3.put_object(**_cfg)

In [9]:
if __name__=='__main__':
    
    #setup cfg dict
    CFG = CONFIG(prod=False)
    
    try:
        # ETL ONLY - find last time scape happened
        # dt = BACKFILL(CFG['local']['run'])
        
        qrys = []
        rows = [] # data rows
        pages = 1 # num of pages of each query combo to scrape
        
        # pull text from query urls
        for ttl in CFG['indeed']['ttls']:
            for loc in CFG['indeed']['locs']:
                for pg in range(pages):
                    url = CFG['indeed']['search'].format(TTL=ttl.replace(' ','%20'),CT=loc[0],ST=loc[1],NUM=10*pg)
                    qrys.append({
                        'data_dt': CFG['now'].strftime('%Y-%m-%d'),'ttl':ttl,'loc':loc
                        ,'soup':bs4.BeautifulSoup(requests.get(url).text,'html.parser')
                    })
#                     time.sleep(random.randint(3,10))
        
        # pull data from text
        for qry in qrys:
            for row in qry['soup'].find_all(name='div',attrs={'class':'row'}):
                rows.append({'q_title':qry['ttl'].upper(),'q_location':', '.join(qry['loc']).upper(),**TRANSFORM(row)})

        # ETL ONLY - load full data
        pd.DataFrame(rows).to_parquet(os.path.join(CFG['local']['bckt'],CFG['local']['key']),index=False)
#         s3s = [{'key':CFG['aws']['s3']['key'],'bckt':CFG['aws']['s3'][i],'region':i} for i in CFG['aws']['s3']['regions']]
#         for s3 in s3s:
#             LOAD(rows,s3,CFG['aws']['iam'])

        # emails
        locs = (i['q_location'] for i in rows)
        info = {loc:[] for loc in locs}
        [info[row['q_location']].append(row) for row in rows]
        alert.ALERT(info,**CFG['email'])
    
    except:
        # failure email
        info = ''
        alert.FAIL(info,**CFG['email'])
        raise

SMTPSenderRefused: (530, b'5.7.0 Authentication Required. Learn more at\n5.7.0  https://support.google.com/mail/?p=WantAuthError q28sm8671373qtk.13 - gsmtp', 'tsj7ww.messenger@gmail.com')