### **8.5 - Elasticsearch Reminder**

In this video, I’m going to give you a quick reminder of what is Elasticsearch and how does it work.

So, what is Elasticsearch? Elasticsearch is an open source search engine providing a distributed full

-text search engine with an HTTP web interface and schema-free JSON documents.

It allows you to store, search and analyze large volumes of data.

It is well suited for collecting and aggregating log data at scale to look for trends,

statistics, summarizations and more in near real time.

How does it work?

There are two concepts to know in Elasticsearch.

The first one is the document.

A document can be seen as a row in a relational database and it corresponds to your data stored in

JSON format into Elasticsearch.

So basically, a document could be the following JSON data coming from a log file with different fields

such as Id, Name, and Loglevel as well as their associated values.

Since your log file contains many log events, each log event is a document along with a unique id.

Next,

these documents are stored into an Index which is a collection of documents. You can think of an index

as a database from a relational database perspective. For scalability reasons,

it’s a best practice to create a new index for each day when you are dealing with log events in Elastic

search.

By doing this, you will be able to request your logs

according to a given date or a range of dates, avoiding requesting all your data which would be very slow

and non optimized.

That’s why I set a date to the index name here. Then, an index can have one or more mapping types that

are used to divide documents into logical groups inside the same index. A mapping defines how a document

is indexed and how its fields those ones, are indexed and stored.

You can think of a mapping as a table in a relational database.

Basically, each document has a defined type as you can see from the field _type.

Finally, once your documents are mapped and stored into an index, you can start requesting them through

the REST API given by Elasticsearch.

All right,

one thing you have to know is that when we talk about Elasticsearch, we usually refer to the stack ELK

for Elasticsearch, Logstash and Kibana. Indeed, Elasticsearch is the document-oriented database

organizing your data but how can you actually ship your data into Elasticsearch and how once they are stored,

how can visualize them?

Well that’s where Logstash and Kibana

now step in.

Let’s begin with Kibana.

Kibana is an open source data visualization tool used for log analytics, application monitoring and

more.

It gives you a powerful and easy way to make dashboards on top of your data stored in Elasticsearch.

Don’t worry, we will see how to use Kibana in the next videos.

The last component of the stack ELK is Logstash. Logstash is a server side data processing pipeline

that can ingest data from multiple sources simultaneously, transform it and ship it to the output you

want such as Elasticsearch, Statsd, Kafka and so on.

In other words, it allows you to ingest data of different shapes and sources, then parse each event in

order to build a common format to finally send the processed data to one or multiple systems.

If you want to apply aggregations, filters or transformations to your log events before forwarding them,

then Logstash could be useful for you.

All right.

Before moving to the practice, there is actually one more tool used in the stack ELK that I want to

talk about which is Filebeat. Filebeat

is a lightweight shipper for forwarding and centralizing log data. Installed as an agent on your servers,

Filebeat monitors the log files or locations that you specify, collect log events, and forwards them to

either Elasticsearch or Logstash for indexing. Like logstash,

you have to define an input and an output but the transformations you can make on your data are very

limited.

Nonetheless, Filebeat has a very small CPU and memory footprint and that’s why it is usually installed

on the same node where the logs are in order to forward them to logstash which is running on another node

since it consumes a lot more resources.

All right that's it for this video.

No you have a better idea of what each tool does,

let’s move to the practice we are going to set up everything in order to monitor Airflow with

Elasticsearch.

See you in the next video.
8.6. [Practica] Configuring Airflow with Elasticsearch

After having a quick reminder about Elasticsearch, Kibana, Logstash and Filebeat, in this video we are

going to configure Airflow to work with Elasticsearch.

Here is the architecture we are going to set up for writing and reading log

events of Airflow in Elasticsearch.

Basically, we will have 3 docker containers corresponding to Logstash, Elasticsearch, and Kibana

as well as another container where the Airflow worker is running. Inside this container

we will install Filebeat in order to fetch the logs and ship them to Logstash.

One important point to keep in mind is that Airflow will not write the log events directly into Elastic

search.

I repeat, unlike with AWS S3 where Airflow was able to write the logs into S3, with Elastic

search it can only read them and so you have to set up Filebeat in order to make the import.

At the end, when a DAG will be triggered, the log events of a given task will be stored in JSON into

local look files.

These log files will be at the path given by the parameter base_log_foder

which is /usr/local/airflow/logs by default.

Then, each time a new log file is produced, Filebeat will process it,

add an offset to each log event and send the output to Logstash. Logstash will get the logs

and will apply some transformations in order to generate a log id field required by Airflow to finally

ship them into Elasticsearch.

Once the data are stored into Elasticsearch, we will be able to monitor our DAGs through Kibana by making

beautiful Dashboards. Before moving to the practice

there are two important points that you must care about. In order to read the logs from Elasticsearch,

Airflow assumes two things. First,

your log event should have a field called offset which will be used to display the logs in the right order.

This offset is automatically created by Filebeat when a given file is processed.

The second thing is that Airflow assumes that a field log_id corresponding to the concatenation

of the dag_id, task_id, execution_date and try_number

as shown here, is defined for each log events. The log_id field will be used by Airflow in order to

retrieve the log from Elasticsearch.

This field is not automatically created by Logstash and so you have to define some transformations

that I already made for you in order to generate it.

One last point. In order to use Elasticsearch with Airflow

you have to install the package elasticsearch of Airflow like we do with S3. All right without further waiting,

let’s move to the practice. From your code editor, check that you are under the folder airflow-materials

/airflow-section-8 and open the file docker-compose-CeleryExecutor

ELK.yml. Nothing new about Airflow, but if you scroll down,

you can see the service Elasticsearch and Kibana. The environment variable SERVER_HOST

must be set to 0.0.0.0 otherwise you won’t be able to access the UI of Kibana from

your web browser. Just below, we have the service Losgstash, with the following volume. This volume corresponds

to the folder containing the logstash pipelines where the logs will be shipped and processed. If you

open the file airflow-logs.conf in the folder mnt/logstash/pipeline,

here you have the pipeline that I made for you where the logs of Airflow will go through. This pipeline

is divided in three parts. The input where we specify that the log event will come from Filebeat

on the port 5044. Then, the filter where we define multiple transformations in order to parse

the JSON data, generates the log_id field as shown here, and move the field offset to the

root of the JSON document instead of being under the field log by default.

Finally, the last part corresponding to the output, is where the processed log event is redirected to

Elasticsearch. Notice the index format here so that each index name will begin with airflow-logs

and the current date. Dividing your logs by day into Elasticsearch is considered as a best practice in order

to optimize your requests.

All right, now let’s configure Airflow to work with Elasicsearch. Open the file airflow.cfg. First, since

we are going to use a remote storage for the logs, we have to set the parameter remote_logging

to true.

Now, unlike with the other remote storages such as S3 or GCP, we are not going to create a connection

neither set a remote_base_log_folder. Indeed, look for the section “elastic

search”.

and here we have the different parameters that we need to define.

Let us begin with the host which is http://elasticsearch:9200.

Since we are using Docker, notice that elasticsearch here, corresponds to the service name running

Elasticsearch which is elasticsearch. The next parameter to define is “write_stdout”.

This parameter allows to write the task logs to the standard output of the worker instead of the default

files. Since we actually want to store the logs in the default files, remove the value and let the parameter

empty.

This is important. Indeed,

I don’t know if it is a bug, but if you keep the parameter sets to False, with Elasticsearch enabled,

the log files won’t be produced.

So for now, keep the parameter empty.

Next, set the parameter json_format

to True since Elasticsearch expects JSON data.

Okay.

The parameter json_fields here, shows the fields that will be added in addition to your

log event.

So we will have the timestamp of the log, the filename where it has been printed out and so on.

You can find the list of all possible fields at the link below.

Last but not least, the parameter log_id_template here corresponds to the

format of the log id value used to retrieve the logs of a given task in Elasticsearch from the Airflow

UI.

By default, It is composed of the dag id, task id, execution date and number of tries, all separated by

dashes.

Keep in mind the template given here, should be the same as defined in the logstash pipeline.

Okay

So we are done with the config of Airflow,

we can start the architecture. From your terminal, start the docker containers by typing “docker-compose -

f

docker-compose-CeleryExecutorELK.yml up -d”.

Ok, type docker ps.

As you can see, we have three new docker containers which are elasticsearch, kibana and logstash. Now

let’s check that everything works fine. [Browser] From your web browser type localhost:9200.

Enter.

perfect, we obtain some information about Elasticsearch. Open a new tab, and type

localhost:5601,

Enter.

And we got Kibana

as expected. Last, in a new tab again, type localhost:8080,

Enter. And the Airflow UI is running. Perfect so everything works

let’s move forward. Back to your terminal. Connect to the airflow worker by typing “docker exec -it”

the container id of the worker,

“/bin/bash”.

Enter. Okay

at this point, Airflow is configured to read task logs from Elasticsearch.

The last step is to set up Filebeat in order to ship the logs into Logstash to finally store them into

Elasticsearch so that we will be able to read them from the Airflow UI.

First, we need to download Filebeat with the command “curl -L -O

and the following link https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.5.2-linux-x86_64.tar.gz”.

Enter.

Let’s wait for the download to finish.

Okay.

Extract the files with “tar xvzf”

and the name of the file. Like that.

Enter.

Then go inside the new folder and type “ls”.

All right, here we have the filebeat binary and the configuration file of Filebeat called filebeat

.yml. Open it with “vim filebeat.yml”.

The first thing we have to configure is where Filebeat should take a look to find the log files and

how they should be processed.

To do this, you have to specify an input. Filebeats brings many different inputs such as log files, Kafka,

S3, Redis and so on.

You can find the exhaustive list at the link below.

In our case we keep the input log which is the one to use when we need to read lines from log files.

Then, just below, we enable the input by setting the parameter to True.

Next, we have to set the path

where the log files are. If you remember when a task is executed, multiple folders are automatically created

respectively corresponding to the dag id, the task id, the execution date and finally the log file

with its name defined by the number of retries of that task.

So here we replace this path

by /usr/local/airflow/logs which is the default path of the log files.

and we type “/*/*/*/*.log”.

Here,

each wildcard, correspond to the dag id, the task id, the execution date, and the file respectively.

Check that the path is correct

otherwise Filebeat will throw you a lot of errors telling you that the JSON format is incorrect, or won’t

tell you anything if the path points somewhere where no log files exist.

Last but not least, we have to set the output. Look for logstash by typing /logstash.

Enter. Here, uncomment the line output.logstash

as well as the line defining the hosts.

Change localhost by logstash which is the service name of Logstash in the docker compose file.

Like that. Then, just above, comment the lines hosts and output.Elasticsearch from the Elasticsearch

output.

Perfect. Save the file. Now everything is set up, we can start Filebeat. Exit the file by typing :Q

and type ./filebeat -e -c

filebeat.yml -d

“publish”.

double quotes. This command willl start Filebeat

based on the configuration file filebeat.yml and enables the debug selector publish to filter

the logs of Filebeat. Enter.

Ok Filebeat is running. Let’s go the Airflow UI. Turn on the toggle of the DAG logger_dag

and wait for the DAGRun to finish.

Perfect.

Now go Kibana and click on “Explore on my own”. Then, from the left panel, click on the last icon here

and “Index Management”. As you can see, we have an index called airflow-logs with the current date corresponding

to when the logs have been processed.

Now click on “Index Patterns” just below Kibana and “Create Index Pattern”. From there,

type the index pattern that should match with the Elasticsearch index below.

So we type airflow-logs-*.

Ok,

the pattern matches the index. “Next step”.

Here, we select @timestamp which is the field that will be used to filter the data by time.

Notice that this field is automatically generated by Elasticsearch

when a log event is stored. Click on “Create index pattern”.

Perfect.

Here we obtain the mapping of the log documents.

Meaning, we are able to request these fields from Kibana to make filters, aggregations, statistics

and more on the data. From the left panel,

click on “Discover” just below the clock. And here they are. The log events of your tasks generated from

Airflow are stored and queryable from Elasticsearch.

Expand the first log by clicking here. If you scroll down, you can see the DAG id of the task,

some info about the host where Filebeat is running, the log level, the log id, the offset, the task id

and so on.

Notice that you can see the transformation we made from Logstash to the log on the field offset.

Here we have log.offset which is created by default by Filebeat and by applying our transformation

we could see the same value but with the name offset.

This is required since Airflow requests on a field called offset and not log.offset.

Finally. If you go back to the Airflow, then click on logger_

dag. “Graph View”.

Then “t2” and “view log”. As you can see, Airflow is able to retrieve the logs from Elasticsearch. Perfect.

So now you know how to set up everything to use Elasticsearch with Airflow.

Now it’s time to build a Dashboard in order to monitor your DAGs using Kibana and Elasticsearch. So keep

everything running and see you in the next video.

8.7. [Practica] Monitoring your DAGs with Elasticsearch

We have learned how to set up

Airflow in order to write and read log events using Elasticsearch, now it’s time to monitor our DAGs

with Kibana.

The first thing we are going to do is to make some cleaning. From the Airflow UI, turn off the toggle

of the DAG logger_dag and remove the tasks as well as the DAGRuns.

Like that.

Then, open your code editor, and check that you are under the folder airflow-materials/airflow

-section-8. From there, open the folder mnt/airflow/logs and delete the folder

logger_dag.

Perfect. Now, go to Kibana and click on the last icon from the left panel. Then “Index Management”,

select the index, click on “Manage Index”, “Delete Index” and confirm. Next, click on “Index Pattern”, select the pattern

and click on the red button here to delete it.

“delete”. All right.

Everything is clean,

we can move forward. In order to create a meaningful dashboard,

we are going to generate some log events. Back to the Airflow UI, turn on the toggle of the dag data

_dag. This DAG is composed of three tasks which basically do nothing except the last one that

will fail

half the time based on the current execution date.

We can take a look at it by clicking on the DAG, then “Graph View”, “fail” and “Rendered”. This is the bash command

executed by the task.

Valid here corresponds to the day of the execution date. Since the start date of the DAG is set 10 days

ago and the catchup parameter is enabled, the execution will change during the backfill process.

So depending if the day is pair or not, either exit 1 or exit 0 will be executed.

All right,

click on Tree View. And the DAGRuns are finished.

Okay it's done. From Kibana, click on “Index Management” and check that you have an index airflow-logs

as shown here.

Notice that the date would be different for you.

So, the log events have been stored in Elasticsearch,

Let's create the mapping again.

Click on “Index Patterns”, “Create index pattern”. Type “airflow-logs-*”.

The pattern and the index match. Click on “Next step”.

Select the field “@timestamp” to be able to filter the logs by time and validate by clicking on “Create

index pattern”. Perfect. Time to create the dashboard. A dashboard in Kibana is composed of multiple visualizations

which are based on Elasticsearch requests.

Let’s create the first one. From the left panel,

click on “Visualize” and “Create new visualization”.

Here, you can choose between multiple visualizations such as Gauge, Goal, Map, Heat Map and so on.

In our case we choose the Gauge which indicates if a given value goes beyond a defined threshold.

Basically, we are going to use it in order to monitor if a given DAG goes beyond a number of errors and so be warned

because it may be in trouble. Here, we choose the source airflow-logs

-*. Ok,

Now

on the left you have two options to customize your visualization. The Metrics which is Count by default but

you can set an average, a sum and so on.

Just below you can specify your label to the metric.

Let us type “Number of failed tasks”.

Then The Buckets here allows to split your visualization based on an aggregation. Click on “Add” and “Split

group”.

Select the aggregation Terms. And select the field dag_

id.keyword. By doing this we are going to obtain one Gauge for each DAG id existing in the log events.

To make things more clear,

open the Airflow UI, and turn on the toggle of the DAG logger_dag. Refresh the page until the DAGRun

is finished.

Ok, back to Kibana, if you click on the button “Refresh” you can see

that we have a new Gauge with the dag id logger_

dag as shown here. The number here, correspond to the total number of log events

since we don’t filter anything. Actually let’s apply a filter so that we only keep the tasks having failed.

From the search bar, click on it, type “message” and select message.

The difference between message and message.keyword is that first one allows you to make partial

matching with a given string whereas .keyword will match only if the string is exactly the same.

Now select the colon and type

“Task

exited with return code 1”.

Finally, since our DAGs are scheduled to run every day,

it would be better to have information on the past 7 days. To do this,

click on the calendar here, and “Last 7 days”. Perfect. Since logger_dag doesn’t have any failed

tasks,

if you change 1 by 0 here so that you filter on the tasks having succeeded. Enter. We obtain the

two gauges as expected.

Now under your modification and change 0 by 1 again. Enter. And we obtain the number of tasks having failed

for the dag

data_dag. Since the number is below 50, it is still considered as working fine.

Let’s say you want to define that above 10 errors

the DAG should be considered in trouble.

Well, if you click on “Options”, here you can define the different ranges. Replace the value 50 here by 10,

meaning if the number of errors is greater or equal to 0 and less than 10,

then the gauge will be in green.

Delete the warning range.

Then here, set 10 instead of 75 so that if the number of errors is greater than 10, then the gauge will

be in red.

Click on this button to apply the modifications. And the gauge is in red as expected.

All right,

now the visualization is set,

click on “Save”

here.

Let's name it,

“dag_id_gauge”

and click on “Save” again.

So we have created our first visualization, let's create a new one.

Click on “Visualizations”, “Create visualization”. Select the vertical bar at the bottom.

We select the index. Here, as we did with the Gauge,

we create a new bucket on the x-axis.

We select the aggregation “Terms” and the field “task_id.keyword”.

Click on “Apply changes”. And we obtain the number of log events by tasks. Save the visualization,

name it “task_id_vertical_bar”.

“Save” again. Now go to “Dashboard”,

“Create new dashboard”. Click on “Add”, then “dag_id_gauge” and “task_id_

vertical_bar”. Close the panel. And as you can see, the visualizations have been added to the

dashboard. Save it by clicking here, let’s give the name “Dag Monitoring”.

Turn on “store time with dashboard” and click on “Save”. Perfect you just have created your first Dashboard

and you are now able to monitor if a given DAG has too many failed tasks in the last 7 days as well

as the number logs produced per task ids. As you may guess, Elasticsearch is extremely powerful and

you can really move to the next level for monitoring your applications. I strongly recommend

you to take a look at it. If you want more about monitoring Airflow with Elasticsearch, please

let me know in the Q/A section, I will be glad to help you. All right, before moving forward, open your

terminal, and check that you are under the folder airflow-materials/airflow-section-8.

Then type the command “docker-compose -f

docker-compose-CeleryExecutorELK.yml down”

So that's it. I hope you enjoy what you have learned.

Take a quick break and see you in the next video.
