In [0]:
import numpy as np
from operator import add

edgesRDD = sc.textFile('/FileStore/tables/comp4651-project/musae_squirrel_edges.csv', 8)
header = edgesRDD.first()
edgesRDD = (
  edgesRDD.filter(lambda line: line != header)
      .map(lambda line: tuple(map(int, line.split(','))))
)

def join_list(list1, list2):
  if list1 is None:
    return list2
  if list2 is None:
    return list1
  else:
    return list1+list2

nodesRDD = (
  edgesRDD.groupByKey().mapValues(list)
    .fullOuterJoin(edgesRDD.map(lambda kv: (kv[1], kv[0])).groupByKey().mapValues(list))
    .mapValues(lambda v: join_list(v[0], v[1]))
).cache()

In [0]:
# PART 2.1 - node distance estimation by landmark approach

allNodes = nodesRDD.sortByKey().map(lambda kv: kv[0]).collect()
numNodes = len(allNodes)
nodeIdToPos = sc.broadcast(dict(zip(allNodes, range(numNodes))))
allNodes = sc.broadcast(np.array(allNodes))

landmarkNodes = nodesRDD.mapValues(len).sortBy(lambda kv: kv[1], ascending=False).map(lambda kv: kv[0]).take(100) # use top 100 nodes with highest degree as 
landmarkNodes = np.array(landmarkNodes)

In [0]:
def BFS(node_neighbors_dist):
  node = node_neighbors_dist[0]
  neighbors = node_neighbors_dist[1][0]
  dist = node_neighbors_dist[1][1]
  for n in neighbors:
    yield (n, dist+1)
  yield (node, dist)

In [0]:
dists = nodesRDD.map(lambda node: (node[0], np.where(landmarkNodes == node[0], 0, np.inf)))

for i in range(15):
  update = (
    nodesRDD
      .join(dists, 8)
      .flatMap(BFS)
  )
  dists = update.reduceByKey(np.minimum, 8)

In [0]:
def cartesian_to_dist_pairs(node_dists):
  node1 = node_dists[0][0]
  node2 = node_dists[1][0]
  if (node1 == node2):
    return (node1, (node2, 0))

  dists1 = node_dists[0][1]
  dists2 = node_dists[1][1]
  dist = np.min(dists1+dists2)
  return (node1, (node2, dist))

def seqOp(x, y):
  node_pos = nodeIdToPos.value[y[0]]
  dist = y[1]
  x[node_pos] = dist
  return x

distToLandmarks = dists.coalesce(4)
allDistEst = (
  distToLandmarks
    .cartesian(distToLandmarks)
    .map(cartesian_to_dist_pairs)
    .aggregateByKey(np.zeros(numNodes), seqOp, lambda x,y: x+y, 8)
).cache()

coverage = (
  allDistEst
    .mapValues(lambda dists: np.ma.masked_invalid(dists).count()/numNodes)
    .map(lambda kv: kv[1])
    .reduce(add)
)/numNodes

print('estimation coverage:', coverage)

In [0]:
# PART 2.2 - Actual Node distance

dists = nodesRDD.map(lambda node: (node[0], np.where(allNodes.value == node[0], 0, np.inf)))

for i in range(15):
  update = (
    nodesRDD
      .join(dists, 8)
      .flatMap(BFS)
  )
  dists = update.reduceByKey(np.minimum, 8)
  
allDist = dists.cache()

coverage = (
  allDist
    .mapValues(lambda dists: np.ma.masked_invalid(dists).count()/numNodes)
    .map(lambda kv: kv[1])
    .reduce(add)
)/numNodes

print('regular coverage:', coverage)

In [0]:
# PART 2.3 - accuracy of estimation

allDistEst_Full = allDistEst.join(allDist, 8).cache()

ME = (
  allDistEst_Full
    .mapValues(lambda value: np.ma.masked_invalid(value[0] - value[1]).mean())
    .map(lambda kv: kv[1])
    .reduce(add)
) / numNodes

errorSD = (
  allDistEst_Full
    .mapValues(lambda value: np.ma.masked_invalid(value[0] - value[1]).std())
    .map(lambda kv: kv[1])
    .reduce(add)
) / numNodes

print('mean error is:', ME)
print('sd of error:', errorSD)

In [0]:
allDistCombined = allDistEst_Full.mapValues(lambda dist1_dist2: np.minimum(dist1_dist2[0], dist1_dist2[1])).cache()

coverage = (
  allDistCombined
    .mapValues(lambda dists: np.ma.masked_invalid(dists).count()/numNodes)
    .map(lambda kv: kv[1])
    .reduce(add)
)/numNodes

meanDist = (
  allDistCombined
    .mapValues(lambda dists: np.ma.masked_invalid(dists).mean())
)

networkAvgDist = (
  meanDist
    .map(lambda kv: kv[1])
    .reduce(add)
)/numNodes

print('mean distance of network', networkAvgDist)

In [0]:
#PART 2.3 Distribution of mean dist
from pyspark.sql.functions import expr, udf, stddev, mean, percentile_approx
import re

distDF = meanDist.mapValues(float).toDF(['id', 'dist']).withColumn('closeness', expr('1/dist'))

networkDistSd = distDF.select(stddev('dist')).collect()[0][0]
print('sd of mean distance of network', networkDistSd)
networkDistMd = distDF.select(percentile_approx('dist', 0.5)).collect()[0][0]
print('median distance of network', networkDistMd)

from scipy.stats import gamma

distStat = (
  distDF
    .withColumn('group', expr('int(dist/0.1)'))
    .groupBy('group')
    .count()
    .selectExpr('group * 0.1 as dist','count as freq')
    .orderBy('dist')
    .withColumn('logD', expr('log10(dist)'))
    .withColumn('p', expr('freq/{}'.format(numNodes)))
    .withColumn('pdf', expr('p*10'))
    .withColumn('cdf', expr('sum(p) over (order by dist)'))
)


In [0]:
fitgamma_pdf = udf(lambda x: float(gamma.pdf(x, ((networkDistMd-1)/networkDistSd) ** 2, 1, scale = (networkDistSd**2)/(networkDistMd-1))))
fitgamma_cdf = udf(lambda x: float(gamma.cdf(x, ((networkDistMd-1)/networkDistSd) ** 2, 1, scale = (networkDistSd**2)/(networkDistMd-1))))

display(
  distStat
    .withColumn('fitted_pdf', fitgamma_pdf(distStat['dist'].cast('double')))
    .withColumn('fitted_cdf', fitgamma_cdf(distStat['dist'].cast('double')))
)

dist,freq,logD,p,pdf,cdf,fitted_pdf,fitted_cdf
1.9,2,0.2787536009528289,0.0003845414343395501,0.0038454143433955,0.0003845414343395501,0.1937808421150148,0.0330853250586766
2.1,9,0.3222192947339193,0.0017304364545279,0.0173043645452797,0.0021149778888675,0.3665438269004806,0.0887381114301517
2.2,54,0.3424226808222063,0.0103826187271678,0.1038261872716785,0.0124975966160353,0.4537500485766229,0.1298017919169618
2.3,117,0.3617278360175928,0.0224956739088636,0.2249567390886367,0.034993270524899,0.531614472209085,0.1791750131559525
2.4,658,0.380211241711606,0.126514131897712,1.26514131897712,0.161507402422611,0.5943110989671333,0.2356167274131552
2.5,197,0.3979400086720376,0.0378773312824456,0.3787733128244568,0.1993847337050567,0.6380873355659751,0.297404053411192
2.6,546,0.414973347970818,0.1049798115746971,1.0497981157469718,0.3043645452797539,0.6613978699485185,0.3625497668769113
2.7,617,0.4313637641589873,0.1186310324937512,1.186310324937512,0.4229955777735051,0.6646855216359194,0.4290144628505045
2.8,484,0.4471580313422192,0.0930590271101711,0.9305902711017112,0.5160546048836763,0.6499448541682575,0.4948847879149574
2.9,266,0.4623979978989561,0.0511440107671601,0.5114401076716015,0.5671986156508364,0.62019806121389,0.5585026665109263


In [0]:
closenessStat = (
  distDF
    .withColumn('group', expr('int(closeness/0.02)'))
    .groupBy('group')
    .count()
    .selectExpr('group * 0.02 as closeness','count as freq')
    .orderBy('closeness')
    .withColumn('logC', expr('log10(closeness)'))
    .withColumn('p', expr('freq/{}'.format(numNodes)))
    .withColumn('pdf', expr('p*50'))
    .withColumn('cdf', expr('sum(p) over (order by closeness)'))
)

In [0]:
display(closenessStat)

closeness,freq,logC,p,pdf,cdf
0.14,4,-0.8538719643217619,0.0007690828686791001,0.038454143433955,0.0007690828686791001
0.16,7,-0.7958800173440752,0.0013458950201884,0.0672947510094212,0.0021149778888675
0.18,47,-0.744727494896694,0.0090367237069794,0.4518361853489713,0.0111517015958469
0.2,141,-0.6989700043360187,0.0271101711209382,1.3555085560469142,0.0382618727167852
0.22,167,-0.6575773191777937,0.0321092097673524,1.6054604883676216,0.0703710824841376
0.24,302,-0.619788758288394,0.058065756585272,2.9032878292636037,0.1284368390694097
0.26,336,-0.585026652029182,0.0646029609690444,3.2301480484522207,0.1930398000384541
0.28,413,-0.5528419686577808,0.079407806191117,3.970390309555855,0.2724476062295712
0.3,503,-0.5228787452803376,0.0967121707363968,4.8356085368198425,0.369159776965968
0.32,459,-0.494850021680094,0.0882522591809267,4.412612959046338,0.4574120361468947


In [0]:
# meanDist.map(lambda kv: '{} {}'.format(kv[0], kv[1])).coalesce(1).saveAsTextFile('/FileStore/tables/comp4651-project/output')

In [0]:
((networkDistMd-1)/networkDistSd) ** 2, 1, (networkDistSd**2)/(networkDistMd-1)