# Context Variables

### Introduction

Now that we know how to extract data from RDS, and move it to S3, to eventually move it to airflow, let's see how we can make sure we are not copying over data we have already moved.  To do so, we need to know information about the last time we have selected our data.

Airflow gives us information about the specific operations through context variables.

### Context Variables

We can use context variables in airflow with something like the following:

```python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

def extract_sql(*args, **kwargs):
    return f"Date executed: {kwargs['execution_date']}"

dag = DAG(dag_id = 'etl_dag', start_date = datetime.now() - timedelta(days = 1))
task = PythonOperator(
    task_id='sql_task',
    python_callable=extract_sql,
    provide_context=True,
    dag=dag)
```

So as we see from the above, context variables provide access to metadata about the task that is run.  To access this metadata, we set `provide_context` to `True` when creating the task.  And then we are given a dictionary of metadata that we access through the `kwargs` argument of the python callable.

> In the example above, we decide to access the execution date.

If we boot up the dag, and check the logs we'll see something like the following:

> <img src="./date_executed_1.png" width="90%">

> So we can ssee that the date executed is in the return value.

Now even more useful to us than the date executed is the previous execution date.

```python
def extract_sql(*args, **kwargs):
    return f"""Current Date: {kwargs['execution_date']}, previous date: {kwargs['prev_execution_date']}"""
```

Having access to the previouss execuction date is valuable because when moving over our data to s3, we can first only select data created after the previous task was run:

```python

def extract_sql(*args, **kwargs):
    rds_hook = PostgresHook('rds')
    s3_hook = S3Hook('s3_connection')
    query = f"""SELECT * FROM zipcodes WHERE created_at > {kwargs['prev_execution_date']};"""
    zipcode_records = rds_hook.get_records(query)
    
    mem_file = io.StringIO()
    csv_writer = csv.writer(mem_file, lineterminator=os.linesep)
    csv_writer.writerows(zipcode_records)
    # encode into a byte stream
    mem_file_binary = io.BytesIO(mem_file.getvalue().encode())
    s3_hook.load_file_obj(
       file_obj=mem_file_binary,
       bucket_name='jigsaw-sample-data',
       key=f"zipcodes-{kwargs['ds']}.csv",
       replace=True,
   )
```

So here, we can see that when selecting records, we select records after the previous execution date.  And then when we upload with the current execution data -- available via `ds`.

### Resources


[Airflow Blog - Zen of Python](https://godatadriven.com/blog/the-zen-of-python-and-apache-airflow/)

[Prev Execution Date](https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html)


[Full Example Engineering Blog](https://medium.com/leboncoin-engineering-blog/data-traffic-control-with-apache-airflow-ab8fd3fc8638)