# Assignment-2: Term Level-Inverted Index

# Spark - Standalone deployment

Spark can be executed in a standalone mode, similarly to MapReduce. However, some slight changes in the configuration are required. More specifically, we will have to simply uncomment the lines that define the <code>PYSPARK_DRIVER_PYTHON</code> and <code>PYSPARK_DRIVER_PYTHON_OPTS</code> parameters:

<ol>
    <li style="padding:10px 0 5px 0">Open the <code>bashrc</code> file for editting: <code>sudo gedit ~/.bashrc</code>
    After line 134 apply the following changes:<br><br>
        <code>#Hadoop Related Options
export HADOOP_HOME=/home/hdoop/hadoop-3.2.1
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop</code>

<code>export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export SBT_HOME=/usr/share/sbt-launcher-packaging/bin/sbt-launch.jar  
export SPARK_HOME=/usr/lib/spark
export PATH=$PATH:$JAVA_HOME/bin
export PATH=$PATH:$SBT_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin
#export PYSPARK_DRIVER_PYTHON=jupyter
#export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
export PYSPARK_PYTHON=/usr/bin/python3.8
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH</code>
    </li>
    <li style="padding:10px 0 5px 0">Apply the changes to <code>bashrc</code> immediately: <code>source ~/.bashrc</code>.</li>
    <li style="padding:10px 0 5px 0">We <b>do not have</b> write the Python code into a <code>.py</code> file. </li>
    <li style="padding:10px 0 5px 0">The python code can be executed by PySpark directly from this notebook.</li>
</ol>

# <code>SparkSession</code>

In [1]:
import os
import pyspark
from pyspark.sql import SQLContext, SparkSession

sc = SparkSession \
        .builder \
        .master('local[4]') \
        .appName("IHU_Spark_Standalone_TestApp") \
        .getOrCreate()

print("Spark Version: " + sc.version)
print("PySpark Version: " + pyspark.__version__)

Spark Version: 3.1.1
PySpark Version: 3.1.1


# Test Data-posts.csv

<b>Due to large file size of posts.csv, unnecessary columns have been removed manually using Libre Office.</b>

The Unofficial Apple Weblog (TUAW)- The inverted index will be constructed by taking into consideration only the titles of the blog posts.

Copy input data files to HDFS. The <code>-f</code> switch of <code>copyFromLocal</code> overwrites the destination file (in case it exists).

In [2]:
!hdfs dfs -copyFromLocal -f /home/bdccuser/notebooks/spark/data/posts.csv /user/bdccuser/

2021-06-11 13:26:20,806 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false


# Working with DataFrames

## 1. Creating Dataframes

### 1.1. Creating Dataframes from CSV files

In [3]:
# Create a DataFrame from an external CSV file (in HDFS)
df_hdfs = sc.read.option('delimiter', ',').option('header', 'true')\
    .csv("/user/bdccuser/posts.csv")

# Create a DataFrame from an external text file (in Local FS)
df_local = sc.read.option('delimiter', ',').option('header', 'true')\
    .csv("file:////home/bdccuser/notebooks/spark/data/posts.csv")

In [4]:
# print the schema of the dataframe (columns and column names - headers)
df_hdfs.printSchema()

root
 |-- Title: string (nullable = true)



In [5]:
# print a small number of records of the Dataframe
df_hdfs.show(10)

+--------------------+
|               Title|
+--------------------+
|First Look: Guita...|
|TUAW Tip: use the...|
|Boxee is updated ...|
|Found Footage: A ...|
|Concept: the iPod...|
|Talkcast live ton...|
|Black Friday: Bes...|
|uTorrent for Mac ...|
|$1.7 million for ...|
|Details emerge on...|
+--------------------+
only showing top 10 rows



In [6]:
print(df_hdfs.count())

17831


### 1.2. Creating RDDs from DataFrames

DataFrame does not support <code>map</code> transformations. Therefore, to apply <code>map</code> the DataFrame must be firstly converted to an RDD.

The opposite job can be performed by converting a DataFrame to an RDD through the <code>fromDF()</code> method.


In [7]:
df_hdfs = sc.read.option('delimiter', ',').option('header', 'true')\
    .csv("/user/bdccuser/posts*")

rdd = df_hdfs.rdd

print(rdd.take(20))

[Row(Title='First Look: Guitar Rock Tour for iPhone'), Row(Title='TUAW Tip: use the Help menu to search Safari bookmarks and history'), Row(Title='Boxee is updated for Apple TV 2.3'), Row(Title='Found Footage: A working NeXT Cube'), Row(Title='Concept: the iPod shuffle bracelet'), Row(Title='Talkcast live tonight at 10pm ET'), Row(Title='Black Friday: Best Buy discounting up to $150 off, Apple retail will match prices'), Row(Title='uTorrent for Mac beta officially released'), Row(Title='$1.7 million for Greensboro Apple Store permit'), Row(Title='Details emerge on future Apple Stores'), Row(Title='First picture of the Greensboro, NC Apple Store under construction'), Row(Title='Ask TUAW: Reinstalling, auto-saving, license keeping and more'), Row(Title='Sneak Preview - Freeverse Flick Fishing 1.2'), Row(Title='Freeverse goes Flick Fishing'), Row(Title='Last chance for two App Store freebies'), Row(Title='iPhone hackers achieve a milestone: Linux boot'), Row(Title='Discounts, doorbusters,

In [8]:
# Create an RDD from an external text file (in HDFS)
rdd_posts_hdfs = sc.sparkContext.textFile("/user/bdccuser/posts*")

print(rdd_posts_hdfs.take(20))

['Title', 'First Look: Guitar Rock Tour for iPhone', 'TUAW Tip: use the Help menu to search Safari bookmarks and history', 'Boxee is updated for Apple TV 2.3', 'Found Footage: A working NeXT Cube', 'Concept: the iPod shuffle bracelet', 'Talkcast live tonight at 10pm ET', '"Black Friday: Best Buy discounting up to $150 off, Apple retail will match prices"', 'uTorrent for Mac beta officially released', '$1.7 million for Greensboro Apple Store permit', 'Details emerge on future Apple Stores', '"First picture of the Greensboro, NC Apple Store under construction"', '"Ask TUAW: Reinstalling, auto-saving, license keeping and more"', 'Sneak Preview - Freeverse Flick Fishing 1.2', 'Freeverse goes Flick Fishing', 'Last chance for two App Store freebies', 'iPhone hackers achieve a milestone: Linux boot', '"Discounts, doorbusters, and more: TUAW sampler of holiday savings"', 'Love that yellow sticky note...', 'Apple Store Australia posts Apple holiday sale discounts']


## 2. Repartitioning Dataframes

An Dataframe may be repartitioned across the nodes of the cluster by using <code>repartition(T)</code>. This command shuffles data from all nodes (also called full shuffle) and splits it into <code>T</code> segments.
<br><br>
<b>Note:</b> <code>repartition()</code> is a very expensive operation as it shuffles data from all nodes in a cluster.


In [9]:
print("Partitions before repartitioning:" + str(df_hdfs.rdd.getNumPartitions()))

df_hdfs_2 = df_hdfs.repartition(8)

print("Partitions after repartitioning:" + str(df_hdfs_2.rdd.getNumPartitions()))

Partitions before repartitioning:1
Partitions after repartitioning:8


## 3. Transformations

A Dataframe can be also accompanied by a custom schema. A schema is a structure that describes the columns of the Dataframe including the column names, data types, metadata etc. It provides much greater control over the columns of a Dataframe and makes the comply with db-like rules.

Therefore, the application of a schema guarantees the correct execution of queries, filters, sort operations, etc. 

In [10]:
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType, DoubleType

custom_schema = StructType([
#     StructField("DocId", IntegerType(), True, metadata={"desc": "document id"}),
    StructField("Title", StringType(), True),
])

df = sc.read.option('delimiter', ',').option('header', 'true') \
    .schema(custom_schema) \
    .csv("/user/bdccuser/posts*")

df.show(10)

+--------------------+
|               Title|
+--------------------+
|First Look: Guita...|
|TUAW Tip: use the...|
|Boxee is updated ...|
|Found Footage: A ...|
|Concept: the iPod...|
|Talkcast live ton...|
|Black Friday: Bes...|
|uTorrent for Mac ...|
|$1.7 million for ...|
|Details emerge on...|
+--------------------+
only showing top 10 rows



### MAP Transformation

In [11]:
rdd_posts_hdfs.take(10)

['Title',
 'First Look: Guitar Rock Tour for iPhone',
 'TUAW Tip: use the Help menu to search Safari bookmarks and history',
 'Boxee is updated for Apple TV 2.3',
 'Found Footage: A working NeXT Cube',
 'Concept: the iPod shuffle bracelet',
 'Talkcast live tonight at 10pm ET',
 '"Black Friday: Best Buy discounting up to $150 off, Apple retail will match prices"',
 'uTorrent for Mac beta officially released',
 '$1.7 million for Greensboro Apple Store permit']

In [12]:
# An example with lambda function
rdd_posts = rdd_posts_hdfs.map(lambda x: x.split(" "))
print("RDD Posts: ", rdd_posts.take(10))

RDD Posts:  [['Title'], ['First', 'Look:', 'Guitar', 'Rock', 'Tour', 'for', 'iPhone'], ['TUAW', 'Tip:', 'use', 'the', 'Help', 'menu', 'to', 'search', 'Safari', 'bookmarks', 'and', 'history'], ['Boxee', 'is', 'updated', 'for', 'Apple', 'TV', '2.3'], ['Found', 'Footage:', 'A', 'working', 'NeXT', 'Cube'], ['Concept:', 'the', 'iPod', 'shuffle', 'bracelet'], ['Talkcast', 'live', 'tonight', 'at', '10pm', 'ET'], ['"Black', 'Friday:', 'Best', 'Buy', 'discounting', 'up', 'to', '$150', 'off,', 'Apple', 'retail', 'will', 'match', 'prices"'], ['uTorrent', 'for', 'Mac', 'beta', 'officially', 'released'], ['$1.7', 'million', 'for', 'Greensboro', 'Apple', 'Store', 'permit']]


In [13]:
# An example with custom function
def upper_list (x) :
    for k in range(len(x)):
        x[k] = x[k].upper()
    return x

def equiv_fun (x):
    return x.split(" ")

rdd_posts_2 = rdd_posts.map(upper_list)
print("RDD Posts: ", rdd_posts_2.take(10))

RDD Posts:  [['TITLE'], ['FIRST', 'LOOK:', 'GUITAR', 'ROCK', 'TOUR', 'FOR', 'IPHONE'], ['TUAW', 'TIP:', 'USE', 'THE', 'HELP', 'MENU', 'TO', 'SEARCH', 'SAFARI', 'BOOKMARKS', 'AND', 'HISTORY'], ['BOXEE', 'IS', 'UPDATED', 'FOR', 'APPLE', 'TV', '2.3'], ['FOUND', 'FOOTAGE:', 'A', 'WORKING', 'NEXT', 'CUBE'], ['CONCEPT:', 'THE', 'IPOD', 'SHUFFLE', 'BRACELET'], ['TALKCAST', 'LIVE', 'TONIGHT', 'AT', '10PM', 'ET'], ['"BLACK', 'FRIDAY:', 'BEST', 'BUY', 'DISCOUNTING', 'UP', 'TO', '$150', 'OFF,', 'APPLE', 'RETAIL', 'WILL', 'MATCH', 'PRICES"'], ['UTORRENT', 'FOR', 'MAC', 'BETA', 'OFFICIALLY', 'RELEASED'], ['$1.7', 'MILLION', 'FOR', 'GREENSBORO', 'APPLE', 'STORE', 'PERMIT']]


In [14]:
# flatmap Transformation

df_hdfs = sc.read.option('delimiter', ',').option('header', 'true')\
    .csv("/user/bdccuser/posts*")

rdd = df_hdfs.rdd

print(rdd.take(10))

[Row(Title='First Look: Guitar Rock Tour for iPhone'), Row(Title='TUAW Tip: use the Help menu to search Safari bookmarks and history'), Row(Title='Boxee is updated for Apple TV 2.3'), Row(Title='Found Footage: A working NeXT Cube'), Row(Title='Concept: the iPod shuffle bracelet'), Row(Title='Talkcast live tonight at 10pm ET'), Row(Title='Black Friday: Best Buy discounting up to $150 off, Apple retail will match prices'), Row(Title='uTorrent for Mac beta officially released'), Row(Title='$1.7 million for Greensboro Apple Store permit'), Row(Title='Details emerge on future Apple Stores')]


In [15]:
import re

def tokenize(x) :
    posts = x.split()
    for k in range(len(posts)) :
        # Convert to lower case
        posts[k] = posts[k].lower()

        # Remove punctuation
        posts[k] = re.sub(r'[^\w\s]', '', posts[k])
       

    return posts

rdd_posts_flat_2 = rdd_posts_hdfs.flatMap( tokenize )

print("Flat RDD Posts:", rdd_posts_flat_2.take(50))

Flat RDD Posts: ['title', 'first', 'look', 'guitar', 'rock', 'tour', 'for', 'iphone', 'tuaw', 'tip', 'use', 'the', 'help', 'menu', 'to', 'search', 'safari', 'bookmarks', 'and', 'history', 'boxee', 'is', 'updated', 'for', 'apple', 'tv', '23', 'found', 'footage', 'a', 'working', 'next', 'cube', 'concept', 'the', 'ipod', 'shuffle', 'bracelet', 'talkcast', 'live', 'tonight', 'at', '10pm', 'et', 'black', 'friday', 'best', 'buy', 'discounting', 'up']


In [16]:
#Combining flatmap and map

import re

def tokenize(x) :
    posts = x.split()
    for k in range(len(posts)) :
        # Convert to lower case
        posts[k] = posts[k].lower()

        # Remove punctuation
        posts[k] = re.sub(r'[^\w\s]', '', posts[k])

    return posts

rdd_posts_flat = rdd_posts_hdfs.flatMap( tokenize )
rdd_posts = rdd_posts_flat.map(lambda x: (x,1))

print("Posts with 1s:", rdd_posts.take(50))

Posts with 1s: [('title', 1), ('first', 1), ('look', 1), ('guitar', 1), ('rock', 1), ('tour', 1), ('for', 1), ('iphone', 1), ('tuaw', 1), ('tip', 1), ('use', 1), ('the', 1), ('help', 1), ('menu', 1), ('to', 1), ('search', 1), ('safari', 1), ('bookmarks', 1), ('and', 1), ('history', 1), ('boxee', 1), ('is', 1), ('updated', 1), ('for', 1), ('apple', 1), ('tv', 1), ('23', 1), ('found', 1), ('footage', 1), ('a', 1), ('working', 1), ('next', 1), ('cube', 1), ('concept', 1), ('the', 1), ('ipod', 1), ('shuffle', 1), ('bracelet', 1), ('talkcast', 1), ('live', 1), ('tonight', 1), ('at', 1), ('10pm', 1), ('et', 1), ('black', 1), ('friday', 1), ('best', 1), ('buy', 1), ('discounting', 1), ('up', 1)]


In [17]:
##The reduceByKey transformation
#(word, frequency)

rdd_reduced = rdd_posts.reduceByKey(lambda a, b: a + b)

print("Posts counts:", rdd_reduced.take(50))

Posts counts: [('title', 4), ('look', 161), ('tour', 27), ('use', 116), ('help', 48), ('search', 45), ('is', 532), ('footage', 170), ('working', 37), ('cube', 27), ('live', 89), ('at', 385), ('10pm', 5), ('friday', 49), ('best', 194), ('150', 1), ('match', 6), ('utorrent', 2), ('mac', 1766), ('17', 19), ('store', 636), ('details', 48), ('stores', 91), ('of', 1243), ('ask', 128), ('reinstalling', 2), ('more', 409), ('sneak', 26), ('preview', 54), ('', 487), ('freeverse', 26), ('12', 61), ('goes', 139), ('last', 38), ('chance', 5), ('two', 82), ('freebies', 7), ('hackers', 9), ('boot', 68), ('doorbusters', 1), ('holiday', 28), ('savings', 4), ('love', 49), ('yellow', 11), ('australia', 13), ('sale', 42), ('757', 1), ('issues', 28), ('says', 64), ('paul', 11)]


In [19]:
# Transform our reduced RDD from its (Word, Frequency) format, to (Frequency, Word)
rdd_reduced_rev = rdd_reduced.map(lambda x: ( x[1], x[0] )) 
print("Reversed RDD:", rdd_reduced_rev.take(50))

Reversed RDD: [(4, 'title'), (161, 'look'), (27, 'tour'), (116, 'use'), (48, 'help'), (45, 'search'), (532, 'is'), (170, 'footage'), (37, 'working'), (27, 'cube'), (89, 'live'), (385, 'at'), (5, '10pm'), (49, 'friday'), (194, 'best'), (1, '150'), (6, 'match'), (2, 'utorrent'), (1766, 'mac'), (19, '17'), (636, 'store'), (48, 'details'), (91, 'stores'), (1243, 'of'), (128, 'ask'), (2, 'reinstalling'), (409, 'more'), (26, 'sneak'), (54, 'preview'), (487, ''), (26, 'freeverse'), (61, '12'), (139, 'goes'), (38, 'last'), (5, 'chance'), (82, 'two'), (7, 'freebies'), (9, 'hackers'), (68, 'boot'), (1, 'doorbusters'), (28, 'holiday'), (4, 'savings'), (49, 'love'), (11, 'yellow'), (13, 'australia'), (42, 'sale'), (1, '757'), (28, 'issues'), (64, 'says'), (11, 'paul')]


In [21]:
# Sort our reduced RDD in decreasing frequency order
rdd_sorted_freq = rdd_reduced_rev.sortByKey(False)
print("Frequency sorted Word counts:", rdd_sorted_freq.take(50))

Frequency sorted Word counts: [(2785, 'the'), (2278, 'apple'), (2015, 'to'), (1916, 'for'), (1766, 'mac'), (1615, 'iphone'), (1556, 'ipod'), (1315, 'on'), (1258, 'a'), (1243, 'of'), (1199, 'and'), (1179, 'your'), (1094, 'in'), (1035, 'itunes'), (1025, 'with'), (775, 'tuaw'), (731, 'new'), (636, 'store'), (532, 'is'), (487, ''), (481, 'os'), (461, 'x'), (426, 'from'), (409, 'more'), (397, 'update'), (385, 'at'), (385, 'an'), (375, 'free'), (371, 'you'), (367, 'video'), (344, 'now'), (344, 'macbook'), (327, 'widget'), (309, 'leopard'), (306, 'available'), (303, 'beta'), (291, 'pro'), (260, 'mini'), (259, 'watch'), (252, 'released'), (250, 'up'), (249, 'releases'), (248, 'get'), (239, 'first'), (220, 'macworld'), (217, '101'), (215, '2'), (214, 'updated'), (212, '10'), (211, 'tv')]
