# Fundamentals of Data Pipeline

## What is a Data Pipeline?

A Data Pipeline is a unified system for capturing events, sometimes from different sources, to offer them to value generation initiatives (like analytics, building products, monitoring, etc.). 

![Example of a Data Pipeline](img/data-pipeline.png)

## What can we call EVENTS?

Each action executed can be called an event. And an action can be executed by:
- Users;
- Systems;
- Bots;

### Coarse-grained events

This type of event is a kind of *by-product* derived by the main product/operation being tracked. Usually, coarse-grained events are stored as text logs that can be used for debugging and analysis .

```json
{
  "id": 3969,
  "ip": "127.0.0.1",
  "timestamp": "2018-01-03 08:48:26.987-02Z",
  "action": "GET / HTTP/2.0",
  "status": 200
}
```

In some cases, the content of an action has some meaning that have to be known at the debugging or analysis time. For example, suppose you have the following event from an email app:

```json
{
  "id": 36728,
  "ip": "127.0.0.1",
  "timestamp": "2018-01-03 08:55:26.987-02Z",
  "action": "GET /inbox",
  "status": 200
}
```

This event have two possible meanings:
- User just loaded the entire app (if this is the first time loaded this session);
- User just refreshed his timeline/inbox;

So, this implementation details have to be documented and spread over the target people that is going to use it to run some analysis or debugging.

### Fine-grained events

This type of event have a record/ticket format like:
- app opened;
- auto refresh;
- user pull down refresh;

Using fine-grained events we annotate actions with some contextual information. So we don't have the risk of multiple meaning as we have with course-grained one.

### Course VS fine grained

Usually, course-grained events are more suitable for logging and debugging, once we don't the exact context that is going to embrace an issue or bug. So we need more **generic** data.

However, analytics initiatives have their scope/context well defined. So it is a good practice to create specific events for everything within the defined context.

The final message of this topic is: **decouple** loggig and analysis!

## Schema

Schema is the structure used to describe your data, providing a contract between **fields** collected and their **data types**.

### Schema and schema-less events

Nowadays, some schema-less formats have been widely used for events collection, like JSON and CSV format. And we also have the old but extensively used table format (e.g. SQL/relational).

The main advantages of having an event schema are:
- More efficient processing, once you don't have to spent resources for checking new fields of changes in data type;
- You don't need to write parsers;
- Easier to change schemas, once you previously know it;
- Facilities **automated anaytics**;

But there are drawbacks too:
- You put data processing effort in the application side;
- less flexibility for shipping new data;

# Show me the code

### Collecting data

Let's use Vizgr (www.vizgr.org) webservice to request some historical events from Wikpedia.

In [2]:
VIZGR_URL = "http://www.vizgr.org/historical-events/search.php?format=json&begin_date=-3000000&end_date=20171231&lang=pt"

In [9]:
import urllib2
import json

In [11]:
response = urllib2.urlopen(VIZGR_URL)
result = response.read()

In [26]:
print result[:300]

{"result": {"count": "7879", "event": {"date": "-292", "description": "In\u00edcio da constru\u00e7\u00e3o do '''''colosso de Rodes''''' pelo escultor rodiano Car\u00e9s de Lindos.", "lang": "pt", "granularity": "year"}, "event": {"date": "-280", "description": "T\u00e9rmino da constru\u00e7\u00e3o 


## Parsing

Here we go with our first issue. The URL response is not a good JSON schema because all events (embraced into a `event` key) is not structured as a list. So an automatic JSON parsing is not gonna work.

In [30]:
data = json.loads(result)

In [31]:
data.get('result')

{u'count': u'7879',
 u'event': {u'category1': u'Novembro',
  u'date': u'2013',
  u'description': u'Elei\xe7\xf5es em 2013',
  u'granularity': u'year',
  u'lang': u'pt'}}

As we can see, the automatic JSON parsing returns only one event (the last one). What can we do now?

#### Splitting the string

We can see in the raw JSON data that all event begin with the `event` key. So we can split the entire string by this key and get a list of all entries.

To be more assertive in the splitting, we are going to include in the default separator all the chars that will not be used in our JSON parsing. Thus our default separator will be **`', "event": '`**. 

In [36]:
result_splitted = result.split(", \"event\": ")

#### Checking split boundaries

In [42]:
for res in result_splitted[:3]: print res

{"result": {"count": "7879"
{"date": "-292", "description": "In\u00edcio da constru\u00e7\u00e3o do '''''colosso de Rodes''''' pelo escultor rodiano Car\u00e9s de Lindos.", "lang": "pt", "granularity": "year"}
{"date": "-280", "description": "T\u00e9rmino da constru\u00e7\u00e3o do '''colosso de Rodes''' \u2014 uma das '''''sete maravilhas do mundo antigo''''' \u2014 pelo escultor rodiano Car\u00e9s de Lindos.", "lang": "pt", "granularity": "year"}


As we can see, the first item do not compose one event once is only metadata (stats) from all events received in the response. So we are going to ignore it.

In [44]:
result_splitted[-3:]

['{"date": "2013", "description": "Desastres em 2013", "lang": "pt", "category1": "Novembro", "granularity": "year"}',
 '{"date": "2013", "description": "Divis\\u00f5es administrativas em 2013", "lang": "pt", "category1": "Novembro", "granularity": "year"}',
 '{"date": "2013", "description": "Elei\\u00e7\\u00f5es em 2013", "lang": "pt", "category1": "Novembro", "granularity": "year"}}}']

We also have an issue with the last item because it have an invalid JSON schema. The braces from the ignored keys (`result` and `count`) are in the end of the string. So we need to treat it too by ignoring the last two characters of the string.

In [46]:
': "year"}}}'[:-2]

': "year"}'

In [47]:
result_splitted = result[:-2].split(", \"event\": ")

Now we can safely split the string and get out event list.

In [48]:
events = [json.loads(res) for res in result_splitted[1:]]

In [50]:
events[:3]

[{u'date': u'-292',
  u'description': u"In\xedcio da constru\xe7\xe3o do '''''colosso de Rodes''''' pelo escultor rodiano Car\xe9s de Lindos.",
  u'granularity': u'year',
  u'lang': u'pt'},
 {u'date': u'-280',
  u'description': u"T\xe9rmino da constru\xe7\xe3o do '''colosso de Rodes''' \u2014 uma das '''''sete maravilhas do mundo antigo''''' \u2014 pelo escultor rodiano Car\xe9s de Lindos.",
  u'granularity': u'year',
  u'lang': u'pt'},
 {u'date': u'-280',
  u'description': u"Constru\xe7\xe3o do '''farol de Alexandria''' \u2014 uma das '''''sete maravilhas do mundo antigo''''' \u2014 pelo arquiteto e engenheiro S\xf3strato de Cnido.",
  u'granularity': u'year',
  u'lang': u'pt'}]

## Schema

Now that we have our events all parsed, we need to check their schema:
- Do they have the same schema?
- What are the fields?
- What are the data types?
- Are the data types homogeneous?

The first event have the following fieds and values:

In [51]:
events[0].keys()

[u'date', u'lang', u'description', u'granularity']

In [52]:
events[0].values()

[u'-292',
 u'pt',
 u"In\xedcio da constru\xe7\xe3o do '''''colosso de Rodes''''' pelo escultor rodiano Car\xe9s de Lindos.",
 u'year']

So seems that we have the following schema:

```json
{
  "date": "INTEGER",
  "lang": "STRING",
  "description": "STRING",
  "granularity": "STRING"
}
```

Let's check now for all entries. Let Pandas shine for you!

In [53]:
import pandas as pd

Now we are going to create a dataframe (table-like) to manage all events.

In [57]:
df = pd.DataFrame.from_dict(events)

In [59]:
df.head(10)

Unnamed: 0,category1,category2,date,description,granularity,lang
0,,,-292,Início da construção do '''''colosso de Rodes'...,year,pt
1,,,-280,Término da construção do '''colosso de Rodes''...,year,pt
2,,,-280,Construção do '''farol de Alexandria''' — uma ...,year,pt
3,,,-280,"Batalha de Heracleia, na qual Pirro enfrenta p...",year,pt
4,,,-279,Batalha de Ásculo entre os Romanos e Pirro.,year,pt
5,,,-275,einado de Arquídamo IV rei de Esparta de 305 a...,year,pt
6,,,-275,einado de Eudâmidas II rei de Esparta entre 27...,year,pt
7,,,-264,Os Mamertinos pedem ajuda a Roma e Cartago par...,year,pt
8,,,-263,Primeira guerra púnica - Hiero II é derrotado ...,year,pt
9,,,-263,Fundação de Belgrado a capital e maior cidade ...,year,pt


When we load all events we see two additional fields `category1` and `category2` that are not present into the first event. So we don't have an normalized schema to all events.

Let's check what we have into these categories:

#### Category 1

In [75]:
df[df.category1.isnull()].shape

(5272, 6)

In [76]:
df[df.category1.notnull()].shape

(2607, 6)

In [67]:
df.groupby(['category1']).count()

Unnamed: 0_level_0,category2,date,description,granularity,lang
category1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Abril,0,162,162,162,162
Agosto,0,127,127,127,127
Agosto a Outubro,0,3,3,3,3
América,0,3,3,3,3
Artes e Ciências,0,1,1,1,1
Artes e ciências,0,2,2,2,2
Asia,0,4,4,4,4
Brasil,0,14,14,14,14
Carnaval,0,13,13,13,13
Continente Americano,0,8,8,8,8


#### Category 2

In [77]:
df[df.category2.isnull()].shape

(7813, 6)

In [78]:
df[df.category2.notnull()].shape

(66, 6)

In [79]:
df.groupby(['category2']).count()

Unnamed: 0_level_0,category1,date,description,granularity,lang
category2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Artes e Ciências,1,1,1,1,1
China,2,2,2,2,2
Europa,8,8,8,8,8
Império Romano,25,26,26,26,26
Império romano,2,2,2,2,2
México,1,1,1,1,1
Pártia,1,1,1,1,1
Religião,9,9,9,9,9
Roma,7,7,7,7,7
Ásia,8,8,8,8,8


# References

1. https://www.slideshare.net/g33ktalk/data-pipeline-acial-lyceum20140624
2. https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying