Step 1: Create AWS Session and clients

In [None]:
import boto3
from botocore.config import Config

aws_session = boto3.Session(
    aws_access_key_id='',
    aws_secret_access_key='',
    region_name='us-east-1'
)

cfg = Config(retries={"max_attempts": 10, "mode": "standard"}, read_timeout=60, connect_timeout=10)

s3_client = aws_session.client('s3')
glue_client = aws_session.client('glue')

athena  = aws_session.client("athena", config=cfg)


Step 2: Extract the data from Postgres

In [7]:
import psycopg2
import pandas as pd
from io import StringIO
import logging

def extract_postgres_to_s3(
        table_name,
        schema="public",
        where_clause=None,
        partition_date=None,
        pg_config=None
    ):

    # Default configs
    pg_config = pg_config or {
        'host': 'localhost',
        'port': '5432',
        'user': 'postgres',
        'password': '4518',
        'database': 'kaggle_practice'
    }

    bucket='golu-aws-project-bucket'


    logging.basicConfig(level=logging.INFO)

    try:
        conn = psycopg2.connect(**pg_config)
        query = f"SELECT * FROM {schema}.{table_name}"
        if where_clause:
            query += f" WHERE {where_clause}"

        logging.info(f"Running query: {query}")

        df = pd.read_sql(query, conn)
        if df.empty:
            logging.warning("No data found.")
            return None


        # S3 key path
        key = f"raw/{table_name}"
        if partition_date:
            key += f"/dt={partition_date}"
        key += f"/{table_name}.csv"

        csv_buffer = StringIO()
        df.to_csv(csv_buffer, index=False)

        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=csv_buffer.getvalue()
        )

        s3_uri = f"s3://{bucket}/{key}"
        logging.info(f"‚úÖ Uploaded to: {s3_uri}")
        return s3_uri

    except Exception as e:
        logging.error(f"‚ùå Failed: {e}")
        raise

extract_postgres_to_s3("customers", "olist_brazil_e_commerce")



INFO:root:Running query: SELECT * FROM olist_brazil_e_commerce.customers
  df = pd.read_sql(query, conn)
INFO:root:‚úÖ Uploaded to: s3://golu-aws-project-bucket/raw/customers/customers.csv


's3://golu-aws-project-bucket/raw/customers/customers.csv'

Step 3: Crawler

In [None]:
# Create Glue Crawler

crawler_name = "crawler_03"
role = "arn:aws:iam::180294202865:role/glue_role_to_give_full_access_to_s3"
database_name = "mydb_01"
s3_target_path = f"s3://golu-aws-project-bucket/raw/customers/"  # adjust path as needed
table_prefix = "raw_"


try:
    response = glue_client.create_crawler(
        Name=crawler_name,
        Role=role,
        DatabaseName=database_name,
        Description="Crawler created via boto3 from Jupyter notebook",
        Targets={
            "S3Targets": [
                {"Path": s3_target_path}
            ]
        },
        TablePrefix=table_prefix,
        Classifiers=[],
        RecrawlPolicy={"RecrawlBehavior": "CRAWL_EVERYTHING"},
        SchemaChangePolicy={
            "UpdateBehavior": "UPDATE_IN_DATABASE",
            "DeleteBehavior": "DEPRECATE_IN_DATABASE"
        },
        Configuration='{"Version":1.0,"CreatePartitionIndex":true}'
    )
    print(f"Crawler '{crawler_name}' created. Response HTTPStatusCode: {response.get('ResponseMetadata', {}).get('HTTPStatusCode')}")
except Exception as e:
    print("Failed to create crawler:", e)

In [10]:
# List Crawlers ::
response = glue_client.get_crawlers()
crawlers = response['Crawlers']
for crawler in crawlers:
    print(f"Crawler Name: {crawler['Name']}, Crawler State: {crawler['State']}")

Crawler Name: CRAWLER_04, Crawler State: READY
Crawler Name: crawler_01, Crawler State: READY
Crawler Name: crawler_02, Crawler State: READY
Crawler Name: crawler_03, Crawler State: STOPPING


In [None]:
# Start the crawler
response = glue_client.start_crawler(Name='crawler_03')
print(response)

{'ResponseMetadata': {'RequestId': '909f9d4e-68c9-4714-9919-064fc941a933', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Tue, 04 Nov 2025 18:11:12 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': '909f9d4e-68c9-4714-9919-064fc941a933', 'cache-control': 'no-cache'}, 'RetryAttempts': 0}}


In [17]:
# Get status of the crawler
state = glue_client.get_crawler(Name=crawler_name)["Crawler"]["State"]
print(f"Current Crawler State: {state}")


Current Crawler State: RUNNING


Step 4: Glue Job

In [18]:
# Upload Glue Job Script to S3 
s3_client.upload_file(r"/Users/pavanhalde/Downloads/glue_job_05.py", 'golu-aws-project-bucket', 'scripts/glue_job_05.py')

# Note: In industry we use CI/CD pipelines to automate such tasks

In [20]:
# create Glue Job 
response = glue_client.create_job(
    Name="glue_job_05",
    Role="arn:aws:iam::180294202865:role/glue_role_to_give_full_access_to_s3",
    Command={
        'Name': 'glueetl',
        'ScriptLocation': "s3://golu-aws-project-bucket/scripts/glue_job_05.py"
    },
    GlueVersion='4.0',
    WorkerType='G.1X',
    NumberOfWorkers=2,
    ExecutionProperty={
        'MaxConcurrentRuns': 1
    }
)

In [21]:
# Listing jobs in account:
response = glue_client.get_jobs()
jobs=response['Jobs']
for job in jobs:
    print(f"Job Name: {job['Name']}")

Job Name: glue_job_01
Job Name: glue_job_02
Job Name: glue_job_03
Job Name: glue_job_04
Job Name: glue_job_05


In [22]:
# Start Job Run ::
response = glue_client.start_job_run(JobName='glue_job_05')
job_run_id = response['JobRunId']
print(f"Job run started with ID: {job_run_id}")


Job run started with ID: jr_a95c6dc727d516ba5bbddf15684e532e6d7583d7f68ba3265461774d9724ea72


In [24]:
# Get Job Run::
response = glue_client.get_job_run(JobName='glue_job_05', RunId=job_run_id)
job_run = response['JobRun']
print(f"Job Run Status: {job_run['JobRunState']}")


Job Run Status: SUCCEEDED


Step 5: Crawler for refined data

In [25]:
# Create Glue Crawler

crawler_name = "crawler_refined_03"
role = "arn:aws:iam::180294202865:role/glue_role_to_give_full_access_to_s3"
database_name = "mydb_01"
s3_target_path = f"s3://golu-aws-project-bucket/output/brazil_e_commerce/customer_refined/" 
table_prefix = "refined_"


try:
    response = glue_client.create_crawler(
        Name=crawler_name,
        Role=role,
        DatabaseName=database_name,
        Description="Crawler created via boto3 from Jupyter notebook",
        Targets={
            "S3Targets": [
                {"Path": s3_target_path}
            ]
        },
        TablePrefix=table_prefix,
        Classifiers=[],
        RecrawlPolicy={"RecrawlBehavior": "CRAWL_EVERYTHING"},
        SchemaChangePolicy={
            "UpdateBehavior": "UPDATE_IN_DATABASE",
            "DeleteBehavior": "DEPRECATE_IN_DATABASE"
        },
        Configuration='{"Version":1.0,"CreatePartitionIndex":true}'
    )
    print(f"Crawler '{crawler_name}' created. Response HTTPStatusCode: {response.get('ResponseMetadata', {}).get('HTTPStatusCode')}")
except Exception as e:
    print("Failed to create crawler:", e)

Crawler 'crawler_refined_03' created. Response HTTPStatusCode: 200


In [26]:
# List Crawlers ::
response = glue_client.get_crawlers()
crawlers = response['Crawlers']
for crawler in crawlers:
    print(f"Crawler Name: {crawler['Name']}, Crawler State: {crawler['State']}")

Crawler Name: CRAWLER_04, Crawler State: READY
Crawler Name: crawler_01, Crawler State: READY
Crawler Name: crawler_02, Crawler State: READY
Crawler Name: crawler_03, Crawler State: READY
Crawler Name: crawler_refined_03, Crawler State: READY


In [27]:
# Start the crawler
response = glue_client.start_crawler(Name='crawler_refined_03')
print(response)

{'ResponseMetadata': {'RequestId': '1a2e7264-617f-43f5-8238-e055d745a0b3', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Tue, 04 Nov 2025 18:33:20 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': '1a2e7264-617f-43f5-8238-e055d745a0b3', 'cache-control': 'no-cache'}, 'RetryAttempts': 0}}


In [28]:
# Get status of the crawler
state = glue_client.get_crawler(Name=crawler_name)["Crawler"]["State"]
print(f"Current Crawler State: {state}")


Current Crawler State: RUNNING


Step 6: Validation through Athena 

In [32]:
import boto3
import time


# Configuration
WORKGROUP       = "primary"
OUTPUT_LOCATION = "s3://golu-aws-project-bucket/athena-results/"  # For Athena query results only
DATABASE        = "mydb_01"  # Your Glue database name
RAW_TABLE       = "raw_customers"  # Original raw table
REFINED_TABLE   = "refined_customer_refined"  # Table created by crawler (with prefix)

print("=" * 60)
print("üîç VALIDATING GLUE JOB TRANSFORMATIONS")
print("=" * 60)

def run_athena_query(sql, description):
    """Helper function to run Athena query and display results"""
    print(f"\n{'='*60}")
    print(f"üìä {description}")
    print(f"{'='*60}")
    print(f"Query: {sql[:100]}...")
    
    # Start query execution
    resp = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={"Database": DATABASE},
        WorkGroup=WORKGROUP,
        ResultConfiguration={"OutputLocation": OUTPUT_LOCATION}
    )
    qid = resp["QueryExecutionId"]
    print(f"Query ID: {qid}")
    
    # Wait for completion
    print("‚è≥ Waiting for query to complete...", end="")
    while True:
        status = athena.get_query_execution(QueryExecutionId=qid)["QueryExecution"]["Status"]["State"]
        if status in ("SUCCEEDED", "FAILED", "CANCELLED"):
            break
        print(".", end="", flush=True)
        time.sleep(2)
    
    print(f"\nStatus: {status}")
    
    if status != "SUCCEEDED":
        detail = athena.get_query_execution(QueryExecutionId=qid)["QueryExecution"]["Status"]
        print(f"‚ùå Query failed: {detail}")
        return None
    
    # Get results
    results = athena.get_query_results(QueryExecutionId=qid)
    headers = [c.get("VarCharValue", "") for c in results["ResultSet"]["Rows"][0]["Data"]]
    rows = [[c.get("VarCharValue", None) for c in r["Data"]] 
            for r in results["ResultSet"]["Rows"][1:]]
    
    # Display results
    print("\n" + " | ".join(headers))
    print("-" * 60)
    for row in rows:
        print(" | ".join(str(cell) if cell else "NULL" for cell in row))
    print(f"\n‚úÖ Rows returned: {len(rows)}")
    
    return rows


üîç VALIDATING GLUE JOB TRANSFORMATIONS


In [33]:

# ============================================================
# VALIDATION 1: Check if customer_city is UPPERCASE
# ============================================================
sql_uppercase_check = f"""
SELECT 
    CASE 
        WHEN customer_city = UPPER(customer_city) THEN 'PASS - All Uppercase'
        ELSE 'FAIL - Not Uppercase'
    END AS uppercase_validation,
    COUNT(*) AS count
FROM {DATABASE}.{REFINED_TABLE}
GROUP BY 1;
"""
run_athena_query(sql_uppercase_check, "Validation 1: Check Uppercase Transformation")



üìä Validation 1: Check Uppercase Transformation
Query: 
SELECT 
    CASE 
        WHEN customer_city = UPPER(customer_city) THEN 'PASS - All Uppercase'
   ...
Query ID: 750d312c-7413-40df-ac17-697b4edd20d8
‚è≥ Waiting for query to complete....
Status: SUCCEEDED

uppercase_validation | count
------------------------------------------------------------
PASS - All Uppercase | 41746

‚úÖ Rows returned: 1


[['PASS - All Uppercase', '41746']]

In [34]:

# ============================================================
# VALIDATION 2: Verify only SP state exists
# ============================================================
sql_state_check = f"""
SELECT 
    customer_state,
    COUNT(*) AS count
FROM {DATABASE}.{REFINED_TABLE}
GROUP BY customer_state
ORDER BY count DESC;
"""
run_athena_query(sql_state_check, "Validation 2: Verify State Filter (Should be SP only)")



üìä Validation 2: Verify State Filter (Should be SP only)
Query: 
SELECT 
    customer_state,
    COUNT(*) AS count
FROM mydb_01.refined_customer_refined
GROUP BY cu...
Query ID: 465d56d4-02be-477c-8a9e-b5e7bc97e132
‚è≥ Waiting for query to complete....
Status: SUCCEEDED

customer_state | count
------------------------------------------------------------
SP | 41746

‚úÖ Rows returned: 1


[['SP', '41746']]

In [35]:

# ============================================================
# VALIDATION 3: Check if customer_unique_id column was dropped
# ============================================================
sql_column_check = f"""
SELECT * 
FROM {DATABASE}.{REFINED_TABLE} 
LIMIT 1;
"""
print(f"\n{'='*60}")
print("üìä Validation 3: Check Dropped Columns")
print(f"{'='*60}")
print("Query: Fetching table schema...")

resp = athena.start_query_execution(
    QueryString=sql_column_check,
    QueryExecutionContext={"Database": DATABASE},
    WorkGroup=WORKGROUP,
    ResultConfiguration={"OutputLocation": OUTPUT_LOCATION}
)
qid = resp["QueryExecutionId"]

# Wait for completion
while True:
    status = athena.get_query_execution(QueryExecutionId=qid)["QueryExecution"]["Status"]["State"]
    if status in ("SUCCEEDED", "FAILED", "CANCELLED"):
        break
    time.sleep(2)

if status == "SUCCEEDED":
    results = athena.get_query_results(QueryExecutionId=qid)
    headers = [c.get("VarCharValue", "") for c in results["ResultSet"]["Rows"][0]["Data"]]
    print("\nColumns in refined table:")
    for i, col in enumerate(headers, 1):
        print(f"  {i}. {col}")
    
    if "customer_unique_id" in headers:
        print("\n‚ùå FAIL: customer_unique_id column still exists!")
    else:
        print("\n‚úÖ PASS: customer_unique_id column was successfully dropped!")



üìä Validation 3: Check Dropped Columns
Query: Fetching table schema...

Columns in refined table:
  1. customer_id
  2. customer_zip_code_prefix
  3. customer_city
  4. customer_state

‚úÖ PASS: customer_unique_id column was successfully dropped!


In [36]:

# ============================================================
# VALIDATION 4: Compare Raw vs Refined Row Counts
# ============================================================
sql_compare_counts = f"""
SELECT 
    'Raw Table' AS source,
    COUNT(*) AS row_count
FROM {DATABASE}.{RAW_TABLE}
UNION ALL
SELECT 
    'Refined Table (SP only)' AS source,
    COUNT(*) AS row_count
FROM {DATABASE}.{REFINED_TABLE};
"""
run_athena_query(sql_compare_counts, "Validation 4: Compare Raw vs Refined Row Counts")



üìä Validation 4: Compare Raw vs Refined Row Counts
Query: 
SELECT 
    'Raw Table' AS source,
    COUNT(*) AS row_count
FROM mydb_01.raw_customers
UNION ALL
S...
Query ID: 520f4847-135e-4832-b2bd-9d68ca3f868a
‚è≥ Waiting for query to complete....
Status: SUCCEEDED

source | row_count
------------------------------------------------------------
Refined Table (SP only) | 41746
Raw Table | 99441

‚úÖ Rows returned: 2


[['Refined Table (SP only)', '41746'], ['Raw Table', '99441']]

In [37]:

# ============================================================
# VALIDATION 5: Sample refined data
# ============================================================
sql_sample = f"""
SELECT 
    customer_id,
    customer_zip_code_prefix,
    customer_city,
    customer_state
FROM {DATABASE}.{REFINED_TABLE}
LIMIT 10;
"""
run_athena_query(sql_sample, "Validation 5: Sample Refined Data (First 10 rows)")



üìä Validation 5: Sample Refined Data (First 10 rows)
Query: 
SELECT 
    customer_id,
    customer_zip_code_prefix,
    customer_city,
    customer_state
FROM m...
Query ID: 532e971a-80df-4e7e-8e01-56cc20ed80d4
‚è≥ Waiting for query to complete....
Status: SUCCEEDED

customer_id | customer_zip_code_prefix | customer_city | customer_state
------------------------------------------------------------
06b8999e2fba1a1fbc88172c00ba8bc7 | 14409.0 | FRANCA | SP
18955e83d337fd6b2def6b18a428ac77 | 9790.0 | SAO BERNARDO DO CAMPO | SP
4e7b3e00288586ebd08712fdd0374a03 | 1151.0 | SAO PAULO | SP
b2b6027bc5c5109e529d4dc6358b12c3 | 8775.0 | MOGI DAS CRUZES | SP
4f2d8ab171c80ec8364f7c12e35b23ad | 13056.0 | CAMPINAS | SP
fd826e7cf63160e536e0908c76c3f441 | 4534.0 | SAO PAULO | SP
b2d1536598b73a9abd18e0d75d92f0a3 | 18682.0 | LENCOIS PAULISTA | SP
eabebad39a88bb6f5b52376faec28612 | 5704.0 | SAO PAULO | SP
206f3129c0e4d7d0b9550426023f0a08 | 13412.0 | PIRACICABA | SP
c5c61596a3b6bd0cee5766992c48a9a1 | 7124

[['06b8999e2fba1a1fbc88172c00ba8bc7', '14409.0', 'FRANCA', 'SP'],
 ['18955e83d337fd6b2def6b18a428ac77', '9790.0', 'SAO BERNARDO DO CAMPO', 'SP'],
 ['4e7b3e00288586ebd08712fdd0374a03', '1151.0', 'SAO PAULO', 'SP'],
 ['b2b6027bc5c5109e529d4dc6358b12c3', '8775.0', 'MOGI DAS CRUZES', 'SP'],
 ['4f2d8ab171c80ec8364f7c12e35b23ad', '13056.0', 'CAMPINAS', 'SP'],
 ['fd826e7cf63160e536e0908c76c3f441', '4534.0', 'SAO PAULO', 'SP'],
 ['b2d1536598b73a9abd18e0d75d92f0a3', '18682.0', 'LENCOIS PAULISTA', 'SP'],
 ['eabebad39a88bb6f5b52376faec28612', '5704.0', 'SAO PAULO', 'SP'],
 ['206f3129c0e4d7d0b9550426023f0a08', '13412.0', 'PIRACICABA', 'SP'],
 ['c5c61596a3b6bd0cee5766992c48a9a1', '7124.0', 'GUARULHOS', 'SP']]

In [None]:

# ============================================================
# VALIDATION 6: Top cities in refined data
# ============================================================
sql_top_cities = f"""
SELECT 
    customer_city,
    customer_state,
    COUNT(*) AS customer_count
FROM {DATABASE}.{REFINED_TABLE}
GROUP BY customer_city, customer_state
ORDER BY customer_count DESC
LIMIT 10;
"""
run_athena_query(sql_top_cities, "Validation 6: Top 10 Cities in Refined Data")

print("\n" + "="*60)
print("‚úÖ ALL VALIDATIONS COMPLETE!")
print("="*60)
print("\nSummary of Validations:")
print("1. ‚úì Uppercase transformation on customer_city")
print("2. ‚úì State filter (SP only)")
print("3. ‚úì Column drop (customer_unique_id)")
print("4. ‚úì Row count comparison")
print("5. ‚úì Sample data inspection")
print("6. ‚úì Top cities analysis")