Local Development prerequisites

- Run `source ./setup_aws_glue_scripts/setup_aws_glue.sh` in GitBash

AWS Glue Local Debugging:
> Run `./glue_local_debug.sh` or follow the following steps.

- From the Command Palette (`Ctrl+Shift+P`), select `Tasks: Run Task command`, then select `Convert Notebook to Script`
- Run `pipenv shell` in terminal
- Run `jupyter notebook --no-browser` in terminal
- Copy Jupyter Server url from terminal
- From the Command Palette (`Ctrl+Shift+P`), select `Notebook: Select Notebook Kernel`, then select `Select Another Kernel...`, then select `Existing Jupyter Server...`, then paste Jupyter Server url, then enter, then enter again.

AWS Glue Interactive Session Debugging:
- From the Command Palette (`Ctrl+Shift+P`), select `Tasks: Run Task command`, then select `Convert Notebook to Script`
- Run `pipenv shell` in terminal
- Run `jupyter notebook --no-browser` in terminal
- Copy Jupyter Server url from terminal
- From the Command Palette (`Ctrl+Shift+P`), select `Notebook: Select Notebook Kernel`, then select `Select Another Kernel...`, then select `Existing Jupyter Server...`, then paste Jupyter Server url, then enter, then enter again.
- Uncomment the following cell, and run it.

# To configure AWS Glue Interactive Session, uncomment the following cell

Change configuration in terms of your requirements.

In [None]:
# %profile [your-profile-name]
# %region eu-west-1
# %iam_role [your-iam-role]
# %worker_type G.1X
# %number_of_workers 2
# %additional_python_modules watchtower
# %extra_py_files s3://artifacts-dev/33333/Layers/pip_common_packages_layer_glue.zip,s3://artifacts-dev/33333/Layers/combined_lambda_layer_glue.zip

# To pass system arguments, uncomment the following cell

Change arguments in terms of your requirements.

In [1]:
import sys
from datetime import datetime


stage = "dev"
current_time = datetime.now()

sys.argv = [
    "user_data_processing_glue_job.py",  # sys.argv[0], script name
    "true",
    "--is_local",
    "true",
    "--job-bookmark-option",
    "job-bookmark-disable",
    "--JOB_ID",
    "j_a197e5a4ae431ff7631a35bdcddbb8b6bebf747667c548020bfbd40447569b25",
    "true",
    "--stage",
    "dev",
    "--JOB_RUN_ID",
    "jr_6a23882f8589db824567693a787ec1014a66dcc868186c2cb8ae26a502f47040",
    "--JOB_NAME",
    "TEST_JOB",
    "--execution_arn",
    f"arn:aws:states:eu-west-1:625904187796:execution:TEST_JOB:try-{current_time.strftime('%Y%m%d%H%M%S%f')}",
    "--job",
    "job_name",
    "--debugging",
    "True",
    "--stage",
    stage,
    "--log_group_name",
    f"/aws-glue/jobs/user_data_processing_glue_job_{stage}",
    "--total_records",
    "1000",
    "--s3_file_path",
    "test",
]

# Initialize AWS Glue Context

In [2]:
import sys
import json
import logging

from glue_utils import argv_to_dict, log_operation
from log_utils import LogUtils

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job


# Get parameters passed to the script
args = getResolvedOptions(
    sys.argv,
    [
        "JOB_NAME",
        "job",
        "execution_arn",
        "debugging",
        "log_group_name",
        "total_records",
        "stage",
    ],
)

log_utils = LogUtils(
    log_group_name=args["log_group_name"],
    log_stream_name=args["JOB_RUN_ID"],
    job_name=args["job"],
    execution_arn=args["execution_arn"],
)

log_utils.configure_logging()

logger = logging.getLogger()

# Initialize Glue context
sparkContext = SparkContext.getOrCreate()
glueContext = GlueContext(sparkContext=sparkContext)
spark = glueContext.spark_session
# logger = glueContext.get_logger()
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


print("sys.argv to dictionary")
print("**********************")
print(json.dumps(obj=sys.argv, indent=4))

args = argv_to_dict(argv=sys.argv)

args_json = json.dumps(args, indent=4)

print("args to dictionary")
print("**********************")
print(args_json)

Jun 03, 2024 10:26:12 AM org.apache.spark.launcher.Log4jHotPatchOption staticJavaAgentOption

log4j:WARN No appenders could be found for logger (org.apache.spark.util.Utils).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


Exception while setting endpoint config: java.lang.NullPointerException
sys.argv to dictionary
**********************
[
    "user_data_processing_glue_job.py",
    "true",
    "--is_local",
    "true",
    "--job-bookmark-option",
    "job-bookmark-disable",
    "--JOB_ID",
    "j_a197e5a4ae431ff7631a35bdcddbb8b6bebf747667c548020bfbd40447569b25",
    "true",
    "--stage",
    "dev",
    "--JOB_RUN_ID",
    "jr_6a23882f8589db824567693a787ec1014a66dcc868186c2cb8ae26a502f47040",
    "--JOB_NAME",
    "TEST_JOB",
    "--execution_arn",
    "arn:aws:states:eu-west-1:625904187796:execution:TEST_JOB:try-20240603102610092683",
    "--job",
    "job_name",
    "--debugging",
    "True",
    "--stage",
    "dev",
    "--log_group_name",
    "/aws-glue/jobs/user_data_processing_glue_job_dev",
    "--total_records",
    "1000",
    "--s3_file_path",
    "test"
]
args to dictionary
**********************
{
    "is_local": true,
    "job_bookmark_option": "job-bookmark-disable",
    "JOB_ID": "j_a1

# To Configure AWS Cloudwatch Logging

In [None]:
log_utils.create_cloud_watch_log_group()

# Business Logic

In [3]:
def read_user_data_csv():
    global user_data_df
    csv_file_path = "./glue_job/user_data.csv"
    user_data_df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
    logger.info(f"CSV file successfully read from path: {csv_file_path}")
    user_data_df.show()


log_operation("Reading User Data CSV", read_user_data_csv)

INFO:root:Starting operation: Reading User Data CSV
INFO:root:CSV file successfully read from path: ./glue_job/user_data.csv        
INFO:root:Completed operation: Reading User Data CSV in 7.96 seconds


+-------+---+-------------+
|   name|age|         city|
+-------+---+-------------+
|  Alice| 30|     New York|
|    Bob| 25|  Los Angeles|
|Charlie| 35|      Chicago|
|  Diana| 28|San Francisco|
|    Eve| 32|       Boston|
|  Frank| 27|      Seattle|
|  Grace| 29|       Austin|
|   Hank| 31|       Denver|
|  Irene| 33|      Phoenix|
|   Jack| 26|    San Diego|
|  Karen| 34|       Dallas|
|    Leo| 28|     San Jose|
|   Mona| 30| Indianapolis|
|   Nate| 25|     Columbus|
| Olivia| 33|    Charlotte|
|   Paul| 29|     Portland|
| Quincy| 35|    Las Vegas|
| Rachel| 28|Oklahoma City|
|  Steve| 32|   Louisville|
|   Tina| 27|    Baltimore|
+-------+---+-------------+
only showing top 20 rows



In [4]:
def filter_users_older_than_30():
    global filtered_user_data_df
    filtered_user_data_df = user_data_df.filter(user_data_df["age"] > 30)
    logger.info("Users older than 30 have been selected.")
    filtered_user_data_df.show()


log_operation("Filtering Users Older Than 30", filter_users_older_than_30)

INFO:root:Starting operation: Filtering Users Older Than 30
INFO:root:Users older than 30 have been selected.
INFO:root:Completed operation: Filtering Users Older Than 30 in 0.51 seconds


+-------+---+-----------+
|   name|age|       city|
+-------+---+-----------+
|Charlie| 35|    Chicago|
|    Eve| 32|     Boston|
|   Hank| 31|     Denver|
|  Irene| 33|    Phoenix|
|  Karen| 34|     Dallas|
| Olivia| 33|  Charlotte|
| Quincy| 35|  Las Vegas|
|  Steve| 32| Louisville|
| Victor| 31|Albuquerque|
|  Wendy| 33|Kansas City|
| Yvonne| 34|    Atlanta|
|  Cindy| 35|      Miami|
|   Ella| 32|  Cleveland|
|  Harry| 31|Minneapolis|
|    Ivy| 33|      Tulsa|
|  Kelly| 34|New Orleans|
|  Oscar| 35|    Anaheim|
|Quentin| 32| Pittsburgh|
+-------+---+-----------+



In [5]:
def select_name_and_city_columns():
    global selected_user_data_df
    selected_user_data_df = filtered_user_data_df.select("name", "city")
    logger.info("'name' and 'city' columns have been selected.")
    selected_user_data_df.show()


log_operation("Selecting Name and City Columns", select_name_and_city_columns)

INFO:root:Starting operation: Selecting Name and City Columns
INFO:root:'name' and 'city' columns have been selected.
INFO:root:Completed operation: Selecting Name and City Columns in 0.33 seconds


+-------+-----------+
|   name|       city|
+-------+-----------+
|Charlie|    Chicago|
|    Eve|     Boston|
|   Hank|     Denver|
|  Irene|    Phoenix|
|  Karen|     Dallas|
| Olivia|  Charlotte|
| Quincy|  Las Vegas|
|  Steve| Louisville|
| Victor|Albuquerque|
|  Wendy|Kansas City|
| Yvonne|    Atlanta|
|  Cindy|      Miami|
|   Ella|  Cleveland|
|  Harry|Minneapolis|
|    Ivy|      Tulsa|
|  Kelly|New Orleans|
|  Oscar|    Anaheim|
|Quentin| Pittsburgh|
+-------+-----------+



In [6]:
# Stop the SparkSession
job.commit()
spark.stop()