# ENSF-612 Quiz 2

### Question 1


1. In Hadoop, each stage involves disk IO. As a result, jobs are run at the speed of disk and not at the speed of the processor. Apache spark executes faster than Hadoop because Apache spark provides in-memory computing and its design is intended to transform the in-memory data, which drastically reduces the time spent on disk.

2. False. RDDs are immutable.

3. False. Transformation is lazily evaluated, not action.

4. Spark automatically creates closures for functions and global variables used by the workers. These closures are sent for every task. This ends up having large lookup table for workers. Also, the changes done to global variables are not sent back to driver.
   
   Broadcast and accumulator variables address these probems. Broadcast variables help in efficiently sending large read-only values to workers by shipping to each worker once instead of with each task. Accumulators help in aggregating values from workers back to the driver, which are useful in implementing parallel counters and sum.

### Question 2

I created a dummy file, and uploaded it at `dbfs:/FileStore/quiz_2/data.csv`. I load this file into pyspark dataframe.

In [0]:
def read_CSV_to_DF(filepath):
  """
  Read a csv file into a spark dataframe
  """
  df = (spark.read
        .option("multiline", "true")
        .option("quote", '"')
        .option("header", "true")
        .option("escape", "\\")
        .option("escape", '"')
        .csv(filepath)
        )
  
  return df

# creating the dataframe
df = read_CSV_to_DF('/FileStore/quiz_2/data.csv')

# updating the datatype of columns of dataframe
df = df.withColumn('QuestionId', df['QuestionId'].cast('int'))
df = df.withColumn('Score', df['Score'].cast('int'))

# show the contents of the file
df.show()

+----------+-----------------+-----+
|QuestionId|HasAcceptedAnswer|Score|
+----------+-----------------+-----+
|         1|             True|   25|
|         2|            False|   26|
|         3|            False|   24|
|         4|             True|   11|
|         5|             True|    3|
|         6|            False|   29|
|         7|             True|   22|
|         8|             True|   16|
|         9|            False|    5|
|        10|            False|   26|
|        11|             True|   30|
|        12|             True|   21|
|        13|            False|   17|
|        14|            False|   15|
|        15|            False|   26|
|        16|             True|   28|
|        17|             True|   12|
|        18|            False|   12|
|        19|            False|   13|
|        20|            False|    7|
+----------+-----------------+-----+
only showing top 20 rows



#### Task 1

In [0]:
rdd_sumAcceptedScore = df.rdd.filter(lambda x: x['HasAcceptedAnswer'] == "True").map(lambda x: (x['HasAcceptedAnswer'] ,x['Score'])).reduceByKey(lambda a, b: a+b).collect()

print(f'Total score of accepted answers = {rdd_sumAcceptedScore[0][1]}')

Total score of accepted answers = 384


#### Task 2

In [0]:
rdd_sumNotAcceptedScore = df.rdd.filter(lambda x: x['HasAcceptedAnswer'] == "False").map(lambda x: (x['HasAcceptedAnswer'] ,x['Score'])).reduceByKey(lambda a, b: a+b).collect()

print(f'Difference of accepted answers and non-accepted answers = {rdd_sumAcceptedScore[0][1] - rdd_sumNotAcceptedScore[0][1]}')

Difference of accepted answers and non-accepted answers = -120


### Question 3

1. While creating the RDD from the text file, no minimum number of partitions is provided, and hence by default two partitions are created. We can supply a bigger value to ensure more partitions are created. By creating more partitions, we can utilize all of the cores available in the cluster.
    ```
    fileRDD = sc.textFile("BigLog.txt", 8)
    ```
    
2. We can cache the RDD before doing any transformations, so that RDD can persist in the memory for the next time we query it.
    ```
    fileRDD.cache()
    ```

Final code would look like below
```
fileRDD = sc.textFile("BigLog.txt", 8)
fileRDD.cache()

def filter1(record):
  ...

def filter2(record):
  ...


result1RDD = fileRDD.filter(filter1)
print(result1RDD.take(5))
result2RDD = fileRDD.filter(filter2)
print(result2RDD.take(5))
```

### Question 4

![Q4_Pipeline.png](/files/quiz_2/Q4_Pipeline.png)

1. We use a supervised machine learning algorithm.

2. Assume that for each of the consumer, we group our data such that we have information on which products were bought, when (month of purchase), and where (region of purchase). And for each consumer, we have their first name, last name, gender, country of origin, and last four digits of credit card.

   We tokenize our consumer data - ie first name, last name, gender, country of origin, the last four digits of credit card - which uniquely identifies a consumer.

   Region, country of origin, and month of buying the product are categorical, so we first convert to numerical using StringIndexer and then encode them using OneHotEncoder.

   We also encode all our products code using OneHotEncoder.

   Since gender is categorical, we encode the gender into 0 and 1 using StringIndexer and then OneHotEncoder.

   For the rest of the tokenized textual data, ie, first name, last name, and last four digits of credit card, we generate TF-IDF matrix. We also use CountVectorizer to generate an extra feature out of length of first name and last name.

   Finally we use VectorAssember to put all our features together and feed it into our ML model.

3. Since we have data of what all products were purchased by consumers, we can easily group the data for each consumer to see which products are frequently bought together. We train our model on existing purchase history to predict the consumer purchase in the next month. The features region and month of purchase play an important part in influencing the output of the ML model. Purchase of the same consumer can be traced using first name, last name, country of origin, and last four digits of credit card.