# Ingesting data

This notebook is responsible for ingesting the data from

https://snap.stanford.edu/data/egonets-Facebook.html

into spark. This is an ego-network. Such a network is defined here http://www.analytictech.com/networks/egonet.htm:

Ego networks consist of a focal node ("ego") and the nodes to whom ego is directly connected to (these are called "alters") plus the ties, if any, among the alters. Of course, each alter in an ego network has his/her own ego network, and all ego networks interlock to form The human social network.

A description of the dataset can be found here https://snap.stanford.edu/data/readme-Ego.txt.

Retrieve the names corresponding to the features.

In [1]:
features = dict()

featNames = open("./facebook/0.featnames", "r")

while True:
    line = featNames.readline()
    
    if line == '':
        break
    
    featCat = line.split(";")[0]

    featInd = int(featCat.split(" ")[0])
    featName = featCat.split(" ")[1]
    
    if featName in features:
        features[featName].append(featInd)
    else:
        features[featName] = [featInd]
    
print features["birthday"]

[0, 1, 2, 3, 4, 5, 6, 7]


Now, store a N x [# features] matrix decribing the vertices and their corresponding features.

In [2]:
featMat = []

nodeDetails = open("./facebook/0.feat", "r")

while True:
    line = nodeDetails.readline()
    
    if line == '':
        break
        
    data = line.split()
    
    # remove the vertex index
    data.pop(0)
    
    intData = [int(d) for d in data]
    
    featMat.append(intData)

In [3]:
numNeighbors = len(featMat)
numFeatures = len(featMat[0])
print numNeighbors, " ", numFeatures

347   224


So, our starting node has 347 neighbors, each neighbor having 0,1,2 or more of the 224 available features.

Next, we need to process the circle data. A circle contains the ego node and all the other nodes appearing on a specific line. We'll store this data as a dict.

In [4]:
circles = dict()

circleDetails = open("./facebook/0.circles", "r")

ind = 0

while True:
    line = circleDetails.readline()
    
    if line == '':
        break
        
    data = line.split()
    data.pop(0)
    
    data = [int(d) for d in data]
    
    circles[ind] = data
    
    ind = ind + 1

print circles[0]
print len(circles)

[71, 215, 54, 61, 298, 229, 81, 253, 193, 97, 264, 29, 132, 110, 163, 259, 183, 334, 245, 222]
24


We found 24 circles.

We've parsed all the data - feature & circle information. 

To recap

- feature names are stored in 'features'. features["feature_name"] will return a list of vertices having that feature
- feature information is stored in the binary matrix 'featMat'. featMat[i,j] = 0 if node i doesn't have feature j and 1 otherwise
- circle information is stored in 'circles'. circle[i] returns a list of vertices being part of a specific circle

# Setting up the two stage submodular problem

For this graph, we'll set up $F$ as being a sum of coverage functions.

Each $f_i$ will be a coverage function corresponding to feature name $i$. So, for example $f_{birthday}$. In this example, there are $8$ features contained in the $birthday$ category. So, for a given set $S$, the value of $f_{birthday}$ will be the number of features from $0$ to $7$ that are covered by at least one vertex in $S$.

So, we are ignoring circles for now.

The implementation will be done in PySpark

In [5]:
import numpy as np

In [6]:
import findspark
import os

In [7]:
findspark.init()

In [8]:
import pyspark

In [9]:
sc = pyspark.SparkContext()

In [10]:
test = sc.parallelize(range(10), 3)

Here is how to build a function that takes in arguments and passes them further down to MapPartitions

In [11]:
def g(const):
    def newf(partition):
        fres = []
        
        for num in partition:
            if num % 2 == 0:
                fres.append(num * const)

        return fres

    return newf

In [12]:
print test.mapPartitions(g(1)).collect()

[0, 2, 4, 6, 8]


In [13]:
print test.mapPartitions(g(6)).collect()

[0, 12, 24, 36, 48]


Now, we'll write our algorithm inside a curried function. 

In [None]:
def doComputations(features, featMat, k, l):
    n = len(featMat)
    m = len(features)
    
    # a callable f for each feature name
    def f()
    
    # a 
    def novel(partition):
        