# Airflow and EMR Code Metadata Extraction Notebook

## Introduction

This notebook is designed to analyze Airflow and EMR code to extract important metadata such as storage details on S3, data ownership, and other relevant information. It utilizes advanced Language Models (LLMs) to understand and extract information from the code.

## Objective

The goal is to provide an automated method to read and interpret Airflow and EMR code, identifying key metadata for better data management and governance.


## Setup

### Install Required Libraries

The notebook requires specific libraries to interface with the LLM models and to handle code analysis.


In [ ]:
!pip install transformers
!pip install boto3  # For AWS S3 interactions
!pip install apache-airflow  # For Airflow code interpretation

## Choosing the Model

We will use a model from the Hugging Face `transformers` library that is best suited for understanding and analyzing code. GPT-Neo or GPT-3 could be ideal choices for this task, owing to their understanding of natural language and code.


In [3]:
from transformers import pipeline
# Initialize the model
model = pipeline('text-generation', model='EleutherAI/gpt-neo-2.7B')  # Example model

## Code Analysis Function

### Defining the Analysis Function
The function below is designed to read the Airflow and EMR code, use the LLM to extract relevant metadata like S3 storage details, data owner, etc.


In [8]:
def extract_metadata_from_code(code):
    """Extract metadata from Airflow/EMR code using LLM."""
    prompt = 'Extract metadata ONLY NO CODE OUTPUT, such as S3 storage details and data owner from the following code:\n' + code
    metadata = model(prompt)[0]['generated_text']
    return metadata


### Input Code

Input the Airflow or EMR code in the cell below to extract metadata.


In [9]:
# Sample Airflow/EMR code
code = '''
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_job_flow import EmrJobFlowSensor
from datetime import datetime, timedelta
import boto3

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email': ['example@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
}

dag = DAG('complex_emr_dag',
          default_args=default_args,
          description='A complex DAG to process data using EMR',
          schedule_interval=timedelta(days=1),
          catchup=False)

def preprocess_data():
    # Code to preprocess data
    print("Data Preprocessing")
    # Simulated interaction with S3
    s3 = boto3.client('s3')
    bucket_name = 'mydata-bucket'
    s3.upload_file('localfile.txt', bucket_name, 'processed/processedfile.txt')

preprocess_task = PythonOperator(task_id='preprocess_data',
                                 python_callable=preprocess_data,
                                 dag=dag)

# EMR Job Flow configuration
job_flow_overrides = {
    'Name': 'MyEMRCluster',
    # Add more configuration options as required
}

create_emr_cluster = EmrCreateJobFlowOperator(
    task_id='create_emr_cluster',
    job_flow_overrides=job_flow_overrides,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    dag=dag)

check_emr_cluster = EmrJobFlowSensor(
    task_id='check_emr_cluster',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag)

def analyze_data():
    # Code to analyze data
    print("Data Analysis")
    # Simulated error
    raise Exception("Simulated error during data analysis")

analyze_data_task = PythonOperator(task_id='analyze_data',
                                   python_callable=analyze_data,
                                   retries=2,
                                   dag=dag)

preprocess_task >> create_emr_cluster >> check_emr_cluster >> analyze_data_task
'''
print('Input Code:\n', code)


Input Code:
 
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_job_flow import EmrJobFlowSensor
from datetime import datetime, timedelta
import boto3

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email': ['example@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
}

dag = DAG('complex_emr_dag',
          default_args=default_args,
          description='A complex DAG to process data using EMR',
          schedule_interval=timedelta(days=1),
          catchup=False)

def preprocess_data():
    # Code to preprocess data
    print("Data Preprocessing")
    # Simulated interaction with S3
    s3 = boto3.client('s3')
    bucket_name = 'mydata-bucket'
    s3.uplo

### Extract and Display Metadata

Run the cell below to extract and display the metadata from the input code.


In [10]:
extracted_metadata = extract_metadata_from_code(code)
print('Extracted Metadata:\n', extracted_metadata)

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


Extracted Metadata:
 Extract metadata ONLY NO CODE OUTPUT, such as S3 storage details and data owner from the following code:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_job_flow import EmrJobFlowSensor
from datetime import datetime, timedelta
import boto3

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email': ['example@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
}

dag = DAG('complex_emr_dag',
          default_args=default_args,
          description='A complex DAG to process data using EMR',
          schedule_interval=timedelta(days=1),
          catchup=False)

def preprocess_data():
    # Code to preprocess data
    print("Data Preprocessin