# ENSF-612 Quiz 1

## Preparation

I created a dummy file, and uploaded it at `dbfs:/FileStore/quiz_1/data.csv`. I load this file into pyspark dataframe.

*Citations - I have adapted the data for the dummy file from [here](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Musical_Instruments_5.json.gz).*

In [0]:
def read_CSV_to_DF(filepath):
  """
  Read a csv file into a spark dataframe
  """
  df = (spark.read
        .option("multiline", "true")
        .option("quote", '"')
        .option("header", "true")
        .option("escape", "\\")
        .option("escape", '"')
        .csv(filepath)
        )
  
  return df

# creating the dataframe
df = read_CSV_to_DF('/FileStore/quiz_1/data.csv')

# updating the datatype of columns of dataframe
df = df.withColumn('NumberOfProductBought', df['NumberOfProductBought'].cast('int'))
df = df.withColumn('ReviewRating', df['ReviewRating'].cast('int'))

I created the below functions that emulate the functionality that we have to assume for this quiz.

In [0]:
import random
from datetime import datetime


@udf
def getSentiment(ReviewText):
  """
  returns 'p' for positive, 'n' for negative and 'o'
  for neutral polarity found in the ReviewText
  """
  
  sentiment = ['n', 'o', 'p']
  random.seed(len(ReviewText))
  return random.choice(sentiment)
      

@udf
def getUserCountry(UserId):
  """
  returns country of UserId
  """
  
  userCountry = ['Israel' , 'United States' , 'Germany' , 'Canada' , 'Denmark']
  random.seed(UserId)
  return random.choice(userCountry)


@udf
def getUserCity(UserId):
  """
  returns city of country if UserId
  """
  
  userCity = ['Tel Aviv', 'Jaffa', 'New York', 'Chicago', 'Munich', 'Berlin', 'Vancouver', 'Calgary', 'Copenhagen', 'Odense']
  random.seed(UserId)
  return random.choice(userCity)


@udf
def getYear(ReviewTime):
  """
  returns year of the ReviewTime
  """
  
  ReviewTime = int(ReviewTime)
  reviewYear = datetime.fromtimestamp(ReviewTime).strftime('%Y')
  return reviewYear


@udf
def getMonth(ReviewTime):
  """
  returns month of the ReviewTime
  """
  
  ReviewTime = int(ReviewTime)
  reviewMonth = datetime.fromtimestamp(ReviewTime).strftime('%m')
  return reviewMonth


@udf
def getDay(ReviewTime):
  """
  returns day of a week like Monday, Tuesday, Sunday
  """
  
  ReviewTime = int(ReviewTime)
  reviewDay = datetime.fromtimestamp(ReviewTime).strftime('%A')
  return reviewDay

## Task 1

In [0]:
# adding additional columns
df = df.select("*", getSentiment("ReviewText").alias("SentimentPolarity"))
df = df.select("*", getUserCountry("UserId").alias("UserCountry"))
df = df.select("*", getUserCity("UserId").alias("UserCity"))
df = df.select("*", getYear("ReviewTime").alias("ReviewYear"))
df = df.select("*", getMonth("ReviewTime").alias("ReviewMonth"))
df = df.select("*", getDay("ReviewTime").alias("ReviewDay"))

# showing the results
df.toPandas().head()   # df.show(n=100)

Unnamed: 0,UserId,ReviewText,ReviewTime,ReviewRating,NumberOfProductBought,SentimentPolarity,UserCountry,UserCity,ReviewYear,ReviewMonth,ReviewDay
0,452526,"Not much to write about here, but it does exac...",1393545600,5,7,p,Germany,Berlin,2014,2,Friday
1,412630,The product does exactly as it should and is q...,1363392000,5,7,p,Canada,Vancouver,2013,3,Saturday
2,169420,The primary job of this device is to block the...,1377648000,5,1,o,Germany,Munich,2013,8,Wednesday
3,470797,Nice windscreen protects my MXL mic and preven...,1392336000,5,6,n,United States,New York,2014,2,Friday
4,412630,This pop filter is great. It looks and perform...,1392940800,5,2,o,Canada,Vancouver,2014,2,Friday


## Task 2

In [0]:
# total number of products
numberOfProducts = df.select('NumberOfProductBought').rdd.flatMap(lambda x: x).reduce(lambda x, y: x + y)

# printing the results
print("Total number of products = {}".format(numberOfProducts))

## Task 3

#### Subtask 1

In [0]:
# sum all the ratings
ratingSum = df.select('ReviewRating').rdd.flatMap(lambda x: x).reduce(lambda x, y: x+y)

# divide by total to get average
ratingAverage = ratingSum/df.select('ReviewRating').count()

# printing the average rating by all users
print("Average rating = {}".format(ratingAverage))

#### Subtask 1 - alternate

In [0]:
# sum all the ratings and number of ratings by each user
rdd_sumRating = df.select(['UserId', 'ReviewRating']).rdd.map(lambda x: (x['UserId'], (x['ReviewRating'], 1))).reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))

# get the average of ratings
rdd_avgRating = rdd_sumRating.map(lambda x: (x[0], x[1][0]/x[1][1]))

# pretty print into table using df.show()
spark.createDataFrame(rdd_avgRating, ['UserID', 'Average Rating']).show(n=100)

#### Subtask 2a

In [0]:
# total number of reviews by polarity
rdd_reviewPolarity = df.select('SentimentPolarity').rdd.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a+b)
reviewsByPolarity = rdd_reviewPolarity.collect()

# printing the results
print("Number of positive reviews = {}\nNumber of neutral reviews  = {}\nNumber of negative Reviews = {}".format(reviewsByPolarity[0][1], reviewsByPolarity[1][1], reviewsByPolarity[2][1]))

#### Subtask 2b

In [0]:
# total number of reviews by country
rdd_reviewsByCountry = df.select(['UserCountry']).rdd.map(lambda x: (x['UserCountry'], 1)).reduceByKey(lambda a, b: a+b).sortByKey()

# pretty print into table using df.show()
spark.createDataFrame(rdd_reviewsByCountry, ['Country', 'Total Reviews']).show(n=100)

In [0]:
# total number of reviews by user
rdd_reviewsByUser = df.select(['UserId']).rdd.map(lambda x: (x['UserId'], 1)).reduceByKey(lambda a, b: a+b).sortByKey()

# pretty print into table using df.show()
spark.createDataFrame(rdd_reviewsByUser, ['UserId', 'Total Reviews']).show(n=100)

#### Subtask 2b - alternate

In [0]:
# total number of reviews by country and by user
rdd = df.select(['UserCountry', 'UserId']).rdd
reviewsByUserCountry = rdd.map(lambda x: [(x['UserCountry'], x['UserId']), 1]).reduceByKey(lambda a, b: a+b).sortByKey()

# pretty print into table using custom code
iter = reviewsByUserCountry.toLocalIterator()
print("+---------------+---------+----------------+")
print("| {:>13} | {:>7} | {:>14} |".format("Country", "UserId", "Total Reviews"))
print("+---------------+---------+----------------+")
for row in iter:
  print("| {:>13} | {:>7} | {:>14} |".format(row[0][0], row[0][1], row[1]))
print("+---------------+---------+----------------+")