# Fundamental Programming Basics

In [1]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))


if ('sc' in locals() or 'sc' in globals()):
    printmd('<<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>>')


In [2]:
!pip install pyspark==2.3.0

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes


In [3]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [4]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

# Exercise 2
## Part 1
Calculate covariance and correlation using ApacheSpark

Create two random RDD’s, which shouldn't correlate at all.

In [5]:
import random
rddX = sc.parallelize(random.sample(list(range(100)),100))
rddY = sc.parallelize(random.sample(list(range(100)),100))

Calculate the mean, note that explicitly cast the denominator to float in order to obtain a float instead of int

In [6]:
meanX = rddX.sum()/float(rddX.count())
meanY = rddY.sum()/float(rddY.count())
print (meanX)
print (meanY)

49.5
49.5


Calculate the covariance

In [7]:
rddXY = rddX.zip(rddY)
covXY = rddXY.map(lambda x_y : (x_y[0]-meanX)*(x_y[1]-meanY)).sum()/rddXY.count()
covXY

-0.89

Covariance is not a normalized measure. Therefore it is used to calculate correlation. First is to calculate the indivicual standard deviations 

In [8]:
from math import sqrt
n = rddXY.count()
sdX = sqrt(rddX.map(lambda x : pow(x-meanX,2)).sum()/n)
sdY = sqrt(rddY.map(lambda x : pow(x-meanY,2)).sum()/n)
print (sdX)
print (sdY)

28.86607004772212
28.86607004772212


Calculate the correlation

In [9]:
corrXY = covXY / (sdX * sdY)
corrXY

-0.0010681068106810682

Calculate the skewness

In [10]:
sumX = rddX.sum()
sumY = rddY.sum()

nX = rddX.count()
nY = rddY.count()

In [11]:
from math import sqrt

In [12]:
skewX = rddX.map(lambda x: pow(x-meanX,3)/pow(sdX,3)).sum()
skewX

-1.5543122344752192e-15

rddX is perfectly symmetrical

In [13]:
skewY = rddY.map(lambda x: pow(x-meanY,3)/pow(sdY,3)).sum()
skewY

-2.4424906541753444e-15

rddY is negative skewed

Calculate the kurtosis

In [14]:
kurtosisX = rddX.map(lambda x: pow(x-meanX,4)/pow(sdX,4)).sum()/(1-float(rddX.count()))
kurtosisX

-1.8179393696945467

Negative values of kurtosis indicate that X distribution is flat and has thin tails = Platykurtic distributions 

In [15]:
kurtosisY = rddY.map(lambda x: pow(x-meanY,3)/pow(sdY,3)).sum()/(1-float(rddY.count()))
kurtosisY

2.4671622769447922e-17

Positive values of kurtosis indicate that Y distribution is peaked and possess thick tails = Leptokurtic distributions

## Part 2
Create a correlation matrix out of the four RDDs

In [16]:
from pyspark.mllib.stat import Statistics
import random
column1 = sc.parallelize(range(100))
column2 = sc.parallelize(range(100,200))
column3 = sc.parallelize(list(reversed(range(100))))
column4 = sc.parallelize(random.sample(range(100),100))
data = column1.zip(column2).zip(column3).zip(column4).map(lambda a_b_c_d : (a_b_c_d[0][0][0],a_b_c_d[0][0][1],a_b_c_d[0][1],a_b_c_d[1]) ).map(lambda a_b_c_d : [a_b_c_d[0],a_b_c_d[1],a_b_c_d[2],a_b_c_d[3]])
print(Statistics.corr(data))

[[ 1.          1.         -1.         -0.09682568]
 [ 1.          1.         -1.         -0.09682568]
 [-1.         -1.          1.          0.09682568]
 [-0.09682568 -0.09682568  0.09682568  1.        ]]
