# 03 Challenge - Second DAG

I used a method of hooks, python operators and XCom to make the given DAG work. In this document I will compare relevant sections of code to explain the changes I made, with the original code first and my code second. My full code is at the end and successfully produced a local postgres table with the summary information.

### Connections

The first thing I did was set up appropriate MySql and postgres connections on the airflow webserver to connect to the two databases, I made a new postgres database for this challenge.

### Libraries

I imported a few extra tools to make the solution work; python operator, hooks for both mysql and postgres, and XCom. I didn't need the datetime, mysql/postgres operators so these were not imported.

In [None]:
# Recieved Code
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

# My Code
from airflow import DAG
from airflow.providers.mysql.hooks.mysql import  MySqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from airflow.models.xcom import XCom

The default_args stayed the same except I changed the retry_delay to 2 minutes so that I could get results faster.

The creation of a dag DAG object was reformatted but the parameters were unchanged.

The main change to the code structure was the sql queries for each task becoming python functions with more complex operations, one of which was executing these sql queries. The tasks themselves were changed to Python Operators but the aim of each task was kept the same. Each operator calls a python function.

In [None]:
# Recieved Code

t1 = PostgresOperator(
    task_id='create_account_summary',
    sql=create_account_summary,
    postgres_conn_id='local_postgres',
    dag=dag,
)

t2 = MySqlOperator(
    task_id='extract_financial_data',
    sql=extract_financial_data,
    mysql_conn_id='financial_mariadb',
    dag=dag,
)

t3 = PostgresOperator(
    task_id='load_account_summary',
    sql=load_account_summary,
    postgres_conn_id='local_postgres',
    dag=dag,
)

t1 >> t2 >> t3

# My Code

t1 = PythonOperator(
    task_id='create_account_summary',
    python_callable=_create_account_summary,
    dag=dag,
)

t2 = PythonOperator(
    task_id='extract_financial_data',
    python_callable=_extract_financial_data,
    dag=dag,
)

t3 = PythonOperator(
    task_id='load_account_summary',
    python_callable=_load_account_summary,
    dag=dag,
)

t1 >> t2 >> t3

### t1 - Create Account Summary

The same SQL query was used with the addition of a line to drop the table if it already exsisted so I didn't have to manually delete the table each time I ran the DAG. 

The primary difference was in changing the Operator from postgres to python I had to add a hook to use the postgres connection I had set-up. 

In [None]:
# Recieved Code
create_account_summary = """
    CREATE TABLE IF NOT EXISTS account_summary (
        account_id INT PRIMARY KEY,
        total_transactions INT,
        total_amount NUMERIC
    );
"""

# My Code
def _create_account_summary():
    request = "DROP TABLE IF EXISTS account_summary; CREATE TABLE account_summary (account_id INT PRIMARY KEY,total_transactions INT,total_amount NUMERIC);"
    pg_hook = PostgresHook(postgres_conn_id='local_postgres', schema='batch_process_03')
    connection = pg_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(request)
    connection.commit()

### t2 - Extract Financial Data

Similarly to t1 the same basic SQL query was used to extract and manipulate the relevant data, in this case removing the attempt to send the data to a local csv file as my guest user did not have the ability to do this. I used a mysql hook this time to connect to the database rather than the MySql Operator.

Once I had the data I had to push it to airflow using XCom so that it could be accessed in the next task. I used the airflow webserver to see the XComs and check that they were being pushed correctly. I considered turning the data into a panda dataframe but it was suggested online this was not good practice so I left the data in the tuple form it came out of the query in.

In [None]:
# Recieved Code
extract_financial_data = """
    SELECT account_id, COUNT(*) AS total_transactions, SUM(amount) AS total_amount
    FROM trans
    GROUP BY account_id
    INTO OUTFILE '/tmp/account_summary.csv'
    FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"'
    LINES TERMINATED BY '\n';
"""

# My Code
def _extract_financial_data(ti):
    request = "SELECT account_id, COUNT(*) AS total_transactions, SUM(amount) AS total_amount FROM trans GROUP BY account_id;"
    ms_hook = MySqlHook(mysql_conn_id='financial_mariadb', schema='financial')
    connection = ms_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(request)
    data = cursor.fetchall()
    ti.xcom_push(key='data', value = data)

### t3 - Load Account Summary

This task XCom pulls the data so that it can be used in this task. Another postgres hook is used to allow the use of the postgres connection. The data is inserted line by line into the table created in t1.

In [None]:
# Recieved Code
load_account_summary = """
    COPY account_summary(account_id, total_transactions, total_amount)
    FROM '/tmp/account_summary.csv' WITH CSV;
"""
# My Code
def _load_account_summary(ti):
    data = ti.xcom_pull(key='data', task_ids='extract_financial_data')
    pg_hook = PostgresHook(postgres_conn_id='local_postgres', schema='batch_process_03')
    connection = pg_hook.get_conn()
    cursor = connection.cursor()
    for row in data:
        request = f"INSERT INTO account_summary(account_id, total_transactions, total_amount) VALUES ({row[0]},{row[1]},{row[2]})"
        cursor.execute(request)
    connection.commit()

### My full code

In [None]:
from airflow import DAG
from airflow.providers.mysql.hooks.mysql import  MySqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from airflow.models.xcom import XCom

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

def _create_account_summary():
    request = "DROP TABLE IF EXISTS account_summary; CREATE TABLE account_summary (account_id INT PRIMARY KEY,total_transactions INT,total_amount NUMERIC);"
    pg_hook = PostgresHook(postgres_conn_id='local_postgres', schema='batch_process_03')
    connection = pg_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(request)
    connection.commit()

def _extract_financial_data(ti):
    request = "SELECT account_id, COUNT(*) AS total_transactions, SUM(amount) AS total_amount FROM trans GROUP BY account_id;"
    ms_hook = MySqlHook(mysql_conn_id='financial_mariadb', schema='financial')
    connection = ms_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(request)
    data = cursor.fetchall()
    ti.xcom_push(key='data', value = data)

def _load_account_summary(ti):
    data = ti.xcom_pull(key='data', task_ids='extract_financial_data')
    pg_hook = PostgresHook(postgres_conn_id='local_postgres', schema='batch_process_03')
    connection = pg_hook.get_conn()
    cursor = connection.cursor()
    for row in data:
        request = f"INSERT INTO account_summary(account_id, total_transactions, total_amount) VALUES ({row[0]},{row[1]},{row[2]})"
        cursor.execute(request)
    connection.commit()

with DAG(
    'financial_dag',
    default_args=default_args,
    description='A financial data extraction and loading DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 4, 28),
    catchup=False,
)as dag:
    
    t1 = PythonOperator(
        task_id='create_account_summary',
        python_callable=_create_account_summary,
        dag=dag,
    )

    t2 = PythonOperator(
        task_id='extract_financial_data',
        python_callable=_extract_financial_data,
        dag=dag,
    )

    t3 = PythonOperator(
        task_id='load_account_summary',
        python_callable=_load_account_summary,
        dag=dag,
    )

t1 >> t2 >> t3
