## Overview
A simple example of using RDD to make meaningfull extract from a text document

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("textSpark").getOrCreate()
sc = spark.sparkContext

In [4]:
# File location and type
file = "../data/Moby-Dick.txt"
file_type = file.split('.')[-1]

# # CSV options
# infer_schema = "false"
# first_row_is_header = "false"
# delimiter = " "

# # The applied options are for CSV files. For other file types, these will be ignored.
# df = spark.read.format(file_type) \
#   .option("inferSchema", infer_schema) \
#   .option("header", first_row_is_header) \
#   .option("sep", delimiter) \
#   .load(file_location)

rdd = sc.textFile(file_location)

rdd.collect()

['The Project Gutenberg EBook of Moby Dick; or The Whale, by Herman Melville',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org',
 '',
 '',
 'Title: Moby Dick; or The Whale',
 '',
 'Author: Herman Melville',
 '',
 'Last Updated: January 3, 2009',
 'Posting Date: December 25, 2008 [EBook #2701]',
 'Release Date: June, 2001',
 '',
 'Language: English',
 '',
 '',
 '*** START OF THIS PROJECT GUTENBERG EBOOK MOBY DICK; OR THE WHALE ***',
 '',
 '',
 '',
 '',
 'Produced by Daniel Lazarus and Jonesey',
 '',
 '',
 '',
 '',
 '',
 'MOBY DICK; OR THE WHALE',
 '',
 'By Herman Melville',
 '',
 '',
 '',
 '',
 "Original Transcriber's Notes:",
 '',
 'This text is a combination of etexts, one from the now-defunct ERIS',
 "project at Virginia Tech and one from Project Gutenberg's archives.

In [5]:
import re

def replace_n_mark(str):

    replace_char = ['.', '"', '-', ',', '*', '(', ')', ';', '[', ']']
    str = re.sub(f"{replace_char}", '', str)
    
    return (str.lower(), 1)

In [6]:
words = rdd.flatMap(lambda x: x.split(' ')) \
            .filter(lambda x: x != '') \
            .map(replace_n_mark) \
            .reduceByKey(lambda x, y: x + y)
words.collect()

[('project', 86),
 ('gutenberg', 23),
 ('ebook', 8),
 ('of', 6668),
 ('moby', 82),
 ('melville', 4),
 ('this', 1283),
 ('is', 1605),
 ('use', 38),
 ('anyone', 5),
 ('anywhere', 11),
 ('at', 1312),
 ('no', 490),
 ('restrictions', 2),
 ('whatsoever.', 5),
 ('may', 227),
 ('it,', 237),
 ('give', 76),
 ('away', 119),
 ('re-use', 2),
 ('online', 4),
 ('www.gutenberg.org', 2),
 ('author:', 1),
 ('last', 209),
 ('january', 1),
 ('3,', 2),
 ('posting', 1),
 ('date:', 2),
 ('#2701]', 1),
 ('june,', 3),
 ('language:', 2),
 ('***', 6),
 ('start', 26),
 ('produced', 9),
 ("transcriber's", 1),
 ('notes:', 1),
 ('combination', 2),
 ('now-defunct', 1),
 ('eris', 1),
 ('virginia', 2),
 ("gutenberg's", 2),
 ('version', 5),
 ('are', 594),
 ('university', 1),
 ('adelaide', 1),
 ('library', 3),
 ('preserving', 4),
 ('version.', 1),
 ('resulting', 3),
 ('was', 1577),
 ('compared', 9),
 ('public', 14),
 ('domain', 9),
 ('in', 4115),
 ('89,', 1),
 ('we', 433),
 ('l', 1),
 ('symbol', 10),
 ('currency.', 1),
 

In [7]:
# converting RDD to DF
cols = ['word', 'count']
df = words.toDF(cols)
df.show()

+------------+-----+
|        word|count|
+------------+-----+
|     project|   86|
|   gutenberg|   23|
|       ebook|    8|
|          of| 6668|
|        moby|   82|
|    melville|    4|
|        this| 1283|
|          is| 1605|
|         use|   38|
|      anyone|    5|
|    anywhere|   11|
|          at| 1312|
|          no|  490|
|restrictions|    2|
| whatsoever.|    5|
|         may|  227|
|         it,|  237|
|        give|   76|
|        away|  119|
|      re-use|    2|
+------------+-----+
only showing top 20 rows



In [8]:
df.createOrReplaceTempView('moby_dick')

In [15]:
spark.sql(
'''
select * from moby_dick
order by count desc
'''
).show()

+----+-----+
|word|count|
+----+-----+
| the|14413|
|  of| 6668|
| and| 6309|
|   a| 4658|
|  to| 4595|
|  in| 4115|
|that| 2759|
| his| 2485|
|  it| 1776|
|with| 1750|
|   i| 1724|
|  as| 1713|
|  he| 1683|
| but| 1672|
|  is| 1605|
| was| 1577|
| for| 1557|
| all| 1359|
|  at| 1312|
|this| 1283|
+----+-----+
only showing top 20 rows



In [11]:
spark.sql(
'''
select * from moby_dick
where word in ('a', 'an', 'the', 'of', 'he', 'she')
sort by count asc
'''
).show()

+----+-----+
|word|count|
+----+-----+
|  an|  592|
|  he| 1683|
|  of| 6668|
| she|  114|
|   a| 4658|
| the|14413|
+----+-----+



In [12]:
# Save file to HDFS
filepath = '../output/moby_dick.parquet'
df.write.format("parquet").mode("overwrite").save(filepath)