# IST 718: Big Data Analytics

- Professor: Daniel Acuna <deacuna@syr.edu>

## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers either from your classmates or from the internet__
- You can put the homework files anywhere you want in your http://notebook.acuna.io workspace but _do not change_ the file names. The TAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` and `np.testing.` statements) are used to grade your answers. **However, the professor and TAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before downloading and submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`). 
- Good luck!

In [5]:
# this code creates the spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
import numpy as np

# Part 1: Map-Reduce: Gradient descent

Throughout this assignment, you should use vanilla Python and not Numpy.

Some statistical models $f(x)$ are learned by optimizing a loss function $L(\Theta)$ that depends on a set of parameters $\Theta$. There are several ways of finding the optimal $\Theta$ for the loss function, one of which is to iteratively update following the gradient:

$$
\nabla L = 
\begin{pmatrix} 
    \frac{\partial L}{\partial \theta_0}\\ 
    \frac{\partial L}{\partial \theta_1} \\ 
    \vdots\\ 
    \frac{\partial L}{\partial \theta_p}
\end{pmatrix}
$$

To then, compute the update
$$\Theta^{t+1} = \Theta^t - \eta \nabla L$$

Because we assume independence between data points, the gradient becomes a summation:

$$\nabla L = \sum_{i=1}^{n} \nabla L_i$$
where $L_i$ is the loss function for the $i$-th data point.

Take as example, the statistical model $f(x) = b_0 + b_1 x$ and loss function $L(\Theta) = (f(x) - y)^2$. If we have a set of three datapoints $D=\{ (x=1,y=2), (x=-2, y=-1), (x=4, y = 3)\}$

Then the loss function for each of them is
$L_1 = \left(b_{0} + b_{1} - 2\right)^{2}$, 
$L_2 = \left(b_{0} - 2 b_{1} + 1\right)^{2}$, and
$L_3 = \left(b_{0} + 4 b_{1} - 3\right)^{2}$

with 
$$\nabla L_i = \left[\begin{matrix}2 b_{0} + 2 b_{1} x_i - 2 y_i\\2 x_i \left(b_{0} + b_{1} x_i - y_i\right)\end{matrix}\right]$$

if we start with a solution $b_0 = 0, b_1 = 1$, then the gradients are:

$$\nabla L_1 = \left[\begin{matrix}-2\\-2\end{matrix}\right]$$
$$\nabla L_2 = \left[\begin{matrix}-2\\4\end{matrix}\right]$$
$$\nabla L_3 = \left[\begin{matrix}2\\8\end{matrix}\right]$$

which after accumulation would yield
$$\nabla L = \left[\begin{matrix}-2\\10\end{matrix}\right]$$

## Question 1 (5 pts)

Create a function `f_linear(b, x)` that receives the parameters `b` and one data point `x` as lists and return the prediction for that data point. Assume that the first element of `b` is the intercept.

In [6]:
# define below the function `f_linear` which performs a linear prediction based on parameters as data point
def f_linear(b, x):
    # YOUR CODE HERE
    res=b[0]
    i=1
    while i<len(b):
        res=res+b[i]*x[i-1]
        i=i+1
    return res
    #raise NotImplementedError()

In [7]:
# for the example above, if we assume b = [0, 1], and the first data point x = [1], y = 2
f_linear([0, 1], [1])

1

In [8]:
# test (5 pts)
assert f_linear([1, 1, 2, 3], [2, 1, 3]) == 14
assert f_linear([1], []) == 1
assert f_linear([0, 1, 0, 1, 0, 1], [0, 10, 10, 10 , 10]) == 20

## Question 2 (5 pts)
Define the function `L(y_pred, y)` that receives a prediction `y_pred` and the actual value `y` and returns the squared error between them.

In [9]:
def L(y_pred, y):
    # YOUR CODE HERE
    return (y_pred-y)**2
    #raise NotImplementedError()

In [10]:
# there should be not error here
L(1, 1)

0

In [11]:
# 2^2 error
L(0, 2)

4

In [12]:
# (5 pts)
assert L(1, 1) == 0
assert L(0, 4) == 16

## Question 3 (10 pts)
Create a function `gf_linear(f, b, x, y)` which returns the gradient of the linear function `f` with parameter `b` with respect to the squared loss function, evaluated at `x` and the actual outcome `y`. This function should return a vector with each element $j$ corresponding to the gradient with respect $b_j$, with $j = \{0, 1, \dots, p\}$.

In [13]:
# define `gf_linear`
#∇𝐿𝑖=[2𝑏0+2𝑏1𝑥𝑖−2𝑦𝑖 2𝑥𝑖(𝑏0+𝑏1𝑥𝑖−𝑦𝑖)]
def gf_linear(f, b, x, y):
    # YOUR CODE HERE
    
    #I am writing this code in a format, as general as possible
    
    #checking if I have 1).integers for 'b' and 'x' 2). Lists of granularity 1 (just 1 set of x-variables) 3). 'x' being a list of lists (more than 1 data points)
    #For case 3, 'b' and 'y' would still be lists with granularity 1 however. My output in this case should be a list of lists; each internal list being gradient for that data point.
    #Eg. op[1]=gradient for data point x[1] and y[1]. x[1] is a list though, thus there are more than 1 independent variables:
    
    #a). Checking for case 1. Gives a message for improper format, since 'b' will be intercept, no parameter for the input variable 'x':
    if (str(type(b))=="<class 'int'>" and str(type(x))=="<class 'int'>"):
        print("Wrong format; 'b' is the inetercept, but no parameter found for the input variable 'x'.")
        
    #b). Checking if 'b' is just an int and 'x' is a list:
    elif (str(type(b))=="<class 'int'>" and str(type(x))=="<class 'list'>"):
        
        #If 'x' is not an empty list, then I don't have a parameter for the 'x', just an intercept in the form of the integer 'b', so it is an error:
        if len(x)>0:
            print("More number of x-variables than parameters. Invalid format.")
    
    #c). Checking for the case where I am having a list of parameters and just 1 input 'x' variable:
    elif (str(type(b))=="<class 'list'>" and str(type(x))=="<class 'int'>"):
        
        #1). If I am having more than 2 parameters for this case, I'd have more parameters than x-variables, so an improper format:
        if len(b)>2:
            print("More number of parameters than x-variables. Invalid format.")
        
        #2). If there is just 1 element in 'b', this will be a case similar to situation b). 
        elif len(b)==1:
            print("Just intercept found; no parameter for input variable x.")
        
        #3). If b is a list, but an empty one:
        elif len(b)==0:
            print("Parameter list is empty. Incorrect format.")
            
        #4). Now I'd have 1 intercept and 1 parameter for the input variable 'x':
        elif len(b)==2:
            res=f(b,x)
            fv_r=res-y
            grad=[2*fv_r,2*fv_r*x]
            return grad
            
    #d). If both 'b' and 'x' are lists:
    elif (str(type(b))=="<class 'list'>" and str(type(x))=="<class 'list'>"):
        grad=[]
        if len(x)==0:
            if len(b)==1:
                res=f(b,x)
                fv_r=res-y
                return([2*fv_r])
            else:
                print("Cannot match number of parameters and number of input variables. 'x' is empty.")
        
        elif type(x[0])=="<class 'list'>":
            j=0
            
            while j<len(x):
                if len(b)==len(x[j])+1:
                    gr=[]
                    res=f(b,x[j])
                    fv_r=res-y[j]
                    gr.append(2*fv_r)
                    i=0
                    while i<len(x[j]):
                        gr.append(2*fv_r*x[j][i])
                        i=i+1
                    grad.append(gr)
                else:
                    print("Incorrect input; not enough parameters or x-variables in entry number "+str(j+1))
                    return -1
                j=j+1
        
        elif str(type(x[0]))=="<class 'int'>":
           
            if len(b)==len(x)+1:
                res=f(b,x)
                
                if str(type(y))=="<class 'list'>":
                    fv_r=res-y[0]
                elif str(type(y))=="<class 'int'>":
                    fv_r=res-y
                    
                grad.append(2*fv_r)
                i=0
                while i<len(x):
                    grad.append(2*fv_r*x[i])
                    i=i+1
            
            else:
                print("Incorrect input; not enough parameters or x-variables.")
                return -1
        
        return grad
            
    #raise NotImplementedError()

In [14]:
# for the example above and first data point
x = [1]
y = 2
b = [0, 1]
gf_linear(f_linear, b, x, y)

[-2, -2]

In [15]:
# for the example above and second data point
x = [-2]
y = -1
b = [0, 1]
gf_linear(f_linear, b, x, y)

[-2, 4]

In [16]:
## (10 pts)
np.testing.assert_array_equal(gf_linear(f_linear, [0, 1], [1], 2), [-2, -2])
np.testing.assert_array_equal(gf_linear(f_linear, [0, 1], [-2], -1), [-2, 4])
np.testing.assert_array_equal(gf_linear(f_linear, [1], [], 0), [2])

## Question 4 (15 pts)

Develop a map-reduce job that produces a value so that the first element of the value is the mean loss function across all the data. You might use other pieces of information as part of the value to create your computation.

You will implement your map function as `map_mse(f, b, L, xy)` where `f` is the function `b` are the parameters of the function `L` is the loss function and `xy` is the data. Assume that the data will come as an RDD where each element is of the format:

`[x, y]` where `x` is a list and `y` is a scalar.

Since the key does not matter for this map reduce job, just put a constant of your choice.

In [17]:
# data
rdd_data = sc.parallelize([
    [[1, 2], 3],
    [[3, 1], 4],
    [[-1, 1.5], 0],
    [[-9, 3], 0]
])

In [18]:
# create function `map_mse` below
def map_mse(f, b, L, xy):
    # YOUR CODE HERE
    x,y=xy
    res=f(b,x)
    return [0,[L(res,y),1]]
    #raise NotImplementedError()

In [19]:
rdd_data.map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).collect()

[[0, [9, 1]], [0, [16, 1]], [0, [0.0, 1]], [0, [0, 1]]]

You should apply the map function in the following way:

```python
# for an example set of `b = [0, 0, 0]`
rdd_data.map(lambda x: map_generator(f_linear, [0, 0, 0], L, x))
```


In [20]:
# try it here
rdd_data.map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).collect()

[[0, [9, 1]], [0, [16, 1]], [0, [0.0, 1]], [0, [0, 1]]]

In [21]:
# (10 pts)
assert rdd_data.map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).count() == 4
assert rdd_data.map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).map(lambda x: len(x)).\
    distinct().\
    first() == 2

assert rdd_data.map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).count() == 4
# the first element should be a number
assert isinstance((rdd_data.map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).first()[1][0]), 
                  (int, float, complex))
# try with other initializations
assert isinstance((rdd_data.map(lambda x: map_mse(f_linear, [1, 2, 3], L, x)).first()[1][0]), 
                  (int, float, complex))

You will now create a reduce job that receives two values of a previous reduce (or map) and merge them appropriately. Remember that at the end of the reduce job, the first element of the value should be the mean squared error. Create the function `reduce_mse(v1, v2)` below.

In [22]:
# create function `reduce_mse` below
def reduce_mse(v1, v2):
    # YOUR CODE HERE
    return [((v1[0]*v1[1])+(v2[0]*v2[1]))/(v1[1]+v2[1]),v1[1]+v2[1]]
    #raise NotImplementedError()

In [23]:
# the following function call should return the mean squared error
rdd_data.\
    map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).\
    reduceByKey(reduce_mse).first()[1][0]

6.25

In [24]:
# the following function call should return the mean squared error
rdd_data.\
    map(lambda x: map_mse(f_linear, [2, 2, 3], L, x)).\
    reduceByKey(reduce_mse).first()[1][0]

41.8125

In [25]:
# (5 pts)
assert rdd_data.\
    map(lambda x: map_mse(f_linear, [0, 0, 0], L, x)).\
    reduceByKey(reduce_mse).first()[1][0] == 6.25

assert rdd_data.\
    map(lambda x: map_mse(f_linear, [2, 0, 0], L, x)).\
    reduceByKey(reduce_mse).first()[1][0] == 3.25

assert rdd_data.\
    map(lambda x: map_mse(f_linear, [2, 2, 3], L, x)).\
    reduceByKey(reduce_mse).first()[1][0] == 41.8125

## Question 5 (10 pts)

In this question, you will compute the cumulative gradient of a model on the data. You will define a map function `map_gradient(f, gf, b, xy)` that would receive a function `f`, its gradient `gf`, its parameters `b`, and a data point `xy = [x, y]`. Also you will define a function `reduce_gradient(v1, v2)` that combines the two values appropriately. In the map function, you probably do not need to keep extra values beyond the actual gradient.

In [26]:
# define the function `map_gradient` below
def map_gradient(f, gf, b, xy):
    # YOUR CODE HERE
    x,y=xy
    grad=gf(f,b,x,y)
    return [0,grad]
    #raise NotImplementedError()

In [27]:
# (5 pts)
assert len(rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, [0, 0, 0], xy)[1]).first()) == 3

In [28]:
map_gradient(f_linear, gf_linear, [0, 0, 0], [[1,3],5])

[0, [-10, -10, -30]]

In [29]:
# define the function `reduce_gradient` below
def reduce_gradient(v1, v2):
    # YOUR CODE HERE
    red_grad=[]
    i=0
    while i<len(v1):
        val=v1[i]+v2[i]
        red_grad.append(val)
        #red_grad[i]=v1[i]+v2[i]: This piece doesn't work. Always must use append()
        i=i+1
    return red_grad
    #raise NotImplementedError()

In [30]:
rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, [0, 0, 0], xy)).\
    reduceByKey(reduce_gradient).first()[1]

[-14.0, -30.0, -20.0]

In [31]:
# (5 pts)
np.testing.assert_array_equal(
    rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, [0, 0, 0], xy)).\
    reduceByKey(reduce_gradient).first()[1],
    [-14.0, -30.0, -20.0])

np.testing.assert_array_equal(
    rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, [0, 0, 0], xy)).\
    reduceByKey(reduce_gradient).first()[1],
    [-14.0, -30.0, -20.0])

if all your answers are correct, then we can run an optimization below, and the MSE should decrease with each iteration

In [32]:
b = [0, 0, 0]
learning_rate = 0.01
print("Initial solution: \t", b)
for _ in range(10):
    print("New iteration")
    print("=============")
    gradient = rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, b, xy)).\
        reduceByKey(reduce_gradient).first()[1]
    b = [b0 - learning_rate*g0 for b0, g0 in zip(b, gradient)]
    print("Current solution: \t", b)
    mse = rdd_data.\
        map(lambda x: map_mse(f_linear, b, L, x)).\
        reduceByKey(reduce_mse).first()[1][0]
    print("Current MSE: \t\t", mse)
    
    

Initial solution: 	 [0, 0, 0]
New iteration
Current solution: 	 [0.14, 0.3, 0.2]
Current MSE: 		 4.036099999999999
New iteration
Current solution: 	 [0.27480000000000004, 0.15880000000000005, 0.455]
Current MSE: 		 2.8077335025
New iteration
Current solution: 	 [0.34362200000000004, 0.41343399999999997, 0.540541]
Current MSE: 		 2.124745162297563
New iteration
Current solution: 	 [0.42466317000000003, 0.24800435, 0.707635855]
Current MSE: 		 1.7435970621414336
New iteration
Current solution: 	 [0.45430526015000006, 0.47522477825, 0.730516771125]
Current MSE: 		 1.529538732230305
New iteration
Current solution: 	 [0.50541029705925, 0.29867069991675, 0.848308677264375]
Current MSE: 		 1.4080112827808984
New iteration
Current solution: 	 [0.5135716556948637, 0.5084709260312962, 0.8371720415554381]
Current MSE: 		 1.3377595648707117
New iteration
Current solution: 	 [0.5479266281297145, 0.32798388034815074, 0.9270367149304003]
Current MSE: 		 1.2959552690942715
New iteration
Current soluti

**(5 pts)** In the code, above, play with the value of `learning_rate` less than 1.0 until the optimizer diverges (the loss function goes down and then goes *up*). What is this learning rate?

In [39]:
b = [0, 0, 0]
learning_rate = 0.0089
print("Initial solution: \t", b)
for _ in range(10):
    print("New iteration")
    print("=============")
    gradient = rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, b, xy)).\
        reduceByKey(reduce_gradient).first()[1]
    b = [b0 - learning_rate*g0 for b0, g0 in zip(b, gradient)]
    print("Current solution: \t", b)
    mse = rdd_data.\
        map(lambda x: map_mse(f_linear, b, L, x)).\
        reduceByKey(reduce_mse).first()[1][0]
    print("Current MSE: \t\t", mse)

Initial solution: 	 [0, 0, 0]
New iteration
Current solution: 	 [0.1246, 0.267, 0.178]
Current MSE: 		 4.1302238099999995
New iteration
Current solution: 	 [0.24508108, 0.18452548, 0.3995655]
Current MSE: 		 2.867466506436631
New iteration
Current solution: 	 [0.318596634118, 0.34265946194600005, 0.506459863229]
Current MSE: 		 2.115207415373838
New iteration
Current solution: 	 [0.38949619256355966, 0.2943986083757235, 0.6387681500672706]
Current MSE: 		 1.6670371339312942
New iteration
Current solution: 	 [0.43253028699358087, 0.3880863578385662, 0.7031536588366425]
Current MSE: 		 1.4000046583047048
New iteration
Current solution: 	 [0.473910740122105, 0.35987954838441216, 0.7823601931883728]
Current MSE: 		 1.2408719109427595
New iteration
Current solution: 	 [0.49875834540221853, 0.415415735805836, 0.8213330385915345]
Current MSE: 		 1.1460130288731225
New iteration
Current solution: 	 [0.522565191141674, 0.3989619281819948, 0.8689466203553182]
Current MSE: 		 1.0894411795892902
N

In [40]:
b = [0, 0, 0]
learning_rate = 0.009
print("Initial solution: \t", b)
for _ in range(10):
    print("New iteration")
    print("=============")
    gradient = rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, b, xy)).\
        reduceByKey(reduce_gradient).first()[1]
    b = [b0 - learning_rate*g0 for b0, g0 in zip(b, gradient)]
    print("Current solution: \t", b)
    mse = rdd_data.\
        map(lambda x: map_mse(f_linear, b, L, x)).\
        reduceByKey(reduce_mse).first()[1][0]
    print("Current MSE: \t\t", mse)

Initial solution: 	 [0, 0, 0]
New iteration
Current solution: 	 [0.126, 0.26999999999999996, 0.18]
Current MSE: 		 4.120141
New iteration
Current solution: 	 [0.247788, 0.182628, 0.40454999999999997]
Current MSE: 		 2.85570663639025
New iteration
Current solution: 	 [0.321056838, 0.34808178599999995, 0.510019389]
Current MSE: 		 2.1049292297785462
New iteration
Current solution: 	 [0.392680961037, 0.29207068843500006, 0.6447346400655]
Current MSE: 		 1.6590588708536282
New iteration
Current solution: 	 [0.4349123897844735, 0.3935339249263425, 0.7066837293143512]
Current MSE: 		 1.394203448815854
New iteration
Current solution: 	 [0.476698058154599, 0.35773950084501305, 0.7877304161128424]
Current MSE: 		 1.2368252572203948
New iteration
Current solution: 	 [0.5006680578834956, 0.42001624374210045, 0.8242888404064057]
Current MSE: 		 1.1432699247936993
New iteration
Current solution: 	 [0.5247027185851659, 0.39721567384850914, 0.8732610378761687]
Current MSE: 		 1.0876196079919576
New i

In [48]:
b = [0, 0, 0]
learning_rate = 0.3
print("Initial solution: \t", b)
for _ in range(10):
    print("New iteration")
    print("=============")
    gradient = rdd_data.map(lambda xy: map_gradient(f_linear, gf_linear, b, xy)).\
        reduceByKey(reduce_gradient).first()[1]
    b = [b0 - learning_rate*g0 for b0, g0 in zip(b, gradient)]
    print("Current solution: \t", b)
    mse = rdd_data.\
        map(lambda x: map_mse(f_linear, b, L, x)).\
        reduceByKey(reduce_mse).first()[1][0]
    print("Current MSE: \t\t", mse)

Initial solution: 	 [0, 0, 0]
New iteration
Current solution: 	 [4.2, 9.0, 6.0]
Current MSE: 		 1267.54
New iteration
Current solution: 	 [3.7199999999999935, -379.08, 61.49999999999998]
Current MSE: 		 3602067.069024999
New iteration
Current solution: 	 [-1642.4459999999997, 21435.677999999993, -5893.893]
Current MSE: 		 12338338354.600441
New iteration
Current solution: 	 [105994.58369999996, -1250821.4444999993, 361211.6305499999]
Current MSE: 		 42376138027663.06
New iteration
Current solution: 	 [-6276797.754854997, 73269195.78397495, -21274153.761412486]
Current MSE: 		 1.4554630449581718e+17
New iteration
Current solution: 	 [368290317.8054631, -4293752442.444835, 1247490101.8632536]
Current MSE: 		 4.9989775679427615e+20
New iteration
Current solution: 	 [-21586820691.9137, 251636837969.8816, -73114754253.90024]
Current MSE: 		 1.7169640245103839e+24
New iteration
Current solution: 	 [1265130559807.0037, -14747347207429.465, 4284974208216.569]
Current MSE: 		 5.897136807721217e

Multiple executions of the same code, but with different learning rate values, showed me that the lowest Mean Squared Error was encountered for a learning rate of 0.009. As I tried moving the learning rate in either directions of 0.009, the Mean Squared Error went up. I can thus conclude that the learning curve is probably a parabolic curve, with the minima at the value of 0.009 (roughly) for the learning rate.