In [106]:
#importing CSV File and Reading it
import pandas as pd
import csv
import pyspark
from pyspark import SparkContext, SparkConf
import numpy as np
import tensorflow as tf
import plotly.plotly as py
from sklearn.utils import shuffle
from sklearn.metrics import mean_absolute_error
from scipy.stats import pearsonr, zscore

In [4]:
#Starting the Spark Session
conf=SparkConf().setAppName("CSE545 Project").set("spark.driver.memory", "12g").set("spark.executor.memory", "6g").set("spark.driver.maxResultSize", "6g")
sc=SparkContext(conf=conf)

In [35]:
#Loading WDI Dataset
WDI_rdd1 = sc.textFile("WDI_GDP_Growth.csv").map(lambda line: line.split(",")).filter(lambda line: len(line)>1)

In [46]:
print(WDI_rdd1.collect())

[[u'Country Name', u'Country Code', u'Indicator Name', u'Indicator Code', u'1960', u'1961', u'1962', u'1963', u'1964', u'1965', u'1966', u'1967', u'1968', u'1969', u'1970', u'1971', u'1972', u'1973', u'1974', u'1975', u'1976', u'1977', u'1978', u'1979', u'1980', u'1981', u'1982', u'1983', u'1984', u'1985', u'1986', u'1987', u'1988', u'1989', u'1990', u'1991', u'1992', u'1993', u'1994', u'1995', u'1996', u'1997', u'1998', u'1999', u'2000', u'2001', u'2002', u'2003', u'2004', u'2005', u'2006', u'2007', u'2008', u'2009', u'2010', u'2011', u'2012', u'2013', u'2014', u'2015', u'2016', u'Unnamed: 61'], [u'Arab World', u'ARB', u'GDP growth (annual %)', u'NY.GDP.MKTP.KD.ZG', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'15.9031516079944', u'8.50550096656832', u'-0.7984761869222149', u'11.605057353708', u'9.145067518629618', u'2.68708383254845', u'-8.9925725353927', u'-6.573568995782581', u'1.06640451994264', u'-2.16460950525548', u'4.46288865462701', u'-0.62

In [45]:
#Filtering required data
WDI_rdd2 = WDI_rdd1.map(lambda x: [x[0],x[29:60]])

print(WDI_rdd2.collect())

[[u'Country Name', [u'1985', u'1986', u'1987', u'1988', u'1989', u'1990', u'1991', u'1992', u'1993', u'1994', u'1995', u'1996', u'1997', u'1998', u'1999', u'2000', u'2001', u'2002', u'2003', u'2004', u'2005', u'2006', u'2007', u'2008', u'2009', u'2010', u'2011', u'2012', u'2013', u'2014', u'2015']], [u'Arab World', [u'-2.16460950525548', u'4.46288865462701', u'-0.626386706730514', u'5.58877530570663', u'2.12100068254981', u'13.141355032793902', u'1.52961578176148', u'4.79539500272912', u'3.20093873984074', u'3.209841598949', u'2.7552678606214602', u'4.478689158267731', u'4.18729746437975', u'5.1001984195861505', u'1.80769841826131', u'5.37168948140877', u'1.61383675424491', u'0.5844055138085339', u'5.3192667581860595', u'9.335660113583051', u'5.71629757147758', u'6.495442352866861', u'4.57174871901114', u'5.81595309929904', u'0.43183141309322104', u'4.77378908129691', u'3.57043337524128', u'5.16139739851323', u'3.64693465504243', u'2.9113462019072203', u'3.4848081657532104']], [u'Carib

In [42]:
# headers = WDI_rdd2.collect()[0]
# WDI_rdd2=WDI_rdd2.filter(lambda x: x[0]!='Country Name')

# df0 = pd.DataFrame(WDI_rdd2.collect(), columns=headers)

In [59]:
#Converting String Values to Integer Values
def conv_x(x):
    growth=[]
    count=0
    temp=[]
    if x[0] != "Country Name":
        for num in x[1]:
            if num == '':
                temp.append(0)
                count+=1
                if(count==8):
                    growth.append(temp)
                    temp=[]
                    count=0
            else:
                temp.append(round(float(num),2))
                count+=1
                if(count==8):
                    growth.append(temp)
                    temp=[]
                    count=0
        growth.append(temp)
    else:
        growth=x[1]
    return [x[0],growth]

In [62]:
#Calculating growth over 8 years
def calc_growth8(x):
    wdi=[]
    if x[0] != "Country Name":
        for l in x[1]:
            gr=0
            for i in range(len(l)):
                gr=round(float(l[i] + gr + (l[i]*gr/100)),2)
            wdi.append(gr)
    else:
        wdi=x[1]
    return [x[0],wdi]

In [63]:
#Call to the conversion function
WDI_rdd3=WDI_rdd2.map(lambda x: conv_x(x)).map(lambda x: calc_growth8(x))
print(WDI_rdd3.collect())

[[u'Country Name', [u'1985', u'1986', u'1987', u'1988', u'1989', u'1990', u'1991', u'1992', u'1993', u'1994', u'1995', u'1996', u'1997', u'1998', u'1999', u'2000', u'2001', u'2002', u'2003', u'2004', u'2005', u'2006', u'2007', u'2008', u'2009', u'2010', u'2011', u'2012', u'2013', u'2014', u'2015']], [u'Arab World', [31.84, 34.34, 46.63, 26.49]], [u'Caribbean small states', [10.89, 28.57, 35.85, 2.36]], [u'Central Europe and the Baltics', [-11.33, 31.25, 45.23, 10.02]], [u'Early-demographic dividend', [31.07, 30.85, 43.77, 34.2]], [u'East Asia & Pacific', [51.04, 33.18, 42.52, 34.86]], [u'East Asia & Pacific (excluding high income)', [82.47, 82.5, 101.66, 67.99]], [u'East Asia & Pacific (IDA & IBRD countries)', [82.47, 82.49, 101.66, 67.99]], [u'Euro area', [25.94, 20.04, 15.53, 1.01]], [u'Europe & Central Asia', [19.94, 17.95, 22.41, 5.03]], [u'Europe & Central Asia (excluding high income)', [-16.31, -6.74, 62.53, 15.25]], [u'Europe & Central Asia (IDA & IBRD countries)', [-15.41, -1.2

In [136]:
def calc_beta(betas, X_test, y_test):
    y_pred = np.matmul(X_test, betas)[:,0]
    print("Mean Absolute Error:", mean_absolute_error(y_test, y_pred))
    print("Pearson Correlation:", pearsonr(y_test, y_pred))

In [137]:
def RidgeRegression(penalty_value = 1.0, learning_rate = 0.0001, n_epochs = 100):

    #Transforming rdd to tensors for applying Machine Learning Models
    X_wdi=WDI_rdd3.filter(lambda x: x[0]!="Country Name").map(lambda x: x[1])
    X_wdi=np.array(X_wdi.collect())
    print(X_wdi)

    #Dividing into training and test data
    offset = int(int(X_wdi.shape[0]) * 0.8)
    X_wdi_tf_test, Y_wdi_tf_test = X_wdi[offset:,:3], X_wdi[offset:,3:]
    X_wdi_tf, Y_wdi_tf = X_wdi[:offset,:3], X_wdi[:offset,3:]

    # Conversion to tensors
    X_wdi_tf = tf.constant(X_wdi_tf, dtype=tf.float32, name="WDI_X")
    Y_wdi_tf = tf.constant(Y_wdi_tf.reshape(-1,1), dtype=tf.float32, name="WDI_Y")
    Xt_wdi_tf = tf.transpose(X_wdi_tf)
    penalty = tf.constant(1.0, dtype=tf.float32, name="penalty")
    I = tf.constant(np.identity(int(X_wdi_tf.shape[1])), dtype=tf.float32, name="I")
    beta = tf.Variable(tf.random_uniform([int(X_wdi_tf.shape[1]), 1], -1., 1.), name = "beta")
    y_pred = tf.matmul(X_wdi_tf, beta, name="predictions")
    penalizedCost = tf.reduce_sum(tf.square(Y_wdi_tf - y_pred)) + penalty * tf.reduce_sum(tf.square(beta))
    optimizer = tf.train.GradientDescentOptimizer(learning_rate = learning_rate)
    training_op = optimizer.minimize(penalizedCost)
    init = tf.global_variables_initializer()
    with tf.Session() as sess:
        sess.run(init)
        for epoch in range(n_epochs):
            if epoch %10 == 0: #print debugging output
                print("Epoch", epoch, "; penalizedCost =", penalizedCost.eval())
            sess.run(training_op)
        #done training, get final beta: 
        best_beta = beta.eval()
    print(best_beta)
    calc_beta(best_beta, X_wdi_tf_test, Y_wdi_tf_test)

RidgeRegression(1)
RidgeRegression(.1)#adjusting penalty: worse
RidgeRegression(10)#better
RidgeRegression(100)#worse
RidgeRegression(10, .01) #gives error because learning rate is too high
RidgeRegression(10, .00001, 10000) #lower learning rate, more epochs: finds finer-grained solution

[[ 31.84  34.34  46.63  26.49]
 [ 10.89  28.57  35.85   2.36]
 [-11.33  31.25  45.23  10.02]
 ..., 
 [  6.29  53.73  39.12   8.41]
 [  8.08  19.92  70.22  54.9 ]
 [ 28.37  23.95 -47.84  82.01]]
('Epoch', 0, '; penalizedCost =', 563823.94)
('Epoch', 10, '; penalizedCost =', inf)
('Epoch', 20, '; penalizedCost =', nan)
('Epoch', 30, '; penalizedCost =', nan)
('Epoch', 40, '; penalizedCost =', nan)
('Epoch', 50, '; penalizedCost =', nan)
('Epoch', 60, '; penalizedCost =', nan)
('Epoch', 70, '; penalizedCost =', nan)
('Epoch', 80, '; penalizedCost =', nan)
('Epoch', 90, '; penalizedCost =', nan)
[[ nan]
 [ nan]
 [ nan]]


ValueError: Input contains NaN, infinity or a value too large for dtype('float64').