# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.3 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session. 
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0 and 3.0. 
                                      Default: 2.0.
----

## Selecting Job Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %glue_ray           String        Sets the session type to Glue Ray.
----

## Glue Config Magic 
*(common across all job types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
----

                                      
## Magic for Spark Jobs (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
                                      ETL and Streaming support G.1X, G.2X, G.4X and G.8X. 
                                      Default: G.1X.
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----
                                      
## Magic for Ray Job

----
    %min_workers        Int           The minimum number of workers that are allocated to a Ray job. 
                                      Default: 1.
    %object_memory_head Int           The percentage of free memory on the instance head node after a warm start. 
                                      Minimum: 0. Maximum: 100.
    %object_memory_worker Int         The percentage of free memory on the instance worker nodes after a warm start. 
                                      Minimum: 0. Maximum: 100.
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
----



####  Run this cell to set up and start your interactive session.


In [None]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [2]:
import datetime
import pyspark.sql.functions as F
import pytz
from awsglue.dynamicframe import DynamicFrame




In [3]:
# Read dimensional tables in.
dimCities = glueContext.create_dynamic_frame.from_catalog(
    database='glassdoorcurated',
    table_name='curated_dimcities'
).toDF()

dimStates = glueContext.create_dynamic_frame.from_catalog(
    database='glassdoorcurated',
    table_name='curated_dimstates'
).toDF()

dimSize = glueContext.create_dynamic_frame.from_catalog(
    database='glassdoorcurated',
    table_name='curated_dimsize'
).toDF()

dimSector = glueContext.create_dynamic_frame.from_catalog(
    database='glassdoorcurated',
    table_name='curated_dimsector'
).toDF()

dimIndustry = glueContext.create_dynamic_frame.from_catalog(
    database='glassdoorcurated',
    table_name='curated_dimindustry'
).toDF()

dimRevenue = glueContext.create_dynamic_frame.from_catalog(
    database='glassdoorcurated',
    table_name='curated_dimrevenue'
).toDF()

dimType = glueContext.create_dynamic_frame.from_catalog(
    database='glassdoorcurated',
    table_name='curated_dimtype'
).toDF()

dimEducation = glueContext.create_dynamic_frame.from_catalog(
    database='glassdoorcurated',
    table_name='curated_dimeducation'
).toDF()

dimExperience = glueContext.create_dynamic_frame.from_catalog(
    database='glassdoorcurated',
    table_name='curated_dimexperience'
).toDF()




In [4]:
def transform_data(df):
    '''
    Performs PySpark transformations on clean data into useable data for analysis.
    
    ARGUMENTS:
        - df: Spark DataFrame
    RETURNS:
        - Spark DataFrame
    '''
    # Job Location
    df = df.withColumn('jobcity', 
                       F.when(
                            F.col('joblocation') == 'Remote', 'Remote'
                            )
                            .otherwise(
                                F.split(F.col('joblocation'), pattern=',')[0]
                            )
                        ) \
            .withColumn('jobcity',
                        F.trim(F.col('jobcity'))
                        ) \
            .withColumn('jobstate', 
                        F.when(
                            F.col('joblocation') == 'Remote', 'Remote'
                            ).
                            otherwise(
                                F.element_at(F.split(F.col('joblocation'), pattern=','), -1)
                                )
                        ) \
            .withColumn('jobstate',
                        F.trim(F.col('jobstate'))
                        )
    # Company Type and Company Name
    df = df.withColumn('companytype', 
                        F.when(
                            F.col('companytype')=='Other', F.col('companytype')
                            )
                            .otherwise(
                                F.element_at(F.split(F.col('companytype'), pattern=' - '), -1)
                                )
                        ) \
                        .withColumn('companytype',
                            F.trim(F.col('companytype'))
                        ) \
            .withColumn('companyname', 
                        F.split(
                            F.col('companyname'), pattern='\n'
                            )[0]
                        ) \
            .withColumn('companyname',
                        F.regexp_replace(
                            F.col('companyname'), ',', '')
                       ) \
            .withColumn('companyname',
                        F.trim(F.col('companyname'))
                        )
    # Salary
    df = df.withColumn('jobsalary', 
                        F.regexp_replace(
                            F.col('jobsalary'), 'Employer Provided Salary:', '')
                        ) \
            .withColumn('jobsalary', 
                        F.regexp_replace(
                            F.col('jobsalary'), ' \(Glassdoor est.\)', '')
                        ) \
            .withColumn('jobsalary',
                        F.when(
                            F.col('jobsalary').contains('Hour'), 
                            F.split(F.col('jobsalary'), pattern=' Per Hour')[0]
                            ).otherwise(
                                F.col('jobsalary')    
                            )
                        ) \
            .withColumn('jobsalary',
                        F.regexp_replace(F.col('jobsalary'), '\$', '')
                    ) \
            .withColumn('minsalary',
                        F.split(
                            F.col('jobsalary'), pattern=' - '
                            )[0]
                        ) \
            .withColumn('minsalary',
                        F.trim(F.col('minsalary'))
                        ) \
            .withColumn('maxsalary',
                        F.element_at(F.split(
                            F.col('jobsalary'), pattern=' - '
                            ), -1)
                        ) \
            .withColumn('maxsalary',
                        F.trim(F.col('maxsalary'))
                        ) \
            .withColumn('minsalary',
                F.when(
                    F.col('minsalary').endswith('K'), 
                    F.regexp_replace(F.col('minsalary'), 'K', '000')
                    ).otherwise(
                        F.col('minsalary') * 2080
                    )
                ) \
            .withColumn('maxsalary',
                        F.when(
                            F.col('maxsalary').endswith('K'), 
                            F.regexp_replace(F.col('maxsalary'), 'K', '000')
                            ).otherwise(
                                F.col('maxsalary') * 2080
                            )
                        ) \
            .withColumn('averagesalary',
                (F.col('minsalary') + F.col('maxsalary')) / 2
            )
    # Company Age
    df = df.withColumn('companyage', 
                       datetime.datetime.now().year - F.col('companyyearfounded')
                       ) \
            .withColumn('companyage',
                       F.trim(F.col('companyage'))
                       )
    
    # Education levels
    phds = {'phd','doctorate', 'postdoc'}
    masters  = {'msc', 'master'}
    undergraduate = {'bachelors', 'undergraduate', 'associates'}

    df = df.withColumn('educationlevel',
                        F.when(
                                F.lower(F.col('jobdescription')).rlike('|'.join(phds)),
                                'PhD or higher'
                            ).otherwise(
                                F.when(
                                    F.lower(F.col('jobdescription')).rlike('|'.join(masters)),
                                    'Masters'
                                ).otherwise(
                                    F.when(
                                        F.lower(F.col('jobdescription')).rlike('|'.join(undergraduate)),
                                        'Undergraduate'
                                    ).otherwise(
                                        'Other / Unknown'
                                    )
                                )
                            )
                        )
    # Create columns for skills demanded by job
    masterlist = {'python', 'sql', 'scala', 'aws', 'gcp', 'azure', 'stream', 'batch', 'java', 'spark','dbt', 'airflow'}

    for skill in masterlist:
        df = df.withColumn(skill,
                      F.when(
                          F.lower(F.col('jobdescription')).rlike(skill),
                          1
                        ).otherwise(
                              0
                        )
                    )
    # Job Seniority
    experienced = {'lead', 'principal', 'senior', 'sr', 'iv', 'manage'}
    mid = {'mid', 'ii', 'iii'}
    entry = {'entry', 'associate', 'assc', 'i', 'junior', 'jr'}
    df = df.withColumn('experiencelevel',
                       F.when(
                           F.lower(F.col('jobtitle')).rlike('|'.join(experienced)),
                           'Senior'
                       ).otherwise(
                           F.when(
                               F.lower(F.col('jobtitle')).rlike('|'.join(mid)),
                               'Mid'
                       ).otherwise(
                           F.when(
                               F.lower(F.col('jobtitle')).rlike('|'.join(entry)),
                               'Entry'
                            ).otherwise(
                                'Other / Unknown'
                            )
                        )
                    )
                )
    # Select cols
    df_transformed = df.select(
        'date',
        'companyname',
        'companyrating',
        'companyrevenue',
        'companysector',
        'companyindustry',
        'companysize',
        'companytype',
        'companyyearfounded',
        'easyapply',
        'jobcity',
        'jobstate',
        'minsalary',
        'maxsalary',
        'averagesalary',
        'companyage',
        'educationlevel',
        'stream',
        'sql',
        'gcp',
        'scala',
        'dbt',
        'java',
        'azure',
        'aws',
        'batch',
        'spark',
        'python',
        'airflow',
        'experiencelevel'
    )
    return df




In [5]:
def map_star_schema(df):
    '''
    Converts DataFrame to STAR Schema data model.
    
    ARGUMENTS:
        - df: PySpark DataFrame
    RETURNS:
        - PySpark DataFrame
    '''
    # To ensure fact table and dimensional tables join correctly, need to validate categorical null values and check all categorical values are valid values seen in dimensional tables.
    # Get list of valid values for categorical cols containing nulls
    cities = {val[0] for val in dimCities.select('cityname').distinct().collect()}
    states = {val[0] for val in dimStates.select('stateabbrev').distinct().collect()}
    sizes = {val[0] for val in dimSize.select('size').distinct().collect()}
    sectors = {val[0] for val in dimSector.select('sector').distinct().collect()}
    industries = {val[0] for val in dimIndustry.select('industry').distinct().collect()} 
    revenues = {val[0] for val in dimRevenue.select('revenue').distinct().collect()}
    types = {val[0] for val in dimType.select('type').distinct().collect()}

    # Apply valid categorical values check, fill nulls with Other / Unknown if necessary 
    validvalueshashmap = {
        'jobcity': cities,
        'jobstate': states,
        'companytype': types,
        'companyrevenue': revenues,
        'companysector': sectors,
        'companysize': sizes,
        'companytype': types,
        'companyindustry': industries
    }
    for col in validvalueshashmap:
        df = df.withColumn(col,
                        F.when(
                            F.col(col).isin(validvalueshashmap[col]),
                            F.col(col)
                            ).otherwise(
                                    'Other / Unknown'
                                )
                        )

    # Fill company name nulls
    df = df.fillna('Other / Unknown', subset=['companyname'])
    
    # Convert model to STAR schema
    cols = [
            'date',
            'companyname', 
            'companyrating', 
            'revenuekey', 
            'sectorkey',
            'industrykey', 
            'sizekey', 
            'typekey', 
            'educationkey', 
            'experiencekey',
            'companyyearfounded', 
            'companyage', 
            'easyapply', 
            'citykey', 
            'statekey', 
            'minsalary', 
            'maxsalary', 
            'averagesalary',
            'stream', 
            'sql', 
            'gcp', 
            'scala', 
            'dbt', 
            'java', 
            'azure', 
            'aws', 
            'batch', 
            'spark', 
            'python', 
            'airflow',
        ]
    df = df \
        .join(F.broadcast(dimCities), df.jobcity==dimCities.cityname, 'left') \
        .join(F.broadcast(dimStates), df.jobstate==dimStates.stateabbrev, 'left') \
        .join(F.broadcast(dimSize), df.companysize==dimSize.size, 'left') \
        .join(F.broadcast(dimSector), df.companysector==dimSector.sector, 'left') \
        .join(F.broadcast(dimIndustry), df.companyindustry==dimIndustry.industry, 'left') \
        .join(F.broadcast(dimRevenue), df.companyrevenue==dimRevenue.revenue, 'left') \
        .join(F.broadcast(dimType), df.companytype==dimType.type, 'left') \
        .join(F.broadcast(dimEducation), df.educationlevel==dimEducation.education, 'left') \
        .join(F.broadcast(dimExperience), df.experiencelevel==dimExperience.experience, 'left') \
        .select(cols)

    return df




In [6]:
def spark_to_dynamicframe(df):
    '''
    Converts Spark DataFrame to Glue DynamicFrame. Applies appropriate data type mappings to fields.
    
    ARGUMENTS:
        - df: PySpark DataFrame
    RETURNS:
        - Glue DynamicFrame
    '''
    # Convert spark dataframe to glue dynamicframe
    dyf_fact = DynamicFrame.fromDF(
        df,
        glueContext,
        'convert'
    )
    # Change types, apply mapping
    mapping = [
        ('date', 'string', 'date', 'string'),
        ('companyname', 'string', 'companyname', 'string'),
        ('companyrating', 'double', 'companyrating', 'double'),
        ('revenuekey', 'long', 'revenuekey', 'int'),
        ('sectorkey', 'long', 'sectorkey', 'int'),
        ('industrykey', 'long', 'industrykey', 'int'),
        ('sizekey', 'long', 'sizekey', 'int'),
        ('typekey', 'long', 'typekey', 'int'),
        ('educationkey', 'long', 'educationkey', 'int'),
        ('experiencekey', 'long', 'experiencekey', 'int'),
        ('companyyearfounded', 'string', 'companyyearfounded', 'int'),
        ('companyage', 'string', 'companyage', 'int'),
        ('easyapply', 'string', 'easyapply', 'string'),
        ('citykey', 'long', 'citykey', 'int'),
        ('statekey', 'long', 'statekey', 'int'),
        ('minsalary', 'string', 'minsalary', 'double'),
        ('maxsalary', 'string', 'maxsalary', 'double'),
        ('averagesalary', 'double', 'averagesalary', 'double'),
        ('stream', 'int', 'stream', 'int'),
        ('sql', 'int', 'sql', 'int'),
        ('gcp', 'int', 'gcp', 'int'),
        ('scala', 'int', 'scala', 'int'),
        ('dbt', 'int', 'dbt', 'int'),
        ('java', 'int', 'java', 'int'),
        ('azure', 'int', 'azure', 'int'),
        ('aws', 'int', 'aws', 'int'),
        ('batch', 'int', 'batch', 'int'),
        ('spark', 'int', 'spark', 'int'),
        ('python', 'int', 'python', 'int'),
        ('airflow', 'int', 'airflow', 'int')
    ]
    dyf_fact = ApplyMapping.apply(
        frame=dyf_fact,
        mappings=mapping,
        transformation_ctx='dyfapplymapping'
    )
    
    return dyf_fact




In [7]:
def save_to_lake_catalog(dyf_fact, date):
    '''
    Saves Glue DynamicFrame to Lake Formation Curated bucket and reads data into Glue Data Catalog.
    
    ARGUMENTS:
        - dyf_fact: Glue DynamicFrame
        - date: String representation of desired date for naming purposes. Formatted as YYYYMMDD
    RETURNS:
        None
    '''
    # Add curated data to S3 curated bucket and Glue Data Catalog

    s3output = glueContext.getSink(
      path=f's3://kc-glassdoor-data-curated/curated/curated_{date}',
      connection_type="s3",
      updateBehavior="UPDATE_IN_DATABASE",
      partitionKeys=[],
        format='csv',
      enableUpdateCatalog=True,
      transformation_ctx="s3output",
    )

    s3output.setCatalogInfo(
      catalogDatabase="glassdoorcurated", catalogTableName=f"curated_{date}"
    )
    s3output.setFormat("csv")
    s3output.writeFrame(dyf_fact.coalesce(1))




In [8]:
date_format='%m/%d/%Y'
date = datetime.datetime.now(tz=pytz.utc)
date = date.astimezone(pytz.timezone('US/Pacific'))
today = date.strftime('%Y%m%d')




In [9]:
# Read daily data into dynamic frame

df = glueContext.create_dynamic_frame.from_catalog(
    database='glassdoorcleaned',
    table_name=f'clean_{today}'
).toDF()

# df = glueContext.create_dynamic_frame.from_catalog(
#     database='glassdoorcleaned',
#     table_name=f'clean_20230719'
# ).toDF()




In [10]:
df = map_star_schema(transform_data(df))




In [11]:
dyf = spark_to_dynamicframe(df)




In [12]:
save_to_lake_catalog(dyf, today)


