## Computing the statistics

In [None]:
%%time
%pwd
from glob import glob
from pyspark import SparkContext

In [38]:
sc.stop()

In [39]:
lib_dir='/mnt/workspace/edX-Micro-Master-in-Data-Science/big-data-analytics-using-spark/notebooks/Section2-PCA/PCA/data_preparation/lib'
libfiles=glob(lib_dir+'/*.py')

sc = SparkContext(pyFiles=libfiles)

In [41]:
N=sc.defaultParallelism
print('Number of executors=',N)

from pyspark import SparkContext
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

('Number of executors=', 2)


In [42]:
#import unpacking routine
import sys
sys.path.append('./lib')
from numpy_pack import unpackArray

import numpy as np

In [43]:
STAT_Descriptions=[
('SortedVals', 'Sample of values', 'vector whose length varies between measurements'),
 ('UnDef', 'sample of number of undefs per row', 'vector whose length varies between measurements'),
 ('mean', 'mean value', ()),
 ('std', 'std', ()),
 ('low100', 'bottom 1%', ()),
 ('high100', 'top 1%', ()),
 ('low1000', 'bottom 0.1%', ()),
 ('high1000', 'top 0.1%', ()),
 ('E', 'Sum of values per day', (365,)),
 ('NE', 'count of values per day', (365,)),
 ('Mean', 'E/NE', (365,)),
 ('O', 'Sum of outer products', (365, 365)),
 ('NO', 'counts for outer products', (365, 365)),
 ('Cov', 'O/NO', (365, 365)),
 ('Var', 'The variance per day = diagonal of Cov', (365,)),
 ('eigval', 'PCA eigen-values', (365,)),
 ('eigvec', 'PCA eigen-vectors', (365, 365))
  ]

In [44]:
US_Weather_parquet='/weather/US_weather.parquet'
measurements=['TMAX','TMIN','TOBS','SNOW','SNWD','PRCP']
Query="SELECT * FROM parquet.`%s`"%US_Weather_parquet
print(Query)

from time import time
t=time()
df = sqlContext.sql(Query).cache()
print(df.count())
print('took',time()-t,'seconds')

SELECT * FROM parquet.`/weather/US_weather.parquet`
3259494
('took', 21.63391399383545, 'seconds')


In [45]:
US_stations_parquet='/weather/US_stations.parquet'
Query="SELECT * FROM parquet.`%s`"%US_stations_parquet
print(Query)

t=time()
stations = sqlContext.sql(Query).cache()
print(stations.count())
print('took',time()-t,'seconds')

SELECT * FROM parquet.`/weather/US_stations.parquet`
12140
('took', 3.763619899749756, 'seconds')


In [46]:
station_state=stations.select(['Station','State'])
station_state.show(4)

+-----------+-----+
|    Station|State|
+-----------+-----+
|USC00341900|   OK|
|USC00428114|   UT|
|USC00165926|   LA|
|USC00411974|   TX|
+-----------+-----+
only showing top 4 rows



In [47]:
df.show(2)

+-----------+-----------+----+--------------------+
|    Station|Measurement|Year|              Values|
+-----------+-----------+----+--------------------+
|CA001126150|       PRCP|1941|[00 7E 00 7E 00 7...|
|CA001126150|       PRCP|1942|[00 00 80 4A 00 0...|
+-----------+-----------+----+--------------------+
only showing top 2 rows



In [48]:
df1=df.join(station_state,on='Station', how='inner')
df1.show(2)

+-----------+-----------+----+--------------------+-----+
|    Station|Measurement|Year|              Values|State|
+-----------+-----------+----+--------------------+-----+
|CA001126150|       PRCP|1941|[00 7E 00 7E 00 7...|  NaN|
|CA001126150|       PRCP|1942|[00 00 80 4A 00 0...|  NaN|
+-----------+-----------+----+--------------------+-----+
only showing top 2 rows



In [49]:
records_per_state_pdf=df1.groupby('State').count().toPandas()
records_per_state_pdf = records_per_state_pdf.sort_values('count')
records_per_state_pdf

Unnamed: 0,State,count
54,NB,120
5,DC,149
29,MB,193
44,AB,594
43,SK,740
8,QC,1218
12,BC,1366
42,ON,1857
9,RI,2608
25,DE,5277


In [50]:
def Unpack(V):
    return unpackArray(V,data_type=np.float16)


In [51]:
%%time
rdd0=df.rdd.map(lambda row:(row.Station,row.Measurement,row.Year,Unpack(row.Values))).cache()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 17.9 ms


### Crashes in the next cell if run using Python3

In [52]:
%%time

rdd0.count()

CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 37.7 s


3259494

In [53]:
rdd0.take(1)

[(u'CA001126150',
  u'PRCP',
  1941,
  array([ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
          nan,  nan,   0.,  61.,  36.,  91.,   0.,   0.,   0.,   0.,  36.,
          91.,   0.,   0.,   0.,  18.,   3.,   0.,   0.,   0.,   0.,   0.,
           0.,   0.,   0.,   0.,   0.,   0.,  41.,   0.,   0., 188.,   8.,
           0.,  28.,  20.,   0.,   0.,   0.,   0.,   0.,   0.,  20.,   0.,
           0.,   0.,   0.,   0.,  53.,   0.,   0.,   0.,   0., 

In [54]:
t=time()
print rdd0.count()
print 'took',time()-t,'seconds'

3259494
took 1.96101808548 seconds


In [22]:
# Compute the overall distribution of values and the distribution of the number of nan per year
def find_percentiles(SortedVals,percentile):
  L=len(SortedVals)/percentile
  return SortedVals[L],SortedVals[-L]
  
def computeOverAllDist(rdd0):
  UnDef=np.array(rdd0.map(lambda row:sum(np.isnan(row))).sample(False,0.01).collect())
  flat=rdd0.flatMap(lambda v:list(v)).filter(lambda x: not np.isnan(x)).cache()
  count,S1,S2=flat.map(lambda x: np.float64([1,x,x**2]))\
                  .reduce(lambda x,y: x+y)
  mean=S1/count
  std=np.sqrt(S2/count-mean**2)
  Vals=flat.sample(False,0.0001).collect()
  SortedVals=np.array(sorted(Vals))
  low100,high100=find_percentiles(SortedVals,100)
  low1000,high1000=find_percentiles(SortedVals,1000)
  return {'UnDef':UnDef,\
          'mean':mean,\
          'std':std,\
          'SortedVals':SortedVals,\
          'low100':low100,\
          'high100':high100,\
          'low1000':low100,\
          'high1000':high1000
          }

In [60]:
# %load lib/spark_PCA.py
import numpy as np
from numpy import linalg as LA

def outerProduct(X):
    """Computer outer product and indicate which locations in matrix are undefined"""
    O=np.outer(X,X)
    N=1-np.isnan(O)
    return (O,N)

def sumWithNan(M1,M2):
    """Add two pairs of (matrix,count)"""
    (X1,N1)=M1
    (X2,N2)=M2
    N=N1+N2
    X=np.nansum(np.dstack((X1,X2)),axis=2)
    return (X,N)

def computeCov(RDDin):
    """computeCov recieves as input an RDD of np arrays, all of the same length, 
    and computes the covariance matrix for that set of vectors"""
    RDD=RDDin.map(lambda v:np.array(np.insert(v,0,1),dtype=np.float64)) # insert a 1 at the beginning of each vector so that the same 
                                           #calculation also yields the mean vector
    OuterRDD=RDD.map(outerProduct)   # separating the map and the reduce does not matter because of Spark uses lazy execution.
    (S,N)=OuterRDD.reduce(sumWithNan)
    # Unpack result and compute the covariance matrix
    # print('RDD=',RDD.collect())
    # print('shape of S=',S.shape,'shape of N=',N.shape)
    # print('S=',S)
    # print('N=',N)
    E=S[0,1:]
    NE=np.float64(N[0,1:])
    print('shape of E=',E.shape,'shape of NE=',NE.shape)
    Mean=E/NE
    O=S[1:,1:]
    NO=np.float64(N[1:,1:])
    Cov=O/NO - np.outer(Mean,Mean)
    # Output also the diagnal which is the variance for each day
    Var=np.array([Cov[i,i] for i in range(Cov.shape[0])])
    return {'E':E,'NE':NE,'O':O,'NO':NO,'Cov':Cov,'Mean':Mean,'Var':Var}

if __name__=="__main__":
    # create synthetic data matrix with j rows and rank k
    
    V=2*(np.random.random([2,10])-0.5)
    data_list=[]
    for i in range(1000):
        f=2*(np.random.random(2)-0.5)
        data_list.append(np.dot(f,V))
    # compute covariance matrix
    RDD=sc.parallelize(data_list)
    OUT=computeCov(RDD)

    #find PCA decomposition
    eigval,eigvec=LA.eig(OUT['Cov'])
    print('eigval=',eigval)
    print('eigvec=',eigvec)

('shape of E=', (10,), 'shape of NE=', (10,))
('eigval=', array([ 1.52150512e+00+0.00000000e+00j, -1.73472348e-16+0.00000000e+00j,
        4.21481279e-01+0.00000000e+00j, -5.65743323e-17+0.00000000e+00j,
       -7.55673997e-17+0.00000000e+00j,  3.40199536e-17+2.62411719e-17j,
        3.40199536e-17-2.62411719e-17j,  3.37003138e-17+0.00000000e+00j,
       -2.10396305e-17+0.00000000e+00j,  1.71760135e-19+0.00000000e+00j]))
('eigvec=', array([[ 4.42175481e-01+0.        j,  8.78490286e-01+0.        j,
         1.80929992e-01+0.        j,  5.48283151e-02+0.        j,
        -2.77083972e-01+0.        j, -1.38838568e-01+0.11262092j,
        -1.38838568e-01-0.11262092j, -3.27840667e-02+0.        j,
         1.25681273e-01+0.        j, -2.98963442e-03+0.        j],
       [ 2.53835606e-01+0.        j, -1.19754390e-01+0.        j,
        -3.88924626e-02+0.        j,  2.05310085e-02+0.        j,
        -3.25215328e-03+0.        j, -6.36014218e-01+0.        j,
        -6.36014218e-01-0.        

In [61]:
from numpy import linalg as LA

STAT={}  # dictionary storing the statistics for each measurement
Clean_Tables={}

#for meas in measurements:
meas=measurements[0]
t=time()
Query="SELECT * FROM parquet.`%s`\n\tWHERE measurement = '%s'"%(US_Weather_parquet,meas)
print Query
df = sqlContext.sql(Query)

SELECT * FROM parquet.`/weather/US_weather.parquet`
	WHERE measurement = 'TMAX'


In [62]:
rdd0=df.rdd.map(lambda row:((row.Station,row.Measurement,row.Year),Unpack(row.Values))).cache()
print rdd0.take(1)
rdd1=rdd0.sample(False,1)\
    .map(lambda (key,val): val)\
    .cache()\
    .repartition(N)
print rdd1.take(1)
print rdd1.count()

[((u'CA001126150', u'TMAX', 1941), array([ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan, 183., 144., 128.,  78., 144., 167., 183., 178., 156.,
       172., 161., 183., 172., 200., 117., 122., 156., 167., 194., 228.,
       244., 250., 228., 206., 211., 250., 222., 244., 206., 206., 194.,
       194., 144., 167., 161., 178., 167., 178., 211., 194., 244., 217.,
       211., 217., 194., 194., 178., 161., 167., 217., 200., 206., 306.,
       250., 239

In [63]:
#get basic statistics
STAT[meas]=computeOverAllDist(rdd1)   # Compute the statistics 
low1000 = STAT[meas]['low1000']  # unpack the extreme values statistics
high1000 = STAT[meas]['high1000']

In [None]:
#clean up table from extreme values and from rows with too many undefinde entries.
rdd2=rdd1.map(lambda V: np.array([x if (x>low1000-1) and (x<high1000+1) else np.nan for x in V]))
rdd3=rdd2.filter(lambda row:sum(np.isnan(row))<50)
Clean_Tables[meas]=rdd3.cache().repartition(N)
C=Clean_Tables[meas].count()
print 'for measurement %s, we get %d clean rows'%(meas,C)

In [None]:
# compute covariance matrix
OUT=computeCov(Clean_Tables[meas])

In [None]:
#find PCA decomposition
eigval,eigvec=LA.eig(OUT['Cov'])

In [59]:
# collect all of the statistics in STAT[meas]
STAT[meas]['eigval']=eigval
STAT[meas]['eigvec']=eigvec
STAT[meas].update(OUT)

# print summary of statistics
print 'the statistics for %s consists of:'%meas
for key in STAT[meas].keys():
    e=STAT[meas][key]
    if type(e)==list:
      print key,'list',len(e)
    elif type(e)==np.ndarray:
      print key,'ndarray',e.shape
    elif type(e)==np.float64:
      print key,'scalar'
    else:
      print key,'Error type=',type(e)
print 'time for',meas,'is',time()-t
break

SELECT * FROM parquet.`/weather/US_weather.parquet`
	WHERE measurement = 'TMAX'
[((u'CA001126150', u'TMAX', 1941), array([ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,
        nan,  nan, 183., 144., 128.,  78., 144., 167., 183., 178., 156.,
       172., 161., 183., 172., 200., 117., 122., 156., 167., 194., 228.,
       244., 250., 228., 206., 211., 250., 222., 244., 206., 206., 194.,
       194., 144., 167., 161., 178., 167., 178., 211., 194., 244., 217.,
       21

KeyboardInterrupt: 

In [None]:
from pickle import dump
def dumpS3(object,S3dir,filename):
    dump(object,open(filename,'wb'))
    !ls -l $filename
    s3helper.local_to_s3(filename, S3dir+filename)
dumpS3((STAT,STAT_Descriptions),'/Weather/','STAT1.pickle')


In [None]:
STAT.keys()

### Sample Stations
Generate a sample of stations, for each one store all available year X measurement pairs.

In [None]:
rdd0.take(10) # test output

In [None]:
groups=rdd0.groupByKey().cache()
print 'number of stations=',groups.count()

groups1=groups.sample(False,0.01).collect()
groups2=[(g[0],[e for e in g[1]]) for g in groups1]

dumpS3(groups2,'/Weather/','SampleStations.pickle')

In [None]:
group_Sample=rdd0.sample(False,0.001).groupByKey().mapValues(list).cache()
group_Sample.first()

In [None]:
Means=rdd0.aggregateByKey((np.zeros(365),1),\
                          lambda S,D: sumWithNan(S,(D[1],1)),\
                          lambda S1,S2: sumWithNanithNan(S1,S2))\
.cache()#.repartition(N)

In [None]:
Means.take(5)

In [None]:
groups=rdd0.groupByKey().cache()
print 'number of stations=',groups.count()
