In [1]:
import os
import boto3
from botocore.exceptions import ClientError
from dotenv import load_dotenv
import time
import re

load_dotenv()

True

In [2]:
# Environment Configuration
ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
SECRET_KEY = os.getenv("AWS_SECRET_KEY")
AWS_REGION = os.getenv("AWS_REGION")
GLUE_ROLE_ARN = os.getenv("GLUE_ROLE_ARN")
LAMBDA_ROLE_ARN = os.getenv("LAMBDA_ROLE_ARN")
S3_BUCKET_COMPLETE_PIPELINE = os.getenv("S3_BUCKET_COMPLETE_PIPELINE")
S3_UPLOAD_FOLDER = "raw/"
S3_PROCESSED_FOLDER = "processed/"
S3_GLUE_OUTPUT_FOLDER = "glue-output/"

# COMPLETE DATA LAKE WORKFLOW

```
+-----------------------------------------------------------------------------------+
|                        COMPLETE DATA LAKE PIPELINE                                |
+-----------------------------------------------------------------------------------+
|                                                                                   |
|  1. INGEST                                                                        |
|     +-------------------+                                                         |
|     | Upload to S3      |  <-- Manual, Lambda, Kinesis Firehose                   |
|     | (raw/ folder)     |                                                         |
|     +-------------------+                                                         |
|              |                                                                    |
|              v                                                                    |
|  2. CATALOG                                                                       |
|     +-------------------+                                                         |
|     | Glue Crawler      |  --> Discovers schema, creates tables                   |
|     | (Data Catalog)    |                                                         |
|     +-------------------+                                                         |
|              |                                                                    |
|              v                                                                    |
|  3. TRANSFORM                                                                     |
|     +-------------------+                                                         |
|     | Glue ETL Job      |  --> Clean, transform, convert to Parquet               |
|     | (processed/)      |                                                         |
|     +-------------------+                                                         |
|              |                                                                    |
|              v                                                                    |
|  4. CATALOG PROCESSED                                                             |
|     +-------------------+                                                         |
|     | Another Crawler   |  --> Update catalog with processed tables               |
|     +-------------------+                                                         |
|              |                                                                    |
|              v                                                                    |
|  5. ANALYZE                                                                       |
|     +-------------------+                                                         |
|     | Athena SQL        |  --> Fast, serverless analytics                         |
|     | Queries           |                                                         |
|     +-------------------+                                                         |
|              |                                                                    |
|              v                                                                    |
|  6. VISUALIZE (Optional)                                                          |
|     +-------------------+                                                         |
|     | QuickSight        |  --> Dashboards, reports                                |
|     +-------------------+                                                         |
|                                                                                   |
|  All serverless, managed, pay-per-use!                                            |
+-----------------------------------------------------------------------------------+
```

# PHASE 1: DATA INGESTION

```
┌─────────────────────────────────────────────────────────────────────────────┐
│  STEP 1: INGEST DATA TO S3                                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   Local Files ──────┐                                                       │
│                     │                                                       │
│   APIs/Streams ─────┼────►  S3 Bucket  ────►  raw/ folder                   │
│                     │      (Landing Zone)                                   │
│   Lambda Events ────┘                                                       │
│                                                                             │
│   Key Operations:                                                           │
│   • Create S3 bucket (landing zone)                                         │
│   • Upload raw data files                                                   │
│   • Verify uploads & read content                                           │
│   • Generate presigned URLs for sharing                                     │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```

## 1.1 Initialize S3 Client

Configure the S3 client with **Signature Version 4** (required for presigned URLs) and regional endpoint.

In [3]:
# create an s3 client
from botocore.config import Config

# Configure S3 client with Signature Version 4 and regional endpoint
# Regional endpoint is required for presigned URLs to work correctly

s3_client = boto3.client('s3', 
                         endpoint_url=f'https://s3.{AWS_REGION}.amazonaws.com',
                         config=Config(signature_version='s3v4'),
                         region_name=AWS_REGION,
                         aws_secret_access_key=SECRET_KEY,
                         aws_access_key_id=ACCESS_KEY,
                         )

## 1.2 Create S3 Bucket (Landing Zone)

The bucket serves as the **landing zone** for all raw data. All incoming files go to the `raw/` prefix.

In [4]:
from botocore.exceptions import ClientError

def create_bucket(bucket_name, region=None):
    """
    Create an S3 bucket in a specified region
    
    Args:
        bucket_name (str): Name for the bucket (must be globally unique)
        region (str): AWS region (if None, uses default from client)
    
    Returns:
        bool: True if bucket created, False otherwise
    """
    try:
        if region is None or region == 'us-east-1':
            # us-east-1 doesn't require LocationConstraint
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            # Other regions require LocationConstraint
            s3_client.create_bucket(
                Bucket=bucket_name,
                CreateBucketConfiguration={'LocationConstraint': region}
            )
        print(f"SUCCESS: Bucket '{bucket_name}' created successfully")
        return True
    except ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == 'BucketAlreadyExists':
            print(f"ERROR: Bucket '{bucket_name}' already exists (owned by someone else)")
        elif error_code == 'BucketAlreadyOwnedByYou':
            print(f"INFO: Bucket '{bucket_name}' already exists and is owned by you")
        else:
            print(f"ERROR: Failed to create bucket - {e}")
        return False

# Example usage (uncomment to test):
create_bucket(S3_BUCKET_COMPLETE_PIPELINE, region=AWS_REGION)

INFO: Bucket 's3-complete-pipeline' already exists and is owned by you


False

## 1.3 Upload Raw Data

Upload files to the `raw/` folder. This is where Glue Crawlers will discover and catalog the data schema.

In [5]:
# upload a file to s3
def upload_file(file_name, bucket, object_name=None):
    """
    Upload a file to an S3 bucket
    
    Args:
        file_name (str): Path to file to upload
        bucket (str): Bucket name
        object_name (str): S3 object name (if None, uses file_name)
    
    Returns:
        bool: True if upload successful, False otherwise
    """
    # If S3 object_name not specified, use file_name
    if object_name is None:
        object_name = os.path.basename(file_name)
    
    try:
        s3_client.upload_file(file_name, bucket, object_name)
        print(f"SUCCESS: '{file_name}' uploaded to '{bucket}/{object_name}'")
        return True
    except FileNotFoundError:
        print(f"ERROR: File '{file_name}' not found")
        return False
    except ClientError as e:
        print(f"ERROR: Failed to upload file - {e}")
        return False

# Example usage (uncomment to test):
upload_file('data/hosts.csv', S3_BUCKET_COMPLETE_PIPELINE, f'{S3_UPLOAD_FOLDER}hosts.csv')

SUCCESS: 'data/hosts.csv' uploaded to 's3-complete-pipeline/raw/hosts.csv'


True

## 1.4 Verify Upload (Read Content)

Read the uploaded file directly from S3 to confirm the data landed correctly.

In [6]:
def read_object(bucket, object_name):
    """
    Read S3 object content directly into memory
    
    Args:
        bucket (str): Bucket name
        object_name (str): S3 object name to read
    
    Returns:
        str: File content as string, or None if error
    """
    try:
        response = s3_client.get_object(Bucket=bucket, Key=object_name)
        content = response['Body'].read().decode('utf-8')
        print(f"SUCCESS: Read {len(content)} characters from '{bucket}/{object_name}'")
        return content
    except ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == 'NoSuchKey':
            print(f"ERROR: Object '{object_name}' not found in bucket '{bucket}'")
        else:
            print(f"ERROR: Failed to read object - {e}")
        return None
    except Exception as e:
        print(f"ERROR: Unexpected error - {e}")
        return None

# Example usage (uncomment to test):
content = read_object(S3_BUCKET_COMPLETE_PIPELINE, f'{S3_UPLOAD_FOLDER}hosts.csv')
if content:
    print("Content:")
    print(content[:500])  # Print first 500 characters

SUCCESS: Read 13083 characters from 's3-complete-pipeline/raw/hosts.csv'
Content:
host_id,host_name,host_since,is_superhost,response_rate,created_at
1,Timothy Parker,2018-03-20,False,99,2025-12-26 14:15:54.011160
2,Hannah Evans,2024-01-01,False,95,2025-12-26 14:15:54.011160
3,Crystal Green,2016-08-06,False,74,2025-12-26 14:15:54.011160
4,Kevin Johnson,2020-02-25,False,100,2025-12-26 14:15:54.011160
5,Monica Johnson,2024-11-11,False,77,2025-12-26 14:15:54.011160
6,Nancy Turner,2016-11-03,False,96,2025-12-26 14:15:54.011160
7,Gerald Hunt,2022-01-21,True,99,2025-12-26 14:


## 1.5 List Objects in Bucket

View all objects in the `raw/` folder with detailed metadata (size, last modified, storage class).

In [7]:
from datetime import datetime

def list_objects_detailed(bucket, prefix=''):
    """
    List all objects in a bucket with detailed metadata
    
    Args:
        bucket (str): Bucket name
        prefix (str): Filter objects by prefix (folder path)
    
    Returns:
        list: List of object metadata dictionaries, or empty list if error
    """
    try:
        response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
        
        if 'Contents' not in response:
            print(f"No objects found in bucket '{bucket}' with prefix '{prefix}'")
            return []
        
        objects = []
        print(f"Objects in '{bucket}/{prefix}':")
        
        for obj in response['Contents']:
            # Convert size to human-readable format
            size_bytes = obj['Size']
            if size_bytes < 1024:
                size_str = f"{size_bytes} B"
            elif size_bytes < 1024**2:
                size_str = f"{size_bytes/1024:.2f} KB"
            else:
                size_str = f"{size_bytes/(1024**2):.2f} MB"
            
            # Format last modified date
            last_modified = obj['LastModified'].strftime('%Y-%m-%d %H:%M:%S')
            
            print(f"Key: {obj['Key']}")
            print(f"Size: {size_str} ({size_bytes:,} bytes)")
            print(f"Last Modified: {last_modified}")
            print(f"Storage Class: {obj.get('StorageClass', 'STANDARD')}")
            print(f"ETag: {obj['ETag']}")
            
            objects.append(obj)
        
        print(f"Total: {len(objects)} object(s)")
        return objects
        
    except ClientError as e:
        print(f"ERROR: Failed to list objects - {e}")
        return []

# Example usage (uncomment to test):
list_objects_detailed(S3_BUCKET_COMPLETE_PIPELINE, prefix=f'{S3_UPLOAD_FOLDER}')

Objects in 's3-complete-pipeline/raw/':
Key: raw/bookings.csv
Size: 501.35 KB (513,378 bytes)
Last Modified: 2026-01-31 03:36:02
Storage Class: STANDARD
ETag: "203775ebda6b0e99de614895de78159f"
Key: raw/hosts.csv
Size: 12.78 KB (13,083 bytes)
Last Modified: 2026-01-31 07:15:50
Storage Class: STANDARD
ETag: "7588197a4f4c485949e7bfc641356122"
Key: raw/lambda-bookings.csv
Size: 1.90 KB (1,942 bytes)
Last Modified: 2026-01-31 07:10:26
Storage Class: STANDARD
ETag: "3a54a253c74c489770db51e83dda2644"
Total: 3 object(s)


[{'Key': 'raw/bookings.csv',
  'LastModified': datetime.datetime(2026, 1, 31, 3, 36, 2, tzinfo=tzutc()),
  'ETag': '"203775ebda6b0e99de614895de78159f"',
  'ChecksumAlgorithm': ['CRC32'],
  'ChecksumType': 'FULL_OBJECT',
  'Size': 513378,
  'StorageClass': 'STANDARD'},
 {'Key': 'raw/hosts.csv',
  'LastModified': datetime.datetime(2026, 1, 31, 7, 15, 50, tzinfo=tzutc()),
  'ETag': '"7588197a4f4c485949e7bfc641356122"',
  'ChecksumAlgorithm': ['CRC32'],
  'ChecksumType': 'FULL_OBJECT',
  'Size': 13083,
  'StorageClass': 'STANDARD'},
 {'Key': 'raw/lambda-bookings.csv',
  'LastModified': datetime.datetime(2026, 1, 31, 7, 10, 26, tzinfo=tzutc()),
  'ETag': '"3a54a253c74c489770db51e83dda2644"',
  'ChecksumAlgorithm': ['CRC32'],
  'ChecksumType': 'FULL_OBJECT',
  'Size': 1942,
  'StorageClass': 'STANDARD'}]

## 1.6 Get Object Metadata

Retrieve detailed metadata for a specific object (content type, encryption, custom tags).

In [8]:
def get_object_metadata(bucket, object_name):
    """
    Retrieve metadata for an S3 object
    
    Args:
        bucket (str): Bucket name
        object_name (str): S3 object name (key)
    
    Returns:
        dict: Metadata dictionary, or None if error
    """
    try:
        response = s3_client.head_object(Bucket=bucket, Key=object_name)
        
        print(f"Metadata for '{bucket}/{object_name}':")
        
        # System metadata
        print("SYSTEM METADATA:")
        print(f"Content-Type: {response.get('ContentType', 'N/A')}")
        print(f"Content-Length: {response.get('ContentLength', 0):,} bytes")
        print(f"Last-Modified: {response.get('LastModified', 'N/A')}")
        print(f"ETag: {response.get('ETag', 'N/A')}")
        print(f"Storage-Class: {response.get('StorageClass', 'STANDARD')}")
        
        # User metadata (custom)
        user_metadata = response.get('Metadata', {})
        if user_metadata:
            print("USER METADATA (Custom):")
            for key, value in user_metadata.items():
                print(f"  {key}: {value}")
        else:
            print("USER METADATA: None")
        
        return response
        
    except ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == '404':
            print(f"ERROR: Object '{object_name}' not found in bucket '{bucket}'")
        else:
            print(f"ERROR: Failed to get metadata - {e}")
        return None

# Example usage (uncomment to test):
get_object_metadata(S3_BUCKET_COMPLETE_PIPELINE, f'{S3_UPLOAD_FOLDER}hosts.csv')

Metadata for 's3-complete-pipeline/raw/hosts.csv':
SYSTEM METADATA:
Content-Type: binary/octet-stream
Content-Length: 13,083 bytes
Last-Modified: 2026-01-31 07:15:50+00:00
ETag: "7588197a4f4c485949e7bfc641356122"
Storage-Class: STANDARD
USER METADATA: None


{'ResponseMetadata': {'RequestId': '4S4YF0RHWW8FD1PR',
  'HostId': 'zEiS5uFU81eio2AjgOwq5BlTqKPYTE1GgDL4/UcVGburzuiNCad5w9k6JOxrPb6xFDvi/wJ0a3f8BGD55aLHv8BuJr2ZaUya',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'zEiS5uFU81eio2AjgOwq5BlTqKPYTE1GgDL4/UcVGburzuiNCad5w9k6JOxrPb6xFDvi/wJ0a3f8BGD55aLHv8BuJr2ZaUya',
   'x-amz-request-id': '4S4YF0RHWW8FD1PR',
   'date': 'Sat, 31 Jan 2026 07:15:55 GMT',
   'last-modified': 'Sat, 31 Jan 2026 07:15:50 GMT',
   'etag': '"7588197a4f4c485949e7bfc641356122"',
   'x-amz-server-side-encryption': 'AES256',
   'accept-ranges': 'bytes',
   'content-type': 'binary/octet-stream',
   'content-length': '13083',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'AcceptRanges': 'bytes',
 'LastModified': datetime.datetime(2026, 1, 31, 7, 15, 50, tzinfo=tzutc()),
 'ContentLength': 13083,
 'ETag': '"7588197a4f4c485949e7bfc641356122"',
 'ContentType': 'binary/octet-stream',
 'ServerSideEncryption': 'AES256',
 'Metadata': {}}

## 1.7 Generate Presigned URL (Share Data)

Create time-limited URLs for secure sharing without exposing AWS credentials.

| Expiration | Use Case |
|------------|----------|
| 600s (10 min) | Quick one-time downloads |
| 3600s (1 hour) | Team collaboration |
| 86400s (24 hours) | External sharing |
| 604800s (7 days) | Maximum allowed |

In [9]:
def generate_presigned_download_url(bucket, object_name, expiration=3600):
    """
    Generate a presigned URL for downloading an S3 object
    
    Args:
        bucket (str): Bucket name
        object_name (str): S3 object name (key)
        expiration (int): URL expiration time in seconds (default 3600 = 1 hour)
    
    Returns:
        str: Presigned URL, or None if error
    
    Common expiration times:
        - 3600 = 1 hour (default)
        - 7200 = 2 hours
        - 86400 = 24 hours
        - 604800 = 7 days (maximum)
    
    Note: Uses AWS Signature Version 4 (required by S3)
    """
    try:
        url = s3_client.generate_presigned_url(
            'get_object',
            Params={
                'Bucket': bucket,
                'Key': object_name
            },
            ExpiresIn=expiration
        )
        
        print(f"SUCCESS: Presigned URL generated for '{bucket}/{object_name}'")
        print(f"Expires in: {expiration} seconds ({expiration/3600:.1f} hours)")
        print(f"\nURL (valid for {expiration/3600:.1f} hours):")
        print(url)
        print("\nAnyone with this URL can download the file until it expires.")
        
        return url
        
    except ClientError as e:
        print(f"ERROR: Failed to generate presigned URL - {e}")
        return None


# Example usage (uncomment to test):
url = generate_presigned_download_url(S3_BUCKET_COMPLETE_PIPELINE, f'{S3_UPLOAD_FOLDER}hosts.csv', expiration=600)

SUCCESS: Presigned URL generated for 's3-complete-pipeline/raw/hosts.csv'
Expires in: 600 seconds (0.2 hours)

URL (valid for 0.2 hours):
https://s3.us-east-2.amazonaws.com/s3-complete-pipeline/raw/hosts.csv?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAWPVGU3OO6BQJAVE3%2F20260131%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20260131T071555Z&X-Amz-Expires=600&X-Amz-SignedHeaders=host&X-Amz-Signature=0a5f86b0a6bc7daa56a1e6fdd6d75b4e6f55793654239777eca7dd78c3ba8a1d

Anyone with this URL can download the file until it expires.


# PHASE 2: DATA CATALOG

```
┌─────────────────────────────────────────────────────────────────────────────┐
│  STEP 2: CATALOG DATA WITH GLUE CRAWLER                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   S3 (raw/)  ────►  Glue Crawler  ────►  Data Catalog                       │
│                     (auto-schema)        (database + tables)                │
│                                                                             │
│   Key Operations:                                                           │
│   • Create Glue database (metadata container)                               │
│   • Create & run crawler (discovers schema from S3)                         │
│   • List tables (verify catalog entries)                                    │
│   • Query-ready tables for Athena/ETL                                       │
│                                                                             │
│   Crawler detects:                                                          │
│   • File format (CSV, JSON, Parquet, etc.)                                  │
│   • Column names and data types                                             │
│   • Partition structure                                                     │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```

## 2.1 Initialize Glue Client

In [10]:
# Initialize Glue Client
glue_client = boto3.client(
    'glue',
    region_name=AWS_REGION,
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY
)

# Helper function for security
def redact_account_id(arn):
    """Redact AWS account ID from ARN"""
    return re.sub(r':\d{12}:', ':************:', str(arn))

print("Glue client initialized")

Glue client initialized


## 2.2 Create Glue Database

The database is a **logical container** for tables in the Data Catalog. Tables discovered by crawlers are stored here.

In [11]:
def create_glue_database(database_name, description=''):
    """
    Create a database in the AWS Glue Data Catalog
    
    Args:
        database_name (str): Name (lowercase, no spaces)
        description (str): Optional description
    
    Returns:
        bool: True if created successfully
    """
    try:
        print(f"Creating Glue database '{database_name}'...")
        
        glue_client.create_database(
            DatabaseInput={
                'Name': database_name,
                'Description': description
            }
        )
        
        print(f"SUCCESS: Database '{database_name}' created")
        return True
        
    except ClientError as e:
        if e.response['Error']['Code'] == 'AlreadyExistsException':
            print(f"Database '{database_name}' already exists")
            return True
        print(f"ERROR: {e}")
        return False

# Test
create_glue_database('aws_full_pipeline_db', 'AWS Glue database for complete pipeline')

Creating Glue database 'aws_full_pipeline_db'...
Database 'aws_full_pipeline_db' already exists


True

## 2.3 List Databases

View all databases in the Glue Data Catalog.

In [12]:
def list_glue_databases():
    """
    List all databases in the Glue Data Catalog
    
    Returns:
        list: Database names
    """
    try:
        print("Listing Glue databases...")
        print("-" * 60)
        
        response = glue_client.get_databases()
        databases = response.get('DatabaseList', [])
        
        if not databases:
            print("No databases found")
            return []
        
        print(f"Found {len(databases)} database(s):\n")
        
        for db in databases:
            print(f"Database: {db['Name']}")
            print(f"  Description: {db.get('Description', 'N/A')}")
            print()
        
        return [db['Name'] for db in databases]
        
    except ClientError as e:
        print(f"ERROR: {e}")
        return []

# Test
list_glue_databases()

Listing Glue databases...
------------------------------------------------------------
Found 4 database(s):

Database: aws_full_pipeline_db
  Description: AWS Glue database for complete pipeline

Database: data_engineering_db
  Description: Learning database

Database: default
  Description: Default Hive database

Database: glue_db
  Description: A glue databse



['aws_full_pipeline_db', 'data_engineering_db', 'default', 'glue_db']

## 2.4 Create Crawler

Crawlers automatically discover schema from S3 data and populate the Data Catalog with table definitions.

In [13]:
def create_glue_crawler(crawler_name, database_name, s3_path, description=''):
    """
    Create a Glue crawler
    
    Args:
        crawler_name (str): Crawler name
        database_name (str): Target database
        s3_path (str): S3 path to crawl (e.g., 's3://bucket/path/')
    """
    try:
        print(f"Creating crawler '{crawler_name}'...")
        print(f"Target: {database_name}")
        print(f"Path: {s3_path}")
        
        glue_client.create_crawler(
            Name=crawler_name,
            Role=GLUE_ROLE_ARN,
            DatabaseName=database_name,
            Description=description,
            Targets={'S3Targets': [{'Path': s3_path}]},
            SchemaChangePolicy={
                'UpdateBehavior': 'UPDATE_IN_DATABASE',
                'DeleteBehavior': 'LOG'
            }
        )
        
        print(f"SUCCESS: Crawler '{crawler_name}' created")
        return True
        
    except ClientError as e:
        if e.response['Error']['Code'] == 'AlreadyExistsException':
            print(f"Crawler '{crawler_name}' already exists")
            return True
        print(f"ERROR: {e}")
        return False

# Example
create_glue_crawler('full_pipeline_crawler', 'aws_full_pipeline_db', f's3://{S3_BUCKET_COMPLETE_PIPELINE}/raw/')

Creating crawler 'full_pipeline_crawler'...
Target: aws_full_pipeline_db
Path: s3://s3-complete-pipeline/raw/
Crawler 'full_pipeline_crawler' already exists


True

## 2.5 Run Crawler

Execute the crawler to scan S3 and update the Data Catalog. Use `wait=True` to block until completion.

In [14]:
def run_crawler(crawler_name, wait=False):
    """
    Start a Glue crawler
    
    Args:
        crawler_name (str): Crawler name
        wait (bool): Wait for completion
    """
    try:
        print(f"Starting crawler '{crawler_name}'...")
        glue_client.start_crawler(Name=crawler_name)
        print(f"Crawler started")
        
        if wait:
            print("Waiting for completion...")
            while True:
                response = glue_client.get_crawler(Name=crawler_name)
                state = response['Crawler']['State']
                
                if state == 'READY':
                    # Wait for stats to update, then re-fetch
                    time.sleep(2)
                    response = glue_client.get_crawler(Name=crawler_name)
                    last = response['Crawler'].get('LastCrawl', {})
                    print(f"Completed: {last.get('Status', 'Unknown')}")
                    break
                print(f"  State: {state}")
                time.sleep(10)
        
        return True
        
    except ClientError as e:
        if e.response['Error']['Code'] == 'CrawlerRunningException':
            print("Crawler already running")
            return True
        print(f"ERROR: {e}")
        return False

# Example
run_crawler('full_pipeline_crawler', wait=True)

Starting crawler 'full_pipeline_crawler'...
Crawler started
Waiting for completion...
  State: RUNNING
  State: RUNNING
  State: RUNNING
  State: RUNNING
  State: RUNNING
Completed: SUCCEEDED


True

## 2.6 List Tables (Verify Catalog)

**Critical step** - Verify the crawler created tables in the Data Catalog. These tables are now queryable by Athena.

In [16]:
def list_tables(database_name):
    """
    List all tables in a Glue database
    
    Args:
        database_name (str): Database name
    
    Returns:
        list: Table information
    """
    try:
        print(f"Tables in database '{database_name}':")
        print("-" * 60)
        
        response = glue_client.get_tables(DatabaseName=database_name)
        tables = response.get('TableList', [])
        
        if not tables:
            print("No tables found")
            return []
        
        for table in tables:
            print(f"\nTable: {table['Name']}")
            print(f"  Location: {table.get('StorageDescriptor', {}).get('Location', 'N/A')}")
            print(f"  Format: {table.get('Parameters', {}).get('classification', 'N/A')}")
            
            # Show columns
            columns = table.get('StorageDescriptor', {}).get('Columns', [])
            if columns:
                print(f"  Columns ({len(columns)}):")
                for col in columns[:5]:  # Show first 5
                    print(f"    - {col['Name']}: {col['Type']}")
                if len(columns) > 5:
                    print(f"    ... and {len(columns) - 5} more")
        
        print(f"\nTotal: {len(tables)} table(s)")
        return tables
        
    except ClientError as e:
        print(f"ERROR: {e}")
        return []

# Verify crawler created tables
list_tables('aws_full_pipeline_db')

Tables in database 'aws_full_pipeline_db':
------------------------------------------------------------

Table: bookings_csv
  Location: s3://s3-complete-pipeline/raw/bookings.csv
  Format: csv
  Columns (9):
    - booking_id: string
    - listing_id: bigint
    - booking_date: string
    - nights_booked: bigint
    - booking_amount: bigint
    ... and 4 more

Table: hosts_csv
  Location: s3://s3-complete-pipeline/raw/hosts.csv
  Format: csv
  Columns (6):
    - host_id: bigint
    - host_name: string
    - host_since: string
    - is_superhost: boolean
    - response_rate: bigint
    ... and 1 more

Table: lambda_bookings_csv
  Location: s3://s3-complete-pipeline/raw/lambda-bookings.csv
  Format: csv
  Columns (9):
    - booking_id: string
    - listing_id: bigint
    - booking_date: string
    - nights_booked: bigint
    - booking_amount: bigint
    ... and 4 more

Table: processed
  Location: s3://s3-complete-pipeline/processed/
  Format: parquet
  Columns (13):
    - booking_id: st

[{'Name': 'bookings_csv',
  'DatabaseName': 'aws_full_pipeline_db',
  'Owner': 'owner',
  'CreateTime': datetime.datetime(2026, 1, 30, 23, 27, 3, tzinfo=tzlocal()),
  'UpdateTime': datetime.datetime(2026, 1, 30, 23, 27, 3, tzinfo=tzlocal()),
  'LastAccessTime': datetime.datetime(2026, 1, 30, 23, 27, 3, tzinfo=tzlocal()),
  'Retention': 0,
  'StorageDescriptor': {'Columns': [{'Name': 'booking_id', 'Type': 'string'},
    {'Name': 'listing_id', 'Type': 'bigint'},
    {'Name': 'booking_date', 'Type': 'string'},
    {'Name': 'nights_booked', 'Type': 'bigint'},
    {'Name': 'booking_amount', 'Type': 'bigint'},
    {'Name': 'cleaning_fee', 'Type': 'bigint'},
    {'Name': 'service_fee', 'Type': 'bigint'},
    {'Name': 'booking_status', 'Type': 'string'},
    {'Name': 'created_at', 'Type': 'string'}],
   'Location': 's3://s3-complete-pipeline/raw/bookings.csv',
   'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
   'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputF

## 2.7 List Crawlers

View all crawlers in your account with their current state.

In [17]:
def list_crawlers():
    """
    List all Glue crawlers
    
    Returns:
        list: Crawler names and states
    """
    try:
        print("Glue Crawlers:")
        print("-" * 60)
        
        response = glue_client.get_crawlers()
        crawlers = response.get('Crawlers', [])
        
        if not crawlers:
            print("No crawlers found")
            return []
        
        for crawler in crawlers:
            print(f"\nCrawler: {crawler['Name']}")
            print(f"  State: {crawler['State']}")
            print(f"  Database: {crawler.get('DatabaseName', 'N/A')}")
            
            # Last crawl info
            last = crawler.get('LastCrawl', {})
            if last:
                print(f"  Last Run: {last.get('Status', 'N/A')}")
                print(f"  Tables Created: {last.get('TablesCreated', 0)}")
                print(f"  Tables Updated: {last.get('TablesUpdated', 0)}")
        
        print(f"\nTotal: {len(crawlers)} crawler(s)")
        return crawlers
        
    except ClientError as e:
        print(f"ERROR: {e}")
        return []

# List all crawlers
list_crawlers()

Glue Crawlers:
------------------------------------------------------------

Crawler: db_s3_crawler
  State: READY
  Database: glue_db
  Last Run: SUCCEEDED
  Tables Created: 0
  Tables Updated: 0

Crawler: full_pipeline_crawler
  State: READY
  Database: aws_full_pipeline_db
  Last Run: SUCCEEDED
  Tables Created: 0
  Tables Updated: 0

Crawler: my-crawler
  State: READY
  Database: data_engineering_db
  Last Run: SUCCEEDED
  Tables Created: 0
  Tables Updated: 0

Crawler: my_processed_data_crawler
  State: READY
  Database: aws_full_pipeline_db
  Last Run: SUCCEEDED
  Tables Created: 0
  Tables Updated: 0

Total: 4 crawler(s)


[{'Name': 'db_s3_crawler',
  'Role': 'glue-access-s3',
  'Targets': {'S3Targets': [{'Path': 's3://real-learn-s3/processed/',
     'Exclusions': []}],
   'JdbcTargets': [],
   'MongoDBTargets': [],
   'DynamoDBTargets': [],
   'CatalogTargets': [],
   'DeltaTargets': [],
   'IcebergTargets': [],
   'HudiTargets': []},
  'DatabaseName': 'glue_db',
  'Classifiers': [],
  'RecrawlPolicy': {'RecrawlBehavior': 'CRAWL_EVERYTHING'},
  'SchemaChangePolicy': {'UpdateBehavior': 'UPDATE_IN_DATABASE',
   'DeleteBehavior': 'DEPRECATE_IN_DATABASE'},
  'LineageConfiguration': {'CrawlerLineageSettings': 'DISABLE'},
  'State': 'READY',
  'CrawlElapsedTime': 0,
  'CreationTime': datetime.datetime(2026, 1, 30, 0, 15, 47, tzinfo=tzlocal()),
  'LastUpdated': datetime.datetime(2026, 1, 30, 0, 15, 47, tzinfo=tzlocal()),
  'LastCrawl': {'Status': 'SUCCEEDED',
   'ErrorMessage': 'Service Principal: glue.amazonaws.com is not authorized to perform: logs:PutLogEvents on resource: arn:aws:logs:us-east-2:44595235113

## 2.8 Cleanup Functions

Delete crawlers and databases when no longer needed. **Use with caution!**

In [18]:
def delete_crawler(crawler_name):
    """
    Delete a Glue crawler
    
    Args:
        crawler_name (str): Crawler name
    """
    try:
        print(f"Deleting crawler '{crawler_name}'...")
        glue_client.delete_crawler(Name=crawler_name)
        print(f"SUCCESS: Crawler '{crawler_name}' deleted")
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == 'EntityNotFoundException':
            print(f"Crawler '{crawler_name}' not found")
        else:
            print(f"ERROR: {e}")
        return False


def delete_database(database_name):
    """
    Delete a Glue database and all its tables
    
    Args:
        database_name (str): Database name
    """
    try:
        print(f"Deleting database '{database_name}'...")
        glue_client.delete_database(Name=database_name)
        print(f"SUCCESS: Database '{database_name}' deleted")
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == 'EntityNotFoundException':
            print(f"Database '{database_name}' not found")
        else:
            print(f"ERROR: {e}")
        return False


# Example (uncomment to use):
# delete_crawler('full_pipeline_crawler')
# delete_database('aws_full_pipeline_db')

# PHASE 3: DATA TRANSFORMATION (ETL)

```
┌─────────────────────────────────────────────────────────────────────────────┐
│  STEP 3: TRANSFORM DATA WITH GLUE ETL JOB                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   S3 (raw/)  ────►  Glue ETL Job  ────►  S3 (processed/)                    │
│   (CSV)             (Spark/Python)       (Parquet)                          │
│                                                                             │
│   Key Operations:                                                           │
│   • Upload ETL script to S3                                                 │
│   • Create Glue job pointing to script                                      │
│   • Run job with parameters                                                 │
│   • Monitor job execution                                                   │
│                                                                             │
│   Transformations in aws_glue_etl.py:                                       │
│   • Null value checking & rejection                                         │
│   • Duplicate detection & handling                                          │
│   • Calculated columns:                                                     │
│     - total_booking_amount = nights_booked * booking_amount                 │
│     - additional_cost = cleaning_fee + service_fee                          │
│     - total_cost = total_booking_amount + additional_cost                   │
│   • Output as Parquet (compressed, columnar)                                │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```

## 3.1 Upload Raw Data (bookings.csv)

In [19]:
upload_file('data/bookings.csv', S3_BUCKET_COMPLETE_PIPELINE, f'{S3_UPLOAD_FOLDER}bookings.csv')

SUCCESS: 'data/bookings.csv' uploaded to 's3-complete-pipeline/raw/bookings.csv'


True

## 3.2 Upload ETL Script to S3

Glue jobs require the Python script to be stored in S3.

In [20]:
# upload the ETL script to S3
upload_file('scripts/aws_glue_etl.py', S3_BUCKET_COMPLETE_PIPELINE, 'scripts/aws_glue_etl.py')

SUCCESS: 'scripts/aws_glue_etl.py' uploaded to 's3-complete-pipeline/scripts/aws_glue_etl.py'


True

## 3.3 Create Glue Job

Create an ETL job that points to the script in S3. Job types:

| Type | Use Case | Workers |
|------|----------|---------|
| `glueetl` | Spark-based, large datasets | 2+ DPUs |
| `pythonshell` | Simple Python, small data | 0.0625-1 DPU |

In [21]:
def create_glue_job(job_name, script_location, description='', job_type='glueetl', 
                    worker_type='G.1X', num_workers=2, timeout=60, max_retries=0):
    """
    Create a Glue ETL job
    
    Args:
        job_name (str): Job name
        script_location (str): S3 path to ETL script (s3://bucket/path/script.py)
        description (str): Job description
        job_type (str): 'glueetl' (Spark) or 'pythonshell'
        worker_type (str): 'G.1X', 'G.2X', 'G.025X' (for pythonshell)
        num_workers (int): Number of workers (min 2 for glueetl)
        timeout (int): Job timeout in minutes
        max_retries (int): Number of retries on failure
    
    Returns:
        bool: True if created successfully
    """
    try:
        print(f"Creating Glue job '{job_name}'...")
        print(f"  Script: {script_location}")
        print(f"  Type: {job_type}")
        
        job_config = {
            'Name': job_name,
            'Description': description,
            'Role': GLUE_ROLE_ARN,
            'Command': {
                'Name': job_type,
                'ScriptLocation': script_location,
                'PythonVersion': '3'
            },
            'DefaultArguments': {
                '--job-language': 'python',
                '--enable-metrics': 'true',
                '--enable-continuous-cloudwatch-log': 'true'
            },
            'Timeout': timeout,
            'MaxRetries': max_retries,
            'GlueVersion': '4.0'
        }
        
        # Add worker config for glueetl jobs
        if job_type == 'glueetl':
            job_config['WorkerType'] = worker_type
            job_config['NumberOfWorkers'] = num_workers
        
        glue_client.create_job(**job_config)
        
        print(f"SUCCESS: Job '{job_name}' created")
        return True
        
    except ClientError as e:
        if e.response['Error']['Code'] == 'AlreadyExistsException':
            print(f"Job '{job_name}' already exists")
            return True
        print(f"ERROR: {e}")
        return False


# Create the bookings ETL job
script_path = f's3://{S3_BUCKET_COMPLETE_PIPELINE}/scripts/aws_glue_etl.py'
create_glue_job(
    job_name='bookings-etl-job',
    script_location=script_path,
    description='Transform bookings data: null check, dedup, calculate totals'
)

Creating Glue job 'bookings-etl-job'...
  Script: s3://s3-complete-pipeline/scripts/aws_glue_etl.py
  Type: glueetl
SUCCESS: Job 'bookings-etl-job' created


True

## 3.4 Run Glue Job

Execute the job with parameters. The job reads from `raw/bookings.csv` and writes to `processed/`.

In [22]:
def run_glue_job(job_name, arguments=None, wait=False):
    """
    Start a Glue job run
    
    Args:
        job_name (str): Job name
        arguments (dict): Job arguments (e.g., {'--S3_BUCKET': 'my-bucket'})
        wait (bool): Wait for completion
    
    Returns:
        str: Job run ID, or None if error
    """
    try:
        print(f"Starting job '{job_name}'...")
        
        run_config = {'JobName': job_name}
        if arguments:
            run_config['Arguments'] = arguments
            print(f"  Arguments: {arguments}")
        
        response = glue_client.start_job_run(**run_config)
        run_id = response['JobRunId']
        
        print(f"Job started. Run ID: {run_id}")
        
        if wait:
            print("Waiting for completion...")
            while True:
                status_response = glue_client.get_job_run(JobName=job_name, RunId=run_id)
                state = status_response['JobRun']['JobRunState']
                
                if state in ['SUCCEEDED', 'FAILED', 'STOPPED', 'TIMEOUT']:
                    print(f"Job {state}")
                    if state == 'FAILED':
                        error = status_response['JobRun'].get('ErrorMessage', 'Unknown')
                        print(f"Error: {error}")
                    elif state == 'SUCCEEDED':
                        duration = status_response['JobRun'].get('ExecutionTime', 0)
                        print(f"Duration: {duration} seconds")
                    break
                    
                print(f"  State: {state}")
                time.sleep(30)
        
        return run_id
        
    except ClientError as e:
        print(f"ERROR: {e}")
        return None


# Run the bookings ETL job
run_glue_job(
    job_name='bookings-etl-job',
    arguments={
        '--S3_BUCKET': S3_BUCKET_COMPLETE_PIPELINE,
        '--SOURCE_PREFIX': 'raw',
        '--TARGET_PREFIX': 'processed',
        '--DUPLICATE_HANDLING': 'keep_first'
    },
    wait=True
)

Starting job 'bookings-etl-job'...
  Arguments: {'--S3_BUCKET': 's3-complete-pipeline', '--SOURCE_PREFIX': 'raw', '--TARGET_PREFIX': 'processed', '--DUPLICATE_HANDLING': 'keep_first'}
Job started. Run ID: jr_aff837deeced3a49b6ac7a2e761f39cd321686edb57574fbf12b03f8397a2e7a
Waiting for completion...
  State: RUNNING
  State: RUNNING
  State: RUNNING
  State: RUNNING
Job SUCCEEDED
Duration: 105 seconds


'jr_aff837deeced3a49b6ac7a2e761f39cd321686edb57574fbf12b03f8397a2e7a'

## 3.5 Get Job Run Status

Check the status of a specific job run.

In [23]:
def get_job_run_status(job_name, run_id=None):
    """
    Get status of a job run (latest if run_id not specified)
    
    Args:
        job_name (str): Job name
        run_id (str): Specific run ID (optional)
    
    Returns:
        dict: Job run details
    """
    try:
        if run_id:
            response = glue_client.get_job_run(JobName=job_name, RunId=run_id)
            runs = [response['JobRun']]
        else:
            response = glue_client.get_job_runs(JobName=job_name, MaxResults=1)
            runs = response.get('JobRuns', [])
        
        if not runs:
            print(f"No runs found for job '{job_name}'")
            return None
        
        run = runs[0]
        print(f"Job Run Status for '{job_name}':")
        print("-" * 50)
        print(f"  Run ID: {run['Id']}")
        print(f"  State: {run['JobRunState']}")
        print(f"  Started: {run.get('StartedOn', 'N/A')}")
        print(f"  Completed: {run.get('CompletedOn', 'N/A')}")
        print(f"  Duration: {run.get('ExecutionTime', 0)} seconds")
        
        if run['JobRunState'] == 'FAILED':
            print(f"  Error: {run.get('ErrorMessage', 'Unknown')}")
        
        return run
        
    except ClientError as e:
        print(f"ERROR: {e}")
        return None


# Check latest run status
get_job_run_status('bookings-etl-job')

Job Run Status for 'bookings-etl-job':
--------------------------------------------------
  Run ID: jr_aff837deeced3a49b6ac7a2e761f39cd321686edb57574fbf12b03f8397a2e7a
  State: SUCCEEDED
  Started: 2026-01-31 02:17:55.659000-05:00
  Completed: 2026-01-31 02:19:48.985000-05:00
  Duration: 105 seconds


{'Id': 'jr_aff837deeced3a49b6ac7a2e761f39cd321686edb57574fbf12b03f8397a2e7a',
 'Attempt': 0,
 'JobName': 'bookings-etl-job',
 'JobMode': 'SCRIPT',
 'JobRunQueuingEnabled': False,
 'StartedOn': datetime.datetime(2026, 1, 31, 2, 17, 55, 659000, tzinfo=tzlocal()),
 'LastModifiedOn': datetime.datetime(2026, 1, 31, 2, 19, 48, 985000, tzinfo=tzlocal()),
 'CompletedOn': datetime.datetime(2026, 1, 31, 2, 19, 48, 985000, tzinfo=tzlocal()),
 'JobRunState': 'SUCCEEDED',
 'Arguments': {'--TARGET_PREFIX': 'processed',
  '--DUPLICATE_HANDLING': 'keep_first',
  '--S3_BUCKET': 's3-complete-pipeline',
  '--SOURCE_PREFIX': 'raw'},
 'PredecessorRuns': [],
 'AllocatedCapacity': 2,
 'ExecutionTime': 105,
 'Timeout': 60,
 'MaxCapacity': 2.0,
 'WorkerType': 'G.1X',
 'NumberOfWorkers': 2,
 'LogGroupName': '/aws-glue/jobs',
 'GlueVersion': '4.0'}

## 3.6 List Jobs

View all Glue jobs in the account.

In [24]:
def list_jobs():
    """
    List all Glue jobs
    
    Returns:
        list: Job details
    """
    try:
        print("Glue Jobs:")
        print("-" * 60)
        
        response = glue_client.get_jobs()
        jobs = response.get('Jobs', [])
        
        if not jobs:
            print("No jobs found")
            return []
        
        for job in jobs:
            print(f"\nJob: {job['Name']}")
            print(f"  Description: {job.get('Description', 'N/A')}")
            print(f"  Type: {job['Command']['Name']}")
            print(f"  Glue Version: {job.get('GlueVersion', 'N/A')}")
            print(f"  Workers: {job.get('NumberOfWorkers', 'N/A')}")
            print(f"  Timeout: {job.get('Timeout', 'N/A')} min")
        
        print(f"\nTotal: {len(jobs)} job(s)")
        return jobs
        
    except ClientError as e:
        print(f"ERROR: {e}")
        return []


# List all jobs
list_jobs()

Glue Jobs:
------------------------------------------------------------

Job: bookings-etl-job
  Description: Transform bookings data: null check, dedup, calculate totals
  Type: glueetl
  Glue Version: 4.0
  Workers: 2
  Timeout: 60 min

Total: 1 job(s)


[{'Name': 'bookings-etl-job',
  'JobMode': 'SCRIPT',
  'JobRunQueuingEnabled': False,
  'Description': 'Transform bookings data: null check, dedup, calculate totals',
  'Role': 'arn:aws:iam::445952351133:role/glue-access-s3',
  'CreatedOn': datetime.datetime(2026, 1, 31, 2, 17, 28, 339000, tzinfo=tzlocal()),
  'LastModifiedOn': datetime.datetime(2026, 1, 31, 2, 17, 28, 339000, tzinfo=tzlocal()),
  'ExecutionProperty': {'MaxConcurrentRuns': 1},
  'Command': {'Name': 'glueetl',
   'ScriptLocation': 's3://s3-complete-pipeline/scripts/aws_glue_etl.py',
   'PythonVersion': '3'},
  'DefaultArguments': {'--enable-metrics': 'true',
   '--job-language': 'python',
   '--enable-continuous-cloudwatch-log': 'true'},
  'MaxRetries': 0,
  'AllocatedCapacity': 2,
  'Timeout': 60,
  'MaxCapacity': 2.0,
  'WorkerType': 'G.1X',
  'NumberOfWorkers': 2,
  'GlueVersion': '4.0'}]

## 3.7 Verify Output

Check the processed folder to confirm Parquet files were created.

In [25]:
# Verify processed data was created
list_objects_detailed(S3_BUCKET_COMPLETE_PIPELINE, prefix='processed/')

Objects in 's3-complete-pipeline/processed/':
Key: processed/bookings/part-00000-cba2c189-2dd3-4c74-84ed-147fc7009b83-c000.snappy.parquet
Size: 262.24 KB (268,529 bytes)
Last Modified: 2026-01-31 07:19:37
Storage Class: STANDARD
ETag: "223992fff8c7a102c9d42e521ebb0997-1"
Total: 1 object(s)


[{'Key': 'processed/bookings/part-00000-cba2c189-2dd3-4c74-84ed-147fc7009b83-c000.snappy.parquet',
  'LastModified': datetime.datetime(2026, 1, 31, 7, 19, 37, tzinfo=tzutc()),
  'ETag': '"223992fff8c7a102c9d42e521ebb0997-1"',
  'ChecksumAlgorithm': ['CRC64NVME'],
  'ChecksumType': 'FULL_OBJECT',
  'Size': 268529,
  'StorageClass': 'STANDARD'}]

## 3.8 Cleanup Functions

Delete jobs when no longer needed. **Use with caution!**

In [26]:
def delete_job(job_name):
    """
    Delete a Glue job
    
    Args:
        job_name (str): Job name
    """
    try:
        print(f"Deleting job '{job_name}'...")
        glue_client.delete_job(JobName=job_name)
        print(f"SUCCESS: Job '{job_name}' deleted")
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == 'EntityNotFoundException':
            print(f"Job '{job_name}' not found")
        else:
            print(f"ERROR: {e}")
        return False


# Example (uncomment to use):
delete_job('bookings-etl-job')
delete_job('my-etl-job')
delete_job('aws-glue-pipeline')

Deleting job 'bookings-etl-job'...
SUCCESS: Job 'bookings-etl-job' deleted
Deleting job 'my-etl-job'...
SUCCESS: Job 'my-etl-job' deleted
Deleting job 'aws-glue-pipeline'...
SUCCESS: Job 'aws-glue-pipeline' deleted


True

# PHASE 4: CATALOG PROCESSED DATA

```
┌─────────────────────────────────────────────────────────────────────────────┐
│  STEP 4: RE-CATALOG TRANSFORMED DATA                                       │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   S3 (processed/)  ────►  Glue Crawler  ────►  Data Catalog                 │
│   (Parquet)               (auto-schema)        (new table)                  │
│                                                                             │
│   Why re-crawl?                                                             │
│   • Parquet has different schema than CSV                                   │
│   • New calculated columns need to be cataloged                             │
│   • Enables Athena queries on processed data                                │
│                                                                             │
│   Result:                                                                   │
│   • New table 'bookings' in Data Catalog                                    │
│   • Columns include: total_booking_amount, additional_cost, total_cost      │
│   • Format: Parquet (faster queries, columnar storage)                      │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```

## 4.1 Create Crawler for Processed Data

Point a new crawler at the `processed/` folder to discover the Parquet schema.

In [27]:
create_glue_crawler('my_processed_data_crawler', 'aws_full_pipeline_db', f's3://{S3_BUCKET_COMPLETE_PIPELINE}/processed/')

Creating crawler 'my_processed_data_crawler'...
Target: aws_full_pipeline_db
Path: s3://s3-complete-pipeline/processed/
Crawler 'my_processed_data_crawler' already exists


True

## 4.2 Run Crawler

Execute the crawler to scan processed Parquet files and update the Data Catalog.

In [28]:
run_crawler('my_processed_data_crawler', wait=True)

Starting crawler 'my_processed_data_crawler'...
Crawler started
Waiting for completion...
  State: RUNNING
  State: RUNNING
  State: RUNNING
  State: RUNNING
  State: RUNNING
Completed: SUCCEEDED


True

## 4.3 Verify New Table

Confirm the crawler created a new table with the calculated columns.

In [29]:
# Verify crawler created tables
list_tables('aws_full_pipeline_db')

Tables in database 'aws_full_pipeline_db':
------------------------------------------------------------

Table: bookings_csv
  Location: s3://s3-complete-pipeline/raw/bookings.csv
  Format: csv
  Columns (9):
    - booking_id: string
    - listing_id: bigint
    - booking_date: string
    - nights_booked: bigint
    - booking_amount: bigint
    ... and 4 more

Table: hosts_csv
  Location: s3://s3-complete-pipeline/raw/hosts.csv
  Format: csv
  Columns (6):
    - host_id: bigint
    - host_name: string
    - host_since: string
    - is_superhost: boolean
    - response_rate: bigint
    ... and 1 more

Table: lambda_bookings_csv
  Location: s3://s3-complete-pipeline/raw/lambda-bookings.csv
  Format: csv
  Columns (9):
    - booking_id: string
    - listing_id: bigint
    - booking_date: string
    - nights_booked: bigint
    - booking_amount: bigint
    ... and 4 more

Table: processed
  Location: s3://s3-complete-pipeline/processed/
  Format: parquet
  Columns (13):
    - booking_id: st

[{'Name': 'bookings_csv',
  'DatabaseName': 'aws_full_pipeline_db',
  'Owner': 'owner',
  'CreateTime': datetime.datetime(2026, 1, 30, 23, 27, 3, tzinfo=tzlocal()),
  'UpdateTime': datetime.datetime(2026, 1, 30, 23, 27, 3, tzinfo=tzlocal()),
  'LastAccessTime': datetime.datetime(2026, 1, 30, 23, 27, 3, tzinfo=tzlocal()),
  'Retention': 0,
  'StorageDescriptor': {'Columns': [{'Name': 'booking_id', 'Type': 'string'},
    {'Name': 'listing_id', 'Type': 'bigint'},
    {'Name': 'booking_date', 'Type': 'string'},
    {'Name': 'nights_booked', 'Type': 'bigint'},
    {'Name': 'booking_amount', 'Type': 'bigint'},
    {'Name': 'cleaning_fee', 'Type': 'bigint'},
    {'Name': 'service_fee', 'Type': 'bigint'},
    {'Name': 'booking_status', 'Type': 'string'},
    {'Name': 'created_at', 'Type': 'string'}],
   'Location': 's3://s3-complete-pipeline/raw/bookings.csv',
   'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
   'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputF

## 4.4 List All Crawlers

View all crawlers to confirm both raw and processed crawlers exist.

In [30]:

# List all crawlers
list_crawlers()

Glue Crawlers:
------------------------------------------------------------

Crawler: db_s3_crawler
  State: READY
  Database: glue_db
  Last Run: SUCCEEDED
  Tables Created: 0
  Tables Updated: 0

Crawler: full_pipeline_crawler
  State: READY
  Database: aws_full_pipeline_db
  Last Run: SUCCEEDED
  Tables Created: 0
  Tables Updated: 0

Crawler: my-crawler
  State: READY
  Database: data_engineering_db
  Last Run: SUCCEEDED
  Tables Created: 0
  Tables Updated: 0

Crawler: my_processed_data_crawler
  State: READY
  Database: aws_full_pipeline_db
  Last Run: SUCCEEDED
  Tables Created: 0
  Tables Updated: 0

Total: 4 crawler(s)


[{'Name': 'db_s3_crawler',
  'Role': 'glue-access-s3',
  'Targets': {'S3Targets': [{'Path': 's3://real-learn-s3/processed/',
     'Exclusions': []}],
   'JdbcTargets': [],
   'MongoDBTargets': [],
   'DynamoDBTargets': [],
   'CatalogTargets': [],
   'DeltaTargets': [],
   'IcebergTargets': [],
   'HudiTargets': []},
  'DatabaseName': 'glue_db',
  'Classifiers': [],
  'RecrawlPolicy': {'RecrawlBehavior': 'CRAWL_EVERYTHING'},
  'SchemaChangePolicy': {'UpdateBehavior': 'UPDATE_IN_DATABASE',
   'DeleteBehavior': 'DEPRECATE_IN_DATABASE'},
  'LineageConfiguration': {'CrawlerLineageSettings': 'DISABLE'},
  'State': 'READY',
  'CrawlElapsedTime': 0,
  'CreationTime': datetime.datetime(2026, 1, 30, 0, 15, 47, tzinfo=tzlocal()),
  'LastUpdated': datetime.datetime(2026, 1, 30, 0, 15, 47, tzinfo=tzlocal()),
  'LastCrawl': {'Status': 'SUCCEEDED',
   'ErrorMessage': 'Service Principal: glue.amazonaws.com is not authorized to perform: logs:PutLogEvents on resource: arn:aws:logs:us-east-2:44595235113

# PHASE 5: ANALYZE WITH ATHENA

```
┌─────────────────────────────────────────────────────────────────────────────┐
│  STEP 5: QUERY DATA WITH ATHENA                                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   Data Catalog  ────►  Athena SQL  ────►  Results                           │
│   (tables)             (serverless)       (S3 + DataFrame)                  │
│                                                                             │
│   Key Features:                                                             │
│   • Serverless - no infrastructure to manage                                │
│   • Pay per query - $5 per TB scanned                                       │
│   • Standard SQL syntax                                                     │
│   • Direct query on S3 data (Parquet = faster + cheaper)                    │
│                                                                             │
│   Query Examples:                                                           │
│   • SELECT, WHERE, GROUP BY, HAVING                                         │
│   • Aggregations: COUNT, SUM, AVG                                           │
│   • Date functions: DATE(), YEAR(), MONTH()                                 │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```

## 5.1 Configure Athena Output Location

Athena stores query results in S3. This location is required for all queries.

In [31]:
ATHENA_OUTPUT = f's3://{S3_BUCKET_COMPLETE_PIPELINE}/athena-results/'
print(f"Athena query results will be stored in: {ATHENA_OUTPUT}")

Athena query results will be stored in: s3://s3-complete-pipeline/athena-results/


## 5.2 Initialize Athena Client

In [32]:
# initialize athena client
# Initialize Athena Client
athena_client = boto3.client(
    'athena',
    region_name=AWS_REGION,
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY
)

# Helper function for security
def redact_account_id(text):
    """Redact AWS account ID from text"""
    return re.sub(r':\d{12}:', ':************:', str(text))

print("Athena client initialized")

Athena client initialized


## 5.3 Run Athena Query

Core function to execute SQL queries and return results. Handles polling for completion.

In [33]:
def run_athena_query(query, database, output_location=None, max_results=100):
    """
    Execute an Athena query and return results
    
    Args:
        query (str): SQL query to execute
        database (str): Glue database name
        output_location (str): S3 path for results (default: ATHENA_OUTPUT)
        max_results (int): Maximum rows to return
    
    Returns:
        list: List of dictionaries with query results
    """
    if output_location is None:
        output_location = ATHENA_OUTPUT
    
    try:
        print(f"Executing query on database '{database}'...")
        print(f"Query: {query[:100]}{'...' if len(query) > 100 else ''}")
        
        # Start query execution
        response = athena_client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={'Database': database},
            ResultConfiguration={'OutputLocation': output_location}
        )
        
        query_execution_id = response['QueryExecutionId']
        print(f"Query ID: {query_execution_id}")
        
        # Wait for query to complete
        while True:
            result = athena_client.get_query_execution(
                QueryExecutionId=query_execution_id
            )
            status = result['QueryExecution']['Status']['State']
            
            if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
                break
            
            print(f"  Status: {status}")
            time.sleep(1)
        
        if status == 'SUCCEEDED':
            # Get execution stats
            stats = result['QueryExecution']['Statistics']
            data_scanned = stats.get('DataScannedInBytes', 0)
            exec_time = stats.get('TotalExecutionTimeInMillis', 0)
            
            print(f"\nQuery SUCCEEDED")
            print(f"Data scanned: {data_scanned / 1024 / 1024:.2f} MB")
            print(f"Execution time: {exec_time / 1000:.2f} seconds")
            print(f"Estimated cost: ${data_scanned / 1024 / 1024 / 1024 / 1024 * 5:.6f}")
            
            # Get query results
            results = athena_client.get_query_results(
                QueryExecutionId=query_execution_id,
                MaxResults=max_results
            )
            
            # Parse results
            rows = results['ResultSet']['Rows']
            if not rows:
                return []
            
            # First row is headers
            headers = [col.get('VarCharValue', '') for col in rows[0]['Data']]
            data = []
            
            for row in rows[1:]:
                values = [col.get('VarCharValue', '') for col in row['Data']]
                data.append(dict(zip(headers, values)))
            
            print(f"Rows returned: {len(data)}")
            return data
            
        else:
            error = result['QueryExecution']['Status'].get('StateChangeReason', 'Unknown error')
            print(f"Query {status}: {error}")
            return None
            
    except ClientError as e:
        print(f"ERROR: {e}")
        return None

## 5.4 Helper Functions

Convenience functions for common operations.

In [34]:
def show_tables(database):
    """
    List all tables in a database
    """
    query = "SHOW TABLES"
    results = run_athena_query(query, database)
    
    if results:
        print("\nTables:")
        for row in results:
            print(f"  - {list(row.values())[0]}")
    
    return results

# Example
show_tables('aws_full_pipeline_db')

Executing query on database 'aws_full_pipeline_db'...
Query: SHOW TABLES
Query ID: 58e0f58e-eee8-4f2a-88af-6d2d573b7fe6
  Status: QUEUED

Query SUCCEEDED
Data scanned: 0.00 MB
Execution time: 0.23 seconds
Estimated cost: $0.000000
Rows returned: 4

Tables:
  - hosts_csv
  - lambda_bookings_csv
  - processed
  - raw


[{'bookings_csv': 'hosts_csv'},
 {'bookings_csv': 'lambda_bookings_csv'},
 {'bookings_csv': 'processed'},
 {'bookings_csv': 'raw'}]

In [35]:
def describe_table(database, table_name):
    """
    Show table schema
    """
    query = f"DESCRIBE {table_name}"
    results = run_athena_query(query, database)
    
    if results:
        print(f"\nSchema for '{table_name}':")
        for row in results:
            col_name = row.get('col_name', '')
            data_type = row.get('data_type', '')
            if col_name and not col_name.startswith('#'):
                print(f"  {col_name}: {data_type}")
    
    return results

# Example
describe_table('aws_full_pipeline_db', 'processed')

Executing query on database 'aws_full_pipeline_db'...
Query: DESCRIBE processed
Query ID: de293f46-f4c7-4f3b-8227-a69318218183
  Status: QUEUED

Query SUCCEEDED
Data scanned: 0.00 MB
Execution time: 0.73 seconds
Estimated cost: $0.000000
Rows returned: 18

Schema for 'processed':


[{'booking_id          \tstring              \t                    ': 'listing_id          \tint                 \t                    '},
 {'booking_id          \tstring              \t                    ': 'booking_date        \ttimestamp           \t                    '},
 {'booking_id          \tstring              \t                    ': 'nights_booked       \tint                 \t                    '},
 {'booking_id          \tstring              \t                    ': 'booking_amount      \tint                 \t                    '},
 {'booking_id          \tstring              \t                    ': 'cleaning_fee        \tint                 \t                    '},
 {'booking_id          \tstring              \t                    ': 'service_fee         \tint                 \t                    '},
 {'booking_id          \tstring              \t                    ': 'booking_status      \tstring              \t                    '},
 {'booking_id          \tst

## 5.5 Convert Results to DataFrame

Convert Athena results to Pandas DataFrame for analysis and visualization.

In [36]:
# lets create a function to return the results in dataframe format
import pandas as pd
# the function takes in results from run_athena_query and returns a dataframe
def athena_results_to_dataframe(results):
    """
    Convert Athena query results to a Pandas DataFrame
    
    Args:
        results (list): List of dictionaries from run_athena_query
    
    Returns:
        pd.DataFrame: DataFrame with query results
    """
    if not results:
        print("No results to convert")
        return pd.DataFrame()
    
    df = pd.DataFrame(results)
    print(f"Converted {len(df)} rows to DataFrame")
    return df

## 5.6 Query Examples

### Basic SELECT with LIMIT

In [37]:
# Simple SELECT with LIMIT
query = """
SELECT * 
FROM processed 
LIMIT 10
"""

results = run_athena_query(query, 'aws_full_pipeline_db')
df = athena_results_to_dataframe(results)
df

Executing query on database 'aws_full_pipeline_db'...
Query: 
SELECT * 
FROM processed 
LIMIT 10

Query ID: f8efe835-cbcd-41b1-a7cf-1909e8c0f9c3
  Status: QUEUED
  Status: RUNNING

Query SUCCEEDED
Data scanned: 0.25 MB
Execution time: 1.58 seconds
Estimated cost: $0.000001
Rows returned: 10
Converted 10 rows to DataFrame


Unnamed: 0,booking_id,listing_id,booking_date,nights_booked,booking_amount,cleaning_fee,service_fee,booking_status,created_at,total_booking_amount,additional_cost,total_cost,processed_at,partition_0
0,0008e04a-d69f-448b-82f5-de35a4ffe0eb,229,2025-09-25 00:00:00.000,14,2912,76,27,cancelled,2025-12-26 14:15:54.011,40768.0,103.0,40871.0,2026-01-31 07:19:33.437,bookings
1,00127cd0-c342-4a32-b9ce-f9defb8932f4,461,2025-05-28 00:00:00.000,14,994,54,40,confirmed,2025-12-26 14:15:54.011,13916.0,94.0,14010.0,2026-01-31 07:19:33.437,bookings
2,00186338-2229-4a7f-922d-1bc2adcd785f,455,2025-04-20 00:00:00.000,4,964,22,49,confirmed,2025-12-26 14:15:54.011,3856.0,71.0,3927.0,2026-01-31 07:19:33.437,bookings
3,001bc7c8-0eea-412d-802c-b2c4d7136462,101,2025-10-06 00:00:00.000,5,1345,26,37,confirmed,2025-12-26 14:15:54.011,6725.0,63.0,6788.0,2026-01-31 07:19:33.437,bookings
4,001c3f21-9ecc-4a69-869f-261ee28e890f,238,2025-07-14 00:00:00.000,2,456,20,32,cancelled,2025-12-26 14:15:54.011,912.0,52.0,964.0,2026-01-31 07:19:33.437,bookings
5,0045bcd6-a019-419c-b8eb-00d66dba626a,402,2025-07-31 00:00:00.000,11,3047,67,26,confirmed,2025-12-26 14:15:54.011,33517.0,93.0,33610.0,2026-01-31 07:19:33.437,bookings
6,00468a09-1ece-477c-95c2-721e850cf52d,89,2025-03-09 00:00:00.000,14,742,23,46,confirmed,2025-12-26 14:15:54.011,10388.0,69.0,10457.0,2026-01-31 07:19:33.437,bookings
7,00516937-91bd-412a-acb7-cdc844e33ac3,157,2025-02-08 00:00:00.000,4,724,56,23,confirmed,2025-12-26 14:15:54.011,2896.0,79.0,2975.0,2026-01-31 07:19:33.437,bookings
8,00531b5f-1010-4947-b5cf-11c259a16968,79,2025-03-10 00:00:00.000,5,530,44,10,confirmed,2025-12-26 14:15:54.011,2650.0,54.0,2704.0,2026-01-31 07:19:33.437,bookings
9,006247af-0ac0-494f-b1a8-15b401d1056e,214,2025-07-15 00:00:00.000,12,1344,59,32,cancelled,2025-12-26 14:15:54.011,16128.0,91.0,16219.0,2026-01-31 07:19:33.437,bookings


### Filter with WHERE Clause

In [38]:
# SELECT with WHERE clause
query = """
SELECT booking_date, nights_booked, booking_amount, booking_status
FROM processed
WHERE booking_status = 'cancelled'
"""

results = run_athena_query(query, 'aws_full_pipeline_db')
df_cancelled = athena_results_to_dataframe(results)
df_cancelled

Executing query on database 'aws_full_pipeline_db'...
Query: 
SELECT booking_date, nights_booked, booking_amount, booking_status
FROM processed
WHERE booking_sta...
Query ID: 58118f6c-bd2a-4567-bcf1-76f28aef8f77
  Status: QUEUED

Query SUCCEEDED
Data scanned: 0.02 MB
Execution time: 0.70 seconds
Estimated cost: $0.000000
Rows returned: 99
Converted 99 rows to DataFrame


Unnamed: 0,booking_date,nights_booked,booking_amount,booking_status
0,2025-09-25 00:00:00.000,14,2912,cancelled
1,2025-07-14 00:00:00.000,2,456,cancelled
2,2025-07-15 00:00:00.000,12,1344,cancelled
3,2025-10-23 00:00:00.000,2,406,cancelled
4,2025-08-06 00:00:00.000,12,3192,cancelled
...,...,...,...,...
94,2025-09-09 00:00:00.000,12,3216,cancelled
95,2025-02-21 00:00:00.000,10,1430,cancelled
96,2025-08-20 00:00:00.000,6,1302,cancelled
97,2025-10-25 00:00:00.000,5,445,cancelled


### Aggregation with GROUP BY

In [39]:
# COUNT and GROUP BY
query = """
SELECT 
    DATE(booking_date) as date,
    COUNT(*) as number_of_bookings
FROM processed
GROUP BY DATE(booking_date)
ORDER BY date
"""

results = run_athena_query(query, 'aws_full_pipeline_db')
df_booking_counts = athena_results_to_dataframe(results)
df_booking_counts

Executing query on database 'aws_full_pipeline_db'...
Query: 
SELECT 
    DATE(booking_date) as date,
    COUNT(*) as number_of_bookings
FROM processed
GROUP BY ...
Query ID: cf88a41e-57fa-4155-a6e1-0f0f23878090
  Status: QUEUED

Query SUCCEEDED
Data scanned: 0.01 MB
Execution time: 0.88 seconds
Estimated cost: $0.000000
Rows returned: 99
Converted 99 rows to DataFrame


Unnamed: 0,date,number_of_bookings
0,2024-12-25,18
1,2024-12-26,13
2,2024-12-27,12
3,2024-12-28,12
4,2024-12-29,12
...,...,...
94,2025-03-29,15
95,2025-03-30,18
96,2025-03-31,11
97,2025-04-01,9


### GROUP BY with HAVING (Revenue Analysis)

Find dates where cancelled bookings exceeded $200,000 - potential revenue loss to investigate.

In [40]:
# GROUP BY with HAVING, dates where total revenue from cancelled bookings > $200,000
# This is a loss of revenue we may want to investigate
query = """
SELECT 
    booking_date,
    COUNT(*) as number_of_bookings,
    SUM(total_booking_amount) as total_revenue,
    ROUND(AVG(total_booking_amount), 2) as avg_sale
FROM processed
WHERE booking_status = 'cancelled'
GROUP BY booking_date
HAVING SUM(total_booking_amount) > 200000
ORDER BY total_revenue DESC
"""

results = run_athena_query(query, 'aws_full_pipeline_db')
df_revenue_by_status = athena_results_to_dataframe(results)
df_revenue_by_status

Executing query on database 'aws_full_pipeline_db'...
Query: 
SELECT 
    booking_date,
    COUNT(*) as number_of_bookings,
    SUM(total_booking_amount) as tota...
Query ID: f7935e86-4d8b-4f05-a5b0-472346ad1d2f
  Status: QUEUED

Query SUCCEEDED
Data scanned: 0.03 MB
Execution time: 0.71 seconds
Estimated cost: $0.000000
Rows returned: 9
Converted 9 rows to DataFrame


Unnamed: 0,booking_date,number_of_bookings,total_revenue,avg_sale
0,2025-03-01 00:00:00.000,14,308194.0,22013.86
1,2025-01-27 00:00:00.000,13,302383.0,23260.23
2,2025-06-28 00:00:00.000,12,252236.0,21019.67
3,2025-11-21 00:00:00.000,11,230151.0,20922.82
4,2025-11-14 00:00:00.000,10,221291.0,22129.1
5,2025-10-25 00:00:00.000,13,213973.0,16459.46
6,2025-01-10 00:00:00.000,9,212630.0,23625.56
7,2025-04-20 00:00:00.000,9,208072.0,23119.11
8,2025-04-30 00:00:00.000,13,204486.0,15729.69


## 5.7 Cleanup Athena Results

Delete old query results from S3 to manage storage costs.

In [41]:
from datetime import datetime, timedelta

def cleanup_athena_results(bucket, prefix='athena-results/', days_old=7):
    """
    Delete Athena query results older than specified days
    
    Args:
        bucket (str): S3 bucket name
        prefix (str): Prefix for Athena results folder
        days_old (int): Delete files older than this many days
    
    Returns:
        int: Number of files deleted
    """
    try:
        cutoff_date = datetime.now(tz=None) - timedelta(days=days_old)
        
        print(f"Cleaning up Athena results older than {days_old} days...")
        print(f"Cutoff date: {cutoff_date.strftime('%Y-%m-%d')}")
        
        response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
        
        if 'Contents' not in response:
            print("No files found")
            return 0
        
        files_to_delete = []
        for obj in response['Contents']:
            # Remove timezone info for comparison
            last_modified = obj['LastModified'].replace(tzinfo=None)
            if last_modified < cutoff_date:
                files_to_delete.append({'Key': obj['Key']})
        
        if not files_to_delete:
            print("No old files to delete")
            return 0
        
        # Delete in batches of 1000 (S3 limit)
        s3_client.delete_objects(
            Bucket=bucket,
            Delete={'Objects': files_to_delete[:1000]}
        )
        
        print(f"SUCCESS: Deleted {len(files_to_delete)} files")
        return len(files_to_delete)
        
    except ClientError as e:
        print(f"ERROR: {e}")
        return 0


# Example (uncomment to use):
cleanup_athena_results(S3_BUCKET_COMPLETE_PIPELINE, days_old=7)

Cleaning up Athena results older than 7 days...
Cutoff date: 2026-01-24
No old files to delete


0

# PHASE 7: AUTOMATE WITH LAMBDA

```
┌─────────────────────────────────────────────────────────────────────────────┐
│  STEP 7: AUTOMATED PIPELINE WITH LAMBDA CHAIN                              │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   S3 Upload ──► Lambda 1 ──► EventBridge ──► Lambda 2 ──► EventBridge ──►  │
│   (trigger)     (crawler)    (crawler done)  (ETL job)   (job done)        │
│                                                                             │
│                              ┌──────────────────────────────────────────┐   │
│                              │ Lambda 3 ──► Processed Crawler ──► Done! │   │
│                              └──────────────────────────────────────────┘   │
│                                                                             │
│   Lambda Functions:                                                         │
│   1. start_raw_crawler      - Triggered by S3 upload                        │
│   2. start_etl_job          - Triggered by crawler completion               │
│   3. start_processed_crawler - Triggered by ETL job completion              │
│                                                                             │
│   EventBridge Rules:                                                        │
│   • Glue Crawler State Change → Lambda 2                                    │
│   • Glue Job State Change → Lambda 3                                        │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```


In [52]:
# make sure the job exists before running this
script_path = f's3://{S3_BUCKET_COMPLETE_PIPELINE}/scripts/aws_glue_etl.py'
create_glue_job(
    job_name='bookings-etl-job',
    script_location=script_path,
    description='Transform bookings data: null check, dedup, calculate totals'
)

Creating Glue job 'bookings-etl-job'...
  Script: s3://s3-complete-pipeline/scripts/aws_glue_etl.py
  Type: glueetl
SUCCESS: Job 'bookings-etl-job' created


True


## 7.1 Initialize Lambda Client

In [66]:
# Initialize Lambda and EventBridge clients
lambda_client = boto3.client(
    'lambda',
    region_name=AWS_REGION,
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY
)

events_client = boto3.client(
    'events',
    region_name=AWS_REGION,
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY
)

print("Lambda and EventBridge clients initialized")

Lambda and EventBridge clients initialized


## 7.2 Create Lambda Function Helper

Reusable function to create Lambda functions with inline code.

In [67]:
import zipfile
import io

def create_lambda_function(function_name, code_string, description='', timeout=300, memory=128):
    """
    Create a Lambda function with inline Python code
    
    Args:
        function_name (str): Function name
        code_string (str): Python code as string
        description (str): Function description
        timeout (int): Timeout in seconds (max 900)
        memory (int): Memory in MB
    
    Returns:
        bool: True if created successfully
    """
    try:
        print(f"Creating Lambda function '{function_name}'...")
        
        # Create zip file in memory
        zip_buffer = io.BytesIO()
        with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
            zf.writestr('lambda_function.py', code_string)
        zip_buffer.seek(0)
        
        lambda_client.create_function(
            FunctionName=function_name,
            Runtime='python3.12',
            Role=LAMBDA_ROLE_ARN,
            Handler='lambda_function.lambda_handler',
            Code={'ZipFile': zip_buffer.read()},
            Description=description,
            Timeout=timeout,
            MemorySize=memory
        )
        
        print(f"SUCCESS: Lambda function '{function_name}' created")
        return True
        
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceConflictException':
            print(f"Function '{function_name}' already exists")
            return True
        print(f"ERROR: {e}")
        return False

## 7.3 Lambda 1: Start Raw Crawler

Triggered when a file is uploaded to `raw/` folder. Starts the Glue crawler to catalog new data.

In [68]:
# Lambda 1: Start Raw Crawler
lambda1_code = '''
import boto3
import json

def lambda_handler(event, context):
    """
    Triggered by S3 upload to raw/ folder
    Starts the Glue crawler for raw data
    """
    glue = boto3.client("glue")
    crawler_name = "full_pipeline_crawler"
    
    print(f"S3 event received: {json.dumps(event)}")
    
    try:
        glue.start_crawler(Name=crawler_name)
        print(f"Started crawler: {crawler_name}")
        return {"statusCode": 200, "body": f"Started {crawler_name}"}
    except glue.exceptions.CrawlerRunningException:
        print(f"Crawler {crawler_name} is already running")
        return {"statusCode": 200, "body": "Crawler already running"}
    except Exception as e:
        print(f"Error: {str(e)}")
        return {"statusCode": 500, "body": str(e)}
'''

create_lambda_function(
    function_name='pipeline-start-raw-crawler',
    code_string=lambda1_code,
    description='Triggered by S3 upload - starts raw data crawler'
)

Creating Lambda function 'pipeline-start-raw-crawler'...
Function 'pipeline-start-raw-crawler' already exists


True

## 7.4 Lambda 2: Start ETL Job

Triggered when raw crawler completes. Starts the Glue ETL job to transform data.

In [69]:
# Lambda 2: Start ETL Job
lambda2_code = f'''
import boto3
import json

def lambda_handler(event, context):
    """
    Triggered by EventBridge when crawler completes
    Starts the Glue ETL job
    """
    glue = boto3.client("glue")
    job_name = "bookings-etl-job"
    
    print(f"Crawler event received: {{json.dumps(event)}}")
    
    # Check if it was the raw crawler that completed
    crawler_name = event.get("detail", {{}}).get("crawlerName", "")
    state = event.get("detail", {{}}).get("state", "")
    
    if crawler_name != "full_pipeline_crawler" or state != "Succeeded":
        print(f"Ignoring event: crawler={{crawler_name}}, state={{state}}")
        return {{"statusCode": 200, "body": "Ignored"}}
    
    try:
        response = glue.start_job_run(
            JobName=job_name,
            Arguments={{
                "--S3_BUCKET": "{S3_BUCKET_COMPLETE_PIPELINE}",
                "--SOURCE_PREFIX": "raw",
                "--TARGET_PREFIX": "processed"
            }}
        )
        run_id = response["JobRunId"]
        print(f"Started ETL job: {{job_name}}, RunId: {{run_id}}")
        return {{"statusCode": 200, "body": f"Started {{job_name}}"}}
    except Exception as e:
        print(f"Error: {{str(e)}}")
        return {{"statusCode": 500, "body": str(e)}}
'''

create_lambda_function(
    function_name='pipeline-start-etl-job',
    code_string=lambda2_code,
    description='Triggered by crawler completion - starts ETL job'
)

Creating Lambda function 'pipeline-start-etl-job'...
Function 'pipeline-start-etl-job' already exists


True

## 7.5 Lambda 3: Start Processed Crawler

Triggered when ETL job completes. Starts the crawler for processed data.

In [70]:
# Lambda 3: Start Processed Crawler
lambda3_code = '''
import boto3
import json

def lambda_handler(event, context):
    """
    Triggered by EventBridge when ETL job completes
    Starts the crawler for processed data
    """
    glue = boto3.client("glue")
    crawler_name = "my_processed_data_crawler"
    
    print(f"Job event received: {json.dumps(event)}")
    
    # Check if it was our ETL job that completed successfully
    job_name = event.get("detail", {}).get("jobName", "")
    state = event.get("detail", {}).get("state", "")
    
    if job_name != "bookings-etl-job" or state != "SUCCEEDED":
        print(f"Ignoring event: job={job_name}, state={state}")
        return {"statusCode": 200, "body": "Ignored"}
    
    try:
        glue.start_crawler(Name=crawler_name)
        print(f"Started crawler: {crawler_name}")
        print("Pipeline complete! Data is ready for Athena queries.")
        return {"statusCode": 200, "body": f"Started {crawler_name}"}
    except glue.exceptions.CrawlerRunningException:
        print(f"Crawler {crawler_name} is already running")
        return {"statusCode": 200, "body": "Crawler already running"}
    except Exception as e:
        print(f"Error: {str(e)}")
        return {"statusCode": 500, "body": str(e)}
'''

create_lambda_function(
    function_name='pipeline-start-processed-crawler',
    code_string=lambda3_code,
    description='Triggered by ETL completion - starts processed data crawler'
)

Creating Lambda function 'pipeline-start-processed-crawler'...
Function 'pipeline-start-processed-crawler' already exists


True

## 7.6 Add S3 Trigger for Lambda 1

Configure S3 bucket to trigger Lambda 1 when files are uploaded to `raw/` folder.

In [71]:
def add_s3_trigger(function_name, bucket_name, prefix='raw/'):
    """
    Add S3 trigger to Lambda function
    
    Args:
        function_name (str): Lambda function name
        bucket_name (str): S3 bucket name
        prefix (str): S3 prefix to filter events
    """
    try:
        # Get Lambda function ARN
        response = lambda_client.get_function(FunctionName=function_name)
        function_arn = response['Configuration']['FunctionArn']
        
        print(f"Adding S3 trigger to '{function_name}'...")
        print(f"  Bucket: {bucket_name}")
        print(f"  Prefix: {prefix}")
        
        # Add permission for S3 to invoke Lambda
        try:
            lambda_client.add_permission(
                FunctionName=function_name,
                StatementId=f's3-trigger-{bucket_name}',
                Action='lambda:InvokeFunction',
                Principal='s3.amazonaws.com',
                SourceArn=f'arn:aws:s3:::{bucket_name}'
            )
        except ClientError as e:
            if 'ResourceConflictException' not in str(e):
                raise
        
        # Configure S3 bucket notification
        s3_client.put_bucket_notification_configuration(
            Bucket=bucket_name,
            NotificationConfiguration={
                'LambdaFunctionConfigurations': [
                    {
                        'LambdaFunctionArn': function_arn,
                        'Events': ['s3:ObjectCreated:*'],
                        'Filter': {
                            'Key': {
                                'FilterRules': [
                                    {'Name': 'prefix', 'Value': prefix}
                                ]
                            }
                        }
                    }
                ]
            }
        )
        
        print(f"SUCCESS: S3 trigger added")
        return True
        
    except ClientError as e:
        print(f"ERROR: {e}")
        return False


# Add S3 trigger to Lambda 1
add_s3_trigger('pipeline-start-raw-crawler', S3_BUCKET_COMPLETE_PIPELINE, 'raw/')

Adding S3 trigger to 'pipeline-start-raw-crawler'...
  Bucket: s3-complete-pipeline
  Prefix: raw/
SUCCESS: S3 trigger added


True

## 7.7 Add EventBridge Rules

Create EventBridge rules to trigger Lambda 2 and 3 when Glue events occur.

In [72]:
import json

def create_eventbridge_rule(rule_name, event_pattern, target_lambda, description=''):
    """
    Create EventBridge rule to trigger Lambda function
    
    Args:
        rule_name (str): Rule name
        event_pattern (dict): Event pattern to match
        target_lambda (str): Lambda function name to invoke
        description (str): Rule description
    """
    try:
        print(f"Creating EventBridge rule '{rule_name}'...")
        
        # Get Lambda ARN
        response = lambda_client.get_function(FunctionName=target_lambda)
        lambda_arn = response['Configuration']['FunctionArn']
        
        # Create or update rule
        events_client.put_rule(
            Name=rule_name,
            EventPattern=json.dumps(event_pattern),
            State='ENABLED',
            Description=description
        )
        
        # Add Lambda as target
        events_client.put_targets(
            Rule=rule_name,
            Targets=[
                {
                    'Id': f'{rule_name}-target',
                    'Arn': lambda_arn
                }
            ]
        )
        
        # Add permission for EventBridge to invoke Lambda
        try:
            lambda_client.add_permission(
                FunctionName=target_lambda,
                StatementId=f'eventbridge-{rule_name}',
                Action='lambda:InvokeFunction',
                Principal='events.amazonaws.com',
                SourceArn=f'arn:aws:events:{AWS_REGION}:{lambda_arn.split(":")[4]}:rule/{rule_name}'
            )
        except ClientError as e:
            if 'ResourceConflictException' not in str(e):
                raise
        
        print(f"SUCCESS: Rule '{rule_name}' created")
        return True
        
    except ClientError as e:
        print(f"ERROR: {e}")
        return False


# Rule 1: Crawler completion → Lambda 2 (Start ETL Job)
crawler_complete_pattern = {
    "source": ["aws.glue"],
    "detail-type": ["Glue Crawler State Change"],
    "detail": {
        "state": ["Succeeded"],
        "crawlerName": ["full_pipeline_crawler"]
    }
}

create_eventbridge_rule(
    rule_name='pipeline-crawler-complete',
    event_pattern=crawler_complete_pattern,
    target_lambda='pipeline-start-etl-job',
    description='Trigger ETL job when raw crawler completes'
)

Creating EventBridge rule 'pipeline-crawler-complete'...
SUCCESS: Rule 'pipeline-crawler-complete' created


True

In [60]:
# Rule 2: ETL Job completion → Lambda 3 (Start Processed Crawler)
job_complete_pattern = {
    "source": ["aws.glue"],
    "detail-type": ["Glue Job State Change"],
    "detail": {
        "state": ["SUCCEEDED"],
        "jobName": ["bookings-etl-job"]
    }
}

create_eventbridge_rule(
    rule_name='pipeline-etl-complete',
    event_pattern=job_complete_pattern,
    target_lambda='pipeline-start-processed-crawler',
    description='Trigger processed crawler when ETL job completes'
)

Creating EventBridge rule 'pipeline-etl-complete'...
SUCCESS: Rule 'pipeline-etl-complete' created


True

## 7.8 Test the Automated Pipeline

Upload a file to `raw/` and watch the pipeline run automatically!

In [79]:
# Test: Upload a file to trigger the pipeline
# This will automatically:
# 1. Trigger Lambda 1 → Start raw crawler
# 2. Crawler completes → Lambda 2 → Start ETL job
# 3. ETL completes → Lambda 3 → Start processed crawler
# 4. Done! Data ready for Athena

# Example: Re-upload bookings.csv to trigger the pipeline
upload_file('data/lambda-bookings.csv', S3_BUCKET_COMPLETE_PIPELINE, 'raw/lambda-bookings.csv')

SUCCESS: 'data/lambda-bookings.csv' uploaded to 's3-complete-pipeline/raw/lambda-bookings.csv'


True

In [82]:
# Wait ~2 min, then check crawler status
list_crawlers()

Glue Crawlers:
------------------------------------------------------------

Crawler: db_s3_crawler
  State: READY
  Database: glue_db
  Last Run: SUCCEEDED
  Tables Created: 0
  Tables Updated: 0

Crawler: full_pipeline_crawler
  State: READY
  Database: aws_full_pipeline_db
  Last Run: SUCCEEDED
  Tables Created: 0
  Tables Updated: 0

Crawler: my-crawler
  State: READY
  Database: data_engineering_db
  Last Run: SUCCEEDED
  Tables Created: 0
  Tables Updated: 0

Crawler: my_processed_data_crawler
  State: RUNNING
  Database: aws_full_pipeline_db
  Last Run: SUCCEEDED
  Tables Created: 0
  Tables Updated: 0

Total: 4 crawler(s)


[{'Name': 'db_s3_crawler',
  'Role': 'glue-access-s3',
  'Targets': {'S3Targets': [{'Path': 's3://real-learn-s3/processed/',
     'Exclusions': []}],
   'JdbcTargets': [],
   'MongoDBTargets': [],
   'DynamoDBTargets': [],
   'CatalogTargets': [],
   'DeltaTargets': [],
   'IcebergTargets': [],
   'HudiTargets': []},
  'DatabaseName': 'glue_db',
  'Classifiers': [],
  'RecrawlPolicy': {'RecrawlBehavior': 'CRAWL_EVERYTHING'},
  'SchemaChangePolicy': {'UpdateBehavior': 'UPDATE_IN_DATABASE',
   'DeleteBehavior': 'DEPRECATE_IN_DATABASE'},
  'LineageConfiguration': {'CrawlerLineageSettings': 'DISABLE'},
  'State': 'READY',
  'CrawlElapsedTime': 0,
  'CreationTime': datetime.datetime(2026, 1, 30, 0, 15, 47, tzinfo=tzlocal()),
  'LastUpdated': datetime.datetime(2026, 1, 30, 0, 15, 47, tzinfo=tzlocal()),
  'LastCrawl': {'Status': 'SUCCEEDED',
   'ErrorMessage': 'Service Principal: glue.amazonaws.com is not authorized to perform: logs:PutLogEvents on resource: arn:aws:logs:us-east-2:44595235113

In [83]:
# Wait ~3 min after crawler completes, check ETL job
get_job_run_status('bookings-etl-job')

Job Run Status for 'bookings-etl-job':
--------------------------------------------------
  Run ID: jr_487dc52fdbd95fa8338d3c84a9858649ee584f64ddc057cfd568a4530edb605d
  State: SUCCEEDED
  Started: 2026-01-31 02:36:20.379000-05:00
  Completed: 2026-01-31 02:37:37.733000-05:00
  Duration: 71 seconds


{'Id': 'jr_487dc52fdbd95fa8338d3c84a9858649ee584f64ddc057cfd568a4530edb605d',
 'Attempt': 0,
 'JobName': 'bookings-etl-job',
 'JobMode': 'SCRIPT',
 'JobRunQueuingEnabled': False,
 'StartedOn': datetime.datetime(2026, 1, 31, 2, 36, 20, 379000, tzinfo=tzlocal()),
 'LastModifiedOn': datetime.datetime(2026, 1, 31, 2, 37, 37, 733000, tzinfo=tzlocal()),
 'CompletedOn': datetime.datetime(2026, 1, 31, 2, 37, 37, 733000, tzinfo=tzlocal()),
 'JobRunState': 'SUCCEEDED',
 'Arguments': {'--S3_BUCKET': 's3-complete-pipeline',
  '--SOURCE_PREFIX': 'raw',
  '--TARGET_PREFIX': 'processed'},
 'PredecessorRuns': [],
 'AllocatedCapacity': 2,
 'ExecutionTime': 71,
 'Timeout': 60,
 'MaxCapacity': 2.0,
 'WorkerType': 'G.1X',
 'NumberOfWorkers': 2,
 'LogGroupName': '/aws-glue/jobs',
 'GlueVersion': '4.0'}

In [84]:
# Simple SELECT with LIMIT
query = """
SELECT * 
FROM processed 
LIMIT 100
"""

results = run_athena_query(query, 'aws_full_pipeline_db')
df = athena_results_to_dataframe(results)
df

Executing query on database 'aws_full_pipeline_db'...
Query: 
SELECT * 
FROM processed 
LIMIT 100

Query ID: f7d119bd-5439-40a9-8916-806e83177081
  Status: QUEUED

Query SUCCEEDED
Data scanned: 0.25 MB
Execution time: 1.08 seconds
Estimated cost: $0.000001
Rows returned: 99
Converted 99 rows to DataFrame


Unnamed: 0,booking_id,listing_id,booking_date,nights_booked,booking_amount,cleaning_fee,service_fee,booking_status,created_at,total_booking_amount,additional_cost,total_cost,processed_at,partition_0
0,0008e04a-d69f-448b-82f5-de35a4ffe0eb,229,2025-09-25 00:00:00.000,14,2912,76,27,cancelled,2025-12-26 14:15:54.011,40768.00,103.00,40871.00,2026-01-31 07:37:22.324,bookings
1,001bc7c8-0eea-412d-802c-b2c4d7136462,101,2025-10-06 00:00:00.000,5,1345,26,37,confirmed,2025-12-26 14:15:54.011,6725.00,63.00,6788.00,2026-01-31 07:37:22.324,bookings
2,001c3f21-9ecc-4a69-869f-261ee28e890f,238,2025-07-14 00:00:00.000,2,456,20,32,cancelled,2025-12-26 14:15:54.011,912.00,52.00,964.00,2026-01-31 07:37:22.324,bookings
3,0045bcd6-a019-419c-b8eb-00d66dba626a,402,2025-07-31 00:00:00.000,11,3047,67,26,confirmed,2025-12-26 14:15:54.011,33517.00,93.00,33610.00,2026-01-31 07:37:22.324,bookings
4,00468a09-1ece-477c-95c2-721e850cf52d,89,2025-03-09 00:00:00.000,14,742,23,46,confirmed,2025-12-26 14:15:54.011,10388.00,69.00,10457.00,2026-01-31 07:37:22.324,bookings
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
94,02b8b62b-9867-432d-82c7-b3f3a9a0620b,478,2025-09-23 00:00:00.000,11,550,58,19,confirmed,2025-12-26 14:15:54.011,6050.00,77.00,6127.00,2026-01-31 07:37:22.324,bookings
95,02b8dc11-4922-4e8b-9d78-3b765501a639,484,2025-10-28 00:00:00.000,4,1200,27,26,confirmed,2025-12-26 14:15:54.011,4800.00,53.00,4853.00,2026-01-31 07:37:22.324,bookings
96,02c7158f-dd42-425f-83c0-2919ecf7702f,413,2025-10-19 00:00:00.000,12,3300,45,46,confirmed,2025-12-26 14:15:54.011,39600.00,91.00,39691.00,2026-01-31 07:37:22.324,bookings
97,02d9c5ba-7b07-488a-9774-72408d185507,235,2025-10-18 00:00:00.000,8,1256,52,36,confirmed,2025-12-26 14:15:54.011,10048.00,88.00,10136.00,2026-01-31 07:37:22.324,bookings


## 7.9 Cleanup Functions

Delete Lambda functions and EventBridge rules when no longer needed.

In [None]:
def delete_lambda_function(function_name):
    """Delete a Lambda function"""
    try:
        print(f"Deleting Lambda function '{function_name}'...")
        lambda_client.delete_function(FunctionName=function_name)
        print(f"SUCCESS: Function '{function_name}' deleted")
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceNotFoundException':
            print(f"Function '{function_name}' not found")
        else:
            print(f"ERROR: {e}")
        return False


def delete_eventbridge_rule(rule_name):
    """Delete an EventBridge rule and its targets"""
    try:
        print(f"Deleting EventBridge rule '{rule_name}'...")
        
        # Remove targets first
        events_client.remove_targets(Rule=rule_name, Ids=[f'{rule_name}-target'])
        
        # Delete rule
        events_client.delete_rule(Name=rule_name)
        
        print(f"SUCCESS: Rule '{rule_name}' deleted")
        return True
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceNotFoundException':
            print(f"Rule '{rule_name}' not found")
        else:
            print(f"ERROR: {e}")
        return False


# Example (uncomment to cleanup):
# delete_lambda_function('pipeline-start-raw-crawler')
# delete_lambda_function('pipeline-start-etl-job')
# delete_lambda_function('pipeline-start-processed-crawler')
# delete_eventbridge_rule('pipeline-crawler-complete')
# delete_eventbridge_rule('pipeline-etl-complete')