1. [Replace NaN with mode](#NaN2Mode)
* [Use sample builtin function to create sample from matrix](#sample)
* [Count of Matching Values in two Matrices/Vectors](#MatchinRows)
* [Cross Validation](#CrossValidation)
* [Value-based join of two Matrices](#JoinMatrices)
* [Filter Matrix to include only Frequent Column Values](#FilterMatrix)
* [Construct (sparse) Matrix from (rowIndex, colIndex, values) triplets](#Construct_sparse_Matrix)
* [Find and remove duplicates in columns or rows](#Find_and_remove_duplicates)
* [Set based Indexing](#Set_based_Indexing)
* [Group by Aggregate using Linear Algebra](#Multi_column_Sorting)
* [Cumulative Summation with Decay Multiplier](#CumSum_Product)
* [Invert Lower Triangular Matrix](#Invert_Lower_Triangular_Matrix)

In [15]:
from systemml import MLContext, dml, jvm_stdout
ml = MLContext(sc)

print (ml.buildTime())

2017-09-22 07:57:57 UTC


## Replace NaN with mode<a id="NaN2Mode" />

This functions replaces NaN in column with mode of column

In [13]:
prog="""
# Function for NaN-aware replacement with mode
replaceNaNwithMode = function (matrix[double] X, integer colId) 
      return (matrix[double] X) 
{
   Xi = replace (target=X[,colId], pattern=0/0, replacement=max(X[,colId])+1)   # replace NaN with largest value + 1
   agg = aggregate (target=Xi, groups=Xi, fn="count")                           # count each distinct value
   mode = as.scalar (rowIndexMax(t(agg[1:nrow(agg)-1, ])))                      # mode is max frequent value except last value
   X[,colId] = replace (target=Xi, pattern=max(Xi), replacement=mode)           # fill in mode
}

X = matrix('1 NaN 1 NaN 1 2 2 1 1 2', rows = 5, cols = 2)

Y = replaceNaNwithMode (X, 2)

print ("Before: \n" + toString(X))
print ("After: \n" + toString(Y))
"""
with jvm_stdout(True):
    ml.execute(dml(prog))

Before: 
1.000 NaN
1.000 NaN
1.000 2.000
2.000 1.000
1.000 2.000

After: 
1.000 2.000
1.000 2.000
1.000 2.000
2.000 1.000
1.000 2.000

SystemML Statistics:
Total execution time:		0.001 sec.
Number of executed Spark inst:	0.




## Use sample builtin function to create sample from matrix<a id="sample" />

Use sample() function, create permutation matrix using table(), and pull sample from X.

In [18]:
prog="""
X = matrix ('2 1 8 3 5 6 7 9 4 4', rows = 5, cols = 2 )

nbrSamples = 2

sv = order (target = sample (nrow (X), nbrSamples, FALSE))  # samples w/o replacement, and order            
P = table (seq (1, nbrSamples), sv, nbrSamples, nrow(X))    # permutation matrix
samples = P %*% X;                                          # apply P to perform selection


print ("X: \n" + toString(X))
print ("sv: \n" + toString(sv))
print ("samples: \n" + toString(samples))
"""
with jvm_stdout(True):
    ml.execute(dml(prog))

X: 
2.000 1.000
8.000 3.000
5.000 6.000
7.000 9.000
4.000 4.000

sv: 
1.000
4.000

samples: 
2.000 1.000
7.000 9.000

SystemML Statistics:
Total execution time:		0.001 sec.
Number of executed Spark inst:	0.




## Count of Matching Values in two Matrices/Vectors<a id="MatchingRows" />

Given two matrices/vectors X and Y, get a count of the rows where X and Y have the same value.

In [19]:
prog="""
X = matrix('8 4 5 4 9 10', rows = 6, cols = 1)
Y = matrix('4 9 5 1 9 7 ', rows = 6, cols = 1)

matches = sum (X == Y)

print ("t(X): " + toString(t(X)))
print ("t(Y): " + toString(t(Y)))
print ("Number of Matches: " + matches + "\n")
"""
with jvm_stdout(True):
    ml.execute(dml(prog))

t(X): 8.000 4.000 5.000 4.000 9.000 10.000

t(Y): 4.000 9.000 5.000 1.000 9.000 7.000

Number of Matches: 2.0

SystemML Statistics:
Total execution time:		0.001 sec.
Number of executed Spark inst:	0.




## Cross Validation<a id="CrossValidation" />

Perform kFold cross validation by running in parallel fold creation, training algorithm, test algorithm, and evaluation.

In [4]:
prog = """
holdOut = 1/3
kFolds = 1/holdOut

nRows = 6; nCols = 3; 

X = matrix(seq(1, nRows * nCols), rows = nRows, cols = nCols)             # X data
y = matrix(seq(1, nRows), rows = nRows, cols = 1)                         # y label data
Xy = cbind (X,y)                                                          # Xy Data for CV

sv = rand (rows = nRows, cols = 1, min = 0.0, max = 1.0, pdf = "uniform") # sv selection vector for fold creation 
sv = (order(target=sv, by=1, index.return=TRUE)) %% kFolds + 1            #    with numbers between 1 .. kFolds 

stats = matrix(0, rows=kFolds, cols=1)                                    # stats per kFolds model on test data

parfor (i in 1:kFolds)
{
   # Skip empty training data or test data. 
   if  ( sum (sv == i) > 0 & sum (sv == i) < nrow(X) )    
   {
      Xyi  = removeEmpty(target = Xy, margin = "rows", select = (sv == i))  # Xyi fold, i.e. 1/k of rows (test data)
      Xyni = removeEmpty(target = Xy, margin = "rows", select = (sv != i))  # Xyni data, i.e. (k-1)/k of rows (train data)

      # Skip extreme label inbalance
      distinctLabels = aggregate( target = Xyni[,1], groups = Xyni[,1], fn = "count")
      if ( nrow(distinctLabels) > 1)
      {
         wi = trainAlg (Xyni[ ,1:ncol(Xy)-1], Xyni[ ,ncol(Xy)])             # wi Model for i-th training data
         pi = testAlg  (Xyi [ ,1:ncol(Xy)-1], wi)                           # pi Prediction for i-th test data
         ei = evalPrediction (pi, Xyi[ ,ncol(Xy)])                          # stats[i,] evaluation of prediction of i-th fold
         stats[i,] = ei
    
         print (  "Test data Xyi" + i + "\n" + toString(Xyi)  
               + "\nTrain data Xyni" + i + "\n" + toString(Xyni)  
               + "\nw" + i + "\n" + toString(wi) 
               + "\nstats" + i + "\n" + toString(stats[i,]) 
               + "\n")
      }
      else
      {
        print ("Training data for fold " + i + " has only " + nrow(distinctLabels) + " distinct labels. Needs to be > 1.")
      }    
   } 
   else 
   {
      print ("Training data or test data for fold " + i + " is empty. Fold not validated.")
   }

}

print ("SV selection vector:\n" + toString(sv))

trainAlg = function (matrix[double] X, matrix[double] y)
  return (matrix[double] w)
{
   w = t(X) %*% y
}

testAlg = function (matrix[double] X, matrix[double] w)
  return (matrix[double] p)
{
   p = X %*% w
}

evalPrediction = function (matrix[double] p, matrix[double] y)
  return (matrix[double] e)
{
   e = as.matrix(sum (p - y))
}
"""

with jvm_stdout(True):
    ml.execute(dml(prog))

Test data Xyi2
10.000 11.000 12.000 4.000
16.000 17.000 18.000 6.000

Train data Xyni2
1.000 2.000 3.000 1.000
4.000 5.000 6.000 2.000
7.000 8.000 9.000 3.000
13.000 14.000 15.000 5.000

w2
95.000
106.000
117.000

stats2
8938.000


Test data Xyi3
1.000 2.000 3.000 1.000
7.000 8.000 9.000 3.000

Train data Xyni3
4.000 5.000 6.000 2.000
10.000 11.000 12.000 4.000
13.000 14.000 15.000 5.000
16.000 17.000 18.000 6.000

w3
209.000
226.000
243.000

stats3
6844.000


Test data Xyi1
4.000 5.000 6.000 2.000
13.000 14.000 15.000 5.000

Train data Xyni1
1.000 2.000 3.000 1.000
7.000 8.000 9.000 3.000
10.000 11.000 12.000 4.000
16.000 17.000 18.000 6.000

w1
158.000
172.000
186.000

stats1
9853.000


SV selection vector:
3.000
1.000
3.000
2.000
1.000
2.000

SystemML Statistics:
Total execution time:		0.024 sec.
Number of executed Spark inst:	0.




## Value-based join of two Matrices<a id="JoinMatrices"/>

Given matrix M1 and M2, join M1 on column 2 with M2 on column 2, and return matching rows of M1.

In [55]:
prog = """
M1 = matrix ('1 1 2 3 3 3 4 4 5 3 6 4 7 1 8 2 9 1', rows = 9, cols = 2)
M2 = matrix ('1 1 2 8 3 3 4 3 5 1', rows = 5, cols = 2)

I = rowSums (outer (M1[,2], t(M2[,2]), "=="))                  # I : indicator matrix for M1
M12 = removeEmpty (target = M1, margin = "rows", select = I)   # apply filter to retrieve join result

print ("M1 \n" + toString(M1))
print ("M2 \n" + toString(M2))
print ("M1[,2] joined with M2[,2], and return matching M1 rows\n" + toString(M12))
"""
with jvm_stdout():
    ml.execute(dml(prog))

M1 
1.000 1.000
2.000 3.000
3.000 3.000
4.000 4.000
5.000 3.000
6.000 4.000
7.000 1.000
8.000 2.000
9.000 1.000

M2 
1.000 1.000
2.000 8.000
3.000 3.000
4.000 3.000
5.000 1.000

M1[,2] joined with M2[,2], and return matching M1 rows
1.000 1.000
2.000 3.000
3.000 3.000
5.000 3.000
7.000 1.000
9.000 1.000

SystemML Statistics:
Total execution time:		0.001 sec.
Number of executed Spark inst:	0.




## Filter Matrix to include only Frequent Column Values <a id="FilterMatrix"/>

Given a matrix, filter the matrix to only include rows with column values that appear more often than MinFreq.

In [53]:
prog = """
MinFreq = 3                                                           # minimum frequency of tokens

M = matrix ('1 1 2 3 3 3 4 4 5 3 6 4 7 1 8 2 9 1', rows = 9, cols = 2)

gM = aggregate (target = M[,2], groups = M[,2], fn = "count")         # gM: group by and count (grouped matrix)
gv = cbind (seq(1,nrow(gM)), gM)                                      # gv: add group values to counts (group values)
fg = removeEmpty (target = gv * (gv[,2] >= MinFreq), margin = "rows") # fg: filtered groups
I = rowSums (outer (M[,2] ,t(fg[,1]), "=="))                          # I : indicator of size M with filtered groups
fM = removeEmpty (target = M, margin = "rows", select = I)            # FM: filter matrix

print (toString(M))
print (toString(fM))
"""
with jvm_stdout():
    ml.execute(dml(prog))

1.000 1.000
2.000 3.000
3.000 3.000
4.000 4.000
5.000 3.000
6.000 4.000
7.000 1.000
8.000 2.000
9.000 1.000

1.000 1.000
2.000 3.000
3.000 3.000
5.000 3.000
7.000 1.000
9.000 1.000

SystemML Statistics:
Total execution time:		0.001 sec.
Number of executed Spark inst:	0.




## Construct (sparse) Matrix from (rowIndex, colIndex, values) triplets<a id="Construct_sparse_Matrix"></a>

Given rowIndex, colIndex, and values as column vectors, construct (sparse) matrix.

In [None]:
prog = """
I = matrix ("1 3 3 4 5", rows = 5, cols = 1)
J = matrix ("2 3 4 1 6", rows = 5, cols = 1)
V = matrix ("10 20 30 40 50", rows = 5, cols = 1)

M = table (I, J, V)
print (toString (M))
"""
ml.execute(dml(prog).output('M')).get('M').toNumPy()

## Find and remove duplicates in columns or rows<a id="Find_and_remove_duplicates"></a>

### Assuming values are sorted.

In [None]:
prog = """
X = matrix ("1 2 3 3 3 4 5 10", rows = 8, cols = 1)

I = rbind (matrix (1,1,1), (X[1:nrow (X)-1,] != X[2:nrow (X),]));             # compare current with next value
res = removeEmpty (target = X, margin = "rows", select = I);                  # select where different
"""
ml.execute(dml(prog).output('res')).get('res').toNumPy()

### No assumptions on values.

In [None]:
prog = """
X = matrix ("3 2 1 3 3 4 5 10", rows = 8, cols = 1)

I = aggregate (target = X, groups = X[,1], fn = "count")                                 # group and count duplicates
res = removeEmpty (target = seq (1, max (X[,1])), margin = "rows", select = (I != 0));   # select groups
"""
ml.execute(dml(prog).output('res')).get('res').toNumPy()

### Order the values and then remove duplicates.

In [None]:
prog = """
X = matrix ("3 2 1 3 3 4 5 10", rows = 8, cols = 1)

X = order (target = X, by = 1)                                      # order values
I = rbind (matrix (1,1,1), (X[1:nrow (X)-1,] != X[2:nrow (X),]));
res = removeEmpty (target = X, margin = "rows", select = I);
"""
ml.execute(dml(prog).output('res')).get('res').toNumPy()

## Set based Indexing<a id="Set_based_Indexing"></a>

Given a matrix X, and a indicator matrix J with indices into X. 
Use J to perform operation on X, e.g. add value 10 to cells in X indicated by J. 

In [None]:
prog = """
X = matrix (1, rows = 1, cols = 100)
J = matrix ("10 20 25 26 28 31 50 67 79", rows = 1, cols = 9)

res = X + table (matrix (1, rows = 1, cols = ncol (J)), J, 10)

print (toString (res))
"""
ml.execute(dml(prog).output('res')).get('res').toNumPy()

## Group by Aggregate using Linear Algebra<a id="Multi_column_Sorting"></a>

Given a matrix PCV as (Position, Category, Value), sort PCV by category, and within each category by value in descending order. Create indicator vector for category changes, create distinct categories, and perform linear algebra operations.

In [None]:
prog = """
C = matrix ('50 40 20 10 30 20 40 20 30', rows = 9, cols = 1)                              # category data
V = matrix ('20 11 49 33 94 29 48 74 57', rows = 9, cols = 1)                              # value data

PCV = cbind (cbind (seq (1, nrow (C), 1), C), V);                                      # PCV representation
PCV = order (target = PCV, by = 3, decreasing = TRUE,  index.return = FALSE);
PCV = order (target = PCV, by = 2, decreasing = FALSE, index.return = FALSE);

# Find all rows of PCV where the category has a new value, in comparison to the previous row

is_new_C = matrix (1, rows = 1, cols = 1);
if (nrow (C) > 1) {
  is_new_C = rbind (is_new_C, (PCV [1:nrow(C) - 1, 2] < PCV [2:nrow(C), 2]));
}

# Associate each category with its index

index_C = cumsum (is_new_C);                                                          # cumsum

# For each category, compute:
#   - the list of distinct categories
#   - the maximum value for each category
#   - 0-1 aggregation matrix that adds records of the same category

distinct_C  = removeEmpty (target = PCV [, 2], margin = "rows", select = is_new_C);
max_V_per_C = removeEmpty (target = PCV [, 3], margin = "rows", select = is_new_C);
C_indicator = table (index_C, PCV [, 1], max (index_C), nrow (C));                    # table

sum_V_per_C = C_indicator %*% V
"""

res = ml.execute(dml(prog).output('PCV','distinct_C', 'max_V_per_C', 'C_indicator', 'sum_V_per_C'))
print (res.get('PCV').toNumPy())
print (res.get('distinct_C').toNumPy())
print (res.get('max_V_per_C').toNumPy())
print (res.get('C_indicator').toNumPy())
print (res.get('sum_V_per_C').toNumPy())

## Cumulative Summation with Decay Multiplier<a id="CumSum_Product"></a>

Given matrix X, compute:

    Y[i] = X[i]
         + X[i-1] * C[i]
         + X[i-2] * C[i] * C[i-1]
         + X[i-3] * C[i] * C[i-1] * C[i-2]
         + ...


In [None]:
cumsum_prod_def = """
cumsum_prod = function (Matrix[double] X, Matrix[double] C, double start)
  return (Matrix[double] Y)
# Computes the following recurrence in log-number of steps:
#   Y [1, ] = X [1, ] + C [1, ] * start;
#   Y [i+1, ] = X [i+1, ] + C [i+1, ] * Y [i, ]
{
   Y = X;  P = C;  m = nrow(X);  k = 1;
   Y [1,] = Y [1,] + C [1,] * start;
   while (k < m) {
     Y [k + 1:m,] = Y [k + 1:m,] + Y [1:m - k,] * P [k + 1:m,];
     P [k + 1:m,] = P [1:m - k,] * P [k + 1:m,];
     k = 2 * k;
   }
}
"""

In this example we use cumsum_prod for cumulative summation with "breaks", that is, multiple cumulative summations in one.

In [None]:
prog = cumsum_prod_def + """
X = matrix ("1 2 3 4 5 6 7 8 9", rows = 9, cols = 1);

#Zeros in C cause "breaks" that restart the cumulative summation from 0
C = matrix ("0 1 1 0 1 1 1 0 1", rows = 9, cols = 1);

Y = cumsum_prod (X, C, 0);

print (toString(Y))
"""
with jvm_stdout():
    ml.execute(dml(prog))

In this example, we copy selected rows downward to all consecutive non-selected rows.

In [None]:
prog = cumsum_prod_def + """
X = matrix ("1 2 3 4 5 6 7 8 9", rows = 9, cols = 1);

# Ones in S represent selected rows to be copied, zeros represent non-selected rows
S = matrix ("1 0 0 1 0 0 0 1 0", rows = 9, cols = 1);

Y = cumsum_prod (X * S, 1 - S, 0);

print (toString(Y))
"""
with jvm_stdout():
    ml.execute(dml(prog))

This is a naive implementation of cumulative summation with decay multiplier.

In [None]:
cumsum_prod_naive_def = """
cumsum_prod_naive = function (Matrix[double] X, Matrix[double] C, double start)
  return (Matrix[double] Y)
{
  Y = matrix (0, rows = nrow(X), cols = ncol(X));
  Y [1,] = X [1,] + C [1,] * start;
  for (i in 2:nrow(X))
  {
    Y [i,] = X [i,] + C [i,] * Y [i - 1,]
  }
}
"""

There is a significant performance difference between the <b>naive</b> implementation and the <b>tricky</b> implementation.

In [None]:
prog = cumsum_prod_def + cumsum_prod_naive_def + """
X = rand (rows = 20000, cols = 10, min = 0, max = 1, pdf = "uniform", sparsity = 1.0);
C = rand (rows = 20000, cols = 10, min = 0, max = 1, pdf = "uniform", sparsity = 1.0);

Y1 = cumsum_prod_naive (X, C, 0.123);
"""
with jvm_stdout():
    ml.execute(dml(prog))

In [None]:
prog = cumsum_prod_def + cumsum_prod_naive_def + """
X = rand (rows = 20000, cols = 10, min = 0, max = 1, pdf = "uniform", sparsity = 1.0);
C = rand (rows = 20000, cols = 10, min = 0, max = 1, pdf = "uniform", sparsity = 1.0);

Y2 = cumsum_prod (X, C, 0.123);
"""
with jvm_stdout():
    ml.execute(dml(prog))

## Invert Lower Triangular Matrix<a id="Invert_Lower_Triangular_Matrix"></a>

In this example, we invert a lower triangular matrix using a the following divide-and-conquer approach. Given lower triangular matrix L, we compute its inverse X which is also lower triangular by splitting both matrices in the middle into 4 blocks (in a 2x2 fashion), and multiplying them together to get the identity matrix:

\begin{equation}
L \text{ %*% } X = \left(\begin{matrix} L_1 & 0 \\ L_2 & L_3 \end{matrix}\right)
\text{ %*% } \left(\begin{matrix} X_1 & 0 \\ X_2 & X_3 \end{matrix}\right)
= \left(\begin{matrix} L_1 X_1 & 0 \\ L_2 X_1 + L_3 X_2 & L_3 X_3 \end{matrix}\right)
= \left(\begin{matrix} I & 0 \\ 0 & I \end{matrix}\right)
\nonumber
\end{equation}

If we multiply blockwise, we get three equations: 

$
\begin{equation}
L1 \text{ %*% } X1 = 1\\ 
L3 \text{ %*% } X3 = 1\\
L2 \text{ %*% } X1 + L3 \text{ %*% } X2 = 0\\
\end{equation}
$

Solving these equation gives the following formulas for X:

$
\begin{equation}
X1 = inv(L1) \\
X3 = inv(L3) \\
X2 = - X3 \text{ %*% } L2 \text{ %*% } X1 \\
\end{equation}
$

If we already recursively inverted L1 and L3, we can invert L2.  This suggests an algorithm that starts at the diagonal and iterates away from the diagonal, involving bigger and bigger blocks (of size 1, 2, 4, 8, etc.)  There is a logarithmic number of steps, and inside each step, the inversions can be performed in parallel using a parfor-loop.

Function "invert_lower_triangular" occurs within more general inverse operations and matrix decompositions.  The divide-and-conquer idea allows to derive more efficient algorithms for other matrix decompositions.


In [None]:
invert_lower_triangular_def = """
invert_lower_triangular = function (Matrix[double] LI)
  return   (Matrix[double] LO)
{
  n = nrow (LI);
  LO = matrix (0, rows = n, cols = n);
  LO = LO + diag (1 / diag (LI));
  
  k = 1;
  while (k < n)
  {
    LPF = matrix (0, rows = n, cols = n);
    parfor (p in 0:((n - 1) / (2 * k)), check = 0)
    {
    i = 2 * k * p;
    j = i + k;
    q = min (n, j + k);
    if (j + 1 <= q) {
      L1 = LO [i + 1:j, i + 1:j];
      L2 = LI [j + 1:q, i + 1:j];
      L3 = LO [j + 1:q, j + 1:q];
      LPF [j + 1:q, i + 1:j] = -L3 %*% L2 %*% L1;
    }
    }
    LO = LO + LPF;
    k = 2 * k;
  }
}
"""

In [None]:
prog =  invert_lower_triangular_def + """
n = 1000;
A = rand (rows = n, cols = n, min = -1, max = 1, pdf = "uniform", sparsity = 1.0);
Mask = cumsum (diag (matrix (1, rows = n, cols = 1)));

L = (A %*% t(A)) * Mask;     # Generate L for stability of the inverse

X = invert_lower_triangular (L);

print ("Maximum difference between X %*% L and Identity = " + max (abs (X %*% L - diag (matrix (1, rows = n, cols = 1)))));
"""
with jvm_stdout():
    ml.execute(dml(prog))

This is a naive implementation of inverting a lower triangular matrix.

In [None]:
invert_lower_triangular_naive_def = """
invert_lower_triangular_naive = function (Matrix[double] LI)
  return   (Matrix[double] LO)
{
  n = nrow (LI);
  LO = diag (matrix (1, rows = n, cols = 1));
  for (i in 1:n - 1)
  {
    LO [i,] = LO [i,] / LI [i, i];
    LO [i + 1:n,] = LO [i + 1:n,] - LI [i + 1:n, i] %*% LO [i,];
  }
  LO [n,] = LO [n,] / LI [n, n];
}
"""

The naive implementation is significantly slower than the divide-and-conquer implementation.

In [None]:
prog =  invert_lower_triangular_naive_def + """
n = 1000;
A = rand (rows = n, cols = n, min = -1, max = 1, pdf = "uniform", sparsity = 1.0);
Mask = cumsum (diag (matrix (1, rows = n, cols = 1)));

L = (A %*% t(A)) * Mask;     # Generate L for stability of the inverse

X = invert_lower_triangular_naive (L);

print ("Maximum difference between X %*% L and Identity = " + max (abs (X %*% L - diag (matrix (1, rows = n, cols = 1)))));
"""
with jvm_stdout():
    ml.execute(dml(prog))