### Computing PCA using RDDs

###  PCA

The vectors that we want to analyze have length, or dimension, of 365, corresponding to the number of 
days in a year.

We want to perform [Principle component analysis (PCA)](https://en.wikipedia.org/wiki/Principal_component_analysis)
on these vectors. There are two steps to this process:

1. Computing the covariance matrix: this is a  simple computation. However, it takes a long time to compute and it benefits from using an RDD because it involves all of the input vectors.
2. Computing the eigenvector decomposition. this is a more complex computation, but it takes a fraction of a second because the size to the covariance matrix is $365 \times 365$, which is quite small. We do it on the head node usin `linalg`

### Computing the covariance matrix
Suppose that the data vectors are the column vectors denoted $x$ then the covariance matrix is defined to be
$$
E(x x^T)-E(x)E(x)^T
$$

Where $x x^T$ is the **outer product** of $x$ with itself.

If the data that we have is $x_1,x_2,x_n$ then the estimates we use are:
$$
\hat{E}(x x^T) = \frac{1}{n} \sum_{i=1}^n x_i x_i^T,\;\;\;\;\;
\hat{E}(x) = \frac{1}{n} \sum_{i=1}^n x_i
$$

### `nan`s in arithmetic operations
* We store all of the measurements as single `bytearray` in a single column. Instead of using 365 columns.
* Why?
  1. Because serializing and desirializing is faster that way.
  1. Because numpy treats `nan` entries correctly:
      * In `numpy.nansum` `5+nan=5` while in dataframes `5+nan=nan`

In [1]:
import numpy as np
X=np.array([1,1,1,2])
print 'mean of',X,'=',np.mean(X)
print 'nanmean of',X,'=',np.nanmean(X)
X=np.array([1,1,np.NaN,2])
print 'mean of',X,'=',np.mean(X)
print 'nanmean of',X,'=',np.nanmean(X)

mean of [1 1 1 2] = 1.25
nanmean of [1 1 1 2] = 1.25
mean of [  1.   1.  nan   2.] = nan
nanmean of [  1.   1.  nan   2.] = 1.33333333333


#### When should you not use `np.nanmean` ?
Using `n.nanmean` is equivalent to assuming that choice of which elements to remove is independent of the values of the elements. 
* Example of bad case: suppose the larger elements have a higher probability of being `nan`. In that case `np.nanmean` will under-estimate the mean

#### Computing Cov matrix on vectors with NaNs
As it happens, we often get vectors $x$ in which some, but not all, of the entries are `nan`. 
Suppose that we want to compute the mean of the elements of $x$. If we use `np.mean` we will get the result `nan`. A useful alternative is to use `np.nanmean` which removes the `nan` elements and takes the mean of the rest.

#### Computing the covariance  when there are `nan`s
The covariance is a mean of outer products.

If the data that we have is $x_1,x_2,x_n$ then the estimates we use are:
$$
\hat{E}(x x^T) = \frac{1}{n} \sum_{i=1}^n x_i x_i^T,\;\;\;\;\;
\hat{E}(x) = \frac{1}{n} \sum_{i=1}^n x_i
$$

In [2]:
x1=np.array([1,np.NaN,3,4,5])
x2=np.array([2,3,4,np.NaN,6])
stacked=np.array([np.outer(x1,x1),np.outer(x2,x2)])
stacked

array([[[  1.,  nan,   3.,   4.,   5.],
        [ nan,  nan,  nan,  nan,  nan],
        [  3.,  nan,   9.,  12.,  15.],
        [  4.,  nan,  12.,  16.,  20.],
        [  5.,  nan,  15.,  20.,  25.]],

       [[  4.,   6.,   8.,  nan,  12.],
        [  6.,   9.,  12.,  nan,  18.],
        [  8.,  12.,  16.,  nan,  24.],
        [ nan,  nan,  nan,  nan,  nan],
        [ 12.,  18.,  24.,  nan,  36.]]])

In [3]:
np.nanmean(stacked,axis=0)



array([[  2.5,   6. ,   5.5,   4. ,   8.5],
       [  6. ,   9. ,  12. ,   nan,  18. ],
       [  5.5,  12. ,  12.5,  12. ,  19.5],
       [  4. ,   nan,  12. ,  16. ,  20. ],
       [  8.5,  18. ,  19.5,  20. ,  30.5]])

### Loading Data into Dataframe

In [4]:
import findspark
findspark.init()

from pyspark import SparkContext
#sc.stop()
## add after creating spark_PCA.py
sc = SparkContext(master="local[3]",pyFiles=['lib/numpy_pack.py','lib/computeStats.py','lib/spark_PCA.py']) #

from pyspark import SparkContext
from pyspark.sql import *
sqlContext = SQLContext(sc)

In [5]:
import sys
sys.path.append('./lib')

import numpy as np
from numpy_pack import packArray,unpackArray
# add after creating spark_PCA.py
# from spark_PCA import computeCov
from computeStats import computeOverAllDist, STAT_Descriptions

### Climate data

The data we will use here comes from [NOAA](https://www.ncdc.noaa.gov/). Specifically, it was downloaded from This [FTP site](ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/).

There is a large variety of measurements from all over the world, from 1870 will 2012.
in the directory `../../Data/Weather` you will find the following useful files:

* data-source.txt: the source of the data
* ghcnd-readme.txt: A description of the content and format of the data
* ghcnd-stations.txt: A table describing the Meteorological stations.



### Data cleaning

* Most measurements exists only for a tiny fraction of the stations and years. We therefor restrict our use to the following measurements:
```python
['TMAX', 'SNOW', 'SNWD', 'TMIN', 'PRCP', 'TOBS']
```

* We consider only measurement-years that have at most 50 `NaN` entries

* We consider only measurements in the continential USA

* We partition the stations into 256 geographical rectangles, indexed from BBBBBBBB to SSSSSSSS. And each containing about 12,000 station,year pairs.

In [6]:
file_index='SSSBSBBB'
filebase='US_Weather_%s'%file_index
filename='US_Weather_%s.csv.gz'%file_index
data_dir='../../Data/Weather'
del_files='%s/%s*'%(data_dir,filename[:-3])
print del_files
#!rm $del_files
command="curl https://mas-dse-open.s3.amazonaws.com/Weather/small/%s > %s/%s"%(filename,data_dir,filename)
print command
#!$command
!ls -lh $data_dir/$filename

../../Data/Weather/US_Weather_SSSBSBBB.csv*
curl https://mas-dse-open.s3.amazonaws.com/Weather/small/US_Weather_SSSBSBBB.csv.gz > ../../Data/Weather/US_Weather_SSSBSBBB.csv.gz


the public mailing list cygwin@cygwin.com
ls: cannot access ../../Data/Weather/US_Weather_SSSBSBBB.csv.gz: No such file or directory


In [7]:
!gzip -d $data_dir/$filename
filename=data_dir+'/US_Weather_SSSBSBBB.csv'
!ls -lh $filename

the public mailing list cygwin@cygwin.com
gzip: ../../Data/Weather/US_Weather_SSSBSBBB.csv.gz: No such file or directory


---------- 1 wenyan mkpasswd 13M May 14 14:28 ../../Data/Weather/US_Weather_SSSBSBBB.csv


the public mailing list cygwin@cygwin.com


In [8]:
import pickle
List=pickle.load(open(filename,'rb'))
len(List)

12930

In [69]:
df=sqlContext.createDataFrame(List)
print df.count()
df.show(5)

12930
+---------+--------+---------+-----------+-----------+------+--------------------+------+--------+
|elevation|latitude|longitude|measurement|    station|undefs|              vector|  year|   label|
+---------+--------+---------+-----------+-----------+------+--------------------+------+--------+
|   1368.2| 39.6372|-119.7094|       TMAX|USC00267691|     7|[80 4D 90 55 80 5...|2010.0|SSSBSBBB|
|   1368.2| 39.6372|-119.7094|       TMIN|USC00267691|    10|[A0 D3 80 D4 90 D...|2010.0|SSSBSBBB|
|   1369.5| 39.5333|-119.8167|       TMAX|USC00266782|    32|[80 54 30 55 90 5...|1893.0|SSSBSBBB|
|   1369.5| 39.5333|-119.8167|       TMAX|USC00266782|     0|[40 56 90 55 80 5...|1896.0|SSSBSBBB|
|   1369.5| 39.5333|-119.8167|       TMAX|USC00266782|     4|[30 55 90 55 30 5...|1897.0|SSSBSBBB|
+---------+--------+---------+-----------+-----------+------+--------------------+------+--------+
only showing top 5 rows



In [10]:
#store dataframe as parquet file
filebase='US_Weather_%s'%file_index
outfilename=data_dir+'/'+filebase+'.parquet'
!rm -rf $outfilename
df.write.save(outfilename)

In [4]:
dom = [31,28,31,30,31,30,31,31,30,31,30,31]
[sum(dom[:i]) for i in range(12)]

[0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334]

In [90]:
def averageByMonth(vector):
    dom = [31,28,31,30,31,30,31,31,30,31,30,31]
    i = 0
    monthly = []
    for m in dom:
        monthly.append(np.nanmean(vector[i:i+m]))
        i+=m
    return np.array(monthly)

def to_monthly(row):
    row_new = copy(row)
    row_new['vector'] = packArray(averageByMonth(unpackArray(row['vector'],np.float16)))
    return row_new

In [116]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType

daily_to_monthly = udf(lambda vector: packArray(averageByMonth(unpackArray(vector,np.float16))), BinaryType())
#df.withColumn("day", day(df.date_time))

df_monthly = df.withColumn('vector', daily_to_monthly(df.vector))

In [122]:
#store dataframe as parquet file
filebase='US_Weather_monthly_%s'%file_index
outfilename=data_dir+'/'+filebase+'.parquet'
!rm -rf $outfilename
df_monthly.write.save(outfilename)

the public mailing list cygwin@cygwin.com


In [119]:
sqlContext.registerDataFrameAsTable(df,'weather') #using older sqlContext instead of newer (V2.0) sparkSession
sqlContext.registerDataFrameAsTable(df_monthly,'weather_monthly') #using older sqlContext instead of newer (V2.0) sparkSession

In [12]:
from time import time
t=time()

N=sc.defaultParallelism
print 'Number of executors=',N
print 'took',time()-t,'seconds'

Number of executors= 3
took 0.000999927520752 seconds


### Computing PCA for each measurement

In [13]:
measurements=['TMAX', 'SNOW', 'SNWD', 'TMIN', 'PRCP', 'TOBS']

#### Homework
The code below computes the covariance matrix using RDDs.
The code allows undefined entries and calculates the covariance without bias.

Your homework is to complete the missing parts in the code (Marked with `...`) so that it would calculate the covariance correctly.

In [14]:
#%%writefile lib/spark_PCA.py #once this works correctly, you should add it to the `lib` directory
# and use import

import numpy as np
from numpy import linalg as LA

def outerProduct(X):
    """Computer outer product and indicate which locations in matrix are undefined"""
    O=np.outer(X,X)
    N=1-np.isnan(O)
    return (O,N)

def sumWithNan(M1,M2):
    """Add two pairs of (matrix,count)"""
    (X1,N1)=M1
    (X2,N2)=M2
    N=N1+N2
    X=np.nansum(np.dstack((X1,X2)),axis=2)
    return (X,N)

def computeCov(RDDin):
    """computeCov recieves as input an RDD of np arrays, all of the same length, 
    and computes the covariance matrix for that set of vectors"""
    RDD=RDDin.map(lambda v:np.array(np.insert(v,0,1),dtype=np.float64)) 
                     # insert a 1 at the beginning of each vector so that the same 
                     #calculation also yields the mean vector
    OuterRDD=RDD.map(outerProduct)  # separating the map and the reduce does not matter because of Spark uses lazy execution.
    (S,N)=OuterRDD.reduce(sumWithNan)
    E=S[0,1:]
    NE=np.float64(N[0,1:])
    print 'shape of E=',E.shape,'shape of NE=',NE.shape
    Mean=E/NE
    O=S[1:,1:]
    NO=np.float64(N[1:,1:])
    Cov=O/NO - np.outer(Mean,Mean)
    # Output also the diagnal which is the variance for each day
    Var=np.array([Cov[i,i] for i in range(Cov.shape[0])])
    return {'E':E,'NE':NE,'O':O,'NO':NO,'Cov':Cov,'Mean':Mean,'Var':Var}

In [17]:
from numpy import linalg as LA
STAT={}  # dictionary storing the statistics for each measurement
Clean_Tables={}

for meas in measurements:
    t=time()
    Query="SELECT * FROM weather\n\tWHERE measurement='%s' "%(meas)
    print Query
    df1 = sqlContext.sql(Query)
    data=df1.rdd.map(lambda row: unpackArray(row['vector'],np.float16))
    #get basic statistics
    STAT[meas]=computeOverAllDist(data)   # Compute the statistics 

    # compute covariance matrix
    OUT=computeCov(data)

    #find PCA decomposition
    eigval,eigvec=LA.eig(OUT['Cov'])

    # collect all of the statistics in STAT[meas]
    STAT[meas]['eigval']=eigval
    STAT[meas]['eigvec']=eigvec
    STAT[meas].update(OUT)

    print 'time for',meas,'is',time()-t

SELECT * FROM weather
	WHERE measurement='TMAX' 
shape of E= (365L,) shape of NE= (365L,)
time for TMAX is 34.2880001068
SELECT * FROM weather
	WHERE measurement='SNOW' 
shape of E= (365L,) shape of NE= (365L,)
time for SNOW is 29.6659998894
SELECT * FROM weather
	WHERE measurement='SNWD' 
shape of E= (365L,) shape of NE= (365L,)
time for SNWD is 25.5550000668
SELECT * FROM weather
	WHERE measurement='TMIN' 
shape of E= (365L,) shape of NE= (365L,)
time for TMIN is 32.8420000076
SELECT * FROM weather
	WHERE measurement='PRCP' 
shape of E= (365L,) shape of NE= (365L,)
time for PRCP is 35.4539999962
SELECT * FROM weather
	WHERE measurement='TOBS' 
shape of E= (365L,) shape of NE= (365L,)
time for TOBS is 23.6119999886


In [21]:
STAT['TMIN']

{'Cov': array([[ 3845.68476627,  2952.35342334,  2031.12085187, ...,
           961.7690815 ,  1037.77936595,  1259.0198822 ],
        [ 2952.35342334,  4064.01905519,  3060.93253379, ...,
          1026.76997668,  1122.07990915,  1374.36361552],
        [ 2031.12085187,  3060.93253379,  4492.4231735 , ...,
          1232.78658599,  1292.24520079,  1543.53302237],
        ..., 
        [  961.7690815 ,  1026.76997668,  1232.78658599, ...,
          3479.72757828,  2791.34961772,  2304.25063747],
        [ 1037.77936595,  1122.07990915,  1292.24520079, ...,
          2791.34961772,  3718.45413395,  2912.2451099 ],
        [ 1259.0198822 ,  1374.36361552,  1543.53302237, ...,
          2304.25063747,  2912.2451099 ,  3831.16574049]]),
 'E': array([-157922., -164322., -168709., -165125., -165648., -165866.,
        -148950., -145902., -141158., -134723., -148891., -149337.,
        -148110., -140731., -142112., -146280., -150503., -145253.,
        -147857., -156370., -157452., -149018., 

In [120]:
from numpy import linalg as LA
STAT_by_month={}  # dictionary storing the statistics for each measurement
Clean_Tables={}

for meas in measurements:
    t=time()
    Query="SELECT * FROM weather_monthly\n\tWHERE measurement='%s' "%(meas)
    #print Query
    df1 = sqlContext.sql(Query)
    data_by_month=df1.rdd.map(lambda row: unpackArray(row['vector'],np.float16))
    #get basic statistics
    #data_by_month = data.map(averageByMonth)
    STAT_by_month[meas]=computeOverAllDist(data_by_month)   # Compute the statistics 

    # compute covariance matrix
    OUT=computeCov(data_by_month)

    #find PCA decomposition
    eigval,eigvec=LA.eig(OUT['Cov'])

    # collect all of the statistics in STAT[meas]
    STAT_by_month[meas]['eigval']=eigval
    STAT_by_month[meas]['eigvec']=eigvec
    STAT_by_month[meas].update(OUT)

    print 'time for',meas,'is',time()-t

shape of E= (12L,) shape of NE= (12L,)
time for TMAX is 18.4509999752
shape of E= (12L,) shape of NE= (12L,)
time for SNOW is 17.6240000725
shape of E= (12L,) shape of NE= (12L,)
time for SNWD is 17.5130000114
shape of E= (12L,) shape of NE= (12L,)
time for TMIN is 17.9160001278
shape of E= (12L,) shape of NE= (12L,)
time for PRCP is 18.0879998207
shape of E= (12L,) shape of NE= (12L,)
time for TOBS is 17.4059998989


In [65]:
STAT_by_month['TMIN']

{'Cov': array([[ 1826.45812838,  1242.73157485,   975.55213565,   820.58884001,
           793.17918125,   875.41244748,   957.70559601,   978.7063259 ,
          1092.04147546,  1090.66617933,  1064.27300513,  1203.84986485],
        [ 1242.73157485,  1548.61679906,  1020.55446696,   901.30597043,
           895.12474067,   934.05112263,   969.77670229,   985.4988463 ,
          1029.64457092,  1053.71704481,  1012.52784748,  1113.80959044],
        [  975.55213565,  1020.55446696,  1198.93684099,   917.57522517,
           925.94688728,   926.82489638,   952.3182791 ,   969.83998356,
           947.69985052,   935.5247331 ,   825.31791318,   864.4206385 ],
        [  820.58884001,   901.30597043,   917.57522517,  1076.50026236,
           919.63901191,   948.2412117 ,   987.41824383,   943.55297149,
           889.05819633,   836.23111555,   721.56403621,   761.24944661],
        [  793.17918125,   895.12474067,   925.94688728,   919.63901191,
          1165.15908791,  1030.37655287,

In [22]:
from pickle import dump
filename=data_dir+'/STAT_%s.pickle'%file_index
dump((STAT,STAT_Descriptions),open(filename,'wb'))
!ls -lrth $data_dir

total 137M
---------- 1 wenyan mkpasswd  17M May  8 21:13 STAT_BBBSBBBB.pickle.gz
---------- 1 wenyan mkpasswd  13M May  8 21:13 US_Weather_BBBSBBBB.csv
d--------- 1 wenyan mkpasswd    0 May  8 21:13 US_Weather_BBBSBBBB.parquet
---------- 1 wenyan mkpasswd  226 May  8 21:13 data-source.txt
d--------- 1 wenyan mkpasswd    0 May  8 21:14 decon_BBBSBBBB.parquet
d--------- 1 wenyan mkpasswd    0 May  8 21:14 decon_BBBSBBBB_SNWD.parquet
d--------- 1 wenyan mkpasswd    0 May  8 21:14 decon_BBBSBBBB_TOBS.parquet
---------- 1 wenyan mkpasswd  23K May  8 21:14 ghcnd-readme.txt
---------- 1 wenyan mkpasswd 8.1M May  8 21:14 ghcnd-stations.txt
---------- 1 wenyan mkpasswd 7.5M May  8 21:14 ghcnd-stations_buffered.txt
---------- 1 wenyan mkpasswd  272 May  8 21:14 ghcnd-version.txt
---------- 1 wenyan mkpasswd 2.1M May  8 21:14 stations.pkl.gz
---------- 1 wenyan mkpasswd  13M May  9 20:47 US_Weather_SSSBSBBB.csv
d--------- 1 wenyan mkpasswd    0 May  9 20:47 US_Weather_SSSBSBBB.parquet
----------

the public mailing list cygwin@cygwin.com


In [67]:
from pickle import dump
filename=data_dir+'/STAT_monthly_%s.pickle'%file_index
dump((STAT_by_month,STAT_Descriptions),open(filename,'wb'))
!ls -lrth $data_dir

total 121M
---------- 1 wenyan mkpasswd  226 May  8 21:13 data-source.txt
---------- 1 wenyan mkpasswd  23K May  8 21:14 ghcnd-readme.txt
---------- 1 wenyan mkpasswd 8.1M May  8 21:14 ghcnd-stations.txt
---------- 1 wenyan mkpasswd 7.5M May  8 21:14 ghcnd-stations_buffered.txt
---------- 1 wenyan mkpasswd  272 May  8 21:14 ghcnd-version.txt
---------- 1 wenyan mkpasswd  14M May  8 21:14 stations.pkl
d--------- 1 wenyan mkpasswd    0 May  9 20:47 US_Weather_SSSBSBBB.parquet
---------- 1 wenyan mkpasswd  78M May  9 21:12 STAT_SSSBSBBB.pickle
d--------- 1 wenyan mkpasswd    0 May  9 22:08 decon_SSSBSBBB_PRCP.parquet
---------- 1 wenyan mkpasswd 7.8K May 12 22:04 PRCP_residuals_PCA.pickle
d--------- 1 wenyan mkpasswd    0 May 14 11:36 decon_SSSBSBBB_SNOW.parquet
d--------- 1 wenyan mkpasswd    0 May 14 11:58 decon_SSSBSBBB_TOBS.parquet
---------- 1 wenyan mkpasswd  20K May 14 11:58 TOBS_min_coeffs.pickle
---------- 1 wenyan mkpasswd  18K May 14 11:58 TOBS_max_coeffs.pickle
---------- 1 we

the public mailing list cygwin@cygwin.com


In [23]:

sc.stop()