In [5]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
from airflow.utils.dates import days_ago


In [23]:
## Testing if AWS credentials file is working to further perform DAG operations
import boto3

s3 = boto3.client('s3')
response = s3.list_buckets()

for bucket in response['Buckets']:
    print(bucket['Name'])

aws-athena-query-results-us-east-1-625444834448
aws-glue-assets-625444834448-us-east-1
nfip-policies-athena
nfip-policies-raw
nfip-policies-transformed-parquet


In [11]:
glue = boto3.client('glue')
response = glue.get_crawlers()
print([i['Name'] for i in response['Crawlers']])

['nfip-policies-parquet-catalog', 'nfip-policies-raw-catalog']


In [13]:
glue.get_jobs()

{'Jobs': [{'Name': 'csv-to-parquet',
   'Description': '',
   'Role': 'arn:aws:iam::625444834448:role/service-role/AWSGlueServiceRole-nfip-policies',
   'CreatedOn': datetime.datetime(2025, 6, 19, 15, 42, 56, 221000, tzinfo=tzlocal()),
   'LastModifiedOn': datetime.datetime(2025, 6, 24, 11, 38, 18, 153000, tzinfo=tzlocal()),
   'ExecutionProperty': {'MaxConcurrentRuns': 1},
   'Command': {'Name': 'glueetl',
    'ScriptLocation': 's3://aws-glue-assets-625444834448-us-east-1/scripts/csv-to-parquet.py',
    'PythonVersion': '3'},
   'DefaultArguments': {'--enable-metrics': 'true',
    '--enable-spark-ui': 'true',
    '--extra-py-files': 's3://aws-glue-studio-transforms-510798373988-prod-us-east-1/gs_common.py,s3://aws-glue-studio-transforms-510798373988-prod-us-east-1/gs_null_rows.py',
    '--spark-event-logs-path': 's3://aws-glue-assets-625444834448-us-east-1/sparkHistoryLogs/',
    '--enable-job-insights': 'true',
    '--enable-observability-metrics': 'true',
    '--enable-glue-datacata

In [15]:
glue.get_databases()

{'DatabaseList': [{'Name': 'default',
   'Description': 'Default Hive database',
   'LocationUri': 's3://nfip-policies-athena/',
   'CreateTime': datetime.datetime(2025, 6, 20, 18, 27, 10, tzinfo=tzlocal()),
   'CreateTableDefaultPermissions': [{'Principal': {'DataLakePrincipalIdentifier': 'IAM_ALLOWED_PRINCIPALS'},
     'Permissions': ['ALL']}],
   'CatalogId': '625444834448'},
  {'Name': 'nfip-policies-db',
   'CreateTime': datetime.datetime(2025, 6, 18, 15, 59, 33, tzinfo=tzlocal()),
   'CreateTableDefaultPermissions': [{'Principal': {'DataLakePrincipalIdentifier': 'IAM_ALLOWED_PRINCIPALS'},
     'Permissions': ['ALL']}],
   'CatalogId': '625444834448'}],
 'ResponseMetadata': {'RequestId': '5357ebde-917b-486a-b541-73f97c3a7c54',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 28 Jun 2025 15:44:13 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '737',
   'connection': 'keep-alive',
   'x-amzn-requestid': '5357ebde-917b-486a-b541-73f97c3a7c54',
   

In [20]:
glue.get_tables(DatabaseName = 'nfip-policies-db')

{'TableList': [{'Name': 'policies_by_state',
   'DatabaseName': 'nfip-policies-db',
   'Owner': 'owner',
   'CreateTime': datetime.datetime(2025, 6, 19, 8, 13, 13, tzinfo=tzlocal()),
   'UpdateTime': datetime.datetime(2025, 6, 19, 8, 13, 14, tzinfo=tzlocal()),
   'LastAccessTime': datetime.datetime(2025, 6, 19, 8, 13, 14, tzinfo=tzlocal()),
   'Retention': 0,
   'StorageDescriptor': {'Columns': [{'Name': 'id', 'Type': 'string'},
     {'Name': 'reportedzipcode', 'Type': 'bigint'},
     {'Name': 'propertystate', 'Type': 'string'},
     {'Name': 'reportedcity', 'Type': 'string'},
     {'Name': 'countycode', 'Type': 'bigint'},
     {'Name': 'latitude', 'Type': 'double'},
     {'Name': 'longitude', 'Type': 'double'},
     {'Name': 'policyeffectivedate', 'Type': 'string'},
     {'Name': 'policyterminationdate', 'Type': 'string'},
     {'Name': 'propertypurchasedate', 'Type': 'string'},
     {'Name': 'totalbuildinginsurancecoverage', 'Type': 'bigint'},
     {'Name': 'totalcontentsinsurancecov

In [None]:
from Glue_Crawler import start_glue_crawler

default_args = {
    'owner': 'pranjali',
    'start_date': days_ago(1),
    'retries': 1,
}

# Use PythonOperator to call different crawlers
def run_crawler_raw():
    start_glue_crawler('nfip-policies-raw-catalog')

def run_crawler_parquet():
    start_glue_crawler('nfip-policies-parquet-catalog')


with DAG(
    dag_id='aws_etl_pipeline',
    default_args=default_args,
    # schedule_interval='@daily',
    schedule_interval=None
    catchup=False
) as dag:

    start_glue_crawler_raw = PythonOperator(
        task_id='start_crawler_raw',
        python_callable=run_crawler_raw,  # You write this using boto3
    )

    run_glue_job = GlueJobOperator(
        task_id='run_glue_job',
        job_name='csv-to-parquet',
        script_location='s3://aws-glue-assets-625444834448-us-east-1/scripts/csv-to-parquet.py',
        region_name='us-east-1',
    )

    start_glue_crawler_transformed = PythonOperator(
        task_id='start_crawler_parquet',
        python_callable=run_crawler_parquet,  # You write this using boto3
    )

    athena_query = AthenaOperator(
        task_id='run_athena_query',
        query='SELECT AVG(totalinsurancepremiumofthepolicy) FROM policies_by_state_d0455ea1435e6198d85a70233f5460e7 WHERE propertystate="NY"',
        database='nfip-policies-db',
        output_location='s3://aws-athena-query-results-us-east-1-625444834448',
    )

    start_glue_crawler_raw >> run_glue_job >> start_glue_crawler_transformed >> athena_query