In [None]:
#imports
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
import math

#Initializing Spark Conf
conf=SparkConf()\
        .setMaster("local[*]")\
        .setAppName("WordCount")\
        .setExecutorEnv("spark.executor.memory","1g")\
        .setExecutorEnv("spark.driver.memory","1g")

#Creating Spark Session
spark=SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()

In [None]:
#Spark context
sc=spark.sparkContext

In [None]:
sc.install_pypi_package("pandas==0.25.1")

In [None]:
sc.install_pypi_package("boto3")

In [None]:
#text file path
textfile="s3://assignmentbkt/sample-a.txt"
out_text= "sample-a-out.txt"
out_file_header = "              Output for Sample - a                  "

In [None]:
#Importing textfile as rdd
word_rdd=sc.textFile(textfile)

In [None]:
#Function to remove punc and lowercase
def lower_clean_str(x):
  punc='!"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~-'
  lowercased_str = x.lower()
  for ch in punc:
    lowercased_str = lowercased_str.replace(ch, ' ')
  return lowercased_str

In [None]:
#Filtered RDD
filtered_rdd = word_rdd.map(lower_clean_str)

In [None]:
#Separate Words By " "
separatedword_rdd=filtered_rdd.flatMap(lambda word: word.split(" "))

In [None]:
#Removing white spaces and empty fields
separatedword_rdd = separatedword_rdd.filter(lambda x:x!='')

In [None]:
#Adding values to each word
word_with_value=separatedword_rdd.map(lambda  word:(word,1))
total_words = word_with_value.count()

In [None]:
#Reduces by key(word) 
word_with_value_red=word_with_value.reduceByKey(lambda x,y:(x+y)).sortByKey()
distinct_word_count = word_with_value_red.count()

In [None]:
#Changeing key and value positions
word_count=word_with_value_red.map(lambda x:(x[1],x[0]))

In [None]:
#Sort by Frequency
wc_sort = word_count.sortByKey(False).collect()

In [None]:
#Creating a Dataframe using above RDD
word_count_rdd = spark.sparkContext.parallelize(wc_sort)
columns = ["Frequency","Word"]
word_count_df = word_count_rdd.toDF(columns)

In [None]:
#Adding Rank column
wc = word_count_df.withColumn("rank",row_number().over(Window.orderBy(monotonically_increasing_id())))

In [None]:
wc.show()

In [None]:
#Values in Datafeame
wc_val = wc.count()

In [None]:
#Calculating Popular words
print("Popular words")
import math


popthreshold = math.ceil(wc_val * 5 /100)
print(popthreshold)

popularwords = wc.select('rank','Word','Frequency').filter(wc.rank <= popthreshold)
popularwords.show()
popularwordspd = popularwords.toPandas()

In [None]:
#Calculating Common words
print("Common words")

lowerthreshold = math.floor(wc_val * 47.5 /100)
upperthreshold = math.ceil(wc_val * 52.5 /100)
print(lowerthreshold)
print(upperthreshold)

commonwords = wc.select('rank','Word','Frequency').filter((wc.rank >=  lowerthreshold) & (wc.rank <=  upperthreshold))

commonwords.show()
commonwordspd = commonwords.toPandas()


In [None]:
#Calculating Rare words
print("Rare words")


rarethreshold = math.floor(wc_val * 95 /100)
print(rarethreshold)

rarewords = wc.select('rank','Word','Frequency').filter(wc.rank >= rarethreshold)

rarewords.show()
rarewordspd = rarewords.toPandas()


In [None]:
# Letters 

In [None]:
#Character count reduced by char
char_counts_with_value_red = word_with_value.flatMap(lambda each: each[0]).map(lambda char: char).map(lambda c: (c, 1)).reduceByKey(lambda v1, v2: v1 + v2)


In [None]:
#Changeing key value position
char_count=char_counts_with_value_red.map(lambda x:(x[1],x[0]))

In [None]:
#Sort by frequency
cc_sort = char_count.sortByKey(False).collect()

In [None]:
#Creating DF using RDD
char_count_rdd = spark.sparkContext.parallelize(cc_sort)
columns = ["Frequency","Letter"]
char_count_df = char_count_rdd.toDF(columns)

In [None]:
#Adding ranking column
cc = char_count_df.withColumn("Rank",row_number().over(Window.orderBy(monotonically_increasing_id())))
cc.show(26)

In [None]:
#Dataframe Size
cc_val = cc.count()

In [None]:
#Calculating Popular Letters
print("Popular Letters")


popthresholdcc = math.ceil(cc_val * 5 /100)
print(popthresholdcc)

popularchars = cc.select('Rank','Letter','Frequency').filter(cc.Rank <= popthresholdcc)
popularchars.show()
popularcharspd = popularchars.toPandas()


In [None]:
#Calculating Common Letters
print("Common Letters")

lowerthresholdcc = math.floor(cc_val * 47.5 /100)
upperthresholdcc = math.ceil(cc_val * 52.5 /100)
print(lowerthresholdcc)
print(upperthresholdcc)

commonchars = cc.select('Rank','Letter','Frequency').filter((cc.Rank >=  lowerthresholdcc) & (cc.Rank <=  upperthresholdcc))
commonchars.show()
commoncharspd = commonchars.toPandas()


In [None]:
#Calculating Rare Letters
print("Rare words")


rarethresholdcc = math.floor(cc_val * 95 /100)
print(rarethresholdcc)

rareletters = cc.select('Rank','Letter','Frequency').filter(cc.Rank >= rarethresholdcc)
rateletterspd = rareletters.toPandas()
rareletters.show()

In [None]:
#Printing into output file 

In [None]:
import boto3

value = "---------------------------------------------\n"+out_file_header+ "\n"+"---------------------------------------------\n\n"+"total number of words = " + str(total_words)+"\n"+"total number of distinct words = " + str(distinct_word_count)+"\n"+"popular_threshold_word = " + str(popthreshold)+"\n"+"common_threshold_l_word = " + str(lowerthreshold)+"\n"+"common_threshold_u_word = " + str(upperthreshold)+"\n"+"rare_threshold_word = " + str(rarethreshold)+"\n"+"---------------------------------------------\n\n"+"Popular words \n"+str(popularwordspd)+"\n\n"+"Common words \n"+str(commonwordspd)+"\n\n"+"Rare words \n"+str(rarewordspd)+"\n\n"+"---------------------------------------------\n\n"+"total number of distinct letters = " + str(cc_val)+"\n"+"popular_threshold_letters = " + str(popthresholdcc)+"\n"+"common_threshold_l_letters = " + str(lowerthresholdcc)+"\n"+"common_threshold_u_letters = " + str(upperthresholdcc)+"\n"+"rare_threshold_letters = " + str(rarethresholdcc)+"\n"+"---------------------------------------------\n\n"+"Popular Letters \n"+str(popularcharspd)+"\n\n"+"Common Letters \n"+str(commoncharspd)+"\n\n"+"Rare Letters \n"+str(rateletterspd)+"\n\n"
s3 = boto3.client('s3')

s3.put_object(Body=value, Bucket="assignmentbkt", Key=out_text)
print("Saved in S3")