<a href="https://colab.research.google.com/github/opeeeeeeeeeeeeeeemi/Data-Engineering-Projects/blob/main/Apache_Airflow_DAG_Task.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
import re

# Define the default arguments
default_args = {
    'owner': 'Ope',
    'start_date': datetime(2024, 6, 20),
    'email': ['ope@email.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=2),
}

# Define the DAG
dag = DAG(
    'data_extraction_Dag',
    default_args=default_args,
    description='A data extraction DAG',
    schedule_interval='@daily',
    catchup=False,
)

# Define the DAG
dag = DAG(
    'process_web_log',
    default_args=default_args,
    description='DAG to process web logs daily',
    schedule_interval='@daily',
    catchup=False,
)

# Define the function to extract IP addresses
def extract_ip_addresses():
    log_file_path = '/home/project/airflow/dags/capstone/accesslog.txt'
    output_file_path = '/home/project/airflow/dags/capstone/extracted_data.txt'

    ip_pattern = re.compile(r'(\d{1,3}\.){3}\d{1,3}')

    with open(log_file_path, 'r') as file:
        log_data = file.readlines()

    extracted_ips = []

    for line in log_data:
        match = ip_pattern.search(line)
        if match:
            extracted_ips.append(match.group())

    with open(output_file_path, 'w') as output_file:
        for ip in extracted_ips:
            output_file.write(f"{ip}\n")

# Task to transform data by filtering out specific IP addresses
def transform_data():
    input_file_path = '/home/project/airflow/dags/capstone/extracted_data.txt'
    output_file_path = '/home/project/airflow/dags/capstone/transformed_data.txt'
    ip_to_filter = "198.46.149.143"

    with open(input_file_path, 'r') as file:
        extracted_ips = file.readlines()

    transformed_ips = [ip.strip() for ip in extracted_ips if ip.strip() != ip_to_filter]

    with open(output_file_path, 'w') as output_file:
        for ip in transformed_ips:
            output_file.write(f"{ip}\n")

# Define the extract_data task
extract_data_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_ip_addresses,
    dag=dag,
)

# Define the transform_data task
transform_data_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag,
)

# Define the load_data task using BashOperator
load_data_task = BashOperator(
    task_id='load_data',
    bash_command='tar -cvf /home/project/airflow/dags/capstone/weblog.tar -C /home/project/airflow/dags/capstone/transformed_data.txt',
    dag=dag,
)

# Add a dummy start and end task for the workflow
start_task = DummyOperator(
    task_id='start_task',
    dag=dag,
)

end_task = DummyOperator(
    task_id='end_task',
    dag=dag,
)

# Define the task pipeline
start_task >> extract_data_task >> transform_data_task >> load_data_task >> end_task

