## Imports des librairies

In [0]:
from pyspark.sql import SparkSession
from time import time

#### Définir  l'emplacement de notre fichier `file_location` 

In [0]:
file_location = "/FileStore/tables/web_Stanford.txt"

#### Créer une session spar

####  Configuration de l'environnement de traitement distribué

In [0]:
#crée une nouvelle session ou récupère une existante
spark = SparkSession.builder \
   .appName("ConnectedComponentsFinder") \
   .getOrCreate()

#Accéder aux fonctionnalités de bas niveau de Spark comme la création de RDDs
sc = spark.sparkContext
lines = sc.textFile(file_location)
print(lines.getNumPartitions())


2


#### Afficher les premières lignes de mon fichier

In [0]:
lines.take(20)

Out[33]: ['# Directed graph (each unordered pair of nodes is saved once): web-Stanford.txt ',
 '# Stanford web graph from 2002',
 '# Nodes: 281903 Edges: 2312497',
 '# FromNodeId\tToNodeId',
 '1\t6548',
 '1\t15409',
 '6548\t57031',
 '15409\t13102',
 '2\t17794',
 '2\t25202',
 '2\t53625',
 '2\t54582',
 '2\t64930',
 '2\t73764',
 '2\t84477',
 '2\t98628',
 '2\t100193',
 '2\t102355',
 '2\t105318',
 '2\t105730']

#### Filtrer pour ne contenir que les lignes non commentées

In [0]:
lines = lines.filter(lambda x: not x.startswith("#"))
lines.take(5)

Out[34]: ['1\t6548', '1\t15409', '6548\t57031', '15409\t13102', '2\t17794']

#### Transformer les données en tupples 

In [0]:
edges = lines.map(lambda l : tuple(map(lambda i: int(i), l.split("\t")[:2])))

#### Initialisation des variables

In [0]:
iterationID = 0
accum = sc.accumulator(0)
newPair = True

#### Function of the CCF-Iterate

In [0]:
def itRedCCF(pair):
  key, values = pair
  min = key
  valueL = []
  for value in values:
    if value < min:
       min = value
    valueL.append(value)
  if min < key:
    yield((key, min))
    for value in valueL:
      if min != value:
        accum.add(1)
        yield((value, min))

#### Main program : data processing with the 2 MapReduce jobs

In [0]:
#timer on
begin = time()
while newPair == True:
  iterationID += 1
  newPair = False
  accum.value = 0
  
  #CCF-iterate (MAP)
  firstJobMap = edges.flatMap(lambda e: (e, e[::-1]))
  
  #CCF-iterate (REDUCE)  
  firstJobReduce = firstJobMap.groupByKey().flatMap(lambda pair: itRedCCF(pair)).sortByKey()
  
  #CFF-debup (MAP & REDUCE)
  debupEdges = firstJobReduce.distinct()
  
  edges = debupEdges
  newPair = bool(accum.value)
  print("Iteration: ", iterationID, ", newPair? ", newPair)

ending = time() - begin
print("Execution time of data processing: ", ending)
results = list(map(lambda e: e[::-1], edges.collect()))

resultsAgg = sc.parallelize(results).groupByKey().map(lambda a: (a[0], list(a[1]))).collect()
print("Number of connected components in graph: ", len(resultsAgg))

for k in resultsAgg:
  print("Component id:", k[0], "| Number of nodes: ", len(k[1]) +1)

Iteration:  1 , newPair?  True
Iteration:  2 , newPair?  True
Iteration:  3 , newPair?  True
Iteration:  4 , newPair?  True
Iteration:  5 , newPair?  True
Iteration:  6 , newPair?  True
Iteration:  7 , newPair?  True
Iteration:  8 , newPair?  True
Iteration:  9 , newPair?  True
Iteration:  10 , newPair?  True
Iteration:  11 , newPair?  True
Iteration:  12 , newPair?  True
Iteration:  13 , newPair?  True
Iteration:  14 , newPair?  True
Iteration:  15 , newPair?  False
Execution time of data processing:  123.28433585166931
Number of connected components in graph:  365
Component id: 48 | Number of nodes:  670
Component id: 40 | Number of nodes:  261
Component id: 136 | Number of nodes:  755
Component id: 272 | Number of nodes:  382
Component id: 936 | Number of nodes:  147
Component id: 800 | Number of nodes:  112
Component id: 488 | Number of nodes:  53
Component id: 320 | Number of nodes:  8
Component id: 208 | Number of nodes:  268
Component id: 4248 | Number of nodes:  105
Component i