&copy; 2019 by Pearson Education, Inc. All Rights Reserved. The content in this notebook is based on the book [**Python for Programmers**](https://amzn.to/2VvdnxE).

# 16. Big Data: Hadoop, Spark, NoSQL and IoT 

In [None]:
# enable high-res images in notebook 
%config InlineBackend.figure_format = 'retina'
%matplotlib inline

# 16.1 Introduction
### Big Data
* Previous data-science case studies all focused on AI
* Here, we focus on the **big-data infrastructure that supports AI solutions**
* As **data grows exponentially**, we want to **learn** from that data&mdash;and at **blazing speed**
* Done with **sophisticated algorithms**, hardware, software and networking designs
* With **big data**, **machine learning** and **deep learning** can be even **more effective**

<hr style="height:2px; border:none; color:#000; background-color:#000;">

# 16.3 NoSQL Big-Data Databases (1 of 2)
* **Relational databases** store data in rectangular **tables**
* **Not efficient** as **data volume** and the numbers of **tables** and **relationships between them** increases
* Most data produced today is 
    * **Unstructured**&mdash;**photos**, **videos** and **natural language** (social-media posts, texts, ...), or
    * **Semi-structured**&mdash;**JSON** and **XML** documents
* **Metadata** adds structure to **unstructured data**, making it **semi-structured**
    * **Tweets** (as you saw earlier)
    * **YouTube videos**&mdash;**who posted** and **when**, **title**, **description**, ...

<hr style="height:2px; border:none; color:#000; background-color:#000;">

# 16.3 NoSQL Big-Data Databases (2 of 2)
* **NoSQL databases** are designed for 
    * **unstructured** and **semi-structured big-data** 
    * big data **storage and processing demands**
* **Big data** requires **massive databases**, which can be spread across data centers worldwide in huge **clusters** of commodity computers
* The name **NoSQL** originally meant what its name implies
* With **growing use of SQL in big data**—such as **SQL on Hadoop** and **Spark SQL**—now it's said to stand for **“Not Only SQL”** 

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Four Major Types of **NoSQL Databases**
* **key–value**
* **document**
* **columnar**
* **graph**
* Our NoSQL case study uses **MongoDB document database** &mdash; the most popular NoSQL database
* **Overviews** of the **NoSQL database types** 
    * [**Python Fundamentals LiveLessons videos**](https://learning.oreilly.com/videos/python-fundamentals/9780135917411/9780135917411-PFLL_Lesson16_20) 
    * [**Python for Programmers, Section 16.3**](https://learning.oreilly.com/library/view/python-for-programmers/9780135231364/ch16.xhtml#ch16lev1sec3)

<hr style="height:2px; border:none; color:#000; background-color:#000;">

# 16.4 Case Study: A MongoDB JSON Document Database 
* Store and search **JSON** for **streamed tweets** about **100 U.S. senators**
* Summarize **top 10** by **tweet count**
* Display **interactive map** containing **tweet count summaries**
* I **pre-executed this example** because we stream 10,000 tweets, which can take substantial time 
* **Possible enhancement** &mdash; Use **sentiment analysis** to count **positive**, **negative** and **neutral tweets** mentioning each senator’s **handle**

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Free Cloud-Based MongoDB Atlas Cluster 
* Requires **no installation** 
* Store up to **512MB of data**
* Can store more with
    * [**Free MongoDB Community Server**](https://www.mongodb.com/download-center/community), or 
    * **Paid MongoDB Atlas account**
* **Creating your MongoDB Atlas cluster**
    * I discuss the details of **signing up** for a MongoDB account, **creating the MongoDB Atlas Cluster**, **configuring** it and getting your **connection string** in my [**Python Fundamentals LiveLessons videos**](https://learning.oreilly.com/videos/python-fundamentals/9780135917411/9780135917411-PFLL_Lesson16_22https://learning.oreilly.com/videos/python-fundamentals/9780135917411) and in [**Python for Programmers, Section 16.4.1**](https://learning.oreilly.com/library/view/Python+for+Programmers,+First+Edition/9780135231364/ch16.xhtml#ch16lev2sec14)

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Python Libraries Required for Interacting with MongoDB
```
conda install -c conda-forge pymongo
conda install -c conda-forge dnspython
```
* **`pymongo` library** &mdash; interact with **MongoDB databases** from Python
* **`dnspython` library** &mdash; used as part of connecting to a **MongoDB Atlas Cluster**

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### keys.py 
* **`keys.py`** must contain 
    * your **Twitter credentials** 
    * your **OpenMapQuest key** 
    * Your **MongoDB connection string** 

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.4.2 Streaming Tweets into MongoDB
### Use Tweepy to Authenticate with Twitter and Get the API Object

In [1]:
import tweepy, keys

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Loading the Senators’ Data (1 of 2)
* **`senators.csv`** (provided in notebook's folder) contains each senator's 
    * two-letter state code
    * name
    * party 
    * Twitter handle
    * Twitter ID
* **Twitter handle and ID** used to track tweets **to**, **from** and **mentioning** each senator 
* When following users via **numeric Twitter IDs**, must submit IDs as **strings**

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Loading the Senators’ Data (2 of 2)

In [2]:
import pandas as pd

In [3]:
senators_df = pd.read_csv('senators.csv')

In [None]:
#senators_df['TwitterID'] = senators_df['TwitterID'].astype(str)

In [4]:
senators_df.head()

Unnamed: 0,State,Name,Party,TwitterHandle,TwitterID
0,AL,Richard Shelby,R,SenShelby,21111098
1,AL,Tommy Tuberville,R,TTuberville,110798061
2,AK,Lisa Murkowski,R,lisamurkowski,18061669
3,AK,Dan Sullivan,R,SenDanSullivan,2891210047
4,AZ,Mark Kelly,D,SenMarkKelly,1324926274888888320


<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Configuring the `pymongo` `MongoClient` 

In [5]:
from pymongo import MongoClient

In [6]:
atlas_client = MongoClient(keys.mongo_connection_string)

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Get **`pymongo` `Database`** Object Representing the `senators` Database
* **Creates the database** if it does not exist
* Will be used to store the collection of **tweet JSON documents**

In [7]:
db = atlas_client.senators 

### Setting up Tweet Stream
* **`TweetListener`** uses the **`db` object** representing the **senators database** to store tweets 
    * Depending on the rate at which people are tweeting about the senators, it may take **minutes to hours** to get **10,000 tweets**

In [8]:
from tweetlistener import TweetListener

In [9]:
tweet_limit = 10000

In [None]:
twitter_stream = TweetListener(keys.consumer_key, keys.consumer_secret,
    keys.access_token, keys.access_token_secret, db, tweet_limit)

In [10]:
tweet_listener = TweetListener(keys.bearer_token, db, tweet_limit)

---

### Redirecting the Standard Error Stream to the Standard Output Stream

In [11]:
import sys

sys.stderr = sys.stdout

### Deleting Existing Stream Rules

* Get the `StreamRule`s by calling your `StreamingClient`’s `get_rules` method
    * `Response`’s `data` attribute contains a `list` of `StreamRule`s

In [12]:
rules = tweet_listener.get_rules().data

* Get the rule IDs

In [13]:
rule_ids = [rule.id for rule in rules]

* Call `StreamingClient`’s `delete_rules` method with a list of rule IDs to delete
    * response contains a `'summary'` dictionary with information about the number of deleted rules

In [14]:
tweet_listener.delete_rules(rule_ids)    

Response(data=None, includes={}, errors=[], meta={'sent': '2022-09-26T03:58:22.065Z', 'summary': {'deleted': 10, 'not_deleted': 0}})

### Creating and Adding Stream Rules
* Create rules to filter the live tweet stream looking for tweets from the senators and mentioning the senators
* The following rules will locate tweets **from** each senator

In [15]:
handles = senators_df.TwitterHandle

for i in range(5):
    strings = [f'from:{handle}' for handle in handles[i * 20:20 + i * 20]]
    response = tweet_listener.add_rules(tweepy.StreamRule(' OR '.join(strings)))
    print(response)

Response(data=[StreamRule(value='from:SenShelby OR from:TTuberville OR from:lisamurkowski OR from:SenDanSullivan OR from:SenMarkKelly OR from:SenatorSinema OR from:SenTomCotton OR from:JohnBoozman OR from:SenFeinstein OR from:SenKamalaHarris OR from:SenBennetCO OR from:Hickenlooper OR from:ChrisMurphyCT OR from:SenBlumenthal OR from:SenatorCarper OR from:ChrisCoons OR from:SenRickScott OR from:marcorubio OR from:ossoff OR from:ReverendWarnock', tag=None, id='1574247004779253760')], includes={}, errors=[], meta={'sent': '2022-09-26T03:58:30.193Z', 'summary': {'created': 1, 'not_created': 0, 'valid': 1, 'invalid': 0}})
Response(data=[StreamRule(value='from:brianschatz OR from:maziehirono OR from:MikeCrapo OR from:SenatorRisch OR from:SenDuckworth OR from:SenatorDurbin OR from:SenatorBraun OR from:SenToddYoung OR from:ChuckGrassley OR from:joniernst OR from:RogerMarshallMD OR from:JerryMoran OR from:SenateMajLdr OR from:RandPaul OR from:BillCassidy OR from:SenJohnKennedy OR from:SenAngusK

* The following rules will locate tweets **mentioning** each senator

In [16]:
handles = senators_df.TwitterHandle

for i in range(5):
    strings = [f'@{handle}' for handle in handles[i * 20:20 + i * 20]]
    response = tweet_listener.add_rules(tweepy.StreamRule(' OR '.join(strings)))
    print(response)

Response(data=[StreamRule(value='@SenShelby OR @TTuberville OR @lisamurkowski OR @SenDanSullivan OR @SenMarkKelly OR @SenatorSinema OR @SenTomCotton OR @JohnBoozman OR @SenFeinstein OR @SenKamalaHarris OR @SenBennetCO OR @Hickenlooper OR @ChrisMurphyCT OR @SenBlumenthal OR @SenatorCarper OR @ChrisCoons OR @SenRickScott OR @marcorubio OR @ossoff OR @ReverendWarnock', tag=None, id='1574247037956231169')], includes={}, errors=[], meta={'sent': '2022-09-26T03:58:38.074Z', 'summary': {'created': 1, 'not_created': 0, 'valid': 1, 'invalid': 0}})
Response(data=[StreamRule(value='@brianschatz OR @maziehirono OR @MikeCrapo OR @SenatorRisch OR @SenDuckworth OR @SenatorDurbin OR @SenatorBraun OR @SenToddYoung OR @ChuckGrassley OR @joniernst OR @RogerMarshallMD OR @JerryMoran OR @SenateMajLdr OR @RandPaul OR @BillCassidy OR @SenJohnKennedy OR @SenAngusKing OR @SenatorCollins OR @ChrisVanHollen OR @SenatorCardin', tag=None, id='1574247044432240642')], includes={}, errors=[], meta={'sent': '2022-09-2

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Starting the Live Tweet Stream

In [17]:
tweet_listener.filter(expansions=['author_id'], tweet_fields=['created_at'])

    Screen name: PappyWhiddy
     Created at: 2022-09-26
Tweets received: 10000
Stream connection closed by Twitter


<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Class `TweetListener` Located in `tweetlistener.py`

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Counting Tweets for Each Senator (1 of 2)
* MongoDB **text search** requires a **text index** specifying **document field(s) to search** 
	* MongoDB [index types](https://docs.mongodb.com/manual/indexes), [text indexes](https://docs.mongodb.com/manual/core/index-text) and [operators](https://docs.mongodb.com/manual/reference/operator)
* A **text index** is defined as a **tuple** containing **field name** to search and **index type** (`'text'`)
* **Wildcard field name (\$\*\*)** indexes **all** text fields for a **full-text search**

In [18]:
db.tweets.create_index([('$**', 'text')])

'$**_text'

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Counting Tweets for Each Senator (2 of 2)
* Use **`tweets` `Collection`’s `count_documents` method** and **full-text search** to count the total number of documents in the collection that contain the specified text
    * Find every **twitter handle** in `senators_df.TwitterHandle` column
    * `{"$text": {"$search": senator}}` indicates that we’re **using the `text` index** to **`search`** for the value of **`senator`**

In [19]:
tweet_counts = []

In [20]:
for senator in senators_df.TwitterHandle:
    tweet_counts.append(db.tweets.count_documents(
        {"$text": {"$search": senator}}))

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Show Tweet Counts for Each Senator 
* Create copy of **`DataFrame` `senators_df`** adding a new column of **`tweet_counts`** 
* Display the **top-10 senators by tweet count**

In [21]:
tweet_counts_df = senators_df.assign(Tweets=tweet_counts)  

In [22]:
tweet_counts_df.sort_values(by='Tweets', ascending=False).head(10)

Unnamed: 0,State,Name,Party,TwitterHandle,TwitterID,Tweets
83,TN,Marsha Blackburn,R,MarshaBlackburn,278145569,2370
19,GA,Raphael Warnock,D,ReverendWarnock,1221242033530195970,1264
77,RI,Sheldon Whitehouse,D,SenWhitehouse,242555999,906
33,KY,Rand Paul,R,RandPaul,216881337,734
84,TX,John Cornyn,R,JohnCornyn,13218102,536
85,TX,Ted Cruz,R,SenTedCruz,1074480192,317
12,CT,Christopher Murphy,D,ChrisMurphyCT,150078976,250
41,MA,Elizabeth Warren,D,SenWarren,970207298,192
78,SC,Lindsey Graham,R,LindseyGrahamSC,432895323,187
62,NY,Chuck Schumer,D,SenSchumer,17494010,177


<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Get the State Locations for Plotting Markers (1 of 3)
* Get each **state’s latitude and longitude** coordinates for **plotting on a map**
* **`state_codes.py`** contains a dictionary that maps **two-letter state codes** to **full state names**
    * Used with **`geopy`** to look up the location of each state

In [23]:
from geopy import OpenMapQuest

In [24]:
import time

In [26]:
from state_codes import state_codes

* Get the **`geocoder` object** to **translate location names** into **`Location` objects**

In [27]:
geo = OpenMapQuest(api_key=keys.mapquest_key) 

<hr style="height:2px; border:none; color:#000; background-
color:#000;">

### Get the State Locations for Plotting Markers (2 of 3)
* Get and sort the unique state names

In [28]:
states = tweet_counts_df.State.unique()  # get unique state names

In [29]:
states.sort() 

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Get the State Locations for Plotting Markers (3 of 3)
* Look up **each state’s location**
* Call `geocode` with state name followed by `', USA'` 
    * Ensures that we get United States locations

In [30]:
locations = []

In [31]:
from IPython.display import clear_output

for state in states:
    processed = False
    delay = .1 
    while not processed:
        try: 
            locations.append(geo.geocode(state_codes[state] + ', USA'))
            clear_output()  # clear cell's current output before showing next one
            print(locations[-1])  
            processed = True
        except:  # timed out, so wait before trying again
            print('OpenMapQuest service timed out. Waiting.')
            time.sleep(delay)
            delay += .1

Wyoming, United States of America


<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Grouping the Tweet Counts by State 
* **Tweet total** for a states' two senators is used to **color the map**
    * **Darker colors** represent **higher tweet counts**
* **`DataFrame` method `groupby`** to group the senators by state 
    * **`as_index=False`**&mdash;state codes should be a column in returned **`GroupBy`** object, rather than indices for the object's rows
* **`GroupBy`** object's **`sum`** method totals the numeric data by state

In [32]:
tweets_counts_by_state = tweet_counts_df.groupby(
    'State', as_index=False).sum(numeric_only=True)

In [33]:
tweets_counts_by_state.head()

Unnamed: 0,State,TwitterID,Tweets
0,AK,2909271716,68
1,AL,131909159,4
2,AR,974208674,166
3,AZ,1324926275969733102,138
4,CA,803694179555715760,28


<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Creating the Map 

In [34]:
import folium

In [35]:
usmap = folium.Map(location=[39.8283, -98.5795], 
                   zoom_start=4, detect_retina=True,
                   tiles='Stamen Toner')

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Creating a Choropleth to Color the Map
* A **choropleth** shades areas in a map using magnitudes of numerical values to determine color
* For a **detailed description of the arguments** below, see 
    * [**Python Fundamentals LiveLessons videos**](https://learning.oreilly.com/videos/python-fundamentals/9780135917411/9780135917411-PFLL_Lesson16_23) 
    * [**Python for Programmers, Section 16.4.2** (under the heading "Creating a Choropleth to Color the Map"](https://learning.oreilly.com/library/view/python-for-programmers/9780135231364/ch16.xhtml#ch16lev2sec15)

In [36]:
choropleth = folium.Choropleth(
    geo_data='us-states.json',
    name='choropleth',
    data=tweets_counts_by_state,
    columns=['State', 'Tweets'],
    key_on='feature.id',
    fill_color='YlOrRd',
    fill_opacity=0.7,
    line_opacity=0.2,
    legend_name='Tweets by State'
).add_to(usmap)

layer = folium.LayerControl().add_to(usmap)

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Creating the Map Markers for Each State (1 of 1)
* Sort senators in **descending order** by **tweet count**
* **`groupby`** maintains **original row order** in each group
* **`index`** &mdash; used to look up each state’s location in **`locations` list**
* **name** &mdash; two-letter **state code**
* **`group`** &mdash; collection of a **state's two senators**

In [37]:
sorted_df = tweet_counts_df.sort_values(by='Tweets', ascending=False)

for index, (name, group) in enumerate(sorted_df.groupby('State')):
    strings = [state_codes[name]]  # used to assemble popup text

    for s in group.itertuples():
        strings.append(f'{s.Name} ({s.Party}); Tweets: {s.Tweets}')
        
    text = '<br>'.join(strings)  
    popup = folium.Popup(text, max_width=200)
    marker = folium.Marker(
        (locations[index].latitude, locations[index].longitude), 
        popup=popup)
    marker.add_to(usmap) 

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Saving and Displaying the Map 

In [None]:
usmap.save('SenatorsTweets.html')

In [38]:
usmap 
#from IPython.display import IFrame
#IFrame(src="./SenatorsTweets.html", width=800, height=450)

<hr style="height:2px; border:none; color:#000; background-color:#000;">

# 16.5 Hadoop
\[We'll cover this example if we have time. The complete presentation can be viewed in my **[Python Fundamentals](https://learning.oreilly.com/videos/python-fundamentals/9780135917411/9780135917411-PFLL_Lesson16_24) (10 videos)**\]
&nbsp;  
&nbsp;  

* **Apache Hadoop** and **Apache Spark** deal with **big-data storage and processing challenges** via 
    * huge **clusters** of computers 
    * **distributed data storage** 
    * **massively parallel processing**
    * Hadoop **MapReduce** programming
    * Spark **in-memory processing** techniques
* **Hadoop** also serves as the **foundation** for many recent advancements in **big-data processing** and an entire **ecosystem of software tools**

<!--

* For a list of **Hadoop ecosystem components**, see my [**Python Fundamentals LiveLessons videos**](https://learning.oreilly.com/videos/python-fundamentals/9780135917411/) (big data lesson coming soon) or [**Python for Programmers, Section 16.5.1**](https://learning.oreilly.com/library/view/python-for-programmers/9780135231364/ch16.xhtml#ch16lev2sec16)

-->

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.1 Hadoop Overview (1 of 2)
* Early on, Google knew they needed to **return search results quickly**
* The only practical way &mdash; **store and index the entire web** 
* Late 90s computers couldn’t store and analyze such a **large volume of data economically and fast enough**
* Google developed a **clustering** system with vast numbers of computers (**nodes**)
    * **greater chance of hardware failures** so they built in high levels of **redundancy** 
    * Data distributed across all these **commodity computers**
* For a **search request**, all computers searched their portion of the web **in parallel**
    * Then results were **gathered and returned**

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.1 Hadoop Overview (2 of 2)
* Google developed **clustering hardware and software** and **distributed storage**
* **Published its designs**
* Programmers at Yahoo!, working from Google’s [**“Google File System” paper**](http://static.googleusercontent.com/media/research.google.com/en//archive/gfs-sosp2003.pdf), built their own system
* They **open-sourced** their work and **Apache.org** implemented **Hadoop**
* Named for an **elephant** stuffed animal that belonged to a child of one of Hadoop’s creators
    * Inspiration for our textbook cover
* Two **additional Google papers** contributed to evolution of Hadoop
	* [**“MapReduce: Simplified Data Processing on Large Clusters”**](http://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf) 
	* [**“Bigtable: A Distributed Storage System for Structured Data”**](http://static.googleusercontent.com/media/research.google.com/en//archive/bigtable-osdi06.pdf) 
		* Basis for Apache HBase (a NoSQL key–value and column-based database).

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### HDFS, `MapReduce` and YARN (1 of 2)
* Hadoop’s key components are: 
	* **HDFS** (Hadoop Distributed File System) for **storing massive amounts of data** throughout a **cluster** 
	* **MapReduce** for implementing the **tasks that process the data**
        * Like **filter/map/reduce** in functional-style programming, but **massively parallel**
* **MapReduce** performs two steps—**mapping** and **reduction**
    * **Mapping** (and **filtering**) &mdash; processes original data across **entire cluster** and **maps** it into tuples of **key–value pairs**
    * **Reduction** &mdash; **combines** those tuples to **produce the results**

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### HDFS, `MapReduce` and YARN (2 of 2)
* Hadoop divides data into **batches** and **distributes** across **cluster's nodes**
* Also **distributes MapReduce code** to **execute in parallel on every node**
    * Each **node** processes the **batch of data** stored **on that node**
* **Reduction combines results** from **all nodes** to produce **final result**
* **YARN** (“yet another resource negotiator”) **manages all resources** in the **cluster** and **schedules tasks** for execution

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Hadoop Ecosystem 
* **Hadoop began** with **HDFS** and **MapReduce**, followed closely by **YARN**
* Now it's a **large ecosystem**, including **Hadoop 3** (3.2 released in January), **Spark** and many other Apache projects[\[1\]](https://hortonworks.com/ecosystems/),[\[2\]](https://readwrite.com/2018/06/26/complete-guide-of-hadoop-ecosystem-components/),[\[3\]](https://www.janbasktraining.com/blog/introduction-architecture-components-hadoop-ecosystem/)
* [See our table of Hadoop Ecosystem Components](https://learning.oreilly.com/library/view/Python+for+Programmers,+First+Edition/9780135231364/ch16.xhtml#ch16lev2sec16)

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.2 Summarizing Word Lengths in _Romeo and Juliet_ via MapReduce
* To execute this example
    * We created a **cloud-based, multi-node cluster of computers** using **Microsoft Azure HDInsight**, which provides **Hadoop as a service**
    * Ran our **Hadoop MapReduce** code on that **cluster**
* **MapReduce task** 
    * Determines the **length of each word in `RomeoAndJuliet.txt`** (from the NLP presentation)
    * Summarizes **number of words of each length** 

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.3 Creating an Apache Hadoop Cluster in Microsoft Azure HDInsight (1 of 2)
* **Multi-node cloud-based clusters** typically are **paid** services
* We used [**Microsoft Azure’s HDInsight service**](https://azure.microsoft.com/en-us/free) (for Hadoop and Spark) and **new account credit** to create **cloud-based clusters of computers** in which to test our examples
    * The **new account credit** was more than enough to test our Hadoop and Spark examples
* HDInsight service **requires a credit card** for **identity verification**
    * When your **new account credit runs out** or **30 days pass**, cannot use paid services unless you authorize Microsoft to **charge your card**
        * **Prevents accidental large bills** for people who are just experimenting

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.3 Creating an Apache Hadoop Cluster in Microsoft Azure HDInsight (2 of 2)
* Details on configuring a **low-cost cluster** to try **Hadoop** 
    * [**Python Fundamentals LiveLessons videos**](https://learning.oreilly.com/videos/python-fundamentals/9780135917411/9780135917411-PFLL_Lesson16_27_pt1) (two videos starting with this one) 
    * [**Python for Programmers, Section 16.5.3**](https://learning.oreilly.com/library/view/Python+for+Programmers,+First+Edition/9780135231364/ch16.xhtml#ch16lev2sec18) 
    * Also [see Microsoft’s **recommended cluster configurations**](https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-component-versioning#default-node-configuration-and-virtual-machine-sizes-for-clusters)
* **Caution: Once you allocate a cluster, it incurs costs whether you’re using it or not. So, when you complete this case study, be sure to delete your cluster(s) and other resources, so you don’t incur additional charges.** 

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.4 Hadoop Streaming
* Hadoop is **Java-based**
* Languages like **Python** that are **not natively supported** must use **Hadoop streaming** 
* **Python MapReduce scripts** communicate with Hadoop via **redirected standard I/O streams**
	* Hadoop redirects **input** to **mapper** script, which reads input from **standard input stream**
	* **Mapper** writes results to **standard output stream**
	* Hadoop redirects **mapper’s output** as **input** to **reducer** script, which reads from the **standard input stream**
	* **Reducer** writes results to **standard output stream** 
	* Hadoop writes **reducer’s output** to **HDFS**   

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.5 Implementing the Mapper (1 of 2)
* **Mapper** takes **lines of text** as input and **maps** them to **key–value pairs**, each containing a **word length** and **`1`**
* **Reducer** will total these **key–value pairs** by key
* **Hadoop streaming** expects **mapper’s output** and **reducer’s input/output** to have the form 
> **_key_`\t`_value_**
* In **`length_mapper.py`**, `#!` tells Hadoop to use Python 3
    * Must be first line in the file
    * **HDInsight** currently includes **Python 2.7.12** and **Python 3.5.2** 
    * **Cannot use f-strings** which are **Python 3.6+**

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.5 Implementing the Mapper (2 of 2)

```python
#!/usr/bin/env python3
# length_mapper.py
"""Maps lines of text to key-value pairs of word lengths and 1."""
import sys

def tokenize_input():  # generator function
    """Split each line of standard input into a list of strings."""
    for line in sys.stdin:
        yield line.split()  

# read each line in the the standard input and for every word 
# produce a key-value pair containing the word, a tab and 1
for line in tokenize_input():
    for word in line:
        print(str(len(word)) + '\t1')

```

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.6 Implementing the Reducer 
* In **`length_reducer.py`**, function **`tokenize_input`** is a **generator function** that reads and splits the **key–value pairs** produced by the mapper
* **MapReduce streaming** supplies the **standard input**
* **`groupby` function** (**`itertools` module**) groups inputs by their **keys** (the **word lengths**) 
* **Total** all the **counts** for a given **key**
* Output a new **key–value pair** consisting of the **word length** and its **total**
* **MapReduce** takes the **final word-count outputs** and **writes** them to a file in **HDFS**

<hr style="height:2px; border:none; color:#000; background-color:#000;">

```python
#!/usr/bin/env python3
# length_reducer.py
"""Counts the number of words with each length."""
import sys
from itertools import groupby
from operator import itemgetter

def tokenize_input():
    """Split each line of standard input into a key and a value."""
    for line in sys.stdin:
        yield line.strip().split('\t')

# produce key-value pairs of word lengths and counts separated by tabs
for word_length, group in groupby(tokenize_input(), itemgetter(0)):
    try:
        total = sum(int(count) for word_length, count in group)
        print(word_length + '\t' + str(total))
    except ValueError:
        pass  # ignore word if its count was not an integer

```

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.7 Preparing to Run the MapReduce Example
* Must upload scripts and `RomeoAndJuliet.txt` into **HDInsight cluster's file system**
* For detailed instructions on doing this, see 
    * [**Python Fundamentals LiveLessons videos**](https://learning.oreilly.com/videos/python-fundamentals/9780135917411/9780135917411-PFLL_Lesson16_31) 
    * [**Python for Programmers, Section 16.5.7**](https://learning.oreilly.com/library/view/python-for-programmers/9780135231364/ch16.xhtml#ch16lev2sec22)

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.8 Running the MapReduce Job (1 of 4)
* Once files are in the cluster, **run MapReduce job** for **`RomeoAndJuliet.txt`** on your **cluster** by executing the following command in the cluster
    * You can copy/paste the command from `yarn.txt` located with this example
    * We reformatted the command here for readability: 

```
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar 
   -D mapred.output.key.comparator.class=
      org.apache.hadoop.mapred.lib.KeyFieldBasedComparator    
   -D mapred.text.key.comparator.options=-n   
   -files length_mapper.py,length_reducer.py    
   -mapper length_mapper.py 
   -reducer length_reducer.py    
   -input /example/data/RomeoAndJuliet.txt    
   -output /example/wordlengthsoutput    
```

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.8 Running the MapReduce Job (2 of 4)
* The **`yarn` command** invokes **Hadoop’s YARN** (“yet another resource negotiator”) tool to manage and coordinate access to the Hadoop resources the **MapReduce task** uses
* **`hadoop-streaming.jar`** contains the Java-based **Hadoop streaming utility** that allows you to use Python to implement the **mapper** and **reducer**
* The two **`-D` options** set Hadoop properties that enable it to 
    * **sort the final key–value pairs by key** (**`KeyFieldBasedComparator`**) 
    * in **descending order (`-`) numerically (`n`)** rather than alphabetically

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.8 Running the MapReduce Job (3 of 4)
* Other command-line arguments:
	* **`-files`**—Comma-separated list of scripts that Hadoop copies to every node in the cluster so they can **execute locally on each node**
	* **`-mapper`**—mapper’s script file
	* **`-reducer`**—reducer’s script file
	* **`-input`**—**File** or **directory of files** to supply as **mapper input**
	* **`-output`**—**HDFS directory** where final results will be stored
        * **Error** if this folder **already exists**

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## 16.5.8 Running the MapReduce Job (4 of 4)
* Sample **output** shows some **Hadoop feedback** produced as the **MapReduce job executes**
    * Used **`...`** to save space 
* Several lines of interest:
	* The total number of **“input paths to process”**—the **`1` source of input** in this example is **`RomeoAndJuliet.txt`**
	* The **“number of splits”**—`2` in this example, based on the **number of worker nodes** in our **HDInsight cluster**
	* The **percentage completion** dates and times&mdash;**big data jobs** could take minutes, hours, days, ...
	* **`File System Counters`** showing numbers of **bytes read and written**
	* **`Job Counters`** showing the **numbers of mapping and reduction tasks used** 
	* **`Map-Reduce Framework`** showing stats about the **steps performed**

<hr style="height:2px; border:none; color:#000; background-color:#000;">

## Output
<pre>
packageJobJar: [] [/usr/hdp/2.6.5.3004-13/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.5.3004-13.jar] /tmp/streamjob2764990629848702405.jar tmpDir=null
...
18/12/05 16:46:25 INFO mapred.FileInputFormat: <strong>Total input paths to process : 1</strong>
18/12/05 16:46:26 INFO mapreduce.JobSubmitter: <strong>number of splits:2</strong>
...
18/12/05 16:46:26 INFO mapreduce.Job: The url to track the job: http://hn0-paulte.y3nghy5db2kehav5m0opqrjxcb.cx.internal.cloudapp.net:8088/proxy/application_1543953844228_0025/
...
18/12/05 16:46:35 INFO mapreduce.Job:  <strong>map 0% reduce 0%</strong>
18/12/05 16:46:43 INFO mapreduce.Job:  <strong>map 50% reduce 0%</strong>
18/12/05 16:46:44 INFO mapreduce.Job:  <strong>map 100% reduce 0%</strong>
18/12/05 16:46:48 INFO mapreduce.Job:  <strong>map 100% reduce 100%</strong>
18/12/05 16:46:50 INFO mapreduce.Job: Job job_1543953844228_0025 <strong>completed successfully</strong>
</pre>

<pre>
18/12/05 16:46:50 INFO mapreduce.Job: Counters: 49
	<strong>File System Counters</strong>
		FILE: Number of bytes read=156411
		FILE: Number of bytes written=813764
...
	<strong>Job Counters</strong> 
		Launched map tasks=2
		Launched reduce tasks=1
...
	<strong>Map-Reduce Framework</strong>
		Map input records=5260
		Map output records=25956
		Map output bytes=104493
		Map output materialized bytes=156417
		Input split bytes=346
		Combine input records=0
		Combine output records=0
		Reduce input groups=19
		Reduce shuffle bytes=156417
		Reduce input records=25956
		Reduce output records=19
		Spilled Records=51912
		Shuffled Maps =2
		Failed Shuffles=0
		Merged Map outputs=2
		GC time elapsed (ms)=193
		CPU time spent (ms)=4440
		Physical memory (bytes) snapshot=1942798336
		Virtual memory (bytes) snapshot=8463282176
		Total committed heap usage (bytes)=3177185280
...
18/12/05 16:46:50 INFO streaming.StreamJob: Output directory: /example/wordlengthsoutput
</pre>

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Viewing the Word Counts
* **Hadoop MapReduce** saves its output into **HDFS**
* To **view final word counts** you must look at the **file in cluster's HDFS**
> ```
hdfs dfs -text /example/wordlengthsoutput/part-00000
```

```
18/12/05 16:47:19 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
18/12/05 16:47:19 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev b5efb3e531bc1558201462b8ab15bb412ffa6b89]
1	4699
2	3869
3	5651
4	3668
5	2719
6	1624
7	1140
8	1062
9	855
10	317
11	189
12	95
13	35
14	13
15	9
16	6
17	3
18	1
23	1
```

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### IMPORTANT: Deleting Your Cluster So You Do Not Incur Charges
* **Be sure to delete your cluster(s) and associated resources (like storage) so you don’t incur additional charges** 
    * For details on deleting the cluster, see my [**Python Fundamentals LiveLessons videos**](https://learning.oreilly.com/videos/python-fundamentals/9780135917411/9780135917411-PFLL_Lesson16_32) (deleting is discussed around the 9:00 minute mark) and in [**Python for Programmers, Section 16.5.8**](https://learning.oreilly.com/library/view/python-for-programmers/9780135231364/ch16.xhtml#ch16lev2sec23)
* [More information](https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-portal)

<hr style="height:2px; border:none; color:#000; background-color:#000;">

### Lecture Note: Switch to **jupyter/pyspark-notebook Docker stack** for Spark presentation

<hr style="height:2px; border:none; color:#000; background-color:#000;">

# More Info 
* See Lesson 16 in [**Python Fundamentals LiveLessons** here on O'Reilly Online Learning](https://learning.oreilly.com/videos/python-fundamentals/9780135917411)
* See Chapter 16 in [**Python for Programmers** on O'Reilly Online Learning](https://learning.oreilly.com/library/view/python-for-programmers/9780135231364/)
* See Chapter 17 in [**Intro Python for Computer Science and Data Science** on O'Reilly Online Learning](https://learning.oreilly.com/library/view/intro-to-python/9780135404799/)
* Interested in a print book? Check out:

| Python for Programmers<br>(640-page professional book) | Intro to Python for Computer<br>Science and Data Science<br>(880-page college textbook)
| :------ | :------
| <a href="https://amzn.to/2VvdnxE"><img alt="Python for Programmers cover" src="../images/PyFPCover.png" width="150" border="1"/></a> | <a href="https://amzn.to/2LiDCmt"><img alt="Intro to Python for Computer Science and Data Science: Learning to Program with AI, Big Data and the Cloud" src="../images/IntroToPythonCover.png" width="159" border="1"></a>

>Please **do not** purchase both books&mdash;_Python for Programmers_ is a subset of _Intro to Python for Computer Science and Data Science_

<hr style="height:2px; border:none; color:#000; background-color:#000;">

&copy; 2019 by Pearson Education, Inc. All Rights Reserved. The content in this notebook is based on the book [**Python for Programmers**](https://amzn.to/2VvdnxE).