In [1]:
from sklearn import datasets
import math as ma
import numpy as np
from pyspark.sql import types as t
from pyspark.sql import functions as f

In [2]:
digits = datasets.load_digits(n_class=6)

data = digits.data

In [3]:
# create spark data frame and attach the original indices to each row. "feature" is image vals, "id" is original idx
df = spark.createDataFrame(sc.parallelize(data.tolist()).zipWithIndex()).toDF("features",
                   "id").repartition("id")
df.cache()


DataFrame[features: array<double>, id: bigint]

In [4]:
euclidean = lambda x,y:ma.sqrt(np.sum((np.array(x)-np.array(y))**2))
data_bc = sc.broadcast(df.sort("id").select("features").rdd.collect())

In [5]:
# create the distance metric of each image to all other images
def pairwise_metric1(y):
    dist = []
    for x in data_bc.value:
        dist += [ma.sqrt(np.sum((np.array(x)-np.array(y))**2))]

    return(dist)

udf_dist1 = f.udf(pairwise_metric1, t.ArrayType(t.DoubleType()))

df = df.withColumn("D", udf_dist1("features"))

In [6]:
n,p = data.shape
dim = 2
X = np.random.rand(n,dim)

In [7]:
# randomly initialize a solution for the pivot point. X is 2 dimensional X num images
dfrand = spark.createDataFrame(sc.parallelize(X.tolist()).zipWithIndex()).toDF("X", 
                     "id2").repartition("id2")

In [8]:
# combine rand with the original and match heir ids
df = df.join(dfrand, df.id==dfrand.id2, "inner").drop("id1")

In [9]:
def pairwise_metric2(y):
    dist = []
    for x in X_bc.value:
        dist += [ma.sqrt(np.sum((np.array(x)-np.array(y))**2))]
    return(dist)

In [10]:
# create the matrix B
def B(id,x,y):

    y,x = np.array(y), np.array(x) 
    y[y==0.0] = np.inf
    z = -x/y

    z[id] = -(np.sum(z)-z[id])
    return(z.tolist())

In [11]:
# function for matrix multiplication using outer multiplication
def df_mult(df, col1, col2, n1, n2, matrix=True):

    udf_mult = f.udf(lambda x,y:np.outer(np.array(x), 
                  np.array(y)).flatten().tolist(),
                   t.ArrayType(t.DoubleType()))

    df = df.withColumn("mult", udf_mult(col1, col2))
    df = df.agg(f.array([f.sum(f.col("mult")[i]) 
             for i in range(n1*n2)])).toDF("mult")
    if not matrix:
        return(df)
    st = t.ArrayType(t.StructType(
                [t.StructField("id",t.LongType()),
                 t.StructField("row",t.ArrayType(
                 t.DoubleType()))]))
    udf_arange = (f.udf(lambda x:[(i,j.tolist()) 
                  for i,j in enumerate(np.array(x).
                       reshape(n1,n2)/n1)], st))

    df = (df.withColumn("mult", 
               udf_arange("mult")).select(
               f.explode("mult").alias("mult")))

    df = (df.select(f.col("mult.id").alias("id2"),
                      f.col("mult.row").
                      alias("X_min")).
                      repartition("id2"))
    return(df)

In [12]:
udf_B = f.udf(B, t.ArrayType(t.DoubleType()))
udf_sigma = (f.udf(lambda x,y: float(np.sum((
                 np.array(x)-np.array(y))**2)), 
                 t.DoubleType()))
sigma_old = np.inf
tol = 1e-4
max_iter = 1000


In [None]:
for i in range(max_iter):
    # list of all x vals sorted by id
    X_bc = sc.broadcast(df.sort("id").select("X").rdd.collect())
    def pairwise_metric2(y):
        dist = []
        for x in X_bc.value:
            dist += [ma.sqrt(np.sum((np.array(x)-np.array(y))**2))]
        return(dist)
    udf_dist2 = f.udf(pairwise_metric2, t.ArrayType(t.DoubleType()))
    # create new column di for distances
    df = df.withColumn("di", udf_dist2("X"))
    
    # get sigma between D and di, sum of squared error
    df = df.withColumn("sigma", udf_sigma("D","di"))
    sigma_new = df.agg({"sigma":"sum"}).collect()[0][0]
    print(sigma_old, sigma_new)
    sigma_old = sigma_new
    # compute matrix B to D and di
    df = df.withColumn("B", udf_B("id","D","di")).drop("di")

    X_min = df_mult(df, "B", "X", n, dim)

    df = df.join(X_min, df.id==X_min.id2).select("id", "D", f.col("X_min").alias("X"))
    # cache action will prevent recreation of dataframe from base
    df.cache()

In [13]:
X_bc = sc.broadcast(df.sort("id").select("X").rdd.collect())

In [14]:
def pairwise_metric2(y):
    dist = []
    for x in X_bc.value:
        dist += [ma.sqrt(np.sum((np.array(x)-np.array(y))**2))]
    return(dist)

In [15]:
udf_dist2 = f.udf(pairwise_metric2, t.ArrayType(t.DoubleType()))
# create new column di for distances of each x to all other x
df = df.withColumn("di", udf_dist2("X"))

In [16]:
df = df.withColumn("sigma", udf_sigma("D","di"))

In [17]:
df = df.withColumn("B", udf_B("id","D","di")).drop("di")

In [18]:
X_min = df_mult(df, "B", "X", n, dim)

In [19]:
df = df.join(X_min, df.id==X_min.id2).select("id", "D", f.col("X_min").alias("X"))

In [20]:
df.cache()

DataFrame[id: bigint, D: array<double>, X: array<double>]

In [22]:
df

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:54874)
Traceback (most recent call last):
  File "C:\spark\spark-3.1.1-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1193, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\spark\spark-3.1.1-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "C:\spark\spark-3.1.1-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1196, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:54874)

DataFrame[id: bigint, D: array<double>, X: array<double>]

In [None]:
X_bc.value[0]

In [None]:
X_bc.value

In [46]:
df_collect1= df.sort("id").select("X").rdd.collect()

In [47]:
df_collect1

[Row(X=[0.3021614460216844, 0.389234008578758]),
 Row(X=[0.281942197876458, 0.9470245547885052]),
 Row(X=[0.2304309236140778, 0.3998166285538577]),
 Row(X=[0.8811913658799089, 0.5847002645249081]),
 Row(X=[0.8645921362801918, 0.9450042062663913]),
 Row(X=[0.161339613250853, 0.11477844361755207]),
 Row(X=[0.12976102329824002, 0.4839044247858091]),
 Row(X=[0.614337105299198, 0.7175342558350419]),
 Row(X=[0.7580417160739605, 0.786139518272637]),
 Row(X=[0.1195431197937592, 0.464277565137982]),
 Row(X=[0.6693623831177395, 0.595148409791477]),
 Row(X=[0.008290015966010977, 0.26385069085869306]),
 Row(X=[0.1026297733432594, 0.2644045450325565]),
 Row(X=[0.8434605204936574, 0.5309702102447524]),
 Row(X=[0.05457071667329749, 0.24078129339871535]),
 Row(X=[0.009596349232605106, 0.4149218914229653]),
 Row(X=[0.47777352304495024, 0.5512209463208159]),
 Row(X=[0.10734762713911195, 0.6892074630493771]),
 Row(X=[0.3280628931110091, 0.17001502275907276]),
 Row(X=[0.9835621667210093, 0.274460551446097

In [5]:
dfcollect = df.collect()

In [7]:
np.shape(dfcollect)

  return array(a, dtype, copy=False, order=order)


(1083, 2)

In [8]:
dfcollect

[Row(features=[0.0, 1.0, 15.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 16.0, 16.0, 16.0, 14.0, 2.0, 0.0, 0.0, 6.0, 16.0, 11.0, 8.0, 8.0, 3.0, 0.0, 0.0, 5.0, 16.0, 11.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 11.0, 14.0, 14.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 16.0, 7.0, 0.0, 0.0, 0.0, 0.0, 6.0, 16.0, 16.0, 4.0, 0.0, 0.0, 0.0, 0.0, 14.0, 14.0, 4.0, 0.0, 0.0, 0.0], id=26),
 Row(features=[0.0, 0.0, 1.0, 15.0, 13.0, 1.0, 0.0, 0.0, 0.0, 0.0, 7.0, 16.0, 14.0, 8.0, 0.0, 0.0, 0.0, 8.0, 12.0, 9.0, 2.0, 13.0, 2.0, 0.0, 0.0, 7.0, 9.0, 1.0, 0.0, 6.0, 6.0, 0.0, 0.0, 5.0, 9.0, 0.0, 0.0, 3.0, 9.0, 0.0, 0.0, 0.0, 15.0, 2.0, 0.0, 8.0, 12.0, 0.0, 0.0, 0.0, 9.0, 15.0, 13.0, 16.0, 6.0, 0.0, 0.0, 0.0, 0.0, 13.0, 14.0, 8.0, 0.0, 0.0], id=29),
 Row(features=[0.0, 0.0, 6.0, 16.0, 16.0, 8.0, 0.0, 0.0, 0.0, 2.0, 16.0, 8.0, 9.0, 16.0, 3.0, 0.0, 0.0, 8.0, 16.0, 1.0, 0.0, 9.0, 9.0, 0.0, 0.0, 9.0, 12.0, 0.0, 0.0, 8.0, 12.0, 0.0, 0.0, 10.0, 12.0, 0.0, 0.0, 8.0, 10.0, 0.0, 0.0, 8.0, 13.0, 0.0, 0.0, 9.0, 8.0, 0.0, 0.0, 2.0, 16.0, 

In [11]:
dfcollect[0]

Row(features=[0.0, 1.0, 15.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 16.0, 16.0, 16.0, 14.0, 2.0, 0.0, 0.0, 6.0, 16.0, 11.0, 8.0, 8.0, 3.0, 0.0, 0.0, 5.0, 16.0, 11.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 11.0, 14.0, 14.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 16.0, 7.0, 0.0, 0.0, 0.0, 0.0, 6.0, 16.0, 16.0, 4.0, 0.0, 0.0, 0.0, 0.0, 14.0, 14.0, 4.0, 0.0, 0.0, 0.0], id=26)

In [15]:
# first need convert to rdd
df.rdd.getNumPartitions()

200

In [27]:
df.first()[2]

[47.0,
 44.31703961232068,
 54.04627646748664,
 51.22499389946279,
 57.245087125446844,
 43.57751713900185,
 48.67237409455183,
 53.38539126015655,
 42.9767378938886,
 50.49752469181039,
 52.10566188045211,
 35.79106033634656,
 49.8196748283246,
 54.064775963653084,
 46.22769732530488,
 50.92150822589606,
 54.80875842417888,
 26.94438717061496,
 52.10566188045211,
 42.68489194082609,
 26.40075756488817,
 24.24871130596428,
 42.941821107167776,
 48.61069841094653,
 49.58830507286975,
 53.59104402789705,
 0.0,
 46.9148164229596,
 48.662100242385755,
 51.51698748956503,
 53.86093203798092,
 56.480084985771754,
 58.54912467321779,
 49.0,
 49.37610758251404,
 53.842362503887216,
 45.44227107000705,
 55.344376408086845,
 52.65928218272634,
 45.0111097397076,
 54.29548784199291,
 44.27188724235731,
 45.155287619502545,
 38.23610858861032,
 49.35585071701227,
 33.689761055846034,
 59.0508255657785,
 61.48983655857283,
 51.43928459844674,
 49.90991885387112,
 51.332251070842396,
 49.08156476723