In [1]:
%load_ext autoreload
%autoreload 2

# Preparation

* run the Docker container as explained in the `README.md`
* install and run Spark

```bash
# install java
mkdir -p /usr/share/man/man1
apt update
apt install -y procps default-jre

# install spark libraries
conda install -c conda-forge pyspark
pip install joblibspark

# download spark
wget https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
tar -xf spark-3.3.0-bin-hadoop3.tgz
cd spark-3.3.0-bin-hadoop3

# start master
./sbin/start-master.sh -h $HOSTNAME

# start worker
./sbin/start-worker.sh $HOSTNAME:7077
```

* you can then either run Jupyter within the container or execute the follwing lines in an `ipython` environment

# Load data

In [1]:
%%time

# make sure we are not oversubscribing CPUs
# by adjusting `n_threads`, `corals.cor_matrix` can be parallelized
from corals.threads import set_threads_for_external_libraries
set_threads_for_external_libraries(n_threads=1)

# imports
import numpy as np
import corals

# create random data
n_features = 16000
n_samples = 32
X = np.random.random((n_samples, n_features))


CPU times: user 77.9 ms, sys: 12.3 ms, total: 90.2 ms
Wall time: 96.5 ms


# Start Spark session

In [8]:
import socket
hostname = socket.gethostname()
hostname

'anes-nalab2'

In [9]:
%%time

from joblib import parallel_backend
from joblibspark import register_spark
register_spark() # register spark backend

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master(f"spark://{hostname}:7077") \
    .appName("Corals") \
    .getOrCreate()


ModuleNotFoundError: No module named 'joblibspark'

# CorALS top-k in Spark

In [76]:
from corals.correlation.topk import cor_topk

In [None]:
%%time
n_jobs = 64  # change this an observe runtime differences
with parallel_backend('spark'):
    result = cor_topk(
        X, 
        k=0.001,
        approximation_factor=10,
        correlation_type="spearman", 
        n_batches=n_jobs,
        n_jobs=n_jobs)

# Spark correlation function
Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.stat.Correlation.html

In [None]:
from pyspark.ml.linalg import DenseMatrix, Vectors
from pyspark.ml.stat import Correlation

n_partitions = 12
Xspark = [[Vectors.dense(row)] for row in X]
dataset = spark.createDataFrame(Xspark, ['features']).repartition(n_partitions)
dataset.rdd.getNumPartitions()

In [None]:
%%time
# * may throw errors (possibly heap issues; needs Spark configuration?)
# * slow
# * results in full correlation matrix in memory
cor = Correlation.corr(dataset, 'features', 'pearson').collect()[0][0].toArray()

In [None]:
%%time

from corals.correlation.utils import derive_k, argtopk

# derive a valid top-k
k = derive_k(X, Y=None, k=0.001)

# sort correlations
cor = cor.flatten()

topk_idx_flat = argtopk(-np.abs(cor), k=k, threshold=None)   

# derive topk correlation and index 
topk_cor = cor[topk_idx_flat]
topk_idx = np.unravel_index(topk_idx_flat, (X.shape[1], X.shape[1]))
    

CPU times: user 48.5 ms, sys: 0 ns, total: 48.5 ms
Wall time: 47.9 ms
