## Building Data Pipelines in Airflow

Every DAG is going to have some standard, boilerplate code to make it run in Airflow

You will always import the needed libraries, and then any other libraries you need for your
tasks. In the following code block, you import the operator, DAG, and the time libraries
for Airflow. For your tasks, you import the pandas, psycopg2, and elasticsearch
libraries:

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
import psycopg2 as db
from elasticsearch import Elasticsearch

[[34m2022-08-15 11:14:10,080[0m] {[34mutils.py:[0m159} INFO[0m - NumExpr defaulting to 4 threads.[0m


To query PostgreSQL, you create the connection, execute the sql query using the
pandas read_sql() method, and then use the pandas to_csv() method to write
the data to disk

In [3]:
#Create a connection string that contains the host, database, username and password
conn_string="dbname='Data Engineering' user='postgres' password='KARima@2019?'"

In [4]:
#Create the connection object by passing the connection string to the connect()method
conn=db.connect(conn_string)

In [5]:
#create the cursor from the connection
cur=conn.cursor()

In [6]:
def queryPostgresql():
    conn_string="dbname='Data Engineering' user='postgres' password='KARima@2019?'"
    conn=db.connect(conn_string)
    cur=conn.cursor()
df=pd.read_sql("select name,city from task1",conn)
df.to_csv('postgresqldata.csv')
print("-------Data Saved------")

-------Data Saved------


To insert the data into Elasticsearch, you create the Elasticsearch object connecting
to localhost. Then, read the CSV from the previous task into a DataFrame, iterate
through the DataFrame, converting each row into JSON, and insert the data using the
index method

In [7]:
#Create a connection to elastic search and type your password
es=Elasticsearch('https://localhost:9200', verify_certs=False, basic_auth=('elastic', '5_AQGZ0kKSiqSI_fhaPF'))

In [8]:
def insertElasticsearch():
    es = Elasticsearch('https://localhost:9200', verify_certs=False, basic_auth=('elastic', '5_AQGZ0kKSiqSI_fhaPF'))
df=pd.read_csv("postgresqldata.csv")
df

Unnamed: 0.1,Unnamed: 0,name,city
0,0,Big Bird,Fakeville
1,1,Brittany Mendoza,Nicolechester
2,2,Brenda Bowen,Catherineton
3,3,Keith Yang,Bridgetburgh
4,4,Timothy Mccormick,Tuckerside
...,...,...,...
996,996,Nancy Ramsey,Ryanburgh
997,997,James Jenkins,Paulaport
998,998,Sarah Herrera,Annshire
999,999,Theresa Mcbride,West Andreburgh


In [9]:
for i, r in df.iterrows():
    doc=r.to_json()
res=es.index(index="frompostgresql", body=doc)
print(res)

[[34m2022-08-15 11:14:20,764[0m] {[34m_transport.py:[0m336} INFO[0m - POST https://localhost:9200/frompostgresql/_doc [status:201 duration:0.803s][0m
{'_index': 'frompostgresql', '_id': '133_oIIBM6TF6_YKRlrW', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}


Next, you will specify the arguments for your DAG. Remember that the start time should
be a day behind if you schedule the task to run daily

In [10]:
default_args = {
 'owner': 'tolani',
 'start_date': dt.datetime(2022, 9, 14),
 'retries': 1,
 'retry_delay': dt.timedelta(minutes=5),
}

Now, you can pass the arguments to the DAG, name it, and set the run interval. You will
define your operators here as well. In this example, you will create two Python operators –
one to get data from PostgreSQL and one to insert data in to Elasticsearch. The getData
task will be upstream and the insertData task downstream, so you will use the >> bit
shift operator to specify this

In [12]:
with DAG('MyDBdag',
 default_args=default_args,
 schedule_interval=timedelta(minutes=5),
 # '0 * * * *',
 ) as dag:
    print_starting = BashOperator(task_id="starting" ,bash_command='echo "I am reading the PostgreSQL now....."')
    getData = PythonOperator(task_id='QueryPostgreSQL',python_callable=queryPostgresql)
    insertData = PythonOperator(task_id='InsertDataElasticsearch',python_callable=insertElasticsearch)
print_starting >> getData >> insertData

<Task(PythonOperator): InsertDataElasticsearch>