In [1]:
spark


In [30]:
#import pandas as pd, numpy as np
import numpy as np
from pyspark.mllib.linalg.distributed import BlockMatrix
from pyspark.mllib.linalg import DenseMatrix, Matrices
from pyspark.sql.functions import randn
import datetime

## Generate random Matrix data and put in Spark DataFrame

In [31]:
NUM_ROWS = int(1e5)
NUM_COLS = 150

In [32]:
np.random.seed(13)
seeds = np.random.choice(range(int(1e6)),NUM_COLS,replace=False).tolist()

In [33]:
sprk_df = spark.range(0,NUM_ROWS,numPartitions=100)

for i in range(len(seeds)):
    sprk_df = sprk_df.withColumn('X'+str(i+1),randn(seed=seeds[i]))
    
sprk_df = sprk_df.drop('id')

In [34]:
sprk_df.limit(10).select(['X1','X10','X149']).show(10)  #toPandas()

+--------------------+--------------------+--------------------+
|                  X1|                 X10|                X149|
+--------------------+--------------------+--------------------+
| -0.7875995423413541| -1.8496231710670317| 0.14319168635324628|
| -0.8802204696629468|  1.0227987046234572|-0.16021299654078477|
|-0.13052186437684288| -0.6377570013161057| 0.17379403751949127|
| -0.9819029238098963| -1.3914036254270399|-0.07337543730763679|
| -1.1528200936909183|  0.7931504380379168| -1.1181662576458717|
|  1.4381143086517836| -0.5923779893720164| 0.46605793183654703|
| -0.8443907079566332|-0.14327078654999217|  1.9005297473603961|
|  1.3409502738035992|  0.4590974616850282| -1.1233419119648724|
|  0.7712124278965381|  1.4994883973477136|   1.614813091130632|
|  -0.631149997027938| -0.3391861350329733| -1.8154550466148764|
+--------------------+--------------------+--------------------+



In [35]:
print('{:,d}'.format(sprk_df.count()))

100,000


## Read matrix data from Spark Dataframe and perform X.T @ X matrix operation

In [36]:
start_time = datetime.datetime.now()
A = np.array(sprk_df.collect())

print(A.shape)

B = np.dot(A.T, A)

print(B.shape)
print('time for numpy: {:.3f}'.format((datetime.datetime.now() - start_time).total_seconds()))

(100000, 150)
(150, 150)
time for numpy: 9.448


## Create BlockMatrix from Spark dataframe and perform X.T @ X matrix operation

In [37]:
start_time = datetime.datetime.now()
rdd= sprk_df.rdd.map(list)
COLUMNS = len(rdd.take(1)[0])

In [38]:
# create closure for partitioning function to consolidate single row dense matrix into larger blocks
# Based on number of dense matrices and specified rows per block, calculate number of blocks to make up the 
# block matrix and return a partition function to be used in partitionBy() function

def createPartitionFunction(num_items,block_rows):
    # Calculate number of blocks that will make up the BlockMatrix
    number_of_blocks = int(np.ceil(num_items/block_rows))
    
    # return number of blocks and closure to be used by partitionBy() function
    return number_of_blocks, lambda k:int(k/block_rows)

In [39]:
ROWS_PER_BLOCK = 1600
number_of_blocks, partition_function = createPartitionFunction(rdd.count(),ROWS_PER_BLOCK)

In [40]:
# function to stack single row dense matrices into larger block based on partitionBy()

def stackDenseMatrices(iter_x):
    # convert each dense matrix in this partition to an element in a list
    vector_list = [dm[1].toArray() for dm in iter_x]
    
    # stack all the elements in the list to a single array
    combined_array = np.vstack(vector_list)

    # convert the single array to a larger dense matrix
    values = [x for x in combined_array.flatten('F').tolist()]
    den_mat = DenseMatrix(combined_array.shape[0],combined_array.shape[1],
                          values)
    
    # return larger dense matrix
    return [den_mat]

In [41]:
# convert rdd of single row dense matrices to rdd of larger dense matrices to form the BlockMatrix

# create key-value rdd with index value of single-row dense matrix as key
rdd2 = rdd.map(lambda x: DenseMatrix(1,COLUMNS,x)).zipWithIndex().map(lambda x:(x[1],x[0]))

In [42]:
# repartition rdd such that all single-row dense matrices that make up a block are in a single partition
rdd2 = rdd2.partitionBy(number_of_blocks,partition_function)


# Based on number of blocks, addd partition number to key and ensure within partition rows are in order
def addPartitionIdToKey(x):
    partition_id = x[1]
    dm_list = [((partition_id,y[0]),y[1]) for y in x[0]]
    return dm_list
    
rdd2 = rdd2.mapPartitions(lambda x:[list(x)]).zipWithIndex().map(addPartitionIdToKey).flatMap(lambda x:x)

# create partitions by block and ensure rows are sorted correctly
rdd2 =  rdd2.repartitionAndSortWithinPartitions(number_of_blocks,lambda k:k[0])


# stack all single-row dense matrices in a partition into one larger dense matrix
rdd2 = rdd2.mapPartitions(stackDenseMatrices)

# create rdd of block matrix structure
rdd2 = rdd2.zipWithIndex().map(lambda x:((x[1],0),x[0]))



In [43]:
Block_A = BlockMatrix(rdd2,ROWS_PER_BLOCK,COLUMNS)

In [44]:
Block_B = Block_A.transpose().multiply(Block_A)

In [45]:
B2 = Block_B.toLocalMatrix().toArray()
print('time for BlockMatrix: {:.3f}'.format((datetime.datetime.now() - start_time).total_seconds()))

time for BlockMatrix: 42.136


## Compare BlockMatrix answer with Numpy Answer and calculate relative error

In [46]:
np.array_equal(B,B2)

False

In [47]:
np.max(np.abs(B2-B))

1.0186340659856796e-10

### Relative Error

In [48]:
np.max(np.abs((B2-B)/B))

2.2590965251122286e-11