In [2]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [3]:
import numpy as np
from functools import reduce
from scipy.stats import kurtosis, skew

# Statistics with PySpark and RDDs

In this exercise we will use functional programming with Python to compute from a list of  N integers:

  1.  the mean
   
        $\mu = \frac{1}{N} \sum_{i=1}^N x_i.$
        
        
  
  2.  the standard deviation
  
        $\sigma = \sqrt{\frac{1}{N} \sum_{i=1}^N (x_i - \mu)^2}$
        
        
        
  3.  the skew 
   
        $\gamma_1 = \frac{1}{N} \sum_{i=1}^N \left[\frac{x_i - \mu}{\sigma}\right]^3$
        
        
        
  4.  the kurtosis
  
         $g_2 = \Big\{   \frac{1}{N} \sum_{i=1}^N \left[\frac{x_i - \mu}{\sigma}\right]^4  \Big\} - 3$
         
         
  For each case above we will define a set of transformations followed by reduction to obtain the results
  
         
         
         

In [4]:
def test_assert():
    """Test that assert is working.

    Args:
        None
    Returns:
        x with value 5
    """
    x = 5
    assert x == 5
    return x
    
y = test_assert() 
assert y == 5

# Convert a list into an RDD


In [6]:
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

[819, 401, 655, 397, 587]

# First Moment

### Define a function, first_moment_term, to  compute the MEAN

 $\mu = \frac{1}{N} \sum_{i=1}^N x_i.$
 
 We will:
 
 1.  Map each term of the list to the correct moment
 2.  Sum the transformed elements over N
 

####  Define the transformation for the first moment terms 
    hint - it is the identity 

In [10]:
def first_moment_term(x):
    """Define the first moment term

    Args:
        x - An element of a list - a term
    Returns:
        transformed term - an identity in this case, x
    """
    
    return x   
    


In [11]:
x = 1

new_term = first_moment_term(x)

print(new_term)

assert new_term == 1

1


In [14]:
def my_mean(rdd):
    """Compute the mean

    Args:
        rdd - an rdd of  integers
        
    Returns:
        the mean of the rdd
    """
    

    transformed_rdd = rdd.map(lambda x : first_moment_term(x))
    mean = transformed_rdd.reduce(lambda x, y : x + y)/rdd.count()
    
    return mean 
    

In [15]:
rdd = sc.parallelize(range(1000))
test_mean = my_mean(rdd)
print(test_mean)
a = np.array(range(1000))
assert test_mean == np.mean(a)

499.5


# Second Moment

### Define a function, second_moment_term, to compute the standard deviation

$\sigma = \sqrt{\frac{1}{N} \sum_{i=1}^N (x_i - \mu)^2}$
 
 We will:
 
 1.  Map each term of the list to the correct moment
 2.  Sum the transformed elements over N
 
 


####  Define the transformation for the first moment terms 

$(x_i - \mu)^2$
   

In [26]:
def second_moment_term(x, mu):
    """Define the first moment term

    Args:
        x - An element of a list - a term
        mu - the mean
    Returns:
        transformed term - an identity in this case, (x - mu)**2
    """
    
    return (x - mu) ** 2  
  

In [27]:
val = 5
mu = 3
term = second_moment_term(x, mu)
assert term == 4


### Transform and reduce to get the standard deviation


   $\sigma = \sqrt{\frac{1}{N} \sum_{i=1}^N (x_i - \mu)^2}$
   
    
   N = len(list)
   
   $\mu$ is the mean
   
   
##### Note:  Use Numpy for the square root  - 

      np.sqrt(x)
 

In [30]:
def my_stdev(rdd, mu):
    """Define a function to compute the standard devivation for an RDD

    Args:
        rdd - a list of integers
        mu - the mean of the rdd
        
    Returns:
        the standard deviation of the list
    """
    
    
    transformed_rdd = rdd.map(lambda x : second_moment_term(x, mu))
    transformed_rdd.collect()
    
    variance = transformed_rdd.reduce(lambda x, y : x + y)/rdd.count()
    
    return np.sqrt(variance)
    

In [32]:
rdd = sc.parallelize(range(1000))
test_mean = my_mean(rdd)
print(test_mean)
test_stdev = my_stdev(rdd, test_mean)
print(test_stdev)
a = np.array(range(1000))
assert test_mean == np.mean(a)
assert test_stdev == np.std(a)
print(np.std(a))

499.5
288.6749902572095
288.6749902572095


# Third Moment



## Define a function, third_moment_term, to compute the Skew

$\gamma_1 = \frac{1}{N} \sum_{i=1}^N \left[\frac{x_i - \mu}{\sigma}\right]^3$

We will:

Map each term of the list to the correct moment
Sum the transformed elements over N


In [33]:
def third_moment_term(x, mu, sigma):
    """Define the third moment term

    Args:
        x - An element of a list - a term
        mu - the mean
        sigma - the standard deviation
    Returns:
        transformed term - an identity in this case, ((x - mu)/sigma)**3
    """
    
    return ((x - mu)/sigma) ** 3  
  

In [34]:
x = 5
mu = 3
sigma = 1
y = third_moment_term(x, mu, sigma)
assert y == 8

### Transform and reduce to obtain the skew


   $\gamma_1 = \frac{1}{N} \sum_{i=1}^N \left[\frac{x_i - \mu}{\sigma}\right]^3$
   
    
   N = len(list)
   
   $\mu$ is the mean
   
   
   $\sigma$ is the standard deviation
   
   

 

In [48]:
def my_skew(rdd, mu, sigma):
    """Compute the third moment, the skew

    Args:
        rdd - an rdd of integers
        mu - the mean of the rdd
        sigma - the standard deviation
        
    Returns:
        the skew of the list
    """
    
    transformed_rdd = rdd.map(lambda x : third_moment_term(x, mu, sigma))
    skew = transformed_rdd.reduce(lambda x, y : x + y)/rdd.count()
    
    return skew
    

In [42]:
rdd = sc.parallelize(range(1000))
test_mean = my_mean(rdd)
print(test_mean)
test_stdev = my_stdev(rdd, test_mean)
print(test_stdev)
test_skew = my_skew(rdd, test_mean, test_stdev)
print(test_skew)
print(skew(a))
a = np.array(range(1000))
assert test_mean == np.mean(a)
assert test_stdev == np.std(a)
assert test_skew - skew(a) < np.finfo(np.float).eps

499.5
288.6749902572095
1.1368683772161603e-16
0.0


# Fourth Moment

## Define a function, fourth_moment_term, to compute the kurtosis

$g_2 = \Big\{   \frac{1}{N} \sum_{i=1}^N \left[\frac{x_i - \mu}{\sigma}\right]^4  \Big\} - 3$

We will:

Map each term of the list to the correct moment
Sum the transformed elements over N

In [44]:
def fourth_moment_term(x, mu, sigma):
    """Define the fourth moment term

    Args:
        x - An element of a list - a term
        mu - the mean
        sigma - the standard deviation
    Returns:
        transformed term - an identity in this case, ((x - mu)/sigma)**4
    """
    
    return ((x - mu)/sigma) ** 4  
  

In [45]:
x = 5
mu = 3
sigma = 1
y = fourth_moment_term(x, mu, sigma)
assert y == 16

 ### Transform and reduce to obtain the kurtosis

   $g_2 = \Big\{   \frac{1}{N} \sum_{i=1}^N \left[\frac{x_i - \mu}{\sigma}\right]^4  \Big\} - 3$

    
   N = len(list)
   
   $\mu$ is the mean
   
   
   $\sigma$ is the standard deviation
   

In [46]:
def my_kurtosis(rdd, mu, sigma):
    """Define the fourth moment, the kurtosis

    Args:
        rdd - an rdd of integers
        mu - the mean of the list
        sigma - the standard deviation
        
    Returns:
        the skew of the list
    """
    
    
    transformed_rdd = rdd.map(lambda x : fourth_moment_term(x, mu, sigma))
    kurtosis = transformed_rdd.reduce(lambda x, y : x + y)/rdd.count()
    
    return kurtosis - 3
    

In [47]:
rdd  = sc.parallelize(range(1000))
test_mean = my_mean(rdd)
print(test_mean)
test_stdev = my_stdev(rdd, test_mean)
print(test_stdev)
test_skew = my_skew(rdd, test_mean, test_stdev)
print(test_skew)
test_kurtosis = my_kurtosis(rdd, test_mean, test_stdev)
print(test_kurtosis)
a = np.array(range(1000))
assert test_mean == np.mean(a)
assert test_stdev == np.std(a)
assert test_skew  - skew(a) <  np.finfo(np.float32).eps
assert test_kurtosis - kurtosis(a) < np.finfo(np.float32).eps
#assert test_kurtosis == kurtosis(a)


499.5
288.6749902572095
1.1368683772161603e-16
-1.2000024000024
