<a href="https://colab.research.google.com/github/skh027/JCU_public/blob/main/W5_S1_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Big Data Everyday
More than 550 million tweets, 105 billion emails, 85 million WhatsApp messages are sent – all in a single day! 4.2 Petabytes of data are generated only on Facebook in 24 hours!! -- How do we process it and build machine learning models from it?

## What is Spark?
Apache Spark is an open-source, distributed cluster computing framework that is used for fast processing, querying and analyzing Big Data.

- most effective data processing framework in enterprises today
- the cost of Spark is high as it requires a lot of RAM for in-memory computation (yet favourite in DS and BigData communities)
- 100 times faster than Map Reduce frameworks like Hadoop
- multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters
- processing of data in batches and real-time streaming, using languages like: Python, SQL, Scala, Java or R
- execute fast, distributed ANSI SQL queries for dashboarding and ad-hoc reporting
- perform EDA on petabyte-scale data without having to resort to downsampling
- use the same machine learning algorithms and code to scale to fault-tolerant clusters of thousands of machines

## Spark Applications

A Spark application is an instance of the Spark Context (SC). It consists of a driver process and a set of executor processes.

- The driver process: It is responsible for maintaining information about the Spark Application, responding to the code, distributing, and scheduling work across the executors. it's the heart of a Spark Application and maintains all relevant information during the lifetime of the application.

- The executors: They are responsible for actually executing the work that the driver assigns them. Each executor is responsible for only two things:
    - Executing code assigned to it by the driver
    - Reporting the state of the computation back to the driver node
    
- Spark Session: The driver process makes itself available to the user as an object called the Spark Session. The Spark Session instance is the way Spark executes user-defined manipulations across the cluster.


## Partitions in Spark
Partitioning means that the complete data is not present in a single place. It is divided into multiple chunks and these chunks are placed on different nodes to process.

If we have one partition, Spark will only have a parallelism of one, even if we have thousands of executors. Also, if we have many partitions but only one executor, Spark will still only have a parallelism of one because there is only one computation resource.

In Spark, the lower level APIs allow us to define the number of partitions.

## How to run Spark on Google Colab?

In [1]:
!pwd  # SKH

/content


In [2]:
from google.colab import drive  # SKH
drive.mount('/content/drive')  # SKH
DIRECTORY_NAME = '/content/drive/My Drive/Data Science/'  # SKH

Mounted at /content/drive


In [3]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (185.125.190.81)] [Waiting for headers] [1 InRelease 3,626 B/3,0% [Connecting to archive.ubuntu.com (185.125.190.81)] [Waiting for headers] [Connected to r2u.stat.                                                                                                    Get:2 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Ign:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy Release [5,713 B]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy Release.gpg [793 B]
Get:8 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Pack

In [4]:
!wget https://downloads.apache.org/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz  # SKH

--2024-08-07 23:34:23--  https://downloads.apache.org/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz
Resolving downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.208.237, 2a01:4f8:10a:39da::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|135.181.214.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 388930980 (371M) [application/x-gzip]
Saving to: ‘spark-3.4.3-bin-hadoop3.tgz’


2024-08-07 23:35:00 (10.1 MB/s) - ‘spark-3.4.3-bin-hadoop3.tgz’ saved [388930980/388930980]



In [5]:
!tar -xzf spark-3.4.3-bin-hadoop3.tgz  # SKH

In [6]:
!pip install -q findspark

In [7]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.3-bin-hadoop3"  # SKH

In [8]:
import findspark
findspark.init()

In [9]:
findspark.find()

'/content/spark-3.4.3-bin-hadoop3'

In [10]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf


conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [11]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')

--2024-08-07 23:35:46--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.161.241.46, 54.237.133.81, 18.205.222.128, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.161.241.46|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13921656 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2024-08-07 23:35:47 (18.6 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13921656/13921656]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [12]:
!curl -s http://localhost:4040/api/tunnels

In [13]:
# a simple example to understand how partitioning helps us to give faster results
# we will create a list of 20 million random numbers between 10 to 1000 and will count the numbers greater than 300

from random import randint

# create a list of random numbers between 10 to 1000
my_large_list = [randint(10,1000) for x in range(0,20000000)]


In [14]:
# create one partition of the list
my_large_list_one_partition = sc.parallelize(my_large_list,numSlices=1)

# check number of partitions
print(my_large_list_one_partition.getNumPartitions())

# filter numbers greater than equal to 300
my_large_list_one_partition = my_large_list_one_partition.filter(lambda x : (x*2)/2.5 >= 300)

# to calculate the time taken to execute the following command
%time

# count the number of elements in filtered list
print(my_large_list_one_partition.count())

1
CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 7.87 µs
12635232


In [15]:
# filter numbers greater than equal to 300
result = list(filter(lambda x: ((x*2)/2.5 >= 300), my_large_list))

%time

# count the number of elements in filtered list
print(len(result))

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.48 µs
12635232


In [16]:
%time
count = 0
for x in my_large_list:
  if(x*2/2.5 >=300):
    count = count +1
print(count)

CPU times: user 4 µs, sys: 1e+03 ns, total: 5 µs
Wall time: 8.82 µs
12635232


In [17]:
# create five partitions of the list
my_large_list_with_five_partition = sc.parallelize(my_large_list, numSlices=5)
%time
my_large_list_with_five_partition = my_large_list_with_five_partition.filter(lambda x : (x*2)/2.5 >= 300)



# count the number of elements in the filtered list
print(my_large_list_with_five_partition.count())

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.48 µs
12635232


## Transformations in Spark
Data structures are immutable in Spark. This means that they cannot be changed once created. We need to instruct Spark on how we would like to modify our data. These instructions are called transformations. There are two types of transformations in Spark:
- Narrow Transformation: All the elements that are required to compute the results of a single partition live in the single partition of the parent RDD. For example, to filter the numbers that are less than 100, we can do this on each partition separately. The transformed new partition is dependent on only one partition to calculate the results.
- Wide Transformation: All the elements that are required to compute the results of single partitions may live in more than one partition of the parent RDD. For example, to calculate the word count, our transformation is dependent on all the partitions to calculate the final result.



## Lazy Evaluation
For example, if we have a very large data file that contains millions of rows. We need to perform analysis on that by doing some manipulations like mapping, filtering, random split or even very basic addition or subtraction. Now, for large datasets, even a basic transformation will take millions of operations to execute.

It is essential to optimize these operations when working with Big Data, and Spark handles it in a very creative way. All we need to do is tell Spark what are the transformations we want to do on the dataset and Spark will maintain a series of transformations (through DAG). When we ask for the results from Spark, it will then find out the best path and perform the required transformations and give us the result.

In [18]:
# create a sample list
my_list = [i for i in range(1,10000000)]

# parallelize the data
rdd_0 = sc.parallelize(my_list,3)

rdd_0

ParallelCollectionRDD[4] at readRDDFromFile at PythonRDD.scala:287

In [19]:
# add value 5.5 to each number
rdd_1 = rdd_0.map(lambda x : x+5.5)

# RDD object
print(rdd_1)

# get the RDD Lineage
print(rdd_1.toDebugString())

PythonRDD[5] at RDD at PythonRDD.scala:53
b'(3) PythonRDD[5] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[4] at readRDDFromFile at PythonRDD.scala:287 []'


In [20]:
# add value 25 each number
rdd_2 = rdd_1.map(lambda x : x+20)

# RDD Object
print(rdd_2)

# get the RDD Lineage
print(rdd_2.toDebugString())

PythonRDD[6] at RDD at PythonRDD.scala:53
b'(3) PythonRDD[6] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[4] at readRDDFromFile at PythonRDD.scala:287 []'


In [21]:
from google.colab import files
# uploaded = files.upload()  # SKH Upload the file manually
# file_name = list(uploaded.keys())[0]  # SKH Get the file name
file_name = 'spark_input_text.txt'  # SKH use previous two lines instead if manually uploading
file_name = DIRECTORY_NAME + file_name  # SKH
my_text_file = sc.textFile(file_name, minPartitions=4)  # SKH

# RDD Object
print(my_text_file)

# convert to lower case
my_text_file = my_text_file.map(lambda x : x.lower())

# Updated RDD Object
print(my_text_file)

# Get the RDD Lineage
print(my_text_file.toDebugString())

/content/drive/My Drive/Data Science/spark_input_text.txt MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0
PythonRDD[9] at RDD at PythonRDD.scala:53
b'(4) PythonRDD[9] at RDD at PythonRDD.scala:53 []\n |  /content/drive/My Drive/Data Science/spark_input_text.txt MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0 []\n |  /content/drive/My Drive/Data Science/spark_input_text.txt HadoopRDD[7] at textFile at NativeMethodAccessorImpl.java:0 []'


In [22]:
# slice the words
my_text_file = my_text_file.map(lambda x : x[:2])

# RDD Object
print(my_text_file)

# Get the RDD Lineage
print(my_text_file.toDebugString())

# Get the first element after all the transformations
print(my_text_file.first())
#print(my_text_file.collect())


PythonRDD[10] at RDD at PythonRDD.scala:53
b'(4) PythonRDD[10] at RDD at PythonRDD.scala:53 []\n |  /content/drive/My Drive/Data Science/spark_input_text.txt MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0 []\n |  /content/drive/My Drive/Data Science/spark_input_text.txt HadoopRDD[7] at textFile at NativeMethodAccessorImpl.java:0 []'
fa


In [23]:
print(my_text_file.countApproxDistinct())

190


## Data Types in Spark MLlib
MLlib is Spark's scalable Machine Learning library. It consists of common machine learning algorithms like Regression, Classification, Dimensionality Reduction, and some utilities to perform basic statistical operations on the data.
We will explore some of the data types that MLlib provides.

### Local Vector
MLlib supports two types of Local Vectors: dense and sparse. Sparse Vectors are used when most of the numbers are zero. To create a sparse vector, you need to provide the length of the vector – indices of non-zero values which should be strictly increasing and non-zero values.

In [24]:
from pyspark.mllib.linalg import Vectors

## Dense Vector
print(Vectors.dense([1,2,3,4,5,6,0]))

v = Vectors.dense([1.0, 2.0])

u = Vectors.dense([3.0, 4.0])

print(v + u)

### SPARSE VECTOR
### Vectors.sparse( length, index_of_non_zero_values, non_zero_values)
### Indices values should be strictly increasing

print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]))

print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]).toArray())

[1.0,2.0,3.0,4.0,5.0,6.0,0.0]
[4.0,6.0]
(10,[0,1,2,4,5],[1.0,5.0,3.0,5.0,7.0])
[1. 5. 3. 0. 5. 7. 0. 0. 0. 0.]


### Labeled Point
Labeled Point is a local vector where a label is assigned to each vector. For supervised problems where we have some target corresponding to some features.

In [25]:
from pyspark.mllib.regression import LabeledPoint

# set a Label against a Dense Vector
point_1 = LabeledPoint(1,Vectors.dense([1,2,3,4,5]))

# Features
print(point_1.features)

# Label
print(point_1.label)

[1.0,2.0,3.0,4.0,5.0]
1.0


### Local Matrix
Local Matrices are stored on a single machine. MLlib supports both dense and sparse matrices. In a Sparse matrix, non-zero entry values are stored in the Compressed Sparse Column (CSC) format in column-major order.

In [26]:
# import the Matrices
from pyspark.mllib.linalg import Matrices

# create a dense matrix of 3 Rows and 2 columns
matrix_1 = Matrices.dense(3, 3, [1,2,3,4,5,6,0,0,9])

print(matrix_1)

print(matrix_1.toArray())

# create a sparse matrix
matrix_2 =  matrix_1.toSparse()

print(matrix_2)

print(matrix_2.toArray())

DenseMatrix([[1., 4., 0.],
             [2., 5., 0.],
             [3., 6., 9.]])
[[1. 4. 0.]
 [2. 5. 0.]
 [3. 6. 9.]]
3 X 3 CSCMatrix
(0,0) 1.0
(1,0) 2.0
(2,0) 3.0
(0,1) 4.0
(1,1) 5.0
(2,1) 6.0
(2,2) 9.0
[[1. 4. 0.]
 [2. 5. 0.]
 [3. 6. 9.]]


### Distributed Matrix
Distributed matrices are stored in one or more RDDs. It is very important to choose the right format of distributed matrices. Four types of distributed matrices have been implemented so far:

#### Row Matrix
- Each row is a local vector. You can store rows on multiple partitions
- Algorithms like Random Forest can be implemented using Row Matrix as the algorithm divides the rows to create multiple trees. The result of one tree is not dependent on other trees. So, we can make use of the distributed architecture and do parallel processing for algorithms like Random Forest for Big Data

In [27]:
# Distributed Data Type - Row Matrix
from pyspark.mllib.linalg.distributed import RowMatrix

# create RDD
rows = sc.parallelize([[1,2,3], [4,5,6], [7,8,9], [10,11,12]])

# create a distributed Row Matrix
row_matrix = RowMatrix(rows)


print(row_matrix)

print(row_matrix.numRows())

print(row_matrix.numCols())

<pyspark.mllib.linalg.distributed.RowMatrix object at 0x7831c098ece0>
4
3


### Indexed Row Matrix
- It is similar to the row matrix where rows are stored in multiple partitions but in an ordered manner. An index value is assigned to each row. It is used in algorithms where the order is important like Time Series data
- It can be created from an RDD of IndexedRow

In [28]:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

# create RDD
indexed_rows = sc.parallelize([
    IndexedRow(0, [0,1,2]),
    IndexedRow(1, [1,2,3]),
    IndexedRow(2, [3,4,5]),
    IndexedRow(3, [4,2,3]),
    IndexedRow(4, [2,2,5]),
    IndexedRow(5, [4,5,5])
])

# create IndexedRowMatrix
indexed_rows_matrix = IndexedRowMatrix(indexed_rows)

print(indexed_rows_matrix.numRows())

print(indexed_rows_matrix.numCols())

6
3


### Block Matrix
- In a Block Matrix, we can store different sub-matrices of a large matrix on different machines
- We need to specify the block dimensions. Like in the below example, we have 3X3 and for each of the blocks, we can specify a matrix by providing the coordinates

In [29]:
# import the libraries
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 3, [1, 2, 1, 2, 1, 2, 1, 2, 1])),
                         ((1, 1), Matrices.dense(3, 3, [3, 4, 5, 3, 4, 5, 3, 4, 5])),
                         ((2, 0), Matrices.dense(3, 3, [1, 1, 1, 1, 1, 1, 1, 1, 1]))])

# Create a BlockMatrix from an RDD of sub-matrix blocks  of size 3X3
b_matrix = BlockMatrix(blocks, 3, 3)

# columns per block
print(b_matrix.colsPerBlock)


# rows per block
print(b_matrix.rowsPerBlock)


# convert the block matrix to local matrix
local_mat = b_matrix.toLocalMatrix()

# print local matrix
print(local_mat.toArray())

3
3
[[1. 2. 1. 0. 0. 0.]
 [2. 1. 2. 0. 0. 0.]
 [1. 2. 1. 0. 0. 0.]
 [0. 0. 0. 3. 3. 3.]
 [0. 0. 0. 4. 4. 4.]
 [0. 0. 0. 5. 5. 5.]
 [1. 1. 1. 0. 0. 0.]
 [1. 1. 1. 0. 0. 0.]
 [1. 1. 1. 0. 0. 0.]]


In [30]:
# uploaded = files.upload()  # Upload the file manually  # SKH
# file_name = list(uploaded.keys())[0]  # Get the file name  # SKH
file_name = '1972-Nixon.txt'  # SKH
file_name = DIRECTORY_NAME + file_name  # SKH

rdd = sc.textFile(file_name, minPartitions=6)  # SKH
rdd.take(10)

['Address on the State of the Union Delivered Before a Joint Session of the Congress. January 20, 1972',
 '',
 'Mr. Speaker, Mr. President, my colleagues in the Congress, our distinguished guests, my fellow Americans:',
 '�@Twenty-five years ago I sat here as a freshman Congressman�Xalong with Speaker Albert�Xand listened for the first time to the President address the State of the Union.',
 '  I shall never forget that moment. The Senate, the diplomatic corps, the Supreme Court, the Cabinet entered the Chamber, and then the President of the United States. As all of you are aware, I had some differences with President Truman. He had some with me. But I remember that on that day�Xthe day he addressed that joint session of the newly elected Republican 80th Congress, he spoke not as a partisan, but as President of all the people�Xcalling upon the Congress to put aside partisan considerations in the national interest.',
 '  The Greek-Turkish aid program, the Marshall Plan, the great foreig

In [31]:
import nltk
nltk.download('punkt')
#word tokenizer
def word_tokenize1(x):
    lowerW = x.lower()
    return nltk.word_tokenize(x)

words = rdd.flatMap(word_tokenize1)
#print(words.collect())
print(len(words.collect()))


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


4472


In [32]:
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
stop_words=set(stopwords.words('english'))
stopW = words.filter(lambda word : word not in stop_words and word != '')
print(stopW.collect())
print(len(stopW.collect()))


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


['Address', 'State', 'Union', 'Delivered', 'Before', 'Joint', 'Session', 'Congress', '.', 'January', '20', ',', '1972', 'Mr.', 'Speaker', ',', 'Mr.', 'President', ',', 'colleagues', 'Congress', ',', 'distinguished', 'guests', ',', 'fellow', 'Americans', ':', '�', '@', 'Twenty-five', 'years', 'ago', 'I', 'sat', 'freshman', 'Congressman�Xalong', 'Speaker', 'Albert�Xand', 'listened', 'first', 'time', 'President', 'address', 'State', 'Union', '.', 'I', 'shall', 'never', 'forget', 'moment', '.', 'The', 'Senate', ',', 'diplomatic', 'corps', ',', 'Supreme', 'Court', ',', 'Cabinet', 'entered', 'Chamber', ',', 'President', 'United', 'States', '.', 'As', 'aware', ',', 'I', 'differences', 'President', 'Truman', '.', 'He', '.', 'But', 'I', 'remember', 'day�Xthe', 'day', 'addressed', 'joint', 'session', 'newly', 'elected', 'Republican', '80th', 'Congress', ',', 'spoke', 'partisan', ',', 'President', 'people�Xcalling', 'upon', 'Congress', 'put', 'aside', 'partisan', 'considerations', 'national', 'in

In [33]:
import string
list_punct=list(string.punctuation)
filtered_data = stopW.filter(lambda punct : punct not in list_punct)
print(filtered_data.collect())

['Address', 'State', 'Union', 'Delivered', 'Before', 'Joint', 'Session', 'Congress', 'January', '20', '1972', 'Mr.', 'Speaker', 'Mr.', 'President', 'colleagues', 'Congress', 'distinguished', 'guests', 'fellow', 'Americans', '�', 'Twenty-five', 'years', 'ago', 'I', 'sat', 'freshman', 'Congressman�Xalong', 'Speaker', 'Albert�Xand', 'listened', 'first', 'time', 'President', 'address', 'State', 'Union', 'I', 'shall', 'never', 'forget', 'moment', 'The', 'Senate', 'diplomatic', 'corps', 'Supreme', 'Court', 'Cabinet', 'entered', 'Chamber', 'President', 'United', 'States', 'As', 'aware', 'I', 'differences', 'President', 'Truman', 'He', 'But', 'I', 'remember', 'day�Xthe', 'day', 'addressed', 'joint', 'session', 'newly', 'elected', 'Republican', '80th', 'Congress', 'spoke', 'partisan', 'President', 'people�Xcalling', 'upon', 'Congress', 'put', 'aside', 'partisan', 'considerations', 'national', 'interest', 'The', 'Greek-Turkish', 'aid', 'program', 'Marshall', 'Plan', 'great', 'foreign', 'policy',

In [34]:
nltk.download('averaged_perceptron_tagger')
def pos_tag(x):
    return nltk.pos_tag([x])
pos_word = filtered_data.map(pos_tag)
print(pos_word.collect())


[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


[[('Address', 'NN')], [('State', 'NN')], [('Union', 'NNP')], [('Delivered', 'VBN')], [('Before', 'IN')], [('Joint', 'JJ')], [('Session', 'NN')], [('Congress', 'NNP')], [('January', 'NNP')], [('20', 'CD')], [('1972', 'CD')], [('Mr.', 'NNP')], [('Speaker', 'NN')], [('Mr.', 'NNP')], [('President', 'NNP')], [('colleagues', 'NNS')], [('Congress', 'NNP')], [('distinguished', 'VBN')], [('guests', 'NNS')], [('fellow', 'NN')], [('Americans', 'NNS')], [('�', 'NN')], [('Twenty-five', 'JJ')], [('years', 'NNS')], [('ago', 'RB')], [('I', 'PRP')], [('sat', 'NN')], [('freshman', 'NN')], [('Congressman�Xalong', 'NN')], [('Speaker', 'NN')], [('Albert�Xand', 'NN')], [('listened', 'VBN')], [('first', 'RB')], [('time', 'NN')], [('President', 'NNP')], [('address', 'NN')], [('State', 'NN')], [('Union', 'NNP')], [('I', 'PRP')], [('shall', 'MD')], [('never', 'RB')], [('forget', 'NN')], [('moment', 'NN')], [('The', 'DT')], [('Senate', 'NNP')], [('diplomatic', 'JJ')], [('corps', 'NN')], [('Supreme', 'NNP')], [('

In [35]:
%time
text_Freq = filtered_data.flatMap(lambda x : nltk.FreqDist(x.split(",")).most_common())\
            .map(lambda x: x)\
            .reduceByKey(lambda x,y : x+y)\
            .sortBy(lambda x: x[1], ascending = False)
topcommon_data = text_Freq.take(100) #take first 100 most common words
print(topcommon_data)

CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 5.01 µs
[('I', 31), ('We', 26), ('new', 23), ('Congress', 22), ('The', 20), ('America', 19), ('Nation', 18), ('years', 16), ('us', 16), ('year', 16), ('world', 16), ('great', 14), ('But', 13), ('one', 13), ('meet', 13), ('As', 11), ('American', 10), ('help', 10), ('must', 9), ('--', 9), ('believe', 9), ('also', 9), ('President', 8), ('full', 8), ('Federal', 8), ('war', 8), ('goal', 8), ('peace', 8), ('proposals', 8), ('toward', 7), ('first', 7), ('today', 7), ('They', 7), ('still', 7), ('look', 7), ('time', 7), ('Our', 7), ('people', 7), ('policy', 6), ('action', 6), ('make', 6), ('Now', 6), ('technology', 6), ('program', 6), ('future', 6), ('property', 6), ('Americans', 6), ('would', 6), ('capacity', 6), ('others', 6), ('differences', 6), ('And', 6), ('shall', 6), ('problems', 6), ('rising', 5), ('spirit', 5), ('improve', 5), ('schools', 5), ('change', 5), ('1972', 5), ('freedom', 5), ('back', 5), ('life', 5), ('lives', 5), (

In [36]:
%time
bigrams = rdd.map(lambda s : s.split(" "))\
             .flatMap(lambda s: [((s[i],s[i+1]),1) for i in range (0, len(s)-1)])\
             .map(lambda x: x)\
             .reduceByKey(lambda x,y : x+y)\
             .sortBy(lambda x: x[1], ascending = False)
bigrams.collect()

CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 7.39 µs


[(('', ''), 100),
 (('of', 'the'), 43),
 (('in', 'the'), 34),
 (('to', 'the'), 22),
 (('for', 'the'), 15),
 (('will', 'be'), 12),
 (('', 'We'), 12),
 (('', 'The'), 12),
 (('we', 'are'), 11),
 (('I', 'have'), 10),
 (('that', 'we'), 9),
 (('have', 'been'), 9),
 (('of', 'our'), 9),
 (('the', 'Congress'), 9),
 (('', 'I'), 8),
 (('', 'As'), 8),
 (('in', 'this'), 8),
 (('and', 'to'), 7),
 (('', 'Our'), 7),
 (('a', 'new'), 7),
 (('the', 'world.'), 7),
 (('believe', 'in'), 7),
 (('We', 'believe'), 7),
 (('on', 'the'), 7),
 (('of', 'this'), 7),
 (('with', 'the'), 7),
 (('and', 'the'), 7),
 (('those', 'who'), 7),
 (('of', 'a'), 7),
 (('as', 'the'), 7),
 (('has', 'been'), 6),
 (('we', 'have'), 6),
 (('as', 'a'), 6),
 (('in', 'their'), 6),
 (('which', 'I'), 6),
 (('be', 'the'), 6),
 (('is', 'not'), 6),
 (('the', 'Nation'), 6),
 (('to', 'meet'), 6),
 (('will', 'help'), 6),
 (('can', 'be'), 5),
 (('As', 'we'), 5),
 (('are', 'not'), 5),
 (('--We', 'will'), 5),
 (('more', 'than'), 5),
 (('it', 'is'), 

In [37]:
nltk.download('maxent_ne_chunker')
nltk.download('words')
%time
from nltk import word_tokenize, pos_tag, ne_chunk
def ne_list(input_text):
    ne_tree = nltk.ne_chunk(pos_tag(word_tokenize(input_text)),binary = True)
    return ne_tree
nes = rdd.flatMap(ne_list)
print(nes.collect())


[nltk_data] Downloading package maxent_ne_chunker to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping chunkers/maxent_ne_chunker.zip.
[nltk_data] Downloading package words to /root/nltk_data...
[nltk_data]   Unzipping corpora/words.zip.


CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 7.63 µs
[Tree('NE', [('Address', 'NN')]), ('on', 'IN'), ('the', 'DT'), ('State', 'NNP'), ('of', 'IN'), ('the', 'DT'), Tree('NE', [('Union', 'NNP'), ('Delivered', 'NNP')]), ('Before', 'IN'), ('a', 'DT'), ('Joint', 'JJ'), ('Session', 'NN'), ('of', 'IN'), ('the', 'DT'), Tree('NE', [('Congress', 'NNP')]), ('.', '.'), ('January', 'NNP'), ('20', 'CD'), (',', ','), ('1972', 'CD'), Tree('NE', [('Mr.', 'NNP'), ('Speaker', 'NNP')]), (',', ','), Tree('NE', [('Mr.', 'NNP')]), ('President', 'NNP'), (',', ','), ('my', 'PRP$'), ('colleagues', 'NNS'), ('in', 'IN'), ('the', 'DT'), Tree('NE', [('Congress', 'NNP')]), (',', ','), ('our', 'PRP$'), ('distinguished', 'JJ'), ('guests', 'NNS'), (',', ','), ('my', 'PRP$'), ('fellow', 'JJ'), ('Americans', 'NNPS'), (':', ':'), ('�', 'JJ'), ('@', 'NNP'), ('Twenty-five', 'CD'), ('years', 'NNS'), ('ago', 'RB'), ('I', 'PRP'), ('sat', 'VBP'), ('here', 'RB'), ('as', 'IN'), ('a', 'DT'), ('freshman', 'NN'), ('Congre

In [38]:
for x in nes.collect():
    if str(x).find('NE')>=0:
        items = []
        for child in x:
            items.append(child[0])
        print(" ".join(items))

Address
Union Delivered
Congress
Mr. Speaker
Mr.
Congress
Speaker
Senate
Supreme Court
Cabinet
United States
Truman
Congress
Marshall Plan
Congress
Congress
Congress
Nation
Nation
America
Nation
District
Columbia
America
Vietnam
Soviet Union
Strong
America
John Kennedy
United States
China
Soviet Union
America
Industrial
Armed Forces
Congress
European
Canada
Japan
American
American
America
American
American
American
America
American
American
Congress
America
American
Congress
Congress
Congress
America
State
Treasury
Advisory Commission
Intergovernmental Relations
America
Congress
American
New York Harbor
Liberty
France
United States
America
America
America
America
America
America
America
America
Look
America
Supreme Court
Senate
House
Congress
America
Congress
American
NOTE
House
Capitol
Carl Albert
House
