#Design and Analysis of Algorithms I
##Programming Assignment 4
###Computing strongly connected components (SCCs) in a directed acyclical graph.

This problem involves finding the five biggest SCCs in a directed acyclical graph. The graph is made available in the form of a 70MB text file where each row represents one edge, with the left entry representing the "from" node and the right entry representing the "to" node.

Before solving the actual problem (where the given graph is a 70MB file), I have created a small test 
graph (shown below).

    [cloudera@localhost Programming Assignment 4]$ head SCC.txt

    1 1 
    1 2 
    1 5 
    1 6 
    1 7 
    1 3 
    1 8 
    1 4 
    2 47646 
    2 47647

####Pre-processing:
We would want to do three things before we can start implementing the algorithm itself. 

1) First we must notice that any sink nodes (i.e. nodes that don't have any edges leaving them) will not have a row in the above representation of the graph. This will cause problems in the correct implementation of the algorithm. Therefore we need to include these sink nodes (if any) in our graph. We can achieve this by appending "ghost edges" (i.e. edges that have a sink node as their "from" node but have no "to" node associated with it) at the end of the above graph.

2) we would want to change the representation of the graph to an adjasency list representation. This will give us huge savings on storage. This won't be apparent in this timny graph, but the difference will become clear on the original graph. 

3) we would (naively) want to create a new graph that is the reverse of the test graph. There are more efficient ways of doing this, but we will start off with a simplistic approach. 

We will write map reduce jobs on Hadoop for points 2) and 3).
#####Step 1: adding ghost edges

In [1]:
v1 = set()
v2 = set()

f = open(r'/home/cloudera/Desktop/educational/ADA1/Programming Assignment 4/SCC.txt')
for line in f:
	tail = line.strip().split()[0]
	head = line.strip().split()[1]
	v1.add(tail)
	v2.add(head)
f.close()

In [2]:
#Add v2 - v1 to the graph
f = open(r'/home/cloudera/Desktop/educational/ADA1/Programming Assignment 4/SCC.txt', 'a')
for v in (v2- v1):
    f.write(v + ' x' + '\n')
for v in (v1- v2):
    f.write('x '+ v + '\n')
f.close()

#####Step 2: changing the graph representation to adjacency list
Create a mapper file **adj_list_mapper.py** with the below code:

    import sys
    for line in sys.stdin:
        try:

            data = line.strip().split()
            key, value = data
            print '%s\t%s' % (key, value)
        except ValueError:
            sys.stderr.write("%s\n" % line)
            exit(1)

Create a reducer file **adj_list_reducer.py** with the below code:

    import sys

    nodelist = ""
    oldkey = None

    for line in sys.stdin:
        data =  line.strip().split("\t")

        if len(data)!= 2:
            continue
        thiskey, thisnode = data

        if oldkey and oldkey != thiskey:
            print "%s,%s" % (oldkey, nodelist)

            nodelist = ""
        oldkey = thiskey
        if nodelist == "":
            nodelist = thisnode
        else:
            nodelist = nodelist+','+thisnode

    if oldkey !=None:
        print "%s,%s" % (oldkey, nodelist)

    [cloudera@localhost Programming Assignment 4]$ hadoop fs -put SCC.txt
    [cloudera@localhost Programming Assignment 4]$ hadoop fs -ls

    -rw-r--r--   3 cloudera cloudera   75321172 2015-08-01 15:32 SCC.txt
    

Run the below command:

    [cloudera@localhost Programming Assignment 4]$ hadoop jar $STREAMING -input SCC.txt -output adj_list_graph -mapper "python adj_list_mapper.py" -reducer "python adj_list_reducer.py" -file adj_list_mapper.py -file adj_list_reducer.py

    packageJobJar: [adj_list_mapper.py, adj_list_reducer.py, /tmp/hadoop-cloudera/hadoop-unjar5038415595277101068/] [] /tmp/streamjob4092261144970766109.jar tmpDir=null
    15/07/30 21:50:31 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
    15/07/30 21:50:32 INFO mapred.FileInputFormat: Total input paths to process : 1
    15/07/30 21:50:32 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-cloudera/mapred/local]
    15/07/30 21:50:32 INFO streaming.StreamJob: Running job: job_201507302114_0001
    15/07/30 21:50:32 INFO streaming.StreamJob: To kill this job, run:
    15/07/30 21:50:32 INFO streaming.StreamJob: UNDEF/bin/hadoop job  -Dmapred.job.tracker=localhost.localdomain:8021 -kill job_201507302114_0001
    15/07/30 21:50:32 INFO streaming.StreamJob: Tracking URL: http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201507302114_0001
    15/07/30 21:50:33 INFO streaming.StreamJob:  map 0%  reduce 0%
    15/07/30 21:50:45 INFO streaming.StreamJob:  map 100%  reduce 0%
    15/07/30 21:50:50 INFO streaming.StreamJob:  map 100%  reduce 100%
    15/07/30 21:50:53 INFO streaming.StreamJob: Job complete: job_201507302114_0001
    15/07/30 21:50:53 INFO streaming.StreamJob: Output: adj_list_graph

    [cloudera@localhost Programming Assignment 4]$ hadoop fs -ls adj_list_graph

    Found 3 items
    -rw-r--r--   3 cloudera cloudera          0 2015-08-01 15:35 adj_list_graph/_SUCCESS
    drwxr-xr-x   - cloudera cloudera          0 2015-08-01 15:33 adj_list_graph/_logs
    -rw-r--r--   3 cloudera cloudera   41197060 2015-08-01 15:35 adj_list_graph/part-00000

    [cloudera@localhost Programming Assignment 4]$ hadoop fs -cat adj_list_graph/part* > SCC_test_adj.txt
    [cloudera@localhost Programming Assignment 4]$ head SCC_adj.txt 

    1,4,8,2,1,5,6,7,3	
    10,23,171168,104816,27,13,12,26,5,30555,1,8,7,32,19,18,30,104784,171171,171170,29,171169,17,24,28	
    100,48,64,46,45,44,43,387408,57,387407,75250,42,41,40,39,112824,47	
    1000,24876,24868,24832,24872,24877,24878,265325,24900,24904,24908,24909,907,24916,24835,24928,24932,24946,24833	
    10000,221257,56930,221262,9971,10013,9978,10014,221263,9999,9998,221256,9883,9882,221255,9846,217444,9872,144710,10015,9982,10009,133589,49255,221261,133588,9952,221260,10007,159023,9939,9849,221259,10006,9928,9925,10004,9922,10003,9918,9916,10002,221258	
    100000,176535	
    100001,x	
    100002,176526,99996,99995,99994,176523,100006,100005,100003,176538,100000,99999	
    100003,176518,176517,99995,99994,176512,176536,100006,176540,176539,176531,176534,176530,100005,100002,176538,176516,100000,99999,176529,176528,176537,176527,176526,176525,176515,176514,176513,176524,176523,99997,176520,176519,99996	
    100004,416294,416292,416293,416304,416303,416302,416301,416300,416299,416298,416297,416296,416295	

#####Step 3: reversing the original graph
Now that we have the test graph in the adjacency list format, lets proceed to reverse it. Create the file **rev_mapper.py** with the below code:

    import sys
    for line in sys.stdin:
        key, value =  line.strip().split()
        print '%s\t%s' % (value, key)

We do not need to create another reducer. We can reuse the **adj_list_reducer.py** file. Run the below command:

    [cloudera@localhost Programming Assignment 4]$ hadoop jar $STREAMING -input SCC.txt -output rev_graph -mapper "python rev_mapper.py" -reducer "python adj_list_reducer.py" -file rev_mapper.py -file adj_list_reducer.py

    packageJobJar: [rev_mapper.py, adj_list_reducer.py, /tmp/hadoop-cloudera/hadoop-unjar1949625360006895700/] [] /tmp/streamjob2353632039566807504.jar tmpDir=null
    .....................
    .....................
    15/07/30 22:09:58 INFO streaming.StreamJob:  map 0%  reduce 0%
    15/07/30 22:10:07 INFO streaming.StreamJob:  map 100%  reduce 0%
    15/07/30 22:10:13 INFO streaming.StreamJob:  map 100%  reduce 100%
    15/07/30 22:10:16 INFO streaming.StreamJob: Job complete: job_201507302114_0002
    15/07/30 22:10:16 INFO streaming.StreamJob: Output: rev_graph

    [cloudera@localhost Programming Assignment 4]$ hadoop fs -ls rev_graph
    Found 3 items
    -rw-r--r--   3 cloudera cloudera          0 2015-08-01 15:42 rev_graph/_SUCCESS
    drwxr-xr-x   - cloudera cloudera          0 2015-08-01 15:41 rev_graph/_logs
    -rw-r--r--   3 cloudera cloudera   42808289 2015-08-01 15:42 rev_graph/part-00000

    [cloudera@localhost Programming Assignment 4]$ hadoop fs -cat rev_graph/part* > SCC_rev.txt
    [cloudera@localhost Programming Assignment 4]$ head SCC_rev.txt

####Implementing Kosaraju's algorithm

In [1]:
# Reading in the reversed graph
f = open(r'/home/cloudera/Desktop/educational/ADA1/Programming Assignment 4/SCC_rev.txt')
g_rev = {}
for line in f:
    tail = line.strip().split(',')[0]
    heads = line.strip().split(',')[1:]
    g_rev[tail] = heads    

In [2]:
print len(g_rev)

875715


In [3]:
d = {}
is_first_pass = 1

In [4]:
def DFS_Loop(G):
    # define the global variables

    # Create a dictionary for book keeping. This dictionary will keep track of: 
    # - whether a node has been explored or not
    # - the finishing times of each node in the first Pass of DFS on reversed graph G_rev
    # - the leader node for each node in second pass of DFS on the forward graph G
    global t
    t = 0
    global S 
    S = ''
    if is_first_pass == 1:
        #create the dict for book keeping
        for key in G.keys():
            d[key] = {'explored':0, 't':0, 'leader':''}
        
        # In the first pass the order of picking the nodes doesn't matter
        sort_key = None
    else:
        # mark all nodes as unexplored
        for key in G.keys():
            d[key]['explored']= 0
        # in the second pass, the nodes must be picked in the reverse order of their finishing times 't' 
        sort_key = lambda x: d[x]['t']
        
    for i in sorted(G.keys(), key = sort_key, reverse = True):
        if i == 'x':
            continue
        else:
            if d[i]['explored'] <> 1:
                S = i
                DFS(G,i)

In [5]:
def DFS(G, i):

    # mark i as explored
    d[i]['explored'] = 1
    # set leader of i as node S
    #global S
    d[i]['leader'] = S
    for j in G[i]:
        if j == 'x': # We have reached a ghost node; meaning 'i' is a sink node. No further recursion to be done.
            continue
        else:
            if d[j]['explored'] <>1:
                DFS(G,j)

    global t
    t = t + 1
    d[i]['t'] = t
    #print 'Node: ', i, 't: ', t, 'S: ', S


In [7]:
import resource, sys
resource.setrlimit(resource.RLIMIT_STACK, (2**29,-1))
sys.setrecursionlimit(10**6)

DFS_Loop(g_rev)

In [8]:
print len(d)

875715


In [9]:
is_first_pass = 0

Reading in the forward graph for the 2nd pass of DFS

    [cloudera@localhost Programming Assignment 4]$ cat SCC_test.txt

    1,4
    2,8
    3,6
    4,7
    5,2
    6,9
    7,1
    8,6,5
    9,7,3

In [10]:
# Reading in the original graph
f = open(r'/home/cloudera/Desktop/educational/ADA1/Programming Assignment 4/SCC_adj.txt')
g = {}
for line in f:
    tail = line.strip().split(',')[0]
    heads = line.strip().split(',')[1:]
    g[tail] = heads  

In [11]:
DFS_Loop(g)

In [12]:
import pandas as pd

df = pd.DataFrame.from_dict(d, orient = 'index')

In [23]:
df.to_csv(r'/home/cloudera/Desktop/educational/ADA1/Programming Assignment 4/final_df.csv', index = False)

    [cloudera@localhost Programming Assignment 4]$ cat final_df.csv | cut -d, -f3 | sort | uniq -c | sort -g -r > leaders.txt

    [cloudera@localhost Programming Assignment 4]$ head leaders.txt

     434821 99999
        968 14448
        459 69681
        313 33604
        211 97132
        205 828413
        197 525409
        177 448227
        162 842535
        152 747580