# Leveraging python in the world of Big Data

In [4]:
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import hadoopy

We are generating more and more data day by day. We have generated more data this century than we have in the previous century and we  are currently only 15 years into this century. Big Data is the new buzz word and everyone is talking about it. It brings new possibilities. Google translate is able to translate any language thanks to Big data. We are able to decode our Human Genome. We can predict the failure of a turbine and do the required maintainence, thanks to Big Data.

There are 3 Vs of Big Data and they are defined as follows.

1. Volume - This defines the size of the data. Facebook has petabytes of data about their users
2. Velocity - This is the rate at which the data is generated. 
3. Variety - Data is not only in table form. There is data from text, images and sound. Data comes in the form of json, xml and other  types

In this chapter, we'll learn how to use python in the world of Big Data by
1. Understanding Hadoop
2. Writing a Map Reduce program in Python
3. Using Pydoop 
4. Understanding Spark
5. Writing a spark program

# What is Hadoop?

According to Apache Hadoop website, Hadoop is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.

<img src="files/images/hadoop_architecture.png">

## Programming Model

Map-Reduce is a programming paradigm that takes a large distributed computation as a sequence of distributed operations on large data sets of key-value pairs. The Map-Reduce framework takes a cluster of machines and executes Map-Reduce jobs across the machines in the cluster. A Map-Reduce job has two phases, a mapping phase and a reduce phase. The input to the Map Reduce is a data set of key/value pairs.

In the mapping phase,  Hadoop splits the input data set into a large number of fragments and assigns each fragment to a mapping task. Hadoop also distributes the many map tasks across the cluster of machines on which it operates. Each mapping task takes the key-value pairs from its assigned fragment and generates a set of intermediate key-value pairs. For each input key-value pair, the map task invokes a user defined mapping function that transforms the input into a different key-value pair .

Following the mapping phase the hadoop sorts the intermediate data set by key and produces a set of key value tuples so that all the values associated with a particular key appear together. It also divides the set of tuples into a number of fragments equal to the number of reduce tasks.

In the reduce phase, each reduce task consumes the fragment of key value tuples assigned to it. For each such tuple, it invokes a  reduce function that transforms the tuple into an output key/value pair. The hadoop framework distributes the many reduce tasks across the cluster of machines and deals with giving the appropriate fragment of intermediate data to each of the reduce task.

<img src="files/images/MapReduce.png">

## Map Reduce Architecture 

It has a master/slave architecture. It has a single master server - jobtracker and several slave servers - tasktrackers, one per machine/node in the cluster. The jobtracker is the point of communication between users and the framework. Users submit map-reduce jobs to the jobtracker, which puts them in a queue of pending jobs and executes them on a first-come/first-served basis. The jobtracker manages the assignment of map and reduce tasks to the tasktrackers. The tasktrackers execute tasks upon instruction from the jobtracker and also handle data motion between the map and reduce phases.



## Hadoop DFS

Hadoop's Distributed File System is designed to store very large files across machines in a large cluster. It has been inspired by the Google File System. Hadoop DFS stores each file as a sequence of blocks, all blocks in a file except the last block are the same size. Blocks belonging to a file are replicated for fault tolerance. The block size and replication factor are configurable per file. Files in HDFS are "write once" and have strictly one writer at any time.

## Hadoop DFS Architecture

Like Hadoop Map/Reduce, HDFS follows a master/slave architecture. An HDFS installation consists of a single Namenode, a master server that manages the filesystem namespace and regulates access to files by clients. In addition, there are a number of Datanodes, one per node in the cluster, which manage storage attached to the nodes that they run on. The Namenode makes filesystem namespace operations like opening, closing, renaming etc. of files and directories available via an RPC interface. It also determines the mapping of blocks to Datanodes. The Datanodes are responsible for serving read and write requests from filesystem clients, they also perform block creation, deletion, and replication upon instruction from the Namenode.

# Python Map-Reduce

The installation of Hadoop won't be covered in this book but you can install it through the following link

http://www.cloudera.com/content/cloudera/en/documentation/cdh4/latest/CDH4-Quick-Start/cdh4qs_topic_3_2.html

We'll be using the Hadoop streaming api for executing our Python Map-Reduce program in Hadoop. The Hadoop Streaming API helps in using any program having standard input and output as a map reduce program.

We'll be writing two Map Reduce Program with python.

1. A Basic Word Count
2. Getting the Sentiment Score of each review
3. Getting overall sentiment score from all the reviews



## Basic word count

We'll start with the word count map-reduce. Save the following code in a word_mapper.py file

In [None]:
#!/usr/bin/env python

import sys

for l in sys.stdin:
    
    # Trailing and Leading white space is removed
    l = l.strip()
    
    # words in the line is split
    word_tokens = l.split()
    
    # Key Value pair is outputted
    for w in word_tokens:
        print '%s\t%s' % (w, 1)

In the above mapper code, each line of the file is stripped of the leading and trailing white spaces. The line is then into tokens of words and then  these tokens of words are outputted as key value pair of <word> 1.

Save the following code in word_reducer.py file

In [None]:
#!/usr/bin/env python

from operator import itemgetter
import sys

current_word_token = None
counter = 0
word = None

# STDIN Input
for l in sys.stdin:
    # Trailing and Leading white space is removed
    l = l.strip()

    # input from the mapper is parsed
    word_token, counter = l.split('\t', 1)

    # count is converted to int
    try:
        counter = int(counter)
    except ValueError:
        # if count is not a number then ignore the line
        continue

    #Since hadoop sorts the mapper output by key, the following
    # if else statement works
    if current_word_token == word_token:
        current_counter += counter
    else:
        if current_word_token:
            print '%s\t%s' % (current_word_token, current_counter)
        
        current_counter = counter
        current_word_token = word_token

# The last word is outputed
if current_word_token == word_token:
    print '%s\t%s' % (current_word_token, current_counter)

In the above code, we use current_word_token to keep track of the current word that is being counted. In the for loop, we use word_token and counter to get the value out of the key value pair. We then convert the counter to int type.

In the if else statement, if the word_token is same as the previous instance which is current_word_token then we keep counting else  if its new word that has come then we output the word and its count. The last if statement is to output the last word. 

We can check out if the mapper is working fine by the following command

In [22]:
%%bash
echo 'dolly dolly max max jack tim max' | ./BigData/word_mapper.py

dolly	1
dolly	1
max	1
max	1
jack	1
tim	1
max	1


Now we can check the reducer is also working fine by piping the reducer to the sorted list of the mapper output.

In [25]:
%%bash
echo "dolly dolly max max jack tim max" | ./BigData/word_mapper.py | sort -k1,1  | ./BigData/word_reducer.py

dolly	2
jack	1
max	3
tim	1


Now, let's try to apply the same on a local file containing the summary of moby dick

In [27]:
%%bash
cat ./Data/mobydick_summary.txt | ./BigData/word_mapper.py | sort -k1,1  | ./BigData/word_reducer.py

a	28
A	2
abilities	1
aboard	3
about	2
adorned	1
Africa	1
Africa,	1
after	1
after,	2
After	1
again	3
again,	1
Ahab	13
Ahab,	1
Ahab’s	6
ahead.	1
All	1
alone	1
also	1
always	1
American	1
an	4
and	36
angry	1
announces	2
another	1
anticipation	1
anyone	2
appearance	2
appreciate	1
approaches	1
are	7
arm	1
as	7
As	1
at	2
At	1
atop	1
attacks	3
away	1
back	1
bad	1
balancing	1
baptizes	1
bargain	1
be	3
because	1
becomes	2
bed	1
Bedford,	1
before	1
beginning	1
begins	1
behind	1
berths	1
between	1
Bildad	1
Bildad,	1
black	1
blood	1
boat	2
boat,	1
boat.	1
boats	3
bones	1
Boomer,	2
both	1
boy,	1
buoy.	1
but	4
by	6
cabin	1
can	1
cannot	1
capital	1
captain,	1
Captain	1
captains	1
captains.	1
captured	1
carpenter	1
carries	1
carry	1
caught	3
chase,	1
Christmas	1
coffin	2
coffin,	1
cold	1
comes	1
companion.	1
confrontation	1
considers	1
constant	1
constitute	1
continues	1
corpse	1
countries	1
covered	1
crazed	1
crazy	1
created	1
crew	2
crew,	1
crewmen	1
cutting	1
day,	2
Day	1
death,	1
death.	3
deaths.	1

## Sentiment Score for each review

We had written a program in the previous chapter to calculate the sentiment score, We'll extend that to write a map reduce program to determine the sentiment score for each review. Write the following code in senti_mapper.py

In [None]:
#!/usr/bin/env python

import sys
import re

positive_words = open('positive-words.txt').read().split('\n')
negative_words = open('negative-words.txt').read().split('\n')

def sentiment_score(text, pos_list, neg_list):
    positive_score = 0
    negative_score = 0

    for w in text.split(' '):
        if w in pos_list: positive_score+=1
        if w in neg_list: negative_score+=1

    return positive_score - negative_score


for l in sys.stdin:
    
    # Trailing and Leading white space is removed
    l = l.strip()

    #Convert to lower case
    l = l.lower()

    #Getting the sentiment score	
    score = sentiment_score(l, positive_words, negative_words)
    
    # Key Value pair is outputted
    print '%s\t%s' % (l, score)

In the above code, we re use the sentiment score function from the previous chapter. For each line, we strip the leading and trailing white spaces and then get the sentiment score for review. Finally, we output the sentence and the score.

For this program, we don't require a reducer as we are calculating the sentiment in the mapper itself and we just have to output the sentiment score.

Lets' test out the mapper is working fine locally with a file containing the reviews for Jurassic World.

In [28]:
%%bash
cat ./Data/jurassic_world_review.txt | ./BigData/senti_mapper.py 

there is plenty here to divert, but little to leave you enraptored. such is the fate of the sequel: bigger. louder. fewer teeth.	0
if you limit your expectations for jurassic world to "more teeth," it will deliver on that promise. if you dare to hope for anything more-relatable characters, narrative coherence-you'll only set yourself up for disappointment.	-1
there's a problem when the most complex character in a film is the dinosaur	-2
not so much another bloated sequel as it is the fruition of dreams deferred in the previous films. too bad the genre dictates that those dreams are once again destined for disaster.	-2
a perfectly fine movie and entertaining enough to keep you watching until the closing credits.	4
this fourth installment of the jurassic park film series shows some wear and tear, but there is still some gas left in the tank. time is spent to set up the next film in the series. they will keep making more of these until we stop watching.	0
an angry movie with a tragic mora

We can see that our program is able to calculate the sentiment score well.

## Overall Sentiment Score

To calculate the overall sentiment score, we would require the reducer and we'll use the same mapper but with slight modifications.

The following is the mapper code that we'll use stored in overall_senti_mapper.py.

In [None]:
import sys
import re
import hashlib

positive_words = open('./Data/positive-words.txt').read().split('\n')
negative_words = open('./Data/negative-words.txt').read().split('\n')

def sentiment_score(text, pos_list, neg_list):
    positive_score = 0
    negative_score = 0

    for w in text.split(' '):
        if w in pos_list: positive_score+=1
        if w in neg_list: negative_score+=1

    return positive_score - negative_score


for l in sys.stdin:
    
    # Trailing and Leading white space is removed
    l = l.strip()

    #Convert to lower case
    l = l.lower()

    #Getting the sentiment score	
    score = sentiment_score(l, positive_words, negative_words)

    #Hashing the review to use it as a string
    hash_object = hashlib.md5(l)
    
    # Key Value pair is outputted
    print '%s\t%s' % (hash_object.hexdigest(), score)

The mapper code is similar to the previous mapper code and but here we are MD5 hashing the review and then outputting it as the key.

Following is the reducer coder that utilize to determine the overall sentiment about the movie. Store the following code in overall_senti_reducer.py

In [None]:
from operator import itemgetter
import sys

total_score = 0

# STDIN Input
for l in sys.stdin:
   
    # input from the mapper is parsed
    key, score = l.split('\t', 1)

    # count is converted to int
    try:
        score = int(score)
    except ValueError:
        # if score is not a number then ignore the line
        continue

    #Updating the total score	
    total_score += score


print '%s' % (total_score,)

In the above code, we strip out the value containing the score and we then keep adding to the total_score variable. Finally, we output the total_score variable which shows the sentiment of the movie.

Let's test out locally the overall sentiment on Jurassic World which is a good movie and then test it out on the movie Unfinished Business which was critically poor.

In [31]:
%%bash
cat ./Data/jurassic_world_review.txt | ./BigData/overall_senti_mapper.py | sort -k1,1  | ./BigData/overall_senti_reducer.py

19


In [47]:
%%bash
cat ./Data/unfinished_business_review.txt | ./BigData/overall_senti_mapper.py | sort -k1,1  | ./BigData/overall_senti_reducer.py

-8


We can see that our code is working well and we also see that Jurassic World has a more positive score which means people liked it a lot and Unfinished Business has negative value which shows that people didn't like it much.

## Deploying Map Reduce code on Hadoop

We'll create a directory Moby Dick, Jurassic World and Unfinished Business Data in HDFS tmp folder.

In [34]:
%%bash
hadoop fs -mkdir /tmp/moby_dick
hadoop fs -mkdir /tmp/jurassic_world
hadoop fs -mkdir /tmp/unfinished_business

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


Let's check if the folders are created.

In [35]:
%%bash
hadoop fs -ls /tmp/

Found 6 items
drwxrwxrwx   - mapred hadoop          0 2014-11-14 15:42 /tmp/hadoop-mapred
drwxr-xr-x   - samzer hadoop          0 2015-06-18 18:31 /tmp/jurassic_world
drwxrwxrwx   - hdfs   hadoop          0 2014-11-14 15:41 /tmp/mapred
drwxr-xr-x   - samzer hadoop          0 2015-06-18 18:31 /tmp/moby_dick
drwxr-xr-x   - samzer hadoop          0 2015-06-16 18:17 /tmp/temp635459726
drwxr-xr-x   - samzer hadoop          0 2015-06-18 18:31 /tmp/unfinished_business


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


Once the folders are created, lets copy the data file to the respective folder.

In [37]:
%%bash
hadoop fs -copyFromLocal ./Data/mobydick_summary.txt /tmp/moby_dick
hadoop fs -copyFromLocal ./Data/jurassic_world_review.txt /tmp/jurassic_world
hadoop fs -copyFromLocal ./Data/unfinished_business_review.txt /tmp/unfinished_business

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


Let's verify that the file is copied

In [39]:
%%bash
hadoop fs -ls /tmp/moby_dick
hadoop fs -ls /tmp/jurassic_world
hadoop fs -ls /tmp/unfinished_business

Found 1 items
-rw-r--r--   3 samzer hadoop       5973 2015-06-18 18:34 /tmp/moby_dick/mobydick_summary.txt
Found 1 items
-rw-r--r--   3 samzer hadoop       3185 2015-06-18 18:34 /tmp/jurassic_world/jurassic_world_review.txt
Found 1 items
-rw-r--r--   3 samzer hadoop       2294 2015-06-18 18:34 /tmp/unfinished_business/unfinished_business_review.txt


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


We can see that files have been copied successfully.

With the following command, we'll execute our mapper and reducers script in hadoop with the following command. In the following command, we define the mapper, reducer, input and output file locations and then use hadoop streaming to execute our scripts.

Let's execute the word count program first.

In [41]:
%%bash

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-*streaming*.jar -file ./BigData/word_mapper.py -mapper word_mapper.py -file ./BigData/word_reducer.py -reducer word_reducer.py -input /tmp/moby_dick/* -output /tmp/moby_output 

packageJobJar: [./BigData/word_mapper.py, ./BigData/word_reducer.py, /tmp/hadoop-samzer/hadoop-unjar3615191027855941954/] [] /tmp/streamjob7745994398502096828.jar tmpDir=null


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
15/06/18 18:39:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/06/18 18:39:13 INFO mapred.FileInputFormat: Total input paths to process : 1
15/06/18 18:39:14 INFO streaming.StreamJob: getLocalDirs(): [/data/1/mapred/local, /data/2/mapred/local, /data/3/mapred/local]
15/06/18 18:39:14 INFO streaming.StreamJob: Running job: job_201506181207_0001
15/06/18 18:39:14 INFO streaming.StreamJob: To kill this job, run:
15/06/18 18:39:14 INFO streaming.StreamJob: UNDEF/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201506181207_0001
15/06/18 18:39:14 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201506181207_0001
15/06/18 18:39:15 INFO streaming.StreamJo

Let's verify that the word count map reduce program is working successfully.

In [43]:
%%bash

hadoop fs -cat /tmp/moby_output/*

(Queequeg	1
A	2
Africa	1
Africa,	1
After	1
Ahab	13
Ahab,	1
Ahab’s	6
All	1
American	1
As	1
At	1
Bedford,	1
Bildad	1
Bildad,	1
Boomer,	2
Captain	1
Christmas	1
Day	1
Delight,	1
Dick	6
Dick,	2
Dick.	6
During	2
Enderby,	1
Fedallah	1
Fedallah,	1
Fedallah.	1
Fedallah’s	2
From	1
Gabriel,	1
He	7
His	1
Indian	1
Ishmael	1
Ishmael,	2
Issuing	1
Jeroboam,	1
Massachusetts,	1
Moby	14
Nantucket	1
Nantucket,	1
New	1
Not	1
Ocean.	1
On	1
One	1
Pacific	1
Peleg	1
Peleg.	1
Pequod	9
Pequod,	2
Pequod’s	5
Pip	1
Pip,	1
Quaker	1
Queequeg	2
Queequeg.	1
Queequeg’s	2
Rachel	1
Rachel,	1
Samuel	1
Since	1
Soon	2
South	1
Starbuck	1
Starbuck,	1
Tashtego	1
Tashtego,	1
The	11
There	1
These	1
They	2
While	1
a	28
abilities	1
aboard	3
about	2
adorned	1
after	1
after,	2
again	3
again,	1
ahead.	1
alone	1
also	1
always	1
an	4
and	36
angry	1
announces	2
another	1
anticipation	1
anyone	2
appearance	2
appreciate	1
approaches	1
are	7
arm	1
as	7
at	2
atop	1
attacks	3
away	1
back	1
bad	1
balancing	1
baptizes	1
bargain	1
be	3
because	1

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
cat: `/tmp/moby_output/_logs': Is a directory


The program is working as intended. Now, we'll deploy the program of that calculates the sentiment score for each of the review. Do note we are adding the positive and negative dictionary files to the Hadoop streaming.

In [44]:
%%bash

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-*streaming*.jar -file ./BigData/senti_mapper.py -mapper senti_mapper.py -file ./BigData/senti_reducer.py -reducer senti_reducer.py -input /tmp/jurassic_world/* -output /tmp/jurassic_output -file ./positive-words.txt -file negative-words.txt  

packageJobJar: [./BigData/senti_mapper.py, ./BigData/senti_reducer.py, ./positive-words.txt, negative-words.txt, /tmp/hadoop-samzer/hadoop-unjar7337551354059438377/] [] /tmp/streamjob3150495844828357124.jar tmpDir=null


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
15/06/18 18:54:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/06/18 18:54:00 INFO mapred.FileInputFormat: Total input paths to process : 1
15/06/18 18:54:01 INFO streaming.StreamJob: getLocalDirs(): [/data/1/mapred/local, /data/2/mapred/local, /data/3/mapred/local]
15/06/18 18:54:01 INFO streaming.StreamJob: Running job: job_201506181207_0002
15/06/18 18:54:01 INFO streaming.StreamJob: To kill this job, run:
15/06/18 18:54:01 INFO streaming.StreamJob: UNDEF/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201506181207_0002
15/06/18 18:54:01 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201506181207_0002
15/06/18 18:54:02 INFO streaming.StreamJo

Let's check if its score the sentiments of review

In [45]:
%%bash

hadoop fs -cat /tmp/jurassic_output/*

"jurassic world," like its predecessors, fills up the screen with roaring, slathering, earth-shaking dinosaurs, then fills in mere humans around the edges. it's a formula that works as well in 2015 as it did in 1993.	3
	
a perfectly fine movie and entertaining enough to keep you watching until the closing credits.	4
	
an angry movie with a tragic moral ... meta-adoration and criticism ends with a genetically modified dinosaur fighting off waves of dinosaurs.	-3
	
if you limit your expectations for jurassic world to "more teeth," it will deliver on that promise. if you dare to hope for anything more-relatable characters, narrative coherence-you'll only set yourself up for disappointment.	-1
	
it can't be the amazing revelation the first film was, but "jurassic world" is a great amusement park ride...and the best sequel to it's predecessor we've had.	4
	
it combines first class effects, a genetically engineered deadly dinosaur, outstanding action, well defined characters and a screenplay

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
cat: `/tmp/jurassic_output/_logs': Is a directory


This program is also working as intended. Now we'll try out the overall sentiment of a movie.

In [49]:
%%bash

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-*streaming*.jar -file ./BigData/overall_senti_mapper.py -mapper overall_senti_mapper.py -file ./BigData/overall_senti_reducer.py -reducer overall_senti_reducer.py -input /tmp/unfinished_business/* -output /tmp/unfinished_business_output -file ./positive-words.txt -file negative-words.txt  

packageJobJar: [./BigData/overall_senti_mapper.py, ./BigData/overall_senti_reducer.py, ./positive-words.txt, negative-words.txt, /tmp/hadoop-samzer/hadoop-unjar4062599583189912126/] [] /tmp/streamjob5575281473245925063.jar tmpDir=null


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
15/06/18 19:04:50 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/06/18 19:04:50 INFO mapred.FileInputFormat: Total input paths to process : 1
15/06/18 19:04:52 INFO streaming.StreamJob: getLocalDirs(): [/data/1/mapred/local, /data/2/mapred/local, /data/3/mapred/local]
15/06/18 19:04:52 INFO streaming.StreamJob: Running job: job_201506181207_0005
15/06/18 19:04:52 INFO streaming.StreamJob: To kill this job, run:
15/06/18 19:04:52 INFO streaming.StreamJob: UNDEF/bin/hadoop job  -Dmapred.job.tracker=localhost:8021 -kill job_201506181207_0005
15/06/18 19:04:52 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201506181207_0005
15/06/18 19:04:53 INFO streaming.StreamJo

Let's verify the result.

In [50]:
%%bash

hadoop fs -cat /tmp/unfinished_business_output/*

-8	


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
cat: `/tmp/unfinished_business_output/_logs': Is a directory


We can see that the overall sentiment score is coming out correctly from Map Reduce. 

Below is a screenshot of the Jobtracker status page.

<img src="files/images/jobtracker.png">

The image shows a portal where the jobs submitted to the jobtracker can be viewed and the status can be seen. This can be seen on port 50070 of the master system. 

From the image, we can see that there is a job running and the status above the image shows that the job completed successfully.

## File Handling with Hadoopy

Hadoopy is a library in python which provides API to interact with Hadoop to manage the files and perform map reduce on it. Hadoopy can be downloaded from the following location

http://www.hadoopy.com/en/latest/tutorial.html#installing-hadoopy

Let's try to put few files in hadoop through hadoopy in a directory created within hdfs called data

In [19]:
%%bash
hadoop fs -mkdir data

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


The following is the code put the data into hdfs with the following code

In [20]:
#!/usr/bin/env python
import hadoopy
import os

hdfs_path = ''


def read_local_dir(local_path):
    for fn in os.listdir(local_path):
        path = os.path.join(local_path, fn)
        if os.path.isfile(path):
            yield path


def main():
    local_path = './BigData/dummy_data'
    for file in  read_local_dir(local_path):
        hadoopy.put(file, 'data')
        print "The file %s has been put into hdfs" % (file,)

if __name__ == '__main__':
    main()

The file ./BigData/dummy_data/test9 has been put into hdfs
The file ./BigData/dummy_data/test7 has been put into hdfs
The file ./BigData/dummy_data/test1 has been put into hdfs
The file ./BigData/dummy_data/test8 has been put into hdfs
The file ./BigData/dummy_data/test6 has been put into hdfs
The file ./BigData/dummy_data/test5 has been put into hdfs
The file ./BigData/dummy_data/test3 has been put into hdfs
The file ./BigData/dummy_data/test4 has been put into hdfs
The file ./BigData/dummy_data/test2 has been put into hdfs


In the above code, we list all the files in a directory and then we put each of the file into hadoop using the put method of hadoopy.

Let's check if all the files have been put into hdfs

In [21]:
%%bash
hadoop fs -ls data

Found 9 items
-rw-r--r--   3 samzer hadoop          0 2015-06-23 00:19 data/test1
-rw-r--r--   3 samzer hadoop          0 2015-06-23 00:19 data/test2
-rw-r--r--   3 samzer hadoop          0 2015-06-23 00:19 data/test3
-rw-r--r--   3 samzer hadoop          0 2015-06-23 00:19 data/test4
-rw-r--r--   3 samzer hadoop          0 2015-06-23 00:19 data/test5
-rw-r--r--   3 samzer hadoop          0 2015-06-23 00:19 data/test6
-rw-r--r--   3 samzer hadoop          0 2015-06-23 00:19 data/test7
-rw-r--r--   3 samzer hadoop          0 2015-06-23 00:19 data/test8
-rw-r--r--   3 samzer hadoop          0 2015-06-23 00:19 data/test9


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


So we have successfully been able to put files into hdfs.

## Pig

<img src="files/images/pig-logo.gif">

Pig is a platform that gives a very expressive language to perform data transformations and querying. The that is written in Pig is in a scripting manner and this gets compiled to Map Reduce Programs which executes on Hadoop. 

Pig helps in reducing the complexity of the raw level Map-Reduce program and enable the user to perform transformations fast.

Pig latin can be learned from the this link http://pig.apache.org/docs/r0.7.0/piglatin_ref2.html

We'll be covering how to perform Top 10 most occuring words with Pig and then we show how you can create your function in python that can be used in Pig.

Let's start with the Word Count. Following is the Pig Latin Code which you can save it in pig_wordcount.py file.

In [None]:
data = load '/tmp/moby_dick/';

word_token = foreach data generate flatten(TOKENIZE((chararray)$0)) as word;

group_word_token = group word_token by word;

count_word_token = foreach group_word_token generate COUNT(word_token) as cnt, group;

sort_word_token = ORDER count_word_token by cnt DESC;

top10_word_count = LIMIT sort_word_token 10; 

DUMP top10_word_count;

In the above code, we load the summary of Moby Dick which is then Tokenized line by line which is basically splitting it into individual elements. The Flatten function converts the Collection of individual word tokens in a line to a row by row form. We then group by the words and then take a count of the words for each word. Finally we sort the counts in descending order and then we limit to the first 10 rows to get the Top 10 most occuring words.

Let's execute the above pig script.

In [53]:
%%bash

pig ./BigData/pig_wordcount.pig

(83,the)
(36,and)
(28,a)
(25,of)
(24,to)
(15,his)
(14,Ahab)
(14,Moby)
(14,is)
(14,in)


2015-06-20 14:31:43,395 [main] INFO  org.apache.pig.Main - Apache Pig version 0.11.0-cdh4.7.0 (rexported) compiled May 28 2014, 11:06:21
2015-06-20 14:31:43,396 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/samzer/Coding/p/pythonDataScienceBook/pig_1434790903392.log
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2015-06-20 14:31:43,963 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /home/samzer/.pigbootup not found
2015-06-20 14:31:44,145 [main] WARN  org.apache.hadoop.conf.Configuration - fs.default.name is deprecated. Instead, use fs.defaultFS
2015-06-20 14:31:44,145 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:8020
2015-06-20 14:31:44,721 [main] INFO  org.apache.pig.backend.hadoop.executionengine.

We are able to get our top 10 words. Let's now create a user defined function with Python which will be used in Pig.

We'll define two user defined function to score the positive and negative sentiment of a sentence. 

The following is the udf for score the positive sentiment and its available in positive_sentiment.py

In [None]:
positive_words = ['a+', 'abound', 'abounds', 'abundance', 'abundant', 'accessable', 'accessible', 'acclaim', 'acclaimed', 'acclamation', 'acco$



@outputSchema("pnum:int")
def sentiment_score(text):
    positive_score = 0

    for w in text.split(' '):
        if w in positive_words: positive_score+=1

    return positive_score


In the above code, we define the positive word list which is used by the sentiment_score function. The function checks for the positive words in a sentence and finally outputs the total count of it. There is a outputSchema decorator which is used to tell Pig what type of data is being outputted out which in our case is int. 

Following is the code for scoring the negative sentiment and its available in negative_sentiment.py. The code almost similar to the positive sentiment.

In [None]:
negative_words = ['2-faced', '2-faces', 'abnormal', 'abolish', 'abominable', 'abominably', 'abominate', 'abomination', 'abort', 'aborted', 'ab$


@outputSchema("nnum:int")
def sentiment_score(text):
    negative_score = 0

    for w in text.split(' '):
        if w in negative_words: negative_score-=1

    return  negative_score

Following is the Pig which scores the sentiments of the Jurassic World review and its available in pig_sentiment.pig

In [None]:
register 'positive_sentiment.py' using org.apache.pig.scripting.jython.JythonScriptEngine as positive;
register 'negative_sentiment.py' using org.apache.pig.scripting.jython.JythonScriptEngine as negative;

data = load '/tmp/jurassic_world/*';

feedback_sentiments = foreach data generate LOWER((chararray)$0) as feedback, positive.sentiment_score(LOWER((chararray)$0)) as psenti , 
negative.sentiment_score(LOWER((chararray)$0)) as nsenti;

average_sentiments = foreach feedback,feedback_sentiments generate psenti + nsenti;

dump average_sentiments;

In the above Pig Script, we first register the python udf scripts using the register command and give it an appropriate name. We then load our Jurassic World review. We then convert our reviews to lower case and then score the positive and negative sentiments of a review. Finally, we add the score to get the overall sentiment of a review.

Let's execute the Pig script and see the result.

In [5]:
%%bash
pig ./BigData/pig_sentiment.pig

(there is plenty here to divert, but little to leave you enraptored. such is the fate of the sequel: bigger. louder. fewer teeth.,0)
(if you limit your expectations for jurassic world to "more teeth," it will deliver on that promise. if you dare to hope for anything more-relatable characters, narrative coherence-you'll only set yourself up for disappointment.,-1)
(there's a problem when the most complex character in a film is the dinosaur,-2)
(not so much another bloated sequel as it is the fruition of dreams deferred in the previous films. too bad the genre dictates that those dreams are once again destined for disaster.,-2)
(a perfectly fine movie and entertaining enough to keep you watching until the closing credits.,4)
(this fourth installment of the jurassic park film series shows some wear and tear, but there is still some gas left in the tank. time is spent to set up the next film in the series. they will keep making more of these until we stop watching.,0)
(an angry movie with 

2015-06-22 00:10:49,026 [main] INFO  org.apache.pig.Main - Apache Pig version 0.11.0-cdh4.7.0 (rexported) compiled May 28 2014, 11:06:21
2015-06-22 00:10:49,027 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/samzer/Coding/p/pythonDataScienceBook/pig_1434912049024.log
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2015-06-22 00:11:28,969 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /home/samzer/.pigbootup not found
2015-06-22 00:11:29,154 [main] WARN  org.apache.hadoop.conf.Configuration - fs.default.name is deprecated. Instead, use fs.defaultFS
2015-06-22 00:11:29,154 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:8020
2015-06-22 00:11:29,723 [main] INFO  org.apache.pig.backend.hadoop.executionengine.

We have successfully scored the sentiments of Jurassic World Review using Python UDF in Pig.

## Python with Apache Spark

<img src="files/images/spark.png">


Apache Spark is a computing framework which works on top of HDFS and provides alternative way of computing similar to Map-Reduce. It was developed by AmpLab of UC Berkeley. Spark does its computation mostly in the memory because of which it is much more faster than Map-Reduce and is well suited for Machine Learning as its able to handle Iterative Work Loads really well.


Spark used the programming abstraction of RDDs (Resilient Distributed Datasets) in which data is logically distributed into partitions and transformations can be performed on top of it.

Python is one of the language that is used for interacting with Apache Spark and we'll create a program to perform the sentiment scoring for each review of Jurassic Park as well as the overall sentiment.

You can install by following the instructions in the following link.

https://spark.apache.org/docs/1.0.1/spark-standalone.html

The following is the Python code for scoring the sentiment.

In [None]:
from __future__ import print_function

import sys
from operator import add

from pyspark import SparkContext

positive_words = open('positive-words.txt').read().split('\n')
negative_words = open('negative-words.txt').read().split('\n')


def sentiment_score(text, pos_list, neg_list):
    positive_score = 0
    negative_score = 0

    for w in text.split(' '):
        if w in pos_list: positive_score+=1
        if w in neg_list: negative_score+=1

    return positive_score - negative_score

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: sentiment <file>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonSentiment")
    lines = sc.textFile(sys.argv[1], 1)
    scores = lines.map(lambda x: (x, sentiment_score(x.lower(), positive_words, negative_words)))
    output = scores.collect()
    for (key, score) in output:
        print("%s: %i" % (key, score))

    sc.stop()


In the above code, we define our standard sentiment_score function which we'll be reusing. The if statement checks that the python script and the text file is given. The sc variable is a Spark Context object with the App name "PythonSentiment". The filename in the argument is passed into spark through the textFile method of sc. In the map function of Spark, we define a lambda function where each line of the text file is passed and then we obtain the line and its respective sentiment score. The output variable gets the result and finally we print the result in the screen.

Let's score the sentiment of each of the review of Jurassic World. Please add your hostname below.

In [5]:
%%bash
~/spark-1.3.0-bin-cdh4/bin/spark-submit --master spark://<hostname>:7077 ./BigData/spark_sentiment.py hdfs://localhost:8020/tmp/jurassic_world/*

There is plenty here to divert, but little to leave you enraptored. Such is the fate of the sequel: Bigger. Louder. Fewer teeth.: 0
If you limit your expectations for Jurassic World to "more teeth," it will deliver on that promise. If you dare to hope for anything more-relatable characters, narrative coherence-you'll only set yourself up for disappointment.: -1
There's a problem when the most complex character in a film is the dinosaur: -2
not so much another bloated sequel as it is the fruition of dreams deferred in the previous films. Too bad the genre dictates that those dreams are once again destined for disaster.: -2
A perfectly fine movie and entertaining enough to keep you watching until the closing credits.: 4
This fourth installment of the Jurassic Park film series shows some wear and tear, but there is still some gas left in the tank. Time is spent to set up the next film in the series. They will keep making more of these until we stop watching.: 0
An angry movie with a tragi

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/22 18:23:55 INFO SparkContext: Running Spark version 1.3.0
15/06/22 18:23:55 WARN Utils: Your hostname, samzer resolves to a loopback address: 127.0.0.1; using 192.168.75.156 instead (on interface wlan0)
15/06/22 18:23:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/06/22 18:23:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/06/22 18:23:55 INFO SecurityManager: Changing view acls to: samzer
15/06/22 18:23:55 INFO SecurityManager: Changing modify acls to: samzer
15/06/22 18:23:55 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(samzer); users with modify permissions: Set(samzer)
15/06/22 18:23:56 INFO Slf4jLogger: Slf4jLogger started
15/06/22 18:23:

We can see that our Spark Program was able to score the sentiment for each of the review. We use the Spark Submit command and we define the Spark master with the python script that needs to be executed along with the location of the Jurassic World review in hdfs.

Below is a Spark program to score the overall sentiment of all the review.


In [None]:
from __future__ import print_function

import sys
from operator import add

from pyspark import SparkContext

positive_words = open('positive-words.txt').read().split('\n')
negative_words = open('negative-words.txt').read().split('\n')


def sentiment_score(text, pos_list, neg_list):
    positive_score = 0
    negative_score = 0

    for w in text.split(' '):
        if w in pos_list: positive_score+=1
        if w in neg_list: negative_score+=1

    return positive_score - negative_score

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: Overall Sentiment <file>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonOverallSentiment")
    lines = sc.textFile(sys.argv[1], 1)
    scores = lines.map(lambda x: ("Total", sentiment_score(x.lower(), positive_words, negative_words)))\
                  .reduceByKey(add)
    output = scores.collect()
    for (key, score) in output:
        print("%s: %i" % (key, score))

    sc.stop()

In the above code, we have added a reduceByKey method which reduces the value by adding them and also we have defined the Key as "Total" so that all the scores reduced based on the single key.

Let's try out the above code to get the overall sentiment of Jurassic World. Please add your hostname below.

In [6]:
%%bash
~/spark-1.3.0-bin-cdh4/bin/spark-submit --master spark://<hostname>:7077 ./BigData/spark_overall_sentiment.py hdfs://localhost:8020/tmp/jurassic_world/*

Total: 19


Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/22 18:40:47 INFO SparkContext: Running Spark version 1.3.0
15/06/22 18:40:48 WARN Utils: Your hostname, samzer resolves to a loopback address: 127.0.0.1; using 192.168.75.156 instead (on interface wlan0)
15/06/22 18:40:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/06/22 18:40:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/06/22 18:40:49 INFO SecurityManager: Changing view acls to: samzer
15/06/22 18:40:49 INFO SecurityManager: Changing modify acls to: samzer
15/06/22 18:40:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(samzer); users with modify permissions: Set(samzer)
15/06/22 18:40:50 INFO Slf4jLogger: Slf4jLogger started
15/06/22 18:40:

We can see that Spark gave an overall sentiment score of 19.

The applications getting executed on Spark can viewed in the browser on the 8080 port of the Spark master. Following is a screenshot of it.

<img src="files/images/spark_monitor.png">


We can see that the number of nodes of Spark, applications that are getting executed currently and as well as the applications that completed execution.