Version 2: 

Same as above, but this time you export the result to a CSV, then you take the CSV and upload it to the database again 

Steps: 
Get the data from the database and export it to a CSV. There are multiple ways on how to do this: 
* run a Postgres query with the PostgresOperator that exports a CSV on your system (easy) The command you are looking for is COPY 
* use the PythonOperator to get the data from the database and stores it as a CSV (medium) - You might want to use the python library psycopg2 and pandas for this
* use the Airflow Hooks to build your own Operator (hard) -> this is the preferred method. Check the airflow documentation for Operators to see how this is build (this is an overkill for the task, but worthwhile checking it)

Read the CSV file and store it in the database again, also multiple ways possible
* run a query with the PostgresOperator that imports the CSV into a table (easy)
* use the PythonOperator to save the data into the database (medium)
* use Python in conjunction with the airflow PostgresHook to build your own Operator (hard) 

This task is heavily emphasized on getting your hands dirty with airflow and python :-)


Here are some resources, that is very helpful: 
airflow documentation: https://airflow.apache.org/ -> There is a tutorial, you should definitely do first
airflow GitHub repository: https://github.com/apache/incubator-airflow (good to understand the Hooks and Operators in a very granular Detail) 
airflow ETL principles https://gtoonstra.github.io/etl-with-airflow/principles.html (very advanced, you do not have to read this, it is just as an additional resource here)

psycopg2 documentation http://initd.org/psycopg/docs/ (to connect python with Postgres)
pandas documentation https://pandas.pydata.org/pandas-docs/stable/ (for data manipulation)


In [4]:
import airflow

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator

from airflow.models import Variable

from datetime import datetime, timedelta

In [5]:
import pandas as pd
import psycopg2

In [13]:
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'email': ['tjdcevans@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

connection_parameters = {
    'host' : 'localhost',
    'port' : '5432',
    'dbname' : 'asana',
    'user' : 'theoevans'
}

filepath = Variable.get('asana_data') + 'completed_tasks.csv'

fetch_query = """
    SELECT u.name as name,
        COUNT(*) as tasks,
        COUNT(NULLIF(completed, 'False') ) as complete,
        COUNT(NULLIF(completed, 'True') ) as incomplete
    FROM tasks t
    LEFT JOIN users u ON u.id = t.assignee_id
    GROUP BY u.name;"""

insert_query = """
    CREATE TABLE IF NOT EXISTS completed_tasks
    (
        name varchar(255),
        tasks bigint,
        complete bigint,
        incomplete bigint
    );
    
    COPY completed_tasks
    FROM '%s' DELIMITER ',' CSV HEADER;
    """ % filepath

dag = DAG('asana_postgres_v2', default_args=default_args, schedule_interval=None)

In [14]:
def export_query():
    conn = psychopg2.connect(**connection_parameters)
    with conn.cursor() as cursor: 
        cursor.execute(fetch_query)
        fetch = cursor.fetchall()
    conn.close()
    df = pd.DataFrame(fetch)
    df.to_csv(filepath)
    
def import_query():
    conn = psychopg2.connect(**connection_parameters)
    with conn.cursor() as cursor: 
        cursor.execute(insert_query)
    conn.close()


In [15]:
t1 = PythonOperator(
    task_id='query_db_and_export',
    python_callable=export_query,
    dag=dag)

t2 = PythonOperator(
    task_id='import_csv_to_db',
    python_callable=import_query,
    dag=dag)

In [None]:
t2.set_downstream(t1)