# Massive matrix multiplication with spark

In [1]:
# Copyright © 2021 Jaakko Paavola

Task: Calculate A x A^T x A and provide the first row of the resulting matrix as the answer in the
same format as the input matrix. The input file (data.txt) contains a matrix A, which may have dimensions
up to 1000000 x 1000. The file is stored in the text format, and each line represents a row vector.
Each row contains 1000 float numbers which are separated by white spaces.

Solution: As matrix multiplication is associative by nature, the problem can be approached either as (1) (A * A^T) * A, i.e., starting by calculating the left-hand side multiplication first, or (2) as A * (A^T * A), i.e., starting by calculating the right-hand side multiplication first.

Let's say we are talking about a 1,000,000 x 1,000 matrix.

In approach (1), computing the product A x A^T requires calculating the dot product of two 1,000-element vectors, i.e., doing 1,000 multiplications for each element in the resulting (intermediate) matrix. The resulting matrix is a 1,000,000 x 1,000,000 matrix, so the number of multiplications needed is 1,000 x 10^12 = 10^15. Then, the second step is (A x A^T) x A, which requires calculating the dot product of two 1,000,000-element vectors, i.e., doing 1,000,000 multiplications for each element in the resulting matrix. The resulting matrix is a 1,000,000 x 1,000 matrix, so the number of multiplications needed is 1,000,000 x 10^9 = 10^15.

In approach (2), computing the product A^T x A requires calculating the dot product of two 1,000,000-element vectors, i.e., doing 1,000,000 multiplications for each element in the resulting (intermediate) matrix. The resulting matrix is a 1,000 x 1,000 matrix, so the number of multiplications needed is 1,000,000 x 1,000,000 = 10^12. Then, the second step is A x (A^T x A), which requires calculating the dot product of two 1,000-element vectors, i.e., doing 1,000 multiplications for each element in the resulting matrix. The resulting matrix is a 1,000,000 x 1,000 matrix, so the number of multiplications needed is 1,000 x 10^9 = 10^12.

In other words, in approach (2) both of the steps require a factor of 1,000 less multiplications than in approach (1).

## Program code with comments

### Code block 1:
In this block I set two configuration variables, rows_to_calculate and partitions. The first one sets the number of rows needed as the output from final matrix product. As the assignment is to provide only the first row as the output, this is set to 1. The second one determines the number of partitions that the dataframes are occasionally repartitioned into. The value of this one should be decided based on whether the code is ran against the small sample dataset or the large dataset.

In this block we also initialize the SparkSession object needed for creating dataframes, suppress the level of logging to warnings and errors, read the data from the file and cast it to float type (or to double type for getting more precision out of the multiplication operations) and drop the 1,001st column, which is all nulls.

In [8]:
# !pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.window import Window

##
## SETTINGS and INITIALIZATION:
##
rows_to_calculate = 1 # This is the number of rows to be calculated at the very end, 
# although only the first one is printed on the screen. # Setting this to only the value 1 is 
# already sufficient to get the first row of the result matrix printed out correctly at the end.
partitions = 10 # This variable determines the number of partitions we use in repartitioning in a few spots in the code
spark = SparkSession\
    .builder\
    .appName("massive-matrix-multiplication") \
    .config("spark.rdd.compress", "true") \
    .config("spark.broadcast.compress", "true") \
    .config("spark.cores.max", "10") \
    .config("spark.driver.maxResultSize", "8g") \
    .config("spark.pyspark.python", "python3") \
    .config("spark.driver.memoryOverhead", "2g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "10g") \
    .config("spark.numExecutors", "true") \
    .config("spark.dynamicAllocation.enable", "true") \
    .master("local[*]") \
    .getOrCreate()
    # .master("spark:HOST:PORT") \
spark.sparkContext.setLogLevel("WARN")

df = spark.read.option("delimiter"," ").option("header","false").option("multiline", "true") \
    .csv("./data-small-sample.txt")

df1 = df.select([f.col(coln).cast("float") for coln in df.columns[:-1]]) # Drop the last, 1001st column that only gets null values
df_A = df1 # Use .limit(x) here for testing with less rows if needed

In [9]:
spark.conf.get("spark.executor.memory")

'2g'

In [10]:
spark.conf.get("spark.driver.memory")

'10g'

### Code block 2:

Here we define a function that transposes any dataframe that contains {rows} number of rows, and pass the matrix to it for transposition. The transposition logic makes use of a few different Spark SQL API functions.

In [11]:
def transpose(df_to_be_transposed, rows):
    df_to_be_transposed_with_row = df_to_be_transposed \
        .withColumn('row', f.row_number().over(Window.partitionBy(f.lit(0)).orderBy(f.monotonically_increasing_id())) - 1)
    df_to_be_transposed = df_to_be_transposed_with_row \
        .withColumn("Rows_of_A_as_arrays", f.array(*df_to_be_transposed.columns)) \
        .select("Rows_of_A_as_arrays", "row") \
        .withColumn("Rows_of_A_columnized", f.explode(f.col("Rows_of_A_as_arrays"))).drop("Rows_of_A_as_arrays")
    if rows == 1:
        return df_to_be_transposed.drop("row")
    else:
        df_to_be_transposed_pivoted = df_to_be_transposed.groupBy().pivot("row").agg(*[f.collect_list(f.col("Rows_of_A_columnized")) \
            .alias(f"{c.strip('_')}") for c in df_to_be_transposed.columns]) \
            .drop(*[f"{i}_Rows_of_A_columnized" for i in range(0, rows)])
        df_transposed = df_to_be_transposed_pivoted \
            .withColumn("Rows_of_A_columnized", f.arrays_zip(*[f"{c}" for c in df_to_be_transposed_pivoted.columns])) \
            .select("Rows_of_A_columnized") \
            .withColumn("Rows_of_A_columnized", f.explode(f.col("Rows_of_A_columnized"))) \
            .select(*["Rows_of_A_columnized." + i for i in [f"{c}" for c in df_to_be_transposed_pivoted.columns]])
        return df_transposed

# %%
print(f"{datetime.now()}: Transposing matrix A")
rows = df_A.count()
df_A_transposed = transpose(df_A, rows)
df_A_transposed.cache()

2024-03-10 12:38:35.871015: Transposing matrix A


24/03/10 12:38:43 WARN CacheManager: Asked to cache already cached data.


DataFrame[0_row: float, 1_row: float, 2_row: float, 3_row: float, 4_row: float, 5_row: float, 6_row: float, 7_row: float, 8_row: float, 9_row: float, 10_row: float, 11_row: float, 12_row: float, 13_row: float, 14_row: float, 15_row: float, 16_row: float, 17_row: float, 18_row: float, 19_row: float, 20_row: float, 21_row: float, 22_row: float, 23_row: float, 24_row: float, 25_row: float, 26_row: float, 27_row: float, 28_row: float, 29_row: float, 30_row: float, 31_row: float, 32_row: float, 33_row: float, 34_row: float, 35_row: float, 36_row: float, 37_row: float, 38_row: float, 39_row: float, 40_row: float, 41_row: float, 42_row: float, 43_row: float, 44_row: float, 45_row: float, 46_row: float, 47_row: float, 48_row: float, 49_row: float, 50_row: float, 51_row: float, 52_row: float, 53_row: float, 54_row: float, 55_row: float, 56_row: float, 57_row: float, 58_row: float, 59_row: float, 60_row: float, 61_row: float, 62_row: float, 63_row: float, 64_row: float, 65_row: float, 66_row: fl

### Code block 3:

Here we calculate the first step of the calculation, arriving at an intermediate matrix product. The calculation of the cross-product sums makes use of aggregation functionalities of Spark SQL API. After the aggregation, the resulting one-row dataframe must be reshaped to the correct shape by "melting"/"unpivoting"/"stacking".

In [12]:
# %%
# Calculate A^T * A with cross-product in a one-liner + reshaping of the result into the intermediate matrix 
# product
print(f"{datetime.now()}: Calculating the cross-product sums of A^T * A")
# Vectorized multiplication happens here, resulting in a one-row dataframe containing the sums that can be 
# reshaped into the actual intermediate matrix product
print(f"{datetime.now()}: Num of partitions before calculating the cross-product sums:", df_A.rdd.getNumPartitions())
df_intermediate_matrix_product = df_A.agg(*[f.sum((f.col(left_column)*f.col(right_column))).alias(f"{left_column}-{right_column}") \
    for left_column in df_A.columns for right_column in df_A.columns])

# %%
df_intermediate_matrix_product = df_intermediate_matrix_product.repartition(partitions)
print(f"{datetime.now()}: Num of partitions before reshaping: ", df_intermediate_matrix_product.rdd.getNumPartitions())
print(f"{datetime.now()}: Reshaping into a A * A^T matrix")
vars_and_vals = f.array(*(f.struct(f.lit(c[:c.find("-")]).alias("variable"), f.col(c).alias("value")) 
    for c in df_intermediate_matrix_product.columns))
cols = [f.col("vars_and_vals")[x].alias(x) for x in ["variable", "value"]]
column_list = df_A.columns

# Melt/unpivot/stack the sums from the columns to the final shape of the matrix product:
df_intermediate_matrix_product = df_intermediate_matrix_product.withColumn("vars_and_vals", f.explode(vars_and_vals)).select(*cols) \
        .groupBy().pivot("variable").agg(f.collect_list("value")) \
        .withColumn("Columns_of_matrix", f.arrays_zip(*column_list)) \
        .withColumn("Columns_of_matrix", f.explode(f.col("Columns_of_matrix"))) \
        .select(*["Columns_of_matrix." + c for c in column_list])
df_intermediate_matrix_product.cache()
print(f"{datetime.now()}: Num of partitions after reshaping: ", df_intermediate_matrix_product.rdd.getNumPartitions())


2024-03-10 12:38:49.716912: Calculating the cross-product sums of A^T * A
2024-03-10 12:38:49.717438: Num of partitions before calculating the cross-product sums: 1


24/03/10 12:41:37 WARN DAGScheduler: Broadcasting large task binary with size 8.5 MiB
24/03/10 12:42:45 WARN DAGScheduler: Broadcasting large task binary with size 34.5 MiB
[Stage 37:>                                                         (0 + 1) / 1]

2024-03-10 12:41:30.306117: Num of partitions before reshaping:  10
2024-03-10 12:43:27.616453: Reshaping into a A * A^T matrix


24/03/10 12:47:51 WARN DAGScheduler: Broadcasting large task binary with size 8.5 MiB
24/03/10 12:48:27 WARN DAGScheduler: Broadcasting large task binary with size 34.6 MiB
24/03/10 12:48:50 WARN DAGScheduler: Broadcasting large task binary with size 54.0 MiB
24/03/10 12:49:16 WARN DAGScheduler: Broadcasting large task binary with size 60.3 MiB
[Stage 66:>                                                         (0 + 1) / 1]

2024-03-10 12:47:49.192937: Num of partitions after reshaping:  1


### Clode block 4:

In this step we prepare for the final calculation step by taking the first row of matrix A and the intermediate matrix product and joining them into a single dataframe.

In [13]:
# %%
print(f"{datetime.now()}: Joining the dataframes for the latter calculation step")
# Join the first {rows_to_calculate} rows of matrix A with the matrix product A^T * A
df_joint = df_intermediate_matrix_product.withColumn('row', f.row_number().over(Window.partitionBy(f.lit(0)).orderBy(f.monotonically_increasing_id())) - 1) \
    .join(df_A_transposed.select(df_A_transposed.columns[:rows_to_calculate]).withColumn('row', f.row_number().over(Window.partitionBy(f.lit(0)).orderBy(f.monotonically_increasing_id())) - 1), on=["row"]).drop("row")

df_joint.cache()

2024-03-10 12:51:11.875534: Joining the dataframes for the latter calculation step


DataFrame[_c0: double, _c1: double, _c2: double, _c3: double, _c4: double, _c5: double, _c6: double, _c7: double, _c8: double, _c9: double, _c10: double, _c11: double, _c12: double, _c13: double, _c14: double, _c15: double, _c16: double, _c17: double, _c18: double, _c19: double, _c20: double, _c21: double, _c22: double, _c23: double, _c24: double, _c25: double, _c26: double, _c27: double, _c28: double, _c29: double, _c30: double, _c31: double, _c32: double, _c33: double, _c34: double, _c35: double, _c36: double, _c37: double, _c38: double, _c39: double, _c40: double, _c41: double, _c42: double, _c43: double, _c44: double, _c45: double, _c46: double, _c47: double, _c48: double, _c49: double, _c50: double, _c51: double, _c52: double, _c53: double, _c54: double, _c55: double, _c56: double, _c57: double, _c58: double, _c59: double, _c60: double, _c61: double, _c62: double, _c63: double, _c64: double, _c65: double, _c66: double, _c67: double, _c68: double, _c69: double, _c70: double, _c71: 

### Code block 5:

In this step we calculate the elements of the final matrix product much like in the previous calculation step by using aggregation functions and then doing reshaping into the actual shape of the final matrix product.

In [14]:
# %%
print(f"{datetime.now()}: Calculating the cross-product sums of A * (A^T * A)")
# Calculate A * (A^T * A) with cross-product in a one-liner + reshaping of the result into the final matrix product

df_joint = df_joint.repartition(partitions)
print(f"{datetime.now()}: Num of partitions before calculating the cross-product sums: ", df_joint.rdd.getNumPartitions())
# Vectorized multiplication happens here, resulting in a one-row dataframe containing the sums that can be reshaped into 
# the actual intermediate matrix product
df_joint_aggregated = df_joint.agg(*[f.sum((f.col(left_column)*f.col(right_column))).alias(f"{right_column}-{left_column}") \
    for right_column in df_joint.columns[:-rows_to_calculate] for left_column in df_joint.columns[-rows_to_calculate:]])

# %%
df_joint_aggregated = df_joint_aggregated.repartition(partitions)
print(f"{datetime.now()}: Num of partitions before reshaping: ", df_joint_aggregated.rdd.getNumPartitions())

print(f"{datetime.now()}: Reshaping into a A * (A^T * A) matrix")
vars_and_vals = f.array(*(f.struct(f.lit(c[:c.find("-")]).alias("variable"), f.col(c).alias("value")) 
    for c in df_joint_aggregated.columns))
cols = [f.col("vars_and_vals")[x].alias(x) for x in ["variable", "value"]]
# Melt/unpivot/stack the sums from the columns to the final shape of the matrix product:
# df_joint_aggregated = df_joint_aggregated.repartition(partitions)
df_A_x_A_T_x_A_pivoted = df_joint_aggregated.withColumn("vars_and_vals", f.explode(vars_and_vals)).select(*cols) \
        .groupBy().pivot("variable").agg(f.collect_list("value"))
df_A_x_A_T_x_A = df_A_x_A_T_x_A_pivoted \
        .withColumn("Columns_of_matrix", f.arrays_zip(*df_A_x_A_T_x_A_pivoted.columns)) \
        .withColumn("Columns_of_matrix", f.explode(f.col("Columns_of_matrix"))) \
        .select(*["Columns_of_matrix." + c for c in df_A_x_A_T_x_A_pivoted.columns])
df_A_x_A_T_x_A.cache()
print(f"{datetime.now()}: Num of partitions after reshaping: ", df_A_x_A_T_x_A.rdd.getNumPartitions())


2024-03-10 12:51:23.147632: Calculating the cross-product sums of A * (A^T * A)


24/03/10 12:51:38 WARN DAGScheduler: Broadcasting large task binary with size 1456.7 KiB
24/03/10 12:51:51 WARN DAGScheduler: Broadcasting large task binary with size 60.3 MiB
24/03/10 12:52:00 WARN DAGScheduler: Broadcasting large task binary with size 61.9 MiB
24/03/10 12:52:05 WARN DAGScheduler: Broadcasting large task binary with size 61.9 MiB
[Stage 82:>                                                         (0 + 1) / 1]

2024-03-10 12:51:24.135421: Num of partitions before calculating the cross-product sums:  10


24/03/10 12:52:14 WARN DAGScheduler: Broadcasting large task binary with size 61.9 MiB
24/03/10 12:52:22 WARN DAGScheduler: Broadcasting large task binary with size 62.0 MiB
24/03/10 12:52:45 WARN DAGScheduler: Broadcasting large task binary with size 62.3 MiB
[Stage 94:>                                                         (0 + 1) / 1]

2024-03-10 12:52:10.887859: Num of partitions before reshaping:  10
2024-03-10 12:52:50.291852: Reshaping into a A * (A^T * A) matrix


24/03/10 12:53:06 WARN DAGScheduler: Broadcasting large task binary with size 61.9 MiB
24/03/10 12:53:19 WARN DAGScheduler: Broadcasting large task binary with size 61.9 MiB
24/03/10 12:54:07 WARN DAGScheduler: Broadcasting large task binary with size 61.9 MiB
24/03/10 12:54:12 WARN DAGScheduler: Broadcasting large task binary with size 61.9 MiB
24/03/10 12:54:35 WARN DAGScheduler: Broadcasting large task binary with size 61.9 MiB
24/03/10 12:54:50 WARN DAGScheduler: Broadcasting large task binary with size 61.9 MiB
24/03/10 12:55:00 WARN DAGScheduler: Broadcasting large task binary with size 62.0 MiB
24/03/10 12:55:34 WARN DAGScheduler: Broadcasting large task binary with size 62.3 MiB
24/03/10 12:55:40 WARN DAGScheduler: Broadcasting large task binary with size 62.4 MiB
24/03/10 12:56:02 WARN DAGScheduler: Broadcasting large task binary with size 62.6 MiB
[Stage 159:>                                                        (0 + 1) / 1]

2024-03-10 12:54:45.941880: Num of partitions after reshaping:  1


### Results:

In [15]:
# %%
# Print the first row of the final matrix product (vertically)
df_A_x_A_T_x_A.show(1, False, True)

24/03/10 12:56:41 WARN DAGScheduler: Broadcasting large task binary with size 62.7 MiB
[Stage 166:>                                                        (0 + 1) / 1]

-RECORD 0----------------------
 _c0   | 2.776537347167101E10  
 _c1   | 2.7662610012919544E10 
 _c10  | 2.819077476647991E10  
 _c100 | 2.8263425572226547E10 
 _c101 | 2.820833058779611E10  
 _c102 | 2.7405150162877968E10 
 _c103 | 2.659017721078386E10  
 _c104 | 2.779007738135295E10  
 _c105 | 2.7500772838454655E10 
 _c106 | 2.7911826974865772E10 
 _c107 | 2.647262503447497E10  
 _c108 | 2.7313896194002274E10 
 _c109 | 2.7599594839650387E10 
 _c11  | 2.763260252884661E10  
 _c110 | 2.8117858774354767E10 
 _c111 | 2.8200326407856846E10 
 _c112 | 2.7443651439188602E10 
 _c113 | 2.675779077443574E10  
 _c114 | 2.741163091779049E10  
 _c115 | 2.7994711562477608E10 
 _c116 | 2.78412241953937E10   
 _c117 | 2.6973861868808025E10 
 _c118 | 2.700770239611181E10  
 _c119 | 2.7840728612576603E10 
 _c12  | 2.5996237880158318E10 
 _c120 | 2.7221752436339066E10 
 _c121 | 2.7399963289500042E10 
 _c122 | 2.795252525090532E10  
 _c123 | 2.7386895391522438E10 
 _c124 | 2.716684720949635E10  
 _c125 |

                                                                                