In [1]:
import pyspark
from pyspark.mllib import linalg as mllib_linalg
from pyspark.mllib.linalg.distributed import MatrixEntry, CoordinateMatrix
from pyspark.ml import linalg as ml_linalg
from pyspark.ml import feature
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml import Pipeline

In [2]:
import numpy as np 
import pandas as pd
from scipy import sparse

The minimum supported version is 2.4.6



In [3]:
n = 150
a = 2.5
z = np.linspace(0,3,n)
z_m = -z 

def draw(n, k, draws):
    sample = np.random.choice(range(n), replace=False, size=draws).tolist()
    return [k if i in sample else float('NAN') for i in range(n)]

def generate_string_data(z,a,k, draws=1):
    return list(zip(draw(len(z),float(k),draws), np.array([a*np.sin(z), a*np.cos(z), z]).T))

def generate_springs(n,a,draws=1):
    tmp = generate_string_data(z,a,0,draws)
    tmp2 = generate_string_data(z_m,a,1,draws)
    tmp3 = map(lambda x : (x[0],*x[1]) ,enumerate(tmp+tmp2))
    return list(tmp3)

In [4]:
spring_data = generate_springs(n,a,1)
spring_data

[(0, nan, array([ 0. ,  2.5,  0. ])),
 (1, nan, array([ 0.05033217,  2.49949328,  0.02013423])),
 (2, nan, array([ 0.10064394,  2.49797334,  0.04026846])),
 (3, nan, array([ 0.1509149 ,  2.49544078,  0.06040268])),
 (4, nan, array([ 0.20112469,  2.49189664,  0.08053691])),
 (5, nan, array([ 0.25125296,  2.48734235,  0.10067114])),
 (6, nan, array([ 0.30127936,  2.48177975,  0.12080537])),
 (7, nan, array([ 0.35118364,  2.47521111,  0.1409396 ])),
 (8, nan, array([ 0.40094556,  2.46763909,  0.16107383])),
 (9, nan, array([ 0.45054495,  2.45906674,  0.18120805])),
 (10, nan, array([ 0.49996169,  2.44949756,  0.20134228])),
 (11, nan, array([ 0.54917577,  2.43893542,  0.22147651])),
 (12, nan, array([ 0.59816722,  2.4273846 ,  0.24161074])),
 (13, nan, array([ 0.64691619,  2.41484978,  0.26174497])),
 (14, nan, array([ 0.69540292,  2.40133604,  0.28187919])),
 (15, nan, array([ 0.74360776,  2.38684886,  0.30201342])),
 (16, nan, array([ 0.79151115,  2.37139413,  0.32214765])),
 (17, nan, 

In [5]:
def _compute_bfs(vec_1, vec_2, sigma=0.42):
    return np.exp(-vec_1.squared_distance(vec_2)/sigma**2)

def _tolerence_cut(value, tol=10e-10):
    if value <= tol:
        return 0
    else:
        return value
    
def _scale_data_frame(df, vector=None):
    if vector:
        to_dense = lambda x: ml_linalg.DenseVector(x.toArray())
        df = df.withColumn(vector, F.udf(to_dense, ml_linalg.VectorUDT())(vector) )
        
        scaler = feature.StandardScaler(
            withMean=True, withStd=True,
            inputCol='vector', outputCol='std_vector')
        model = scaler.fit(df)
        return (model
                .transform(df)
                .select([i for i in df.columns if i != vector]+[scaler.getOutputCol()])
                .withColumnRenamed(existing=scaler.getOutputCol(), new=vector))
    
def do_cartesian(sc, df, index=None, vector=None, **kwargs):
    sigma = kwargs.get('sigma', 0.42)
    tol = kwargs.get('tolerance', 10e-10)
    
    scaled_df = _scale_data_frame(df, vector=vector)
    bc_vec = sc.broadcast(scaled_df.select(index, vector).rdd.collectAsMap())
    if index:
        print(bc_vec.value[0])
        index_rdd = df.rdd.map(lambda x: x[index]).cache()
        cartesian_demon = index_rdd.cartesian(index_rdd).filter(lambda x: x[0] >= x[1])
        cartesian_distance_demon = cartesian_demon.map(lambda x: MatrixEntry(x[0],x[1], _compute_bfs(
        bc_vec.value.get(x[0]),
        bc_vec.value.get(x[1]),
        sigma)))
        
    return cartesian_distance_demon.filter(lambda x: _tolerence_cut(x.value) )

def triangle_mat_summation(mat_element):
    if mat_element.j == mat_element.i:
        return (mat_element.i, mat_element.value),
    else:
        return (mat_element.i, mat_element.value), (mat_element.j, mat_element.value)

In [6]:
spring_data = generate_springs(n,a,1)
string_rdd = sc.parallelize(spring_data).map(lambda x: pyspark.Row(id=x[0],label=x[1], vector=ml_linalg.DenseVector(x[2])))
string_df = string_rdd.toDF()
# rdd = sc.range(10).map(lambda x: pyspark.Row(id=x, vector=ml_linalg.DenseVector(np.random.randint(0,9,size=10))))
# df = rdd.toDF()

In [7]:
string_df.show(300,False)

+---+-----+---------------------------------------------------------------+
|id |label|vector                                                         |
+---+-----+---------------------------------------------------------------+
|0  |NaN  |[0.0,2.5,0.0]                                                  |
|1  |NaN  |[0.05033216963986689,2.499493283187483,0.020134228187919462]   |
|2  |NaN  |[0.10064393595448022,2.4979733381594746,0.040268456375838924]  |
|3  |NaN  |[0.15091490388955273,2.495440781061335,0.060402684563758385]   |
|4  |NaN  |[0.20112469492937707,2.491896638524472,0.08053691275167785]    |
|5  |NaN  |[0.25125295535773495,2.487342347250174,0.10067114093959731]    |
|6  |NaN  |[0.3012793645087535,2.4817797534272055,0.12080536912751677]    |
|7  |NaN  |[0.35118364300436306,2.475211111983417,0.14093959731543623]    |
|8  |NaN  |[0.40094556097501977,2.467639085671652,0.1610738255033557]     |
|9  |NaN  |[0.4505449462603564,2.45906674399034,0.18120805369127516]      |
|10 |NaN  |[

In [8]:
demon = do_cartesian(sc, string_df,'id', 'vector')
demon.take(10)

[-6.10797542368e-17,1.37652733992,3.17294190766e-17]


[MatrixEntry(0, 0, 1.0),
 MatrixEntry(1, 0, 0.9948465948193068),
 MatrixEntry(2, 0, 0.979546467464139),
 MatrixEntry(1, 1, 1.0),
 MatrixEntry(2, 1, 0.9948462833543696),
 MatrixEntry(2, 2, 1.0),
 MatrixEntry(3, 0, 0.9545715578949986),
 MatrixEntry(4, 0, 0.9206812925488386),
 MatrixEntry(5, 0, 0.8788839464495921),
 MatrixEntry(6, 0, 0.8303864948563296)]

In [9]:
demon_matrix = CoordinateMatrix(demon, 300, 300) 
row_summed_matrix = demon_matrix.entries.flatMap(triangle_mat_summation).reduceByKey(lambda x,y: x+y).collectAsMap()
bc_row_summed = sc.broadcast(row_summed_matrix)

In [10]:
transition_rdd = demon.map(lambda x: MatrixEntry(x.i, x.j, x.value/bc_row_summed.value.get(x.j) ))
col_summed_matrix = transition_rdd.flatMap(triangle_mat_summation).reduceByKey(lambda x,y: x+y).collectAsMap()
bc_col_summed = sc.broadcast(col_summed_matrix)

In [11]:
hat_transition_rdd = transition_rdd.map(lambda x: MatrixEntry(x.i,x.j, x.value/bc_col_summed.value.get(x.i)))
hat_transition_rdd.take(25)

[MatrixEntry(0, 0, 0.03885626426621346),
 MatrixEntry(1, 0, 0.03865634180049757),
 MatrixEntry(2, 0, 0.03806368333210306),
 MatrixEntry(1, 1, 0.038864896355216096),
 MatrixEntry(2, 1, 0.03866647926373562),
 MatrixEntry(2, 2, 0.03889149963010969),
 MatrixEntry(3, 0, 0.0370975477380816),
 MatrixEntry(4, 0, 0.035787897381467496),
 MatrixEntry(5, 0, 0.03417381210218183),
 MatrixEntry(6, 0, 0.03230156446721945),
 MatrixEntry(3, 1, 0.03807621921677726),
 MatrixEntry(3, 2, 0.0386955784355721),
 MatrixEntry(4, 1, 0.037112976697733155),
 MatrixEntry(4, 2, 0.03810821923499331),
 MatrixEntry(5, 1, 0.0358062355961513),
 MatrixEntry(5, 2, 0.0371478086029921),
 MatrixEntry(6, 1, 0.034194599635376646),
 MatrixEntry(6, 2, 0.03584334262962627),
 MatrixEntry(3, 3, 0.03893652382769003),
 MatrixEntry(4, 3, 0.03874383722620631),
 MatrixEntry(4, 4, 0.038999766747288854),
 MatrixEntry(5, 3, 0.03815955761840735),
 MatrixEntry(5, 4, 0.038810729125191366),
 MatrixEntry(5, 5, 0.03908034393490406),
 MatrixEntry(6

In [12]:
def generate_label_matrix(df):
    Y_L = df.filter(~F.isnan(F.col('label'))).select('id','label').cache()
    Y_U = df.filter(F.isnan(F.col('label'))).select('id').cache()
    Y_max = Y_L.groupby().max('label').collect()[0][0]
    
    Y_L_rdd = Y_L.rdd.map(lambda x: MatrixEntry(i=x['id'], j=x['label'], value=1.0))
    
    Y_L_mat = CoordinateMatrix(Y_L_rdd, numRows=df.count(), numCols=Y_max+1)
    
    Y_U_rdd = Y_U.rdd.flatMap(lambda x: [MatrixEntry(i=x['id'], j=idx, value=.50) for idx in range(int(Y_max+1))])
    #print(Y_U_rdd.take(5))
    Y_U_mat = CoordinateMatrix(Y_U_rdd, numRows=df.count(), numCols=Y_max+1)
    return Y_L_mat, Y_U_mat


In [23]:
y_l, y_u = generate_label_matrix(string_df)
print(y_l.entries.take(5))
print(y_u.entries.take(5))
print(y_l.numRows(),y_l.numCols())
print(y_u.numRows(),y_u.numCols())


[MatrixEntry(34, 0, 1.0), MatrixEntry(202, 1, 1.0)]
[MatrixEntry(0, 0, 0.5), MatrixEntry(0, 1, 0.5), MatrixEntry(1, 0, 0.5), MatrixEntry(1, 1, 0.5), MatrixEntry(2, 0, 0.5)]
300 2
300 2


In [14]:
mat_hatter_rdd = CoordinateMatrix(hat_transition_rdd,numCols=2*n, numRows=2*n)

In [15]:
def naive_multiplication(A:CoordinateMatrix, B:CoordinateMatrix, is_triangle=False):
    """
    A is the left matrix
    B is the right matix
    """
    if is_triangle:
        left_rdd = (A.entries
                    .flatMap(lambda x: [((x.j, x.i), x.value),((x.i, x.j), x.value)])
                    .aggregateByKey(
                        zeroValue=(0.0,0.0),
                        seqFunc=lambda x,y: (x[0]+y, x[1] + 1 ),
                        combFunc=lambda a,b: (x[0] + y[0], x[1] + y[1]))
                    .mapValues(lambda x: x[0]/x[1])
                    .map(lambda x: (x[0][0], (x[0][1], x[1])))
                   )
    else:
        left_rdd = A.entries.map(lambda x: (x.j, (x.i, x.value))) 
    right_rdd = B.entries.map(lambda x: (x.i, (x.j, x.value)))
    combined_rdd = (left_rdd
                    .join(right_rdd)
                    .map(lambda x: x[1])
                    .map(lambda x: ((x[0][0], x[1][0]), x[0][1]*x[1][1]))
                    .reduceByKey(lambda x,y: x+y)
                    .map(lambda x: MatrixEntry(i=x[0][0], j=x[0][1], value=x[1]))
                   )
    return combined_rdd

In [17]:
new_y_l = naive_multiplication(mat_hatter_rdd, y_l, is_triangle=True)
n

268

In [24]:
print(mat_hatter_rdd.numCols(),mat_hatter_rdd.numRows())
print(y_l.numCols(), y_l.numRows())

300 300
2 300


In [30]:
d = (mat_hatter_rdd.toBlockMatrix()).multiply(y_l.toBlockMatrix())
dd = d.toLocalMatrix()
dd.toArray()

array([[  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  0.00000000e+00,   0.00000000e+00],
       [  