In this notebook we go through the process for storing data in Elasticsearch (this process is called *ingesting*). We use data from the [TidyTuesday event](https://github.com/rfordatascience/tidytuesday). In particular, we picked the yearly dataset [Week 31 - R and Package download stats](https://github.com/rfordatascience/tidytuesday/tree/master/data/2018/2018-10-30) which contains anonymized package and R language downloads from the RStudio CRAN mirror between October 20, 2017 and October 20, 2018. It consists on the following variables:

* **date**: date of download (y-m-d).
* **time**: time of download (in UTC).
* **size**: size (in bytes).
* **version**: R release version.
* **os**: Operating System.
* **country**: Two letter ISO country code.
* **ip_id**: Anonymized daily ip code (unique identifier).

To *ingest* the dataset to Elasticsearch we need to format or remove some columns and then save it to a *.csv* file. Then we use atom along with package json-converter to export the dataset to a json object (Elasticsearch format). Finally, we use Elasticsearch *bulk API* to ingest the dataset.

In [1]:
# Import libraries
import pandas as pd
import json
import numpy as np

In [2]:
df = pd.read_csv("data/r_downloads_year.csv")

In [3]:
df.head()

Unnamed: 0.1,Unnamed: 0,date,time,size,version,os,country,ip_id
0,1,2017-10-23,14:29:18,78171332,3.4.2,win,ES,1
1,2,2017-10-23,14:29:22,20692638,3.4.2,win,PT,2
2,3,2017-10-23,14:29:57,972075,3.4.2,win,PL,3
3,4,2017-10-23,14:30:00,1032203,3.0.3,win,JP,4
4,5,2017-10-23,14:30:18,78171332,3.4.2,win,CN,5


## Data cleaning

#### date_time
One of the datetime formats accepted by Elasticsearch is *yyyy-MM-ddTHH:mm:ssZ*. This format allows us to add a time zone designator (we use Zulu). More information on this format can be found in this [link](https://en.wikipedia.org/wiki/ISO_8601). We reformat date_time column to this format and rename it to *@timestamp*.

###  id_ip
We rename this column to *id* so we can use it as an identifier in Elasticsearch.

### date, time
We drop these columns.

In [4]:
#Create a single column that contains download date and time 
df['date_time'] = df.date.str.cat(df.time, sep = 'T')
df['date_time'] = df.date_time + ".000Z"

#Rename date_time column to @timestamp
df.rename(columns={'date_time': '@timestamp', 'ip_id':'id'}, inplace = True)

#Drop Id columns
df.drop(['Unnamed: 0', 'date', 'time'], axis = 1, inplace = True)

In [5]:
#Quick look at data
df.head()

Unnamed: 0,size,version,os,country,id,@timestamp
0,78171332,3.4.2,win,ES,1,2017-10-23T14:29:18.000Z
1,20692638,3.4.2,win,PT,2,2017-10-23T14:29:22.000Z
2,972075,3.4.2,win,PL,3,2017-10-23T14:29:57.000Z
3,1032203,3.0.3,win,JP,4,2017-10-23T14:30:00.000Z
4,78171332,3.4.2,win,CN,5,2017-10-23T14:30:18.000Z


## Ingesting data to Elasticsearch

Ideally, we would have *streaming data* coming from different sources in real time. Specially for this example in which downloads happen all the time. We have static data and the process we go through might not be quite useful but is good to get started playing around with Elasticsearch's capabilities. When we get to the point of making a dashboard we'll pretend our data is changing constantly :)

In Elasticsearch each record or row is called a *document* and uses *JSON* as the serialisation format. An index contains many documents. To export the dataset to a proper *JSON* format for Elasticsearch we use editor *Atom* along with the plugin *json-converter*. For detailed instructions on how to do this go to this [link]().

In [7]:
#Save data to csv
df.sample(n=200000).to_csv("data/r_downloads_year_cleaned.csv", index=False)

After exporting our csv file to json (using Atom) we ingest our data via *curl* command. Go to this [link]() for more information on this part. 

**Note:** Another approach is to use *elasticsearch library* for Python.