In [56]:
import numpy as np
import pandas as pd
import uuid
import random
import string
import pyspark.sql

In [110]:
data = np.random.rand(10, 170)
w1 = np.random.rand(170, 15)
b1 = np.random.rand(1, 15)
w2 = np.random.rand(15, 15)
b2 = np.random.rand(1)

In [36]:
def gen_random_string(s=6):
    return ''.join([random.choice(string.ascii_letters) for n in xrange(s)])

def gen_domain_names(n):
    return [ gen_random_string(3) + "@" + gen_random_string(8) + ".com" for x in range(n) ]

def gen_column_names(n):
    return [ 'col' + str(x) for x in range(n) ]

In [46]:
pdf = pd.DataFrame({ "domain_name": gen_domain_names(10) }).join(pd.DataFrame(data, columns = gen_column_names(170)))

In [47]:
pdf

Unnamed: 0,domain_name,col0,col1,col2,col3,col4,col5,col6,col7,col8,...,col160,col161,col162,col163,col164,col165,col166,col167,col168,col169
0,deW@jsuFQMpL.com,0.73163,0.622644,0.930286,0.767651,0.874389,0.06675,0.570905,0.67098,0.193517,...,0.055866,0.913569,0.479245,0.234269,0.349505,0.863073,0.489527,0.108635,0.719782,0.641626
1,MQv@xvBlWLIt.com,0.142349,0.979741,0.579209,0.257627,0.356324,0.050332,0.338703,0.201736,0.219748,...,0.280167,0.840519,0.545947,0.577371,0.409859,0.135914,0.553756,0.110668,0.149902,0.237696
2,UjW@RHLZCyCB.com,0.856417,0.529134,0.605024,0.40361,0.862446,0.179642,0.593527,0.59655,0.101543,...,0.403078,0.820586,0.509496,0.76923,0.417305,0.874566,0.44254,0.420021,0.939898,0.800755
3,yhb@uUmrOetI.com,0.74018,0.796776,0.773875,0.576437,0.11585,0.308389,0.839504,0.473207,0.212981,...,0.492815,0.327248,0.419873,0.330981,0.2144,0.570782,0.655442,0.892885,0.410818,0.962335
4,qnP@OmZOscil.com,0.528439,0.228079,0.890415,0.738422,0.949623,0.932025,0.992216,0.110466,0.609361,...,0.536185,0.120766,0.781428,0.471502,0.032502,0.84467,0.764518,0.582775,0.811452,0.051578
5,zoA@WPfRtqxh.com,0.97191,0.18935,0.521078,0.933077,0.18974,0.766224,0.593429,0.356585,0.26728,...,0.196795,0.103541,0.243113,0.905907,0.503339,0.766814,0.034027,0.442544,0.098086,0.868724
6,dzI@kDGWMlDh.com,0.574308,0.475546,0.252854,0.237101,0.450879,0.198107,0.886848,0.041466,0.790353,...,0.634693,0.671066,0.468238,0.488639,0.908752,0.175751,0.058188,0.48415,0.093107,0.580877
7,Bib@cojngdmo.com,0.699552,0.525776,0.404001,0.05496,0.034833,0.275862,0.091469,0.540325,0.349273,...,0.789546,0.122829,0.170607,0.56478,0.471794,0.849263,0.346988,0.402362,0.87086,0.071549
8,alM@RMKJVmdJ.com,0.508926,0.624261,0.914394,0.947247,0.748565,0.782728,0.22371,0.001266,0.485259,...,0.638043,0.194616,0.079684,0.880985,0.711668,0.668892,0.121675,0.608067,0.717765,0.288322
9,FaV@taWBlOih.com,0.586216,0.267127,0.353271,0.691287,0.4488,0.919428,0.939731,0.392881,0.231547,...,0.727488,0.129731,0.500636,0.746492,0.188504,0.935719,0.852644,0.333591,0.375858,0.367487


In [48]:
df = spark.createDataFrame(pdf)

In [59]:
sf = pyspark.sql.DataFrameStatFunctions(df)

In [63]:
df.cov('col1', 'col2')

0.013604262970479996

### How to create Broadcast

In [111]:
bw1 = sc.broadcast(w1)
bb1 = sc.broadcast(b1)
bw2 = sc.broadcast(w2)
bb2 = sc.broadcast(b2)

In [67]:
bb2.value

array([ 0.14321851,  0.79578977,  0.10523382,  0.80417474,  0.31393053,
        0.69748726,  0.23200199,  0.04553523,  0.17474875,  0.44196285,
        0.85301598,  0.88399546,  0.24759187,  0.00144347,  0.70919579])

In [73]:
def f(x): print(x)
df.rdd.foreach(f)

In [146]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

def process_row(row):
    domain_name = row['domain_name']
    colvalues = [ row[colname] for colname in row.asDict() if colname != 'domain_name' ]
    v = (np.array(colvalues)[None,:].dot(bw1.value) + bb1.value)[0,:]
    features = Vectors.dense(v.tolist())
    #return Row(key=domain_name, value = v.tolist())
    return Row(features = features)

def map_raw_features(row):
    domain_name = row['domain_name']
    colvalues = [ row[colname] for colname in row.asDict() if colname != 'domain_name' ]
    features = Vectors.dense(colvalues)
    return Row(features = features)

new_df = spark.createDataFrame(df.rdd.map(map_raw_features), ["features"])
        

### Write dataframe to disk

In [108]:
new_df.write.csv("processed")

### Calculate correlation matrix

In [147]:
from pyspark.ml.stat import Correlation
pearsonCorr = Correlation.corr(new_df, 'features', 'pearson').collect()[0][0]

In [153]:
corr = pd.DataFrame(pearsonCorr.values.reshape(170, 170), index=gen_column_names(170), columns=gen_column_names(170))


In [154]:
corr

Unnamed: 0,col0,col1,col2,col3,col4,col5,col6,col7,col8,col9,...,col160,col161,col162,col163,col164,col165,col166,col167,col168,col169
col0,1.000000,-0.180478,-0.284838,0.659813,0.310663,-0.468041,-0.286572,-0.295380,0.220054,0.302310,...,-0.116522,0.373725,0.508032,0.454191,0.396754,0.044587,0.034504,0.391227,-0.179311,-0.041708
col1,-0.180478,1.000000,0.424210,0.159457,-0.324974,0.115274,0.107144,0.358331,-0.278319,-0.071546,...,-0.581390,-0.302622,-0.298493,0.408714,0.254485,-0.310542,0.215905,0.559517,0.194134,0.398677
col2,-0.284838,0.424210,1.000000,0.342570,-0.128012,0.201777,-0.215586,-0.024286,-0.659627,-0.209363,...,-0.176912,-0.028708,-0.163513,-0.297370,-0.301697,-0.217084,-0.009549,0.235928,0.142256,0.303351
col3,0.659813,0.159457,0.342570,1.000000,0.354941,0.078085,-0.286725,-0.284586,-0.505819,0.270285,...,-0.115118,0.226742,0.125839,0.294134,0.135647,-0.278808,0.207547,0.294829,0.058109,0.340110
col4,0.310663,-0.324974,-0.128012,0.354941,1.000000,0.201560,-0.350064,0.130434,-0.312211,-0.225750,...,-0.277473,0.321773,-0.146891,-0.148218,-0.131879,-0.609102,0.324980,-0.200489,0.370980,0.118571
col5,-0.468041,0.115274,0.201777,0.078085,0.201560,1.000000,0.390142,0.108246,-0.490183,0.071883,...,0.015731,-0.390372,-0.401260,-0.190736,-0.194533,-0.340781,-0.181287,-0.530982,0.529930,0.008050
col6,-0.286572,0.107144,-0.215586,-0.286725,-0.350064,0.390142,1.000000,0.162074,0.218928,0.524157,...,0.305644,-0.815728,0.118601,0.478395,0.561417,0.079824,-0.479337,-0.541828,0.570194,-0.536612
col7,-0.295380,0.358331,-0.024286,-0.284586,0.130434,0.108246,0.162074,1.000000,-0.255810,-0.434988,...,-0.313698,0.040560,-0.597194,0.273437,0.435488,-0.312277,0.196786,0.141819,0.238276,0.027817
col8,0.220054,-0.278319,-0.659627,-0.505819,-0.312211,-0.490183,0.218928,-0.255810,1.000000,0.194469,...,0.008790,-0.176475,0.583699,0.130980,0.199665,0.442807,-0.421431,0.001849,-0.164980,-0.526427
col9,0.302310,-0.071546,-0.209363,0.270285,-0.225750,0.071883,0.524157,-0.434988,0.194469,1.000000,...,0.305941,-0.275117,0.664993,0.441681,0.380246,0.463183,-0.395540,-0.211260,0.170777,-0.503443
