Installing and Setting Up Spark

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
!tar xf spark-3.3.0-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.3.0-bin-hadoop3"

In [4]:
import findspark
findspark.init()

In [5]:
findspark.find()

'spark-3.3.0-bin-hadoop3'

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

Cloud Computing Assignment 2

In [7]:
#reading all text files

path = '/content/assignment_2'

allfiles_RDD = sc.textFile(path)

In [8]:
#loading all data into json format
import json
all_files_json = allfiles_RDD.map(lambda x: json.loads(x))

In [9]:
#taking only text from all articles
all_text = all_files_json.map(lambda x: x["text"])
all_text.take(10)

['INDORE: Its time \'independence\' is redefined. As India celebrates its 66th Independence Day, Indore youngsters feel they can not enjoy the \'freedom\' for which the freedom fighters had laid their lives for.Abhishek Vyas, 20-year-old engineer feels that burden of poverty and unemployment has reduced the youth to slaves of the old days. "Rural India still has the social evil of addiction to country liquor leading to number of deaths everyday, government benefits reaches to someone else other than beneficiaries." He added, "We don\'t have political leaders who are not involved in some scam or the other whom we can vote. These leaders can not fulfill our expectations. India still needs years to get freedom that the freedom strugglers dreamt of."Mayur Gehlot, an 18-year-old engineering student, says, "Even at educational institutes, we have to go through manipulations in our results. To get good score and placements, we have to run after the professors concerned. This is no freedom whe

In [10]:
#removing punctuations
import string
new_all_text = all_text.map(lambda x: x.translate(str.maketrans('', '', string.punctuation)))
new_all_text.take(10)

['INDORE Its time independence is redefined As India celebrates its 66th Independence Day Indore youngsters feel they can not enjoy the freedom for which the freedom fighters had laid their lives forAbhishek Vyas 20yearold engineer feels that burden of poverty and unemployment has reduced the youth to slaves of the old days Rural India still has the social evil of addiction to country liquor leading to number of deaths everyday government benefits reaches to someone else other than beneficiaries He added We dont have political leaders who are not involved in some scam or the other whom we can vote These leaders can not fulfill our expectations India still needs years to get freedom that the freedom strugglers dreamt ofMayur Gehlot an 18yearold engineering student says Even at educational institutes we have to go through manipulations in our results To get good score and placements we have to run after the professors concerned This is no freedom where we cannot even express our opinions

In [11]:
#converting into lower case and spliting 
all_words = new_all_text.flatMap(lambda x: x.lower().split())
all_words.take(20)

['indore',
 'its',
 'time',
 'independence',
 'is',
 'redefined',
 'as',
 'india',
 'celebrates',
 'its',
 '66th',
 'independence',
 'day',
 'indore',
 'youngsters',
 'feel',
 'they',
 'can',
 'not',
 'enjoy']

In [12]:
#mapping a value for each word
mapping_words = all_words.map(lambda x: (x,1))
mapping_words.take(20)

[('indore', 1),
 ('its', 1),
 ('time', 1),
 ('independence', 1),
 ('is', 1),
 ('redefined', 1),
 ('as', 1),
 ('india', 1),
 ('celebrates', 1),
 ('its', 1),
 ('66th', 1),
 ('independence', 1),
 ('day', 1),
 ('indore', 1),
 ('youngsters', 1),
 ('feel', 1),
 ('they', 1),
 ('can', 1),
 ('not', 1),
 ('enjoy', 1)]

In [13]:
#reducing to get count of each word
reduced_words = mapping_words.reduceByKey(lambda k,v:k+v)
print(reduced_words.take(10))
print(reduced_words.count())

[('also', 155647), ('asylumthe', 3), ('state', 102566), ('nscn', 255), ('acceptable', 565), ('ive', 2164), ('cow', 610), ('ugcs', 64), ('beijing', 753), ('pradhan', 1057)]
854537


In [14]:
#filtering out words whose frequency is less than 10
filtered_words = reduced_words.filter(lambda x: x[1]>=10)

Question-1

In [15]:
#count of all words after filtering
filtered_words.count()

77151

Question-2

In [16]:
#getting count of words like congress,london,washington,football
congress_count = filtered_words.filter(lambda x: x[0]=='congress')
print(congress_count.collect())

london_count = filtered_words.filter(lambda x: x[0]=='london')
print(london_count.collect())

washington_count =  filtered_words.filter(lambda x: x[0]=='washington')
print(washington_count.collect())

football_count =  filtered_words.filter(lambda x: x[0]=='football')
print(football_count.collect())

[('congress', 31709)]
[('london', 4201)]
[('washington', 1589)]
[('football', 1604)]


Question-3

In [17]:
#assigning path for each month file
month1_path = '/content/assignment_2/2012-01*'
month2_path = '/content/assignment_2/2012-02*'
month3_path = '/content/assignment_2/2012-03*'
month4_path = '/content/assignment_2/2012-04*'
month5_path = '/content/assignment_2/2012-05*'
month6_path = '/content/assignment_2/2012-06*'
month7_path = '/content/assignment_2/2012-07*'
month8_path = '/content/assignment_2/2012-08*'
month9_path = '/content/assignment_2/2012-09*'
month10_path = '/content/assignment_2/2012-10*'
month11_path = '/content/assignment_2/2012-11*'
month12_path = '/content/assignment_2/2012-12*'

In [18]:
#reading each month data into RDD format
month1_words = sc.textFile(month1_path)
month2_words = sc.textFile(month2_path)
month3_words = sc.textFile(month3_path)
month4_words = sc.textFile(month4_path)
month5_words = sc.textFile(month5_path)
month6_words = sc.textFile(month6_path)
month7_words = sc.textFile(month7_path)
month8_words = sc.textFile(month8_path)
month9_words = sc.textFile(month9_path)
month10_words = sc.textFile(month10_path)
month11_words = sc.textFile(month11_path)
month12_words = sc.textFile(month12_path)

In [19]:
#defining a function to return max frequency word of each month
def max_freq_month(month_data):
  step_1 = month_data.map(lambda x: json.loads(x))
  step_2 = step_1.map(lambda x: x["text"])
  step_3 = step_2.map(lambda x: x.translate(str.maketrans('', '', string.punctuation)))
  step_4 = step_3.flatMap(lambda x: x.lower().split()) 
  step_5 = step_4.map(lambda x:(x,1))
  step_6 = step_5.reduceByKey(lambda k,v:k+v).sortBy(lambda x: x[1], ascending=False)
  return(step_6.take(1))

In [20]:
#printing the max count word for each month
print("Word with Max count in Month Jan: ", max_freq_month(month1_words))
print("Word with Max count in Month Feb: ", max_freq_month(month2_words))
print("Word with Max count in Month Mar: ", max_freq_month(month3_words))
print("Word with Max count in Month Apr: ", max_freq_month(month4_words))
print("Word with Max count in Month May: ", max_freq_month(month5_words))
print("Word with Max count in Month Jun: ", max_freq_month(month6_words))
print("Word with Max count in Month Jul: ", max_freq_month(month7_words))
print("Word with Max count in Month Aug: ", max_freq_month(month8_words))
print("Word with Max count in Month Sep: ", max_freq_month(month9_words))
print("Word with Max count in Month Oct: ", max_freq_month(month10_words))
print("Word with Max count in Month Nov: ", max_freq_month(month11_words))
print("Word with Max count in Month Dec: ", max_freq_month(month12_words))

Word with Max count in Month Jan:  [('the', 248283)]
Word with Max count in Month Feb:  [('the', 270072)]
Word with Max count in Month Mar:  [('the', 286395)]
Word with Max count in Month Apr:  [('the', 236932)]
Word with Max count in Month May:  [('the', 312504)]
Word with Max count in Month Jun:  [('the', 270382)]
Word with Max count in Month Jul:  [('the', 298191)]
Word with Max count in Month Aug:  [('the', 285629)]
Word with Max count in Month Sep:  [('the', 267490)]
Word with Max count in Month Oct:  [('the', 283019)]
Word with Max count in Month Nov:  [('the', 308822)]
Word with Max count in Month Dec:  [('the', 308955)]


Question-4

In [21]:
#path and function for count and filtering words for in 2012_09_01 not in 2012_08_01
file_path_2012_09_01 = '/content/assignment_2/2012-09-01'
file_path_2012_08_01 = '/content/assignment_2/2012-08-01'

words_2012_09_01 = sc.textFile(file_path_2012_09_01)
words_2012_08_01 = sc.textFile(file_path_2012_08_01)

def words_count(data):
  s1 = data.map(lambda x: json.loads(x))
  s2 = s1.map(lambda x: x["text"])
  s3 = s2.map(lambda x: x.translate(str.maketrans('', '', string.punctuation)))
  s4 = s3.flatMap(lambda x: x.lower().split())
  s5 = s4.map(lambda x: (x,1))
  s6 = s5.reduceByKey(lambda k,v: k+v)
  return(s6)

word_list_09 = words_count(words_2012_09_01)
word_list_08 = words_count(words_2012_08_01)

join_result = word_list_09.leftOuterJoin(word_list_08)

final_list = join_result.filter(lambda x: x[1][1] == None)

final_words = final_list.map(lambda x: x[0])

print("Words in 2012-09-01 not in 2012-08-1: ", final_words.collect())



Question-5

In [22]:
#function for returning the count of word monsoon
def count_of_monsoon(month):
  m1 = month.map(lambda x: json.loads(x))
  m2 = m1.map(lambda x: x["text"])
  m3 = m2.map(lambda x: x.translate(str.maketrans('', '', string.punctuation)))
  m4 = m3.flatMap(lambda x: x.lower().split()) 
  m5 = m4.map(lambda x:(x,1))
  m6 = m5.reduceByKey(lambda k,v:k+v)
  monsoon_count = m6.filter(lambda x: x[0]=='monsoon')
  return(monsoon_count.collect())

#printing output for each month

print("Monsoon count in Jan: ",count_of_monsoon(month1_words))
print("Monsoon count in Feb: ",count_of_monsoon(month2_words))
print("Monsoon count in Mar: ",count_of_monsoon(month3_words))
print("Monsoon count in Apr: ",count_of_monsoon(month4_words))
print("Monsoon count in May: ",count_of_monsoon(month5_words))
print("Monsoon count in Jun: ",count_of_monsoon(month6_words))
print("Monsoon count in Jul: ",count_of_monsoon(month7_words))
print("Monsoon count in Aug: ",count_of_monsoon(month8_words))
print("Monsoon count in Sep: ",count_of_monsoon(month9_words))
print("Monsoon count in Oct: ",count_of_monsoon(month10_words))
print("Monsoon count in Nov: ",count_of_monsoon(month11_words))
print("Monsoon count in Dec: ",count_of_monsoon(month12_words))

Monsoon count in Jan:  [('monsoon', 71)]
Monsoon count in Feb:  [('monsoon', 93)]
Monsoon count in Mar:  [('monsoon', 114)]
Monsoon count in Apr:  [('monsoon', 179)]
Monsoon count in May:  [('monsoon', 474)]
Monsoon count in Jun:  [('monsoon', 1189)]
Monsoon count in Jul:  [('monsoon', 1203)]
Monsoon count in Aug:  [('monsoon', 673)]
Monsoon count in Sep:  [('monsoon', 515)]
Monsoon count in Oct:  [('monsoon', 326)]
Monsoon count in Nov:  [('monsoon', 170)]
Monsoon count in Dec:  [('monsoon', 95)]
