# Chapter 2: Agile Tools

In this chapter we will briefly introduce our software stack, enabling you to see the tools in action and become proficient in their use.

![ads2_0201.png](attachment:ads2_0201.png)

![ads2_0202.png](attachment:ads2_0202.png)

## Working with JSON

We'll be working with JSON throughout the book, both to store and process data in the 'back end' and to send data from a web application to the 'front end' web browser. In agile data science, we will often collect, process and store events in JSON format.

![ads2_0209.png](attachment:ads2_0209.png)



In [1]:
#
# How to read and write JSON and JSON Lines files using Python
#

import sys, os, re
import json

ary_of_objects = [
  {'name': 'Russell Jurney', 'title': 'CEO'},
  {'name': 'Muhammad Imran', 'title': 'VP of Marketing'},
  {'name': 'Fe Mata', 'title': 'Chief Marketing Officer'},
]

path = "data/test.jsonl"

#
# Write our objects to jsonl
#

f = open(path, 'w')

for row_object in ary_of_objects:
  json_record = json.dumps(row_object, ensure_ascii=False)
  f.write(json_record + "\n")

f.close()

print("Wrote JSON Lines file data/test.jsonl\n")

#
# Read this jsonl file back into objects
#

ary_of_objects = []

f = open(path, "r")
for line in f:
  record = json.loads(line.rstrip("\n|\r"))
  ary_of_objects.append(record)

print(ary_of_objects)

print("\nRead JSON Lines file data/test.jsonl")

Wrote JSON Lines file data/test.jsonl

[{'name': 'Russell Jurney', 'title': 'CEO'}, {'name': 'Muhammad Imran', 'title': 'VP of Marketing'}, {'name': 'Fe Mata', 'title': 'Chief Marketing Officer'}]

Read JSON Lines file data/test.jsonl


In [2]:
%%bash

cat data/test.jsonl

{"name": "Russell Jurney", "title": "CEO"}
{"name": "Muhammad Imran", "title": "VP of Marketing"}
{"name": "Fe Mata", "title": "Chief Marketing Officer"}


### Verifying our Work

Now lets verify that our file, `data/test.jsonl` has been written. Go to the shell prompt for your virutal machine and type the command:

```
ls data/test.jsonl
cat data/test.jsonl
```

You should see the JSON that we wrote reproduced in your console.

## Data Processing with Spark

![ads2_0211.png](attachment:ads2_0211.png)

Once we have our data files on disk, we will use Spark to process them. Spark is a general purpose computing framework that scales well to handle large datasets by being distributed across multiple PC machines. In fact, Spark is the leading general-purpose distributed data processing platform. 

Spark works by breaking up data processing across networks of commodity PC machines, each acting on data on its own local disk and RAM. Spark’s job is to coordinate these machines into a single computing platform. The fact that Spark is a distributed platform is essential to it scaling to data of any size, and Spark is great at this. It works well in “local mode” on one machine, and it works well on clusters of thousands of machines. This meets our requirement that our tools scale to data of any size. Spark is also excellent glue, with connectors to many different systems including Kafka and databases like MongoDB.

Spark for Python is called PySpark. We'll be using PySpark, but note that Spark can also work with other languages like Java, Scala or R.

![ads2_0212.png](attachment:ads2_0212.png)

The image above describes the Spark ecosystem. Spark runs on top of HDFS or S3 and includes Spark SQL, Spark MLlib, and Spark Streaming. We'll be using Spark SQL and Spark MLlib later in this tutorial.

Spark __local mode__ lets us run Spark on small data locally, for development. We’ll be using Spark local mode throughout this course. The idea is that you can develop locally to learn, and then later on use a Spark cluster as your data grows —- although we should note that, with EC2 instances available with 2 TB of RAM, “local mode” in Spark can still process pretty big datasets! The reason to use a cluster, then, is more around reliability through redundancy and satisfying the cost/benefit curve, where multiple cheaper machines are less expensive than one monster.

## Initializing PySpark

We will be working with PySpark directly from Jupyter notebooks like this one. In order to initialize PySpark, you will need to run the following code. Give your application a name like `"Introducing PySpark"` and then run the code below. 

This will initialize the two objects you will need to use Spark: `SparkContext`, which is named `sc` and `SparkSessions` which is named `spark`. We will see how to use these two objects to perform work below. We'll be using the `config` method for adding a `spark.jars.packages` for MongoDB. This is general pattern for integrating databases and other systems with Spark. Note that for more complicated uses like an external MongoDB server, you will need to check the [configuration documentation](https://docs.mongodb.com/spark-connector/current/configuration/).

In [3]:
from pyspark.sql import SparkSession


# Initialize PySpark with MongoDB support
APP_NAME = "Introducing PySpark"
spark = (
    SparkSession.builder.appName(APP_NAME)
    # Load support for MongoDB and Elasticsearch
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,org.elasticsearch:elasticsearch-spark-30_2.12:7.14.2")
    # Add Configuration for MongopDB
    .config("spark.mongodb.input.uri", "mongodb://mongo:27017/test.coll")
    .config("spark.mongodb.output.uri", "mongodb://mongo:27017/test.coll")
    .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("ERROR")

print("\nPySpark initialized...")



:: loading settings :: url = jar:file:/usr/local/spark-3.2.0-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
org.elasticsearch#elasticsearch-spark-30_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-79481ed5-1fb8-4d65-a47b-cd79a56860fb;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
	found org.elasticsearch#elasticsearch-spark-30_2.12;7.14.2 in central
	found org.scala-lang#scala-reflect;2.12.8 in central
	found org.slf4j#slf4j-api;1.7.6 in central
	found commons-logging#commons-logging;1.1.1 in central
	found javax.xml.bind#jaxb-api;2.3.1 in central
	found com.google.protobuf#protobuf-java;2.5.0 in central
	found org.apache.spark#spark-yarn_2.12;3.0.1 in central
:: resolution repo


PySpark initialized...


You should see the words `PySpark initialized...` appear after some initialization output the first time you run the notebook from the [MongoDB Connector for Apache Spark](https://www.mongodb.com/products/spark-connector). This means you are ready to process data in Spark!

### Our First Spark Job

For our first Spark job, we're going to load some CSV (Comma Separated Value) data and count the number of records present. We're going to do so using the [Spark RDD API](https://spark.apache.org/docs/latest/rdd-programming-guide.html), which was later superseded by the [Spark SQL DataFrame API](https://spark.apache.org/docs/latest/sql-programming-guide.html). Note the use of the `map` command to run a function on each record (a line of text as input, and a list as output), and the use of the `collect` method to gather data from PySpark's memory (potentially on a large compute cluster) to local memory (on this machine you're using).

In [4]:
%%bash

cat ../lib/data/example.csv

Russell Jurney,Relato,CEO
Florian Liebert,Mesosphere,CEO
Don Brown,Rocana,CIO
Steve Jobs,Apple,CEO
Donald Trump,The Trump Organization,CEO
Russell Jurney,Data Syndrome,Principal Consultant


In [5]:
csv_lines = sc.textFile("../lib/data/example.csv")
print(type(csv_lines))

data = csv_lines.map(lambda line: line.split(","))

data.first()

<class 'pyspark.rdd.RDD'>


                                                                                

['Russell Jurney', 'Relato', 'CEO']

In [6]:
# This is a lambda
a = lambda x: x * 2

# This is its output
print(a(2))

# This is the equivalent function
def b(x):
    """Multiply a number by 2"""
    return x * 2

# See how they work?
print(b(2))

4
4


In [7]:
data.first()

['Russell Jurney', 'Relato', 'CEO']

In [8]:
data.take(2)

[['Russell Jurney', 'Relato', 'CEO'], ['Florian Liebert', 'Mesosphere', 'CEO']]

In [9]:
data.collect()

[['Russell Jurney', 'Relato', 'CEO'],
 ['Florian Liebert', 'Mesosphere', 'CEO'],
 ['Don Brown', 'Rocana', 'CIO'],
 ['Steve Jobs', 'Apple', 'CEO'],
 ['Donald Trump', 'The Trump Organization', 'CEO'],
 ['Russell Jurney', 'Data Syndrome', 'Principal Consultant']]

You can count the number of records in any RDD with the `count` method.

In [10]:
data.count()

6

## Dicts are Easier to Work With than Lists

It is easier to work with records that have named fields than to work with lists with numeric fields. Before we proceed, lets convert our data into dictionaries.

In [11]:
dict_data = data.map(
    lambda record: { 'name': record[0], 'company': record[1], 'title': record[2] }
)

dict_data.collect()

[{'name': 'Russell Jurney', 'company': 'Relato', 'title': 'CEO'},
 {'name': 'Florian Liebert', 'company': 'Mesosphere', 'title': 'CEO'},
 {'name': 'Don Brown', 'company': 'Rocana', 'title': 'CIO'},
 {'name': 'Steve Jobs', 'company': 'Apple', 'title': 'CEO'},
 {'name': 'Donald Trump', 'company': 'The Trump Organization', 'title': 'CEO'},
 {'name': 'Russell Jurney',
  'company': 'Data Syndrome',
  'title': 'Principal Consultant'}]

### Printing Data as JSON

Sometimes it is hard to read records without proper formatting. The `json` module can help with its `sort_keys` and `indent` options.

In [12]:
import json


print(
    json.dumps(
        dict_data.collect(),
        indent=4,
        sort_keys=True
    )
)

[
    {
        "company": "Relato",
        "name": "Russell Jurney",
        "title": "CEO"
    },
    {
        "company": "Mesosphere",
        "name": "Florian Liebert",
        "title": "CEO"
    },
    {
        "company": "Rocana",
        "name": "Don Brown",
        "title": "CIO"
    },
    {
        "company": "Apple",
        "name": "Steve Jobs",
        "title": "CEO"
    },
    {
        "company": "The Trump Organization",
        "name": "Donald Trump",
        "title": "CEO"
    },
    {
        "company": "Data Syndrome",
        "name": "Russell Jurney",
        "title": "Principal Consultant"
    }
]


### Grouping Data

The `GROUP BY` is a fundamental operation in data processing. Lets see how this works in Spark. We will group the list of executives by job title.

In [13]:
grouped_by_title = dict_data.groupBy(
    lambda record: record['title']
)

grouped_by_title.collect()

[('CEO', <pyspark.resultiterable.ResultIterable at 0x7ffff83f3250>),
 ('CIO', <pyspark.resultiterable.ResultIterable at 0x7ffff83af130>),
 ('Principal Consultant',
  <pyspark.resultiterable.ResultIterable at 0x7ffff83af940>)]

### Inspecting Grouped Data

Note that `RDD.groupBy` produces a tuple with two fields: the key we grouped by and a `pyspark.resultiterable.ResultIterable.` If you want to inspect each group, you can cast the `ResultIterable` to a `list`.

In [14]:
record_groups = grouped_by_title.map(
    lambda record: (
        record[0], 
        list(record[1])
    )
)

record_groups.collect()

[('CEO',
  [{'name': 'Russell Jurney', 'company': 'Relato', 'title': 'CEO'},
   {'name': 'Florian Liebert', 'company': 'Mesosphere', 'title': 'CEO'},
   {'name': 'Steve Jobs', 'company': 'Apple', 'title': 'CEO'},
   {'name': 'Donald Trump',
    'company': 'The Trump Organization',
    'title': 'CEO'}]),
 ('CIO', [{'name': 'Don Brown', 'company': 'Rocana', 'title': 'CIO'}]),
 ('Principal Consultant',
  [{'name': 'Russell Jurney',
    'company': 'Data Syndrome',
    'title': 'Principal Consultant'}])]

Once again, we can use JSON to more clearly format the data. This shows our grouped records: the group key, followed by a `list` of `dict` objects with the fields of our original `dict` records. There is nothing special about grouped data, it is simply grouped to be processed in its group context.

In [15]:
print(json.dumps(record_groups.collect(), indent=4, sort_keys=True))

[
    [
        "CEO",
        [
            {
                "company": "Relato",
                "name": "Russell Jurney",
                "title": "CEO"
            },
            {
                "company": "Mesosphere",
                "name": "Florian Liebert",
                "title": "CEO"
            },
            {
                "company": "Apple",
                "name": "Steve Jobs",
                "title": "CEO"
            },
            {
                "company": "The Trump Organization",
                "name": "Donald Trump",
                "title": "CEO"
            }
        ]
    ],
    [
        "CIO",
        [
            {
                "company": "Rocana",
                "name": "Don Brown",
                "title": "CIO"
            }
        ]
    ],
    [
        "Principal Consultant",
        [
            {
                "company": "Data Syndrome",
                "name": "Russell Jurney",
                "title": "Principal Consultant"
    

## Counting Grouped Data

In the `pyspark.RDD` API, the way we count groups is to use raw Python, the `len` command. We'll see later how the `pyspark.SQL` API uses higher level functions to do the same calculation.

In [16]:
title_counts = grouped_by_title.map(
    lambda record: (
        record[0], 
        len(record[1])
    )
)

title_counts.collect()

[('CEO', 4), ('CIO', 1), ('Principal Consultant', 1)]

### PySpark Summary

We'll come back to PySpark's RDD API later on, but first we'll introduce MongoDB and the DataFrame API!

## Introducing MongoDB

![ads2_0215.png](attachment:ads2_0215.png)

Once we have processed data, we need to __publish__ it somewhere so it can be accessed and used. We use MongoDB to accomplish this. 

### Opening a Terminal

To access MongoDB, we will use a terminal. Open the Home tab of your browser, or click [here](/). 

Now select the `New` menu item at the top right, and then select `Terminal`. 

![New_Terminal.png](attachment:New_Terminal.png)

This will open up a new shell terminal in a new tab.

![Terminal.png](attachment:Terminal.png)

At the terminal prompt, now bring up the mongo shell by typing: 

```
mongosh mongo:27017/agile_data_science
```

This will bring up a prompt with a `>` next to it. This is where you enter commands to be executed by MongoDB.

Begin by listing the collections in the database. 

```
show collections
```

This will list nothing in particular, only system collections because we haven't created any yet. Lets create one now. In MongoDB you can create a collection by inserting a single record. Lets insert a record describing me, Russell Jurney!

```
db.my_collection.insert({"name": "Russell Jurney"});
```

`db` is shorthand for database. It references the database we have open, in this case `agile_data_science`. Next comes `my_collection` which refernces the collection or table inside the database we want to access. Next comes a collection command. In this case we `insert` a record, which we enter as JSON.

Now that the record is inserted and the table exists, we can find the record.

```
db.my_collection.find({"name": "Russell Jurney"});
```

And it will return the record you entered! As you can see, using MongoDB is simple. That is why it is part of our stack.

## Publishing Data to MongoDB from PySpark

Pushing data to MongoDB from PySpark is easy with the [MongoDB Connector for Apache Spark](https://www.mongodb.com/products/spark-connector).

You can setup PySpark to push to your own MongoDB cluster by setting the `spark.mongodb.input.uri` and `spark.mongodb.input.uri` config keys and loading the `org.mongodb.spark:mongo-spark-connector_2.12` Maven package as shown below. You can set a username and password with `mongodb://root:admin@mongo:27017/my_db/my_collection`.

```python
spark = (
    SparkSession.builder.appName(APP_NAME)
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")
    .config("spark.mongodb.input.uri", "mongodb://mongo:27017/test.coll")
    .config("spark.mongodb.output.uri", "mongodb://mongo:27017/test.coll")
    .getOrCreate()
)
```

Note that we have already configured PySpark to connect to MongoDB via the `SparkSession` setup above, so we can run PySpark as normal and on

In [16]:
csv_lines = spark.read.csv("../lib/data/example.csv")

# Convert to a Pandas DataFrame to display well in a Jupyter Notebook
csv_lines.toPandas()

Unnamed: 0,_c0,_c1,_c2
0,Russell Jurney,Relato,CEO
1,Florian Liebert,Mesosphere,CEO
2,Don Brown,Rocana,CIO
3,Steve Jobs,Apple,CEO
4,Donald Trump,The Trump Organization,CEO
5,Russell Jurney,Data Syndrome,Principal Consultant


In [17]:
import pyspark.sql.functions as F


executives = csv_lines.select(
    F.col("_c0").alias("name"),
    F.col("_c1").alias("company"),
    F.col("_c2").alias("title")
)

executives.toPandas()

Unnamed: 0,name,company,title
0,Russell Jurney,Relato,CEO
1,Florian Liebert,Mesosphere,CEO
2,Don Brown,Rocana,CIO
3,Steve Jobs,Apple,CEO
4,Donald Trump,The Trump Organization,CEO
5,Russell Jurney,Data Syndrome,Principal Consultant


In [18]:
(
    executives.write.format("mongo")
    .mode("append")
    .option("database", "agile_data_science")
    .option("collection", "executives")
    .save()
)

print("Wrote to MongoDB!")

Wrote to MongoDB!


### Verifying Data in MongoDB

Now jump back to your terminal tab and run the following command:

```
db.executives.find()
```

You should see that the records we loaded and computed are now in MongoDB. It only takes one line of code to send data to MongoDB. Nice, right?

## Searching Data with Elasticsearch

If we want to search data, we use Elasticsearch, which provides a robust, easy-to-use search solution that lowers the barrier of entry to individuals wanting to search their data, large or small. Elasticsearch has a simple RESTful JSON interface, so we can use it from the command line or from any language. We’ll be using Elasticsearch to search our data, to make it easy to find the records we’ll be working so hard to create.

### Creating an Elasticsearch Index
To create an elasticsearch index for all our documents, we will run the following command in a terminal using the `%%bash` command and `curl` to send elasticsarch JSON data describing our index.

In [19]:
%%bash

# Delete any old indexes called 'test'
curl -XDELETE "elastic:9200/test?pretty"

curl -XPUT 'elastic:9200/test?pretty' \
  -H 'Content-Type: application/json' -d'
{
    "settings" : {
        "index" : {
            "number_of_shards" : 1,
            "number_of_replicas" : 1
        }
    }
}
'

{
  "acknowledged" : true
}
{
  "acknowledged" : true,
  "shards_acknowledged" : true,
  "index" : "test"
}


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    28  100    28    0     0    285      0 --:--:-- --:--:-- --:--:--   294
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   213  100    80  100   133   1159   1927 --:--:-- --:--:-- --:--:--  3227


You should see the following response:

```
{
  "acknowledged" : true,
  "shards_acknowledged" : true
  "index": "test"
}
```

### Storing to Elasticsearch with `curl`

Now lets try inserting a record, and then reading it back. To insert a record, run:

In [20]:
%%bash
curl -XPOST 'elastic:9200/test/1?pretty' \
  -H 'Content-Type: application/json' -d'
{
    "name" : "Russell Jurney",
    "message" : "trying out Elasticsearch"
}
'

{
  "_index" : "test",
  "_type" : "1",
  "_id" : "YlOa64wBrenEcs-N8m2Y",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   313  100   234  100    79   4680   1580 --:--:-- --:--:-- --:--:--  6659


### Searching Elasticsearch with `curl`
To read the record back, run:

In [21]:
%%bash

curl -XGET 'elastic:9200/test/_search?q=name:Russell&pretty'

{
  "took" : 68,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 0.2876821,
    "hits" : [
      {
        "_index" : "test",
        "_type" : "1",
        "_id" : "YlOa64wBrenEcs-N8m2Y",
        "_score" : 0.2876821,
        "_source" : {
          "name" : "Russell Jurney",
          "message" : "trying out Elasticsearch"
        }
      }
    ]
  }
}


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   525  100   525    0     0   5833      0 --:--:-- --:--:-- --:--:--  6034


## Elasticsearch and PySpark

To write data from PySpark to Elasticsearch (or read data from Elasticsearch to PySpark), we’ll need to use a project called Elasticsearch for Hadoop, which includes Spark support. We have already preconfigured PySpark to use this project, so you won’t need to do anything special to load this library.

### Storing Data in Elasticsearch

To store data in elasticsearch, we call the `DataFrame.write.format("org.elasticsearch.spark.sql")` API and configure it to write to the elastic Docker container at hostname `elastic` and the index called `executives`. Configuring your `SparkSession` to use Elastic looks like this:

```python
spark = (
    SparkSession.builder.appName(APP_NAME)
    .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:7.14.2")
    .getOrCreate()
)
```

In [22]:
csv_lines = spark.read.csv("../lib/data/example.csv")

csv_lines.toPandas()

Unnamed: 0,_c0,_c1,_c2
0,Russell Jurney,Relato,CEO
1,Florian Liebert,Mesosphere,CEO
2,Don Brown,Rocana,CIO
3,Steve Jobs,Apple,CEO
4,Donald Trump,The Trump Organization,CEO
5,Russell Jurney,Data Syndrome,Principal Consultant


In [23]:
import pyspark.sql.functions as F


executives = csv_lines.select(
    F.col("_c0").alias("name"),
    F.col("_c1").alias("company"),
    F.col("_c2").alias("title")
)

executives.toPandas()

Unnamed: 0,name,company,title
0,Russell Jurney,Relato,CEO
1,Florian Liebert,Mesosphere,CEO
2,Don Brown,Rocana,CIO
3,Steve Jobs,Apple,CEO
4,Donald Trump,The Trump Organization,CEO
5,Russell Jurney,Data Syndrome,Principal Consultant


In [24]:
(
    executives.write.format("org.elasticsearch.spark.sql")
    .option("es.nodes", "elastic")
    .option("es.port", "9200")
    .option("es.nodes.wan.only", "true")
    .mode("Overwrite").save("executives/_doc")
)

print("Saved data to Elasticsearch!")

Saved data to Elasticsearch!


### Querying Elasticsearch

We can again use `curl` to query Elasticsearch to verify whether our data stored successfully or not.

In [26]:
%%bash

curl 'elastic:9200/executives/_search?q=name:Russell*&pretty'

{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "executives",
        "_type" : "_doc",
        "_id" : "Y1Ob64wBrenEcs-N0G2n",
        "_score" : 1.0,
        "_source" : {
          "name" : "Russell Jurney",
          "company" : "Relato",
          "title" : "CEO"
        }
      },
      {
        "_index" : "executives",
        "_type" : "_doc",
        "_id" : "aFOb64wBrenEcs-N0G2n",
        "_score" : 1.0,
        "_source" : {
          "name" : "Russell Jurney",
          "company" : "Data Syndrome",
          "title" : "Principal Consultant"
        }
      }
    ]
  }
}


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   821  100   821    0     0  41050      0 --:--:-- --:--:-- --:--:-- 48294


### Inspecing our Records

Note the records now look like this (for example):

```json
      {
        "_index" : "executives",
        "_type" : "_doc",
        "_id" : "AWL1bEp0GvFw0fJ74-O4",
        "_score" : 1.0,
        "_source" : {
          "company" : "Data Syndrome",
          "name" : "Russell Jurney",
          "title" : "Pricipal Consultant"
        }
      },
```

Elasticsearch has generated an _id for us and has added other metadata to each record returned, including a score on how well the record matched the query.

This is a good time to point out that Elasticsearch is a great key/value or document store! It could easily replace MongoDB in our stack, and doing so could simplify and enhance scalability by reducing components. Remember, simplicity is key to scalability. That being said, Mongo has features we’ll be thankful for later, so don’t write it off.

## Python and Elasticsearch

In our web applications we will need to access Elasticsearch from within Python but outside Spark. To do so we use the Python Elasticsearch Client library available from PyPi under the `elasticsearch` package name.

Using `elasticsearch` is easy. Running the code below, you should see much the same content as was returned from Elasticsearch via `curl`.

In [27]:
import json
from elasticsearch7 import Elasticsearch


es = Elasticsearch('http://elastic:9200/')

print(
    json.dumps(
        es.search(index='executives', body={
            "query": {
                "match": {"name": "Russell"}
                }
            }
        ),
        indent=4,
        sort_keys=True
    )
)

{
    "_shards": {
        "failed": 0,
        "skipped": 0,
        "successful": 1,
        "total": 1
    },
    "hits": {
        "hits": [
            {
                "_id": "Y1Ob64wBrenEcs-N0G2n",
                "_index": "executives",
                "_score": 1.0296195,
                "_source": {
                    "company": "Relato",
                    "name": "Russell Jurney",
                    "title": "CEO"
                },
                "_type": "_doc"
            },
            {
                "_id": "aFOb64wBrenEcs-N0G2n",
                "_index": "executives",
                "_score": 1.0296195,
                "_source": {
                    "company": "Data Syndrome",
                    "name": "Russell Jurney",
                    "title": "Principal Consultant"
                },
                "_type": "_doc"
            }
        ],
        "max_score": 1.0296195,
        "total": {
            "relation": "eq",
            "value": 2
       



## Distributed Streams with Apache Kafka

According to its website, “Kafka™ is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.” We’ll be using Kafka streams to make predictions in “sub real time,” using Spark Streaming. Kafka can also be used to collect data and aggregate it to bulk storage like HDFS or Amazon S3.

Kafka is already installed via the `docker-compose.yaml` file in this project.

### Listing Kafka Topics

Kafka organizes data by topic. Lets use the [confluent_kafka.admin.AdminClient](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.admin.AdminClient) utility that is included with Confluent Kafka's Python library to list the topics. We'll have to give the command the local address to our Kafka service at the hostname `kafka`.

For a guide on `confluent-kafka` check out the [documentation](https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html).

In [28]:
!pip install confluent-kafka



In [29]:
from confluent_kafka.admin import AdminClient, NewTopic


a = AdminClient({'bootstrap.servers': "kafka"})

In [30]:
new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in ["test_topic"]]

fs = a.create_topics(new_topics)

In [31]:
# Wait for each operation to finish.
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

Failed to create topic test_topic: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'test_topic' already exists."}


#### Using the Console Consumer

Next we'll use the `timeout` command to run the console consumer for five seconds, with the `--from-beginning` option so it will print the messages we just sent from the console producer a moment ago. Note that we also need to give the `--bootstrap-server` and `--topic` arguments to specify the Kafka server address and topic.

From your own computer's shell you can connect to the `kafka` container with the following command:

```bash
docker-compose exec -it kafka /bin/bash
```

Once there, run the console consumer:

```bash
kafka-console-consumer.sh \
--topic test_topic \
--bootstrap-server localhost:9092 \
--from-beginning
```

## Realtime Versus Batch Computing

Using Kafka is straightforward, but we’ll see later how this simple framework can create complex dataflows in a way that is simple to operate. The global queue abstraction Kafka provides is extremely powerful. We’ll only be using Kafka to deploy predictions using Spark Streaming, but it can do much more.

Despite Kafka’s power, we’ll spend most of our time in this book doing batch processing. The rule is, “If you can do it in batch, you should do it in batch.” Operating a Spark cluster is much simpler than operating a pool of realtime workers using Kafka. While you can replay Kafka’s history to do the equivalent of batch operations, batch computing is optimized for the process of applied research that constitutes data science work.

If you do decide to move from batch computing to realtime streams, though, PySpark has you covered! You can use the same code with PySpark Streaming to process messages in Kafka that you used to process them in batch mode using PySpark. It is quite natural to prototype streaming applications in batch and then convert them to streams later.

## Kafka with Python via `confluent-kafka`

Kafka can be accessed through libraries in all popular languages. The `confluent-kafka` library is from Confluent, the makers of Kafka. It is what we'll be using it to deploy machine learning models in realtime because it is the most reilable.

### Producing Kafka Messages

#### Setting up a `Producer`

First we import the `Producer` class from the `confluent-kafka` module. Then we instantiate a `Producer`, feeding it arguments for the bootstrap server list and the API version we'll be using during this Kafka session.

In [32]:
import json
from confluent_kafka import Producer

producer = Producer({
    "bootstrap.servers": "kafka",
})
print("Producer instantiated!")

Producer instantiated!


#### Producing Messages

Next we will use our producer to produce (transmit) ten JSON messages. Note that we have to encode our data in a character format because Kafka uses bytes, not strings. The default encoding is `utf-8`

In [33]:
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

In [34]:
names = ["Alan", "Bob", "Carol", "Delia", "Edward", "Fu", "Greg", "Howard", "Irene", "Justin"]

for i, data in enumerate(names):
    # Trigger any available delivery report callbacks from previous produce() calls
    producer.poll(0)
    
    # Create a fun message
    name = names[i]
    message = {
        "i": i,
        "message": "Hello, {}!".format(name)
    }
    json_message = json.dumps(message)

    # Asynchronously produce a message, the delivery report callback
    # will be triggered from poll() above, or flush() below, when the message has
    # been successfully delivered or failed permanently.
    producer.produce("test_topic", json_message.encode("utf-8"), callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery report callbacks to be triggered.
producer.flush()

Message delivered to test_topic [0]
Message delivered to test_topic [0]
Message delivered to test_topic [0]
Message delivered to test_topic [0]
Message delivered to test_topic [0]
Message delivered to test_topic [0]
Message delivered to test_topic [0]
Message delivered to test_topic [0]
Message delivered to test_topic [0]
Message delivered to test_topic [0]


0

### Consuming Kafka Messages

#### Setting up a `Consumer`

We first import the `Consumer` class from the `confluent-kafka` package. Next we instantiate a `Consumer`, specifying its bootstrap server, to use topic `test_topic` and partition group id. Then we seek to the beginning of the topic - something we must do because concurrent operations aren't possible in a notebook so we have to do one then the other, which requires us to seek to the beginning to see the messages already sent.

In [35]:
from confluent_kafka import Consumer


consumer = Consumer({
    'bootstrap.servers': 'kafka',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe(['test_topic'])

while True:
    msg = consumer.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    
    json_string = msg.value().decode('utf-8')
    print('Received message: {}'.format(json_string))
          
    message = json.loads(json_string)
    print(json.dumps(message))

consumer.close()

Received message: {"i": 0, "message": "Hello, Alan!"}
{"i": 0, "message": "Hello, Alan!"}
Received message: {"i": 1, "message": "Hello, Bob!"}
{"i": 1, "message": "Hello, Bob!"}
Received message: {"i": 2, "message": "Hello, Carol!"}
{"i": 2, "message": "Hello, Carol!"}
Received message: {"i": 3, "message": "Hello, Delia!"}
{"i": 3, "message": "Hello, Delia!"}
Received message: {"i": 4, "message": "Hello, Edward!"}
{"i": 4, "message": "Hello, Edward!"}
Received message: {"i": 5, "message": "Hello, Fu!"}
{"i": 5, "message": "Hello, Fu!"}
Received message: {"i": 6, "message": "Hello, Greg!"}
{"i": 6, "message": "Hello, Greg!"}
Received message: {"i": 7, "message": "Hello, Howard!"}
{"i": 7, "message": "Hello, Howard!"}
Received message: {"i": 8, "message": "Hello, Irene!"}
{"i": 8, "message": "Hello, Irene!"}
Received message: {"i": 9, "message": "Hello, Justin!"}
{"i": 9, "message": "Hello, Justin!"}


KeyboardInterrupt: 

## Lightweight Web Applications

The next step is turning our published data into an interactive application. As shown in Figure 2-18, we’ll use lightweight web frameworks to do that.

We choose lightweight web frameworks because they are simple and fast to work with. Unlike with CRUD applications, mined data is the star of the show here. We use read-only databases and simple application frameworks because that fits with the applications we build and how we offer value.

![ads2_0217.png](attachment:ads2_0217.png)

Given the following examples in Python/Flask, you can easily implement a solution in Sinatra, Rails, Django, Node.js, or your favorite language and web framework.

### Web Applications in Jupyter Notebooks

It is not possible to run a web server, a continuous process, inside a Jupyter Notebook. Therefore in this section we will display the code in the notebook but direct you to a terminal to run and interact with the web applications directly.

If you haven't already, look above for directions on opening a terminal and do so now. Otherwise move to a terminal and change directories to `ch02`, subdirectory `web`.

### Echo Microservice in Flask

The first web application we will build will be a Flask microservice that returns whatever text you give it as part of the url. The code for this service is simple:

```python
from flask import Flask
app = Flask(__name__)

@app.route("/<input>")
def hello(input): 
  return input

if __name__ == "__main__": app.run(debug=True)
```

To run this web application, in your terminal in the `ch02/web` directory, execute the `test_flask.py` file with Python.

```bash
cd ch02/web
python ./test_flask.py
```

Now in a new tab of your browser, open up the following url: [http://localhost:5000/echo test](http://localhost:5000/echo%20test) You will see that it echoes back any text that you put after the final / in the url. 

Lets use `curl` to verify things are working:

So, I learned a couple things:
1. I must run the app my my LOCAL terminal... as in not one in this env.
2. I can leave the localhost as-is. It appears Flask/the computer is smart enough to get away from the localhost:5000 port that is used by Airplay on my mac.
3. I need to be ok feeling stupid when I'm trying a new paradigm... especially when it is app dev or other paradigms I'm less familiar with.

In [None]:
%%bash
curl http://127.0.0.1:5000//What%20the%20heck!

Normally you would use the CTRL-C character to stop the Flask server process, but this does not work in a Jupyter Terminal, so to shut down this web process, visit [http://localhost:5000/shutdown](http://localhost:5000/shutdown) This is how we will shut down all of our web applications.

### `pymongo` and Flask

Lets use the `pymongo` module we used earlier to return some data as JSON in response to a web request. The code to do so looks like:

```python
from flask import Flask
from pymongo import MongoClient
import bson.json_util

# Set up Flask
app = Flask(__name__)

# Set up Mongo
client = MongoClient("mongo") # defaults to localhost
db = client.agile_data_science

# Fetch executives by name
@app.route("/executive/<name>")
def executive(name):
  executive = db.executives.find({"name": name})
  return bson.json_util.dumps(list(executive))

if __name__ == "__main__": app.run(debug=True)
```

First we import `flask.Flask`, `pymongo.MongoClient` and `bson.json_util`. We setup an application object which we'll use to setup routes and to run the Flask test web server. Next we instantiate our `MongoClient`, and setup the database handle.

Then we setup a route to the url `/executive/<name>` where `name` is an argument. 

In the same directory in your terminal, `ch02/web`, run the `test_flask_mongo.py` script.

```bash
python ./test_flask_mongo.py
```

Now visit [http://localhost:5000/executive/Russell Jurney](http://localhost:5000/executive/Russell%20Jurney) and see Flask use Mongo to retrieve a record about your teacher :) You should see the following JSON, which we stored before:

<img src="./images/json.png" alt="Two JSON Records of Executive Russell Jurney" style="width: 40%;"/>

Making Mongo work with Flask is as easy as that! Note how MongoDB has added an ID to each record. Now let's make it presentable.

## Bootstrap

Design and presentation impact the value of your work. In fact, one way to think of Agile Data Science is as iterative data design. The output of our data models matches our views, and in that sense design and data processing are not distinct. Instead, they are part of the same collaborative activity: data design. With that in mind, it is best that we start out with a solid, clean design for our data and work from there.

![ads2_0219.png](attachment:ads2_0219.png)

Let's try wrapping our previous example in an HTML table, styled with Bootstrap.

```python
from flask import Flask, render_template
from pymongo import MongoClient
import bson.json_util

# Set up Flask
app = Flask(__name__)

# Set up Mongo
client = MongoClient() # defaults to localhost
db = client.agile_data_science

# Fetch from/to totals, given a pair of email addresses
@app.route("/executive/<name>")
def executive(name):
  executives = db.executives.find({"name": name})
  return render_template('table.html', executives=list(executives))

if __name__ == "__main__":
  app.run(
    debug=True,
    host='0.0.0.0'
  )
```

Next we need to create the HTML template `table.html` to wrap our data in a presentable form. We first create a title for our page, then a table with a header row and successive data rows.

```html
<div class="container">
  <div class="page-header">
    <h1>Agile Data Science</h1>
  </div>
  <p class="lead">Executives</p>
  <table class="table">
    <thead>
      <th>Name</th>
      <th>Company</th>
      <th>Title</th>
    </thead>
    <tbody>
      {% for executive in executives -%}
      <tr>
        <td>{{executive.name}}</td>
        <td>{{executive.company}}</td>
        <td>{{executive.title}}</td>
      </tr>
      {% endfor -%}
    </tbody>
  </table>
</div>
```

Now go to your Terminal and now run the file [`ch02/web/test_flask_bootstrap.py`](web/test_flask_bootstrap.py) from the `ch02/web` directory:

```bash
cd /home/vagrant/Agile_Data_Code_2
cd ch02/web
python ./test_flask_bootstrap.py
```

And now visit the same url: [http://localhost:5000/executive/Russell Jurney](http://localhost:5000/executive/Russell%20Jurney)

![ads2_0220.png](attachment:ads2_0220.png)

Congratulatations, you've created a data-backed web page!

## Conclusion

We’ve toured our environment and have executed “Hello, World!” in each tool. Together, these tools form a data pipeline of distributed systems capable of collecting, processing, publishing, and decorating data of any size. This pipeline is easy to modify at every stage with one line of code. This pipeline will scale without our worrying about optimization at each step—optimization will be one concern, but not our main concern.

As we’ll see in the next chapter, because we’ve created an arbitrarily scalable pipeline where every stage is easily modifiable, it is possible to return to agility. We won’t quickly hit a wall as soon as we need to switch from a relational database to something else that “scales better,” and we aren’t subjecting ourselves to the limitations imposed by tools designed for other tasks, like online transaction processing.

We now have total freedom to use best-of-breed tools within this framework to solve hard problems and produce value. We can choose any language, any framework, and any library and glue it together to get things built.