# ENSF-612 Midterm

## Preparation for Q2

I created a dummy file, and uploaded it at `dbfs:/FileStore/midterm/q2_data.csv`. I load this file into pyspark dataframe.

*Citations - I have adapted the data for the dummy file from [here](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Musical_Instruments_5.json.gz).*

In [0]:
def read_CSV_to_DF(filepath, isHeader):
  """
  Read a csv file into a spark dataframe
  """
  df = (spark.read
        .option("multiline", "true")
        .option("quote", '"')
        .option("header", isHeader)
        .option("escape", "\\")
        .option("escape", '"')
        .csv(filepath)
        )
  
  return df

# creating the dataframe
df = read_CSV_to_DF('/FileStore/midterm/q2_data.csv', True)

# updating the datatype of columns of dataframe
df = df.withColumn('IssuePriority', df['IssuePriority'].cast('int'))
df = df.withColumn('NumberOfComponentsAffected', df['NumberOfComponentsAffected'].cast('int'))

I created the below functions that emulate the functionality that we have to assume for this midterm.

In [0]:
import random
from datetime import datetime
 
 
@udf
def getIssueType(IssueDescription):
  """
  returns 'b' for bug, 'f' for new feature, and 
  'e' for feature enhancement
  """
  
  issue_type = ['b', 'f', 'e']
  random.seed(len(IssueDescription))
  return random.choice(issue_type) 


@udf
def getYear(CreationTime):
  """
  returns year of the CreationTime
  """
  
  CreationTime = int(CreationTime)
  creationYear = datetime.fromtimestamp(CreationTime).strftime('%Y')
  return creationYear
 
 
@udf
def getMonth(CreationTime):
  """
  returns month of the CreationTime
  """
  
  CreationTime = int(CreationTime)
  creationMonth = datetime.fromtimestamp(CreationTime).strftime('%m')
  return creationMonth
 
 
@udf
def getDay(CreationTime):
  """
  returns day of a week like Monday, Tuesday, Sunday
  """
  
  CreationTime = int(CreationTime)
  creationDay = datetime.fromtimestamp(CreationTime).strftime('%A')
  return creationDay

## Task 2.1

In [0]:
# adding additional columns
df = df.select("*", getIssueType("IssueDescription").alias("IssueType"))
df = df.select("*", getYear("CreationTime").alias("IssueYear"))
df = df.select("*", getMonth("CreationTime").alias("IssueMonth"))
df = df.select("*", getDay("CreationTime").alias("IssueDay"))

# showing the results
df.toPandas().head()   # df.show(n=100)

Unnamed: 0,IssueId,CreationTime,IssueDescription,IssuePriority,NumberOfComponentsAffected,IssueType,IssueYear,IssueMonth,IssueDay
0,452526,1393545600,"Not much to write about here, but it does exac...",5,7,e,2014,2,Friday
1,412630,1363392000,The product does exactly as it should and is q...,5,7,e,2013,3,Saturday
2,169420,1377648000,The primary job of this device is to block the...,5,1,f,2013,8,Wednesday
3,470797,1392336000,Nice windscreen protects my MXL mic and preven...,5,6,b,2014,2,Friday
4,412630,1392940800,This pop filter is great. It looks and perform...,5,2,f,2014,2,Friday


## Task 2.2

In [0]:
# total number of components
numberOfComponents = df.select('NumberOfComponentsAffected').rdd.flatMap(lambda x: x).reduce(lambda x, y: x + y)

# printing the results
print("Total number of components affected by all the issues = {}".format(numberOfComponents))

Total number of components affected by all the issues = 293


## Task 2.3

#### Subtask 1

In [0]:
# sum all the priorities
rdd_sumPriority = df.select(['IssueType', 'IssuePriority']).rdd.map(lambda x: (x['IssueType'], (x['IssuePriority'], 1))).reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))

# divide by total to get average
rdd_avgPriority = rdd_sumPriority.map(lambda x: (x[0], x[1][0]/x[1][1]))

# printing the average IssuePriority per IssueType
spark.createDataFrame(rdd_avgPriority, ['IssueType', 'Average Priority']).show(n=100)

+---------+------------------+
|IssueType|  Average Priority|
+---------+------------------+
|        e|               4.7|
|        f|4.6521739130434785|
|        b| 4.235294117647059|
+---------+------------------+



#### Subtask 2

In [0]:
# total number of reviews by IssueType
rdd_issuesByType = df.select(['IssueType']).rdd.map(lambda x: (x['IssueType'], 1)).reduceByKey(lambda a, b: a+b).sortByKey()

# pretty print into table using df.show()
spark.createDataFrame(rdd_issuesByType, ['IssueType', 'Total Issues']).show(n=100)

+---------+------------+
|IssueType|Total Issues|
+---------+------------+
|        b|          17|
|        e|          10|
|        f|          23|
+---------+------------+



#### Subtask 3a

In [0]:
# total number of issues by year
rdd_issuesByYear = df.select(['IssueYear']).rdd.map(lambda x: (x['IssueYear'], 1)).reduceByKey(lambda a, b: a+b).sortByKey()

# pretty print into table using df.show()
spark.createDataFrame(rdd_issuesByYear, ['IssueYear', 'Reported Issues']).show(n=100)

+---------+---------------+
|IssueYear|Reported Issues|
+---------+---------------+
|     2008|              1|
|     2010|              1|
|     2012|             15|
|     2013|             21|
|     2014|             12|
+---------+---------------+



#### Subtask 3b

In [0]:
# total number of issues by month
rdd_issuesByMonth = df.select(['IssueMonth']).rdd.map(lambda x: (x['IssueMonth'], 1)).reduceByKey(lambda a, b: a+b).sortByKey()

# pretty print into table using df.show()
spark.createDataFrame(rdd_issuesByMonth, ['IssueMonth', 'Reported Issues']).show(n=100)

+----------+---------------+
|IssueMonth|Reported Issues|
+----------+---------------+
|        01|              7|
|        02|              5|
|        03|              6|
|        04|              2|
|        05|              2|
|        06|              3|
|        07|              6|
|        08|              5|
|        09|              4|
|        10|              2|
|        11|              3|
|        12|              5|
+----------+---------------+



#### Subtask 3c

In [0]:
# total number of issues by day
rdd_issuesByDay = df.select(['IssueDay']).rdd.map(lambda x: (x['IssueDay'], 1)).reduceByKey(lambda a, b: a+b).sortByKey()

# pretty print into table using df.show()
spark.createDataFrame(rdd_issuesByDay, ['IssueDay', 'Reported Issues']).show(n=100)

+---------+---------------+
| IssueDay|Reported Issues|
+---------+---------------+
|   Friday|              8|
|   Monday|              5|
| Saturday|              7|
|   Sunday|              6|
| Thursday|              5|
|  Tuesday|              7|
|Wednesday|             12|
+---------+---------------+



## Preparation for Q4

I created a dummy file, and uploaded it at `dbfs:/FileStore/midterm/q4_data.csv`. I load this file into pyspark dataframe.

In [0]:
# creating the dataframe
df = read_CSV_to_DF('/FileStore/midterm/q4_data.csv', True)

## Task 4.1.a

In [0]:
from pyspark.sql.functions import col, explode, regexp_replace, split

def list_splitter(pyspark_df):
  """
  Splits the list of friends and explode it into rows
  """
  
  return pyspark_df.withColumn("Friend_profile_id_list", explode(split(regexp_replace(col("Friend_profile_id_list"), "(^\[)|(\]$)", ""), ",")))

In [0]:
def friend_count(pyspark_df, profile_id):
  """
  Returns the count of friends of the given
  profile_id in the dataframe pyspark_df
  """
  
  # filter the profile_id from the dataframe
  pyspark_df = pyspark_df.filter(pyspark_df['Profile_id'] == profile_id)
  
  # explode the friend list
  pyspark_df = list_splitter(pyspark_df)
  
  # compute the friend count and return it
  return pyspark_df.rdd.map(lambda x: (x['Profile_id'], 1)).reduceByKey(lambda a, b: a+b).collect()[0][1]

In [0]:
friend_count(df, '416')

Out[13]: 19

## Task 4.1.b

In [0]:
def common_friend_list(pyspark_df, profile_1, profile_2):
  """
  Returns the dataframe of common friends of given profiles
  profile_1 and profile_2
  """
  
  # filter profiles from the dataframe
  pyspark_df = pyspark_df.filter((df['Profile_id'] == profile_1) | (df['Profile_id'] == profile_2))
  
  # explode the friend list
  pyspark_df = list_splitter(pyspark_df)
  
  # count the key=common friends, so that the corresponding value of common friends is 2
  df_counts = pyspark_df.rdd.map(lambda x: (x['Friend_profile_id_list'], 1)).reduceByKey(lambda a, b: a+b)
  
  # reverse the key and value
  df_counts = df_counts.map(lambda x: (x[1], x[0]))
  df_list = spark.createDataFrame(df_counts, ['Count', 'Common Friend'])

  # print all values where key=2 
  return df_list.filter(df_list['Count'] == 2).select('Common Friend')

In [0]:
common_friend_list(df, '501', '416').show(n=100)

+-------------+
|Common Friend|
+-------------+
|          466|
|          500|
|          552|
+-------------+

