# IST 718: Big Data Analytics

- Professor: Daniel Acuna <deacuna@syr.edu>

## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers either from your classmates or from the internet__
- You can put the homework files anywhere you want in your https://jupyterhub.ischool.syr.edu/ workspace but _do not change_ the file names. The TAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` and `np.testing.` statements) are used to grade your answers. **However, the professor and TAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before downloading and submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`). 
- Good luck!

# Part 1: MapReduce with Spark

In [None]:
# Run this code to create the Spark session
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('map-reduce').getOrCreate()
sc = spark.sparkContext

In [None]:
# this RDD will be used throughout this part of the homework
gpa_rdd = sc.parallelize([
 ['2020', 'Fall', 'IST687', 1, 'A', 4],
 ['2020', 'Fall', 'IST659', 3, 'A', 4],
 ['2020', 'Fall', 'IST615', 3, 'B+', 3.3],
 ['2020', 'Fall', 'SOC101', 3, 'A-', 3.7],
 ['2020', 'Fall', 'MAT221', 3, 'C', 2],
 ['2020', 'Fall', 'IST346', 3, 'A', 4],
 ['2021', 'Spring', 'IST681', 3, 'B+', 3.3],
 ['2021', 'Spring', 'MAT222', 3, 'A', 4],
 ['2021', 'Spring', 'IST719', 3, 'C+', 2.3],
 ['2021', 'Spring', 'MBC638', 3, 'B-', 2.7],
 ['2021', 'Fall', 'IST718', 3, 'A-', 3.7],
 ['2021', 'Fall', 'PSY520', 3, 'B+', 3.3],
 ['2021', 'Fall', 'IST700', 3, 'A', 4],
 ['2021', 'Fall', 'IST722', 3, 'B-', 2.7]])

## Question 1: Cumulative GPA with MapReduce (15 pts)

The cumulative GPA reported in any transcript is the average of the numerical score of a course weighted by the credits of such course. Construct a MapReduce job that takes the `gpa_rdd` RDD and returns the cumulative GPA *per season*. 

Each record in `gpa_rdd` contains:
- The year
- The season
- The course code
- The number of credits
- The letter grade
- The number grade

**Hint:** In class, we discussed the MapReduce job for computing avereage. In this case, the key-value pairs will be similar but instead of counting the number of elements considered in the average so far, we need to count the credits. Clearly, the key needs to be year and semester.

In [None]:
def map_weighted_gpa(record):   
    # YOUR CODE HERE
    raise NotImplementedError()

def reduce_weighted_gpa(value1, value2):    
    # YOUR CODE HERE
    raise NotImplementedError()

## The map job should produce as key the year, season and value a tuple or list with the grade and credit.

For example, the first element of the map of `gpa_rdd` should be

```python
gpa_rdd.\
    map(map_weighted_gpa).\
    first()
```

```python
['2020Fall', [4, 1]]
```

In [None]:
gpa_rdd.\
    map(map_weighted_gpa).\
    first()

In [None]:
# 15 pts
##### first result
assert gpa_rdd.\
    map(map_weighted_gpa).\
    first() == ['2020Fall', [4, 1]]
# the key should be a string
assert gpa_rdd.map(map_weighted_gpa).map(lambda x: type(x[0])).distinct().count() == 1
assert gpa_rdd.map(map_weighted_gpa).map(lambda x: type(x[0])).distinct().first() == str
# all values should be of length 2
assert gpa_rdd.map(map_weighted_gpa).map(lambda x: len(x[1])).distinct().count() == 1
assert gpa_rdd.map(map_weighted_gpa).map(lambda x: len(x[1])).distinct().first() == 2

In [None]:
# there should be three results in the map reduce because there are three semesters
gpa_rdd.\
    map(map_weighted_gpa).\
    reduceByKey(reduce_weighted_gpa).count()

In [None]:
# 5 pts
assert (gpa_rdd.\
    map(map_weighted_gpa).\
    reduceByKey(reduce_weighted_gpa).collect()[0] == ('2021Spring', [3.0749999999999997, 12]) or \
    (gpa_rdd.\
    map(map_weighted_gpa).\
    reduceByKey(reduce_weighted_gpa).collect()[0] == ('2021Fall', [3.4250000000000003, 12])) or \
    (gpa_rdd.\
    map(map_weighted_gpa).\
    reduceByKey(reduce_weighted_gpa).collect()[0] == ('2020Fall', [3.4375, 16])))

## Time series

## Question 2: Stock Price
To make a good investment decision, we have to know the company well. In this question, we have the stock price of a company in the past 3 months. We want to obtain `minimun` and `maximum` of the stock price in the past 3 months using MapReduce. The first element of the value ate the end of the computation should be the minimum stock price and the second element should be the maximum stock price.

In [None]:
# stock price dataset
stock_price = sc.parallelize([['2021-06-16', 130.15], ['2021-06-17', 131.79], ['2021-06-18', 130.46], 
                             ['2021-06-21', 132.3],
['2021-06-22', 133.98], ['2021-06-23', 133.7], ['2021-06-24', 133.41], ['2021-06-25', 133.11],
['2021-06-28', 134.78], ['2021-06-29', 136.33], ['2021-06-30', 136.96], ['2021-07-01', 137.27],
['2021-07-02', 139.96], ['2021-07-06', 142.02] ,['2021-07-07', 144.57] ,['2021-07-08', 143.24] ,
['2021-07-09', 145.11], ['2021-07-12', 144.5] ,['2021-07-13', 145.64] ,['2021-07-14', 149.15] ,
['2021-07-15', 148.48], ['2021-07-16', 146.39] ,['2021-07-19', 142.45] ,['2021-07-20', 146.15] ,
['2021-07-21', 145.4], ['2021-07-22', 146.8] ,['2021-07-23', 148.56] ,['2021-07-26', 148.99] ,
['2021-07-27', 146.77], ['2021-07-28', 144.98] ,['2021-07-29', 145.64] ,['2021-07-30', 145.86] ,
['2021-08-02', 145.52], ['2021-08-03', 147.36] ,['2021-08-04', 146.95] ,['2021-08-05', 147.06] ,
['2021-08-06', 146.14], ['2021-08-09', 146.09] ,['2021-08-10', 145.6] ,['2021-08-11', 145.86] ,
['2021-08-12', 148.89], ['2021-08-13', 149.1] ,['2021-08-16', 151.12] ,['2021-08-17', 150.19] ,
['2021-08-18', 146.36], ['2021-08-19', 146.7] ,['2021-08-20', 148.19] ,['2021-08-23', 149.71] ,
['2021-08-24', 149.62], ['2021-08-25', 148.36] ,['2021-08-26', 147.54] ,['2021-08-27', 148.6] ,
['2021-08-30', 153.12], ['2021-08-31', 151.83] ,['2021-09-01', 152.51] ,['2021-09-02', 153.65] ,
['2021-09-03', 154.3], ['2021-09-07', 156.69] ,['2021-09-08', 155.11] ,['2021-09-09', 154.07] ,
['2021-09-10', 148.97],['2021-09-13', 149.55] ,['2021-09-14', 148.12] ,['2021-09-15', 149.03] ,
['2021-09-16', 148.79]])

In [None]:
# define the map operation
def stock_map(kv):  
    # YOUR CODE HERE
    raise NotImplementedError()

def stock_reduce(v1, v2):   
    # YOUR CODE HERE
    raise NotImplementedError()

In [None]:
# 10 pts
# there should be 1 result only
assert stock_price.map(stock_map).reduceByKey(stock_reduce).count() == 1
# the resulting value should have two elements (stock price 1, stock price 2)
assert len(stock_price.map(stock_map).reduceByKey(stock_reduce).values().first()) == 2
# lowest stock price was 130.15
assert stock_price.map(stock_map).reduceByKey(stock_reduce).values().first()[0] == 130.15
# highest stock price was 156.69
assert stock_price.map(stock_map).reduceByKey(stock_reduce).values().first()[1] == 156.69

##  Temperature Deviation
Historical changes in temperature throughout history is a higly noisy process. Still, we can use long-term trends to understand these fluctiations and perhaps relate them to external factors such as changes in the oceans, magnetic fields of the earth, or human activity (CO2).

Luckily, we have data to explore how temperatures have changes over the years. In the dataset below, we have obtained a set of key-value pairs where the key is the year $y$ and the value is a temperature deviation $t_y$ from a long-term historical record (e.g., inferred from the artic's permafrost).

In [None]:
global_avg_temp = sc.parallelize([[1880, -0.06], [1881, -0.09], [1882, 0.08], [1883, -0.28], 
                                  [1884, -0.24], 
                   [1885, -0.45], [1886, -0.18], [1887, -0.53], [1888, -0.33], [1889, -0.03],
                   [1890, -0.34], [1891, -0.4], [1892, -0.25], [1893, -0.74], [1894, -0.42],
                   [1895, -0.44], [1896, -0.15], [1897, -0.13], [1898, -0.01], [1899, -0.14],
                   [1900, -0.35], [1901, -0.13], [1902, -0.1], [1903, -0.24], [1904, -0.62],
                   [1905, -0.35], [1906, -0.14], [1907, -0.37], [1908, -0.38], [1909, -0.58],
                    [1910, -0.36], [1911, -0.55], [1912, -0.28], [1913, -0.35], [1914, 0.11],
                   [1915, -0.15], [1916, -0.13], [1917, -0.53], [1918, -0.32], [1919, -0.23],
                    [1920, -0.2], [1921, -0.07], [1922, -0.3], [1923, -0.24], [1924, -0.22],
                    [1925, -0.4], [1926, 0.13], [1927, -0.23], [1928, -0.08], [1929, -0.49],
                    [1930, -0.34], [1931, -0.03], [1932, 0.15], [1933, -0.29], [1934, -0.3],
                    [1935, -0.28], [1936, -0.27], [1937, -0.18], [1938, 0.02], [1939, -0.11],
                    [1940, 0.01], [1941, 0.27], [1942, 0.29], [1943, -0.03], [1944, 0.39],
                    [1945, 0.16], [1946, 0.24], [1947, -0.16], [1948, 0.06], [1949, 0.12],
                   [1950, -0.29], [1951, -0.28], [1952, 0.13], [1953, 0.09], [1954, -0.2],
                    [1955, 0.14], [1956, -0.14], [1957, -0.09], [1958, 0.32], [1959, 0.16],
                   [1960, 0.04], [1961, 0.13], [1962, 0.12], [1963, 0.06], [1964, 0.01],
                    [1965, -0.07], [1966, -0.05], [1967, -0.09], [1968, -0.2], [1969, -0.09],
                    [1970, 0.14], [1971, 0.01], [1972, -0.24], [1973, 0.28], [1974, -0.19],
                    [1975, 0.11], [1976, -0.02], [1977, 0.13], [1978, 0.16], [1979, 0.15],
                    [1980, 0.33], [1981, 0.51], [1982, 0.14], [1983, 0.53], [1984, 0.3],
                    [1985, 0.22], [1986, 0.31], [1987, 0.32], [1988, 0.56], [1989, 0.17],
                    [1990, 0.36], [1991, 0.43], [1992, 0.46], [1993, 0.36], [1994, 0.27],
                    [1995, 0.56], [1996, 0.25], [1997, 0.34], [1998, 0.6], [1999, 0.51],
                   [2000, 0.34], [2001, 0.47], [2002, 0.71], [2003, 0.72], [2004, 0.61],
                    [2005, 0.65], [2006, 0.51], [2007, 0.92], [2008, 0.27], [2009, 0.6],
                    [2010, 0.73], [2011, 0.47], [2012, 0.44], [2013, 0.62],
                   [2014, 0.7], [2015, 0.83], [2016, 1.12], [2017, 0.98], [2018, 0.76],
                   [2019, 0.94], [2020, 1.15], [2021, 0.95]])

## Question 3: Running average of temperature deviation (25 pts)

Sometimes, time series are too noisy and require smoothing. We can do this smoothing by computing a running average of values. In this question, you will compute a simple moving average where the resulting RDD will have as key the year $y$ and as the first element of the value $m_y$, the average temperature deviation of the last $n$ years in the data (including the current year):

$$m_y = \frac{\sum_{i=y-n+1}^{y} t_i}{n}$$

For example, for the input data and for $n=2$

```
(1880, -0.06)
(1881, -0.09)
(1882, 0.08)
(1883, -0.28)
```

The first element for key 1883 will be

$$
\begin{array}
mm_{1883} &=\frac{\sum_{i=1883-2+1}^{1883} t_i}{2}\\
& =\frac{\sum_{i=1882}^{1883} t_i}{2}\\
& =\frac{0.08 -0.28}{2}\\
& =0.1
\end{array}$$

Below, define a function that computes the moving average for $n=5$. Notice that you will be able to compute the moving average **only** starting in year 1884 and ending in year 2021. Because each input element should likely generate several key-value pairs, use `flatMap` instead of `map` as your map step.

In [None]:
# define the map operation
def ma_map(kv):    
    # YOUR CODE HERE
    raise NotImplementedError()

def ma_reduce(v1, v2):   
    # YOUR CODE HERE
    raise NotImplementedError()

In [None]:
# the first result of the moving average should be average of the first five values of the temperature deviations
sum(global_avg_temp.values().take(5))/5

In [None]:
# confirm that you get the same results
sc.parallelize(global_avg_temp.take(5)).\
    flatMap(ma_map).\
    reduceByKey(ma_reduce).\
    filter(lambda kv: kv[0] == 1884).\
    map(lambda kv: (kv[0], kv[1][0])).\
    collect()

If you were to run the following code, it should love like this:

```python
global_avg_temp.\
    flatMap(ma_map).\
    reduceByKey(ma_reduce).\
    filter(lambda kv: kv[0] >= 1884 and kv[0] <= 2021).\
    map(lambda kv: [kv[0], kv[1][0]]).\
    sortByKey().\
    take(10)
```

```
[(1884, -0.11800000000000002),
 (1885, -0.196),
 (1886, -0.21400000000000002),
 (1887, -0.33599999999999997),
 (1888, -0.346),
 (1889, -0.30400000000000005),
 (1890, -0.28200000000000003),
 (1891, -0.32600000000000007),
 (1892, -0.27),
 (1893, -0.352)]
 ```

In [None]:
# try it here
global_avg_temp.\
    flatMap(ma_map).\
    reduceByKey(ma_reduce).\
    filter(lambda kv: kv[0] >= 1884 and kv[0] <=2021).\
    map(lambda kv: [kv[0], kv[1][0]]).\
    sortByKey().\
    take(10)

In [None]:
# 25 pts
np.testing.assert_almost_equal(global_avg_temp.\
    flatMap(ma_map).\
    reduceByKey(ma_reduce).\
    filter(lambda kv: kv[0] >= 1884 and kv[0] <= 2021).\
    map(lambda kv: [kv[0], kv[1][0]]).\
    sortByKey().\
    values().sum(), 7.837999999999998)