In [1]:
import os
import pandas as pd
import boto3
import sys
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, current_date, year
from utils.env import load_env, get_env_or_args

In [2]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


In [3]:
RELATIVE_PATH_PREFIX="./../../.."

In [4]:
def calculate_age(data_frame):
    # Calculating the current year using the current date
    current_year = year(current_date())

    # Calculating the age by subtracting the year_of_birth from the current year
    age_column = current_year - col("year_of_birth")

    # Adding the calculated age as a new column to the DataFrame
    data_frame_with_age = data_frame.withColumn("age", age_column)

    return data_frame_with_age

In [5]:
def transform_to_lower_case(data_frame):
    # Converting all column names to lower snake_case
    for col_name in data_frame.columns:
        snake_case_name = col_name.lower().replace(" ", "_")
        data_frame = data_frame.withColumnRenamed(col_name, snake_case_name)

    # Computing full_name by concatenating first_name and last_name
    data_frame = data_frame.withColumn("full_name", F.concat_ws(" ", data_frame["first_name"], data_frame["last_name"]))
    return data_frame

In [6]:
def read_data(environment, local_file_path, crawler_name):
    if environment == "local":
        # Read data from local file (e.g., using Pandas)
        data = spark.read.csv(local_file_path, header=True)
    else:
        # Create or retrieve a Spark context
        sc = SparkContext.getOrCreate()
        glueContext = GlueContext(sc)
        glue_client = boto3.client('glue')

        # Get the crawler metadata
        crawler_metadata = glue_client.get_crawler(Name=crawler_name)

        # keys = list(crawler_metadata['Crawler'].keys())

        database_name = crawler_metadata['Crawler']['DatabaseName']
        prefix = crawler_metadata['Crawler']['Targets']['S3Targets'][0]['Path'] if 'S3Targets' in crawler_metadata['Crawler']['Targets'] else None

        # Get table names from the Glue Data Catalog using the database and prefix
        response = glue_client.get_tables(DatabaseName=database_name)
        # error_message = f"The keys are: {response['TableList']}, {prefix}"
        # raise Exception(error_message)
        tables = [table['Name'] for table in response['TableList']]

        # Assuming the first table name matches the prefix, use it for reading data
        table_name = tables[0]
        print("crawler_metadata")



        # Read data from Glue Data Catalog
        dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
            database=database_name,
            table_name=table_name
        )

        # Convert to a Spark DataFrame (or Pandas DataFrame if needed)
        data = dynamic_frame.toDF()

    return data

In [7]:
def write_to_csv(data_frame, environment, target_directory, target_bucket):
    # Determine the output path based on the environment
    if environment == 'local':

        output_path = os.path.join(target_directory, 'output.csv')
        print(output_path)
        data_frame.write.csv(output_path, mode='overwrite', header=True)
    else:
        output_path = f"s3://{target_bucket}/{target_directory}"
        data_frame.write.csv(output_path, mode='overwrite', header=True)
    return data_frame

In [9]:
environment = get_env_or_args("ENVIRONMENT_NAME")
load_env(environment)
local_file_path = f"{RELATIVE_PATH_PREFIX}/data/raw/sample.csv"
crawler_name = get_env_or_args("SOURCE_CRAWLER")
data = read_data(get_env_or_args("ENVIRONMENT_NAME"), local_file_path, crawler_name)
print(data.show())

+----------+---------+--------------------+-------------+
|First Name|Last Name|               Email|Year of Birth|
+----------+---------+--------------------+-------------+
|     Rohan|    Gupta|rohan.gupta@wedne...|         1992|
+----------+---------+--------------------+-------------+

None


In [10]:
target_bucket = get_env_or_args("BUCKET_NAME")
target_directory = f"{RELATIVE_PATH_PREFIX if environment == 'local' else target_bucket}/landing/{get_env_or_args('JOB_NAME')}"
transformed_data = transform_to_lower_case(data)
final_data_with_age = calculate_age(transformed_data)
print(final_data_with_age.show())
write_to_csv(final_data_with_age, environment, target_directory, target_bucket)
job.commit()

+----------+---------+--------------------+-------------+-----------+----+
|first_name|last_name|               email|year_of_birth|  full_name| age|
+----------+---------+--------------------+-------------+-----------+----+
|     Rohan|    Gupta|rohan.gupta@wedne...|         1992|Rohan Gupta|31.0|
+----------+---------+--------------------+-------------+-----------+----+

None
./../../../landing/job2/output.csv
