# Diplomatura en Ciencia de Datos, Aprendizaje Automático y sus Aplicaciones
## Programación Distribuida sobre Grandes Volúmenes de Datos

Damián Barsotti 

### Facultad de Matemática Astronomía Física y Computación
## Universidad Nacional de Córdoba

<img src="http://program.ar/wp-content/uploads/2018/07/logo-UNC-FAMAF.png" alt="Drawing" style="width:80%;"/>

# Grandes Grafos Sociales
---
### Ejemplo Red de Usuarios Twitter

In [1]:
!ls -l ../inputs/ds/tweets.pqt

total 7603
-rw-r--r-- 1 ssulca ssulca 1178897 Nov  1 20:47 part-00000-30555478-6859-4a3b-a8e1-6135ac631066.snappy.parquet
-rw-r--r-- 1 ssulca ssulca 1170279 Nov  1 20:47 part-00001-30555478-6859-4a3b-a8e1-6135ac631066.snappy.parquet
-rw-r--r-- 1 ssulca ssulca 1156050 Nov  1 20:47 part-00002-30555478-6859-4a3b-a8e1-6135ac631066.snappy.parquet
-rw-r--r-- 1 ssulca ssulca 1141217 Nov  1 20:47 part-00003-30555478-6859-4a3b-a8e1-6135ac631066.snappy.parquet
-rw-r--r-- 1 ssulca ssulca 1134385 Nov  1 20:47 part-00004-30555478-6859-4a3b-a8e1-6135ac631066.snappy.parquet
-rw-r--r-- 1 ssulca ssulca 1121361 Nov  1 20:47 part-00005-30555478-6859-4a3b-a8e1-6135ac631066.snappy.parquet
-rw-r--r-- 1 ssulca ssulca  620041 Nov  1 20:47 part-00006-30555478-6859-4a3b-a8e1-6135ac631066.snappy.parquet
-rw-r--r-- 1 ssulca ssulca       0 Nov  1 20:47 _SUCCESS


In [2]:
import os
from pyspark.sql import SparkSession

Spark usa graphs de manera interna pero esta basada en RDD

[Databricks](https://databricks.com/) tiene una version de graph que usa dataframes llamda `graphframes`, Entonces vamos a usar esa, por cuestiones de comodidad .

In [3]:
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell"
)

Para jupyter lab si tiene problemas en importar la libreria usar

ver la version de la libreria correspondiente a la version de spark

```sh
pyspark --packages graphframes:graphframes:0.8.1-spark3.0-s_2.1
```

In [4]:
spark = SparkSession.builder.appName("07_grafos").getOrCreate()
sc = spark.sparkContext

#sc.addPyFile('/users/ssulca/.ivy2/jars/graphframes_graphframes-0.8.1-spark3.0-s_2.12.jar')

### Load Tweets

* `pqt` - formato parket, 
* alamacena el file en partes separadas.
* `RT_times` es la cantidad de veces que el usuario retweeteo el tweet

In [5]:
tweets = spark.read.parquet("../inputs/ds/tweets.pqt")

print("Cantidadtweets de tweets:", tweets.count())

tweets.printSchema()

Cantidadtweets de tweets: 172040
root
 |-- timestamp: long (nullable = true)
 |-- user: string (nullable = true)
 |-- RT_by: string (nullable = true)
 |-- RT_times: long (nullable = true)
 |-- text: string (nullable = true)



In [6]:
tweets.limit(10).toPandas()

Unnamed: 0,timestamp,user,RT_by,RT_times,text
0,1487970896,ierrejon,teresapititesa,2,"RT @ierrejon: Aulas masificadas, falta de prof..."
1,1487971010,RobiBaradel,itoferrero,1,RT @RobiBaradel: Nos parece muy sano que los c...
2,1487971052,vilma_ripoll,PabloIannuzzi,3,"RT @vilma_ripoll: Los ""voluntarios""contra el p..."
3,1487971121,fargosi,PiaAsao,3,RT @fargosi: Hay otros medios de protesta. Las...
4,1487971177,GFrondizi,tepergmailcomP1,3,RT @GFrondizi: Pido algún fiscal por incumplim...
5,1487971184,RobiBaradel,Analiavc1,41,RT @RobiBaradel: Nos parece muy sano que los c...
6,1487971345,SuperSecretario,ari2271,12,"RT @SuperSecretario: #Cambiemos sacó solo 1,5%..."
7,1487971368,MarinaVolpe12,BocaLocura312,1,RT @MarinaVolpe12: No se ofrezcan nunca como M...
8,1487971408,lanacionmas,susanamartimari,2,RT @lanacionmas: ¿Qué opinás de la propuesta #...
9,1487971469,aeronauticosCFK,andrea7533,17,RT @aeronauticosCFK: #ApoyoALosMaestros\nPara ...


### Definición de la red de usuarios:

La idea es generar una rede donde cada nodo sea si un usuriao
y la relacion un usuario retwitteo a otro.

* **Usuario A está conectado con usuario B** si B retweeteó un mensaje de A.
* Queremos también la cantidad de retweets que hizo B de mensajes de A **distintos o no**.

#### Generamos las conecciones por usuarios

In [7]:
from pyspark.sql.functions import sum, count

twitterConnections = tweets \
                        .groupBy("user", "RT_by") \
                        .agg(sum("RT_times").alias("count_RT"), count("*").alias("count_distinct_RT"))


## GraphFrames

Esta api es utilizada para el manejo de grafos, ya que esta facilita 
el uso de los datos cuando no tenemos los datos almacenados en 
tablas.

* Utiliza la interface Spark SQL (Dataframes).
* Creada por [Databricks](https://databricks.com/).
* Por ahora no viene en la distribución estandard de Spark (solo [GraphX](http://spark.apache.org/graphx/) sobre RDD's).
* Algoritmos ya implementados como: 
    - Breadth-first search (BFS)
    - Componentes (fuertemente) conexas
    - Label Propagation Algorithm (para detectar comunidades)
    - PageRank
    - Shortest paths
    - Triangle count.
* **Agregación de mensajes** para hacer algoritmos.
* **Motif finding** para queries. 

### Documentación De `Graphframes`:

* [User Guide](https://graphframes.github.io/graphframes/docs/_site/user-guide.html).
* [User Guide - Python](https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html).
* [Documentación de la API](https://graphframes.github.io/graphframes/docs/_site/api/python/index.html).

Tambine permite la conexion con diferentes fuentes, en las que se incluye base de datos para grafos.

### Creación de grafo
Existen diferentes formas de almacenar grafos en tablas

* Se crean a partir de `DataFrame` de aristas y/o vértices.
  - Diferentes nodos en diferentes diferentes particiones (Edge Cut) 
    El problema es que pasar los datos de nodos a tablas, existe 
    redundacia de los datos. Las aristas puede estar almacenadas
    de forma repetida.
  - Spark almacena usando Vertex Cut, Almacena por aristas, 
    Si hay vertices que tienen distintas aristas a distintos nodos
    se repite ese vertice. 
    > Ejm. Imagen el vertice central de almacena 3 veces.
* DataFrame de aristas debe tener las columnas `src` y `dst` (por lo menos).
  - Alamcena por defecto grafos dirigidos
* DataFrame de vértices debe tener la columna `id` (por lo menos).
  - Idetificador del Nodo.
* Los grafos son dirigidos.
* Se almacenan con redundancia (vertex cut):
  ![](http://spark.apache.org/docs/latest/img/edge_cut_vs_vertex_cut.png)
  
La informacion extra relacionada con la conexion o con el nodo, se 
almacena en columas extra.

### Ejemplo:

* Para los vertices es obligatorio tener la columna `id`
* para las aristas es obligatorio `src` `dst`

In [8]:
from graphframes import *

# DataFrame de vértices
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
],["id", "name", "age"])

# DataFrame de aristas
e = spark.createDataFrame([
  ("a", "b", "amigo"),
  ("b", "c", "sigue"),
  ("c", "b", "sigue"),
  ("f", "c", "sigue"),
  ("e", "f", "sigue"),
  ("e", "d", "amigo"),
  ("d", "a", "amigo"),
  ("a", "e", "amigo"),
  ("a", "h", "amigo")
], ["src", "dst", "relationship"])

# Creacion de GraphFrame
g = GraphFrame(v, e)

In [9]:
g.vertices.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 30|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Fanny| 36|
|  g|  Gabby| 60|
+---+-------+---+



In [10]:
g.edges.show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|       amigo|
|  b|  c|       sigue|
|  c|  b|       sigue|
|  f|  c|       sigue|
|  e|  f|       sigue|
|  e|  d|       amigo|
|  d|  a|       amigo|
|  a|  e|       amigo|
|  a|  h|       amigo|
+---+---+------------+



`inDegrees` cuantas aristas llegan a cada uno de los vertices.

In [11]:
g.inDegrees.show()

+---+--------+
| id|inDegree|
+---+--------+
|  f|       1|
|  e|       1|
|  h|       1|
|  d|       1|
|  c|       2|
|  b|       2|
|  a|       1|
+---+--------+



In [12]:
# Query: Count the number of "follow" connections in the graph.

print("Cantidad de amistades:", 
        g.edges.filter("relationship = 'amigo'").count())

Cantidad de amistades: 5


#### PageRank

Algorimo de Google, Que simula como los usuaros navegan de pagina en 
pagina, siguiendo links.

Es un proceso iterativo, en donde cada pagina tiene un importancia
incial pagina, y en cada iteracion un pagina transmite su importancia
a otras paginas.

En cada iteracion se va cambiando la importancia de cada nodo, hasta 
que el algorimo converge, y dando como resultado la importancia de
cada pagina.

In [13]:
# Run PageRank algorithm, and show results.
results = g.pageRank(resetProbability=0.01, maxIter=10)

results.vertices.select("id", "pagerank").show()

+---+--------------------+
| id|            pagerank|
+---+--------------------+
|  g|0.011647254575707162|
|  b|  3.6199424699159937|
|  e| 0.03848486478566412|
|  a| 0.05421739436354941|
|  f|  0.0347707909956736|
|  d|  0.0347707909956736|
|  c|   3.206166434367738|
+---+--------------------+



#### Cambios para visualizar con Guepi

`coalese(1)`, es para guardar los arhivos como una sola parte
ya que spark siempre almacena los files por partes.


In [14]:
from pyspark.sql.functions import col

g.edges.printSchema()

g.edges \
        .select(col("src").alias("Source"), 
                col("dst").alias("Target"), 
                "relationship") \
        .coalesce(1) \
        .write \
        .csv("../outputs/g_edges.csv", mode="overwrite", header=True)
# Ver que pasa sin coalesce

root
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- relationship: string (nullable = true)



#### Show using gepi
<img src="../docs/grafo_1.png" alt="Drawing" style="width:60%;"/>

### Volvamos a los tweets
#### Hay que renombrar las conexiones de tweeter:

In [15]:
from pyspark.sql.functions import col

edgesDF = twitterConnections \
    .select(col("user").alias("src"), 
            col("RT_by").alias("dst"),
            "count_RT", "count_distinct_RT")
                                       
edgesDF.limit(10).show()

+---------------+--------------+--------+-----------------+
|            src|           dst|count_RT|count_distinct_RT|
+---------------+--------------+--------+-----------------+
|     WolffWaldo| xenia43sailor|      69|                1|
|  atlanticsurff|     paulobeni|       8|                1|
|Ruben_Rischmann|PPperonoelsapo|      23|                1|
|     WolffWaldo|    MariePes72|      14|                1|
|   romerodiario|     RcaRawson|       4|                1|
|     pablen2012|  NattyJuliAle|       4|                1|
|MaestroPortenio|  samuelevyARG|      14|                1|
|   LaAlfareraOk|    aka_Mister|      33|                1|
|    ertenembaum|        fertso|       4|                1|
| BrujaUniversal|        O_Fer_|       6|                1|
+---------------+--------------+--------+-----------------+



#### Hay que crear el DF de vértices
generar un dataframe solamente con los usuarios.

In [16]:
srcs = edgesDF.select(col("src").alias("id"))

dsts = edgesDF.select(col("dst").alias("id"))
    
vertexDF = srcs.unionAll(dsts).distinct()

vertexDF.limit(10).show()

+---------------+
|             id|
+---------------+
|     ElGorilaje|
|   flordecaribe|
|          l_p_o|
|     Piru_laura|
| budano_ignacio|
|   RepettoLidia|
|    LilianaTren|
| defensoria_bol|
|   MIGUE67MIGUE|
|carlito60322427|
+---------------+



#### Creación de GraphFrame:
Los grae frame tambien se puede almancenar en memoria

In [17]:
graph = GraphFrame(vertexDF, edgesDF).cache()

### Operaciones básicas sobre grafos

#### A continuación veremos como calcular: 
* la cantidad de vértices (usuarios)
* la cantidad de arístas (conexiones)
* el grado de cada vértice (cantidad de conexiones por usuario)

#### Tamaño del grafo

In [18]:
verticesCount = graph.vertices.count()
print("Cantidad de nodos:", verticesCount)

edgesCount = graph.edges.count()
print("Cantidad de conecciones: ", edgesCount)

Cantidad de nodos: 57138
Cantidad de conecciones:  152613


#### Cantidad de conecciones por usuario
Se obtiene usando el atributo `degree`. 

Esto me da una idea de la importancia de cada nodo. Esto una importancia local, de acuerdo a la cantidad de conexiones.

Un proximo paso sera analizar la calidad de las conexiones.

In [19]:
nodesDegree = graph.degrees

sortedByDegree = nodesDegree.orderBy("degree", ascending=False)

sortedByDegree.limit(10).show()

+---------------+------+
|             id|degree|
+---------------+------+
|Winston_Dunhill|  2325|
|fernandocarnota|  1745|
|   santosjorgeh|  1657|
|lanatoparatodos|  1529|
|  JorgeFavaloro|  1483|
|    RobiBaradel|  1483|
|     elcoya1977|  1471|
|   eldestapeweb|  1441|
| lucaslauriente|  1422|
|     betovaldez|  1407|
+---------------+------+



## Agregación de Mensajes
---
### Influencia Colectiva
Me da importancia del usuario viendo sus conexiones y las conexiones 
de sus conexiones.

#### Aplicaciones: 
* Marketing viral (influencia de opinión)
* Detección de focos de dispersión de enfermedades
* ...

#### Cómo funciona:
* Primero se calcula la **Influencia Colectiva (CI)**
  - rank segun cantida de conexiones y segun las conexiones de las 
    conexiones.
* En cada iteración elimina del grafo el nodo de mayor CI y recalcula el CI de los nodos restantes.
  - Saco de la red al usuario mas importante.
* Termina cuando la componente gigante se destruye
* Solución básica: \\(O(N2)\\). Optimizada: \\(O(NlogN)\\)

> Influence maximization in complex networks through optimal percolation” (Flaviano Morone, Hernán A. Makse, 27 Jun 2015) https://arxiv.org/abs/1506.08326

Cuando la red se divide en partes, todos los usuarios que saque son 
los mas importantes, por lo tanto son los objetivos para viralizar un 
contenido.

### Fórmula de CI

La formula de collective influence para un vértice \\(i\\) es:

$$ CI(i) = (degree_i -  1) \times \sum\limits_{j \in vecinos(i)} (degree_j -  1)$$

Se supone un grafo **no dirigido**.

Es decir, degree, es el la cantidad de aristas de entrada y salida 
menos uno multiplicado por los grados menos 1 de todos los vecinos que 
estan al rededor.

### Pasos para calcular collective influence son:

* agregar el grado (degree) de cada nodo al grafo.
* importar `AggregateMessages`.
* calcular collective influence de cada individuo usando `AggregateMessages` de `GraphFrame`
* ordenar los individuos por su collective influence de mayor a menor
* imprimir el listado de individuos


#### Documentación para implementar con GraphFrames:

* Documentación sobre [AggregateMessages](https://graphframes.github.io/graphframes/docs/_site/user-guide.html#message-passing-via-aggregatemessages) con ejemplo.
* Documentación de [API AggregateMessages](https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html).


Cree un nuevo grafo usando los nodos con su degree disponible en nodesDegree y las aristas originales

Aristas y vértices están bien particionadas

In [20]:
from pyspark.sql.functions import sum
from graphframes.lib import AggregateMessages as AM

#Cree un nuevo grafo usando los nodos con su degree disponible en nodesDegree y las aristas originales
degreeGraph = GraphFrame(nodesDegree, graph.edges )
# Aristas y vértices están bien particionadas

Comenzamos a calcular CI: Por cada nodo calcula la sumatoria del degree-1 de sus vecinos.

> Lo que hago es generar mensajes, cada nodo va a enviar a sus
vecionos cual es el grado el tiene. Esto tanto para conexiones
entrantes como para conexiones salientes.

> Despues cada nodo recibe ese mensaje y hace algo con esos mensaje.


In [21]:
msgToSrc = AM.dst["degree"] - 1
msgToDst = AM.src["degree"] - 1

Luego defino la operacion de agregacion sobre los mensajes

El DataFrame sumNeighborDegrees debe tener 2 columnas: 
* `id` identficador del nodo
* `sum_neighbor_degree` la suma de los grados menos 1 de los vecions

In [22]:
sumNeighborDegrees = degreeGraph.aggregateMessages(
    sum(AM.msg).alias("sum_neighbor_degree"), # Hacer aggregation sobre el valor de AM.msg
    sendToSrc=msgToSrc, #  send destination user's age to source
    sendToDst=msgToDst) # send source user's age to destination

Join de sumNeighborDegrees con degrees para tener todas las columnas necesarias para calcular CI.

> Se hace el join  para multiplicar mi grado con la sumatoria de los grados de los vecinos. Entonces tengo en una columa mi grado y entra la suma de los grados de los vecions.


In [23]:
nodesDegreeJoin = nodesDegree.join(sumNeighborDegrees,"id")

Calculamos CI: para terminar de calcularlo debemos multiplicar la sumatoria del paso anterior por degree - 1 de cada nodo.
El DataFrame collectiveInfluence debe tener 2 columnas: id, ci y degree

Por ultimo ordena descendentemente por ci 

In [24]:
collectiveInfluence = nodesDegreeJoin \
                          .select( ((col("degree") - 1) * col("sum_neighbor_degree")).alias("ci"), 
                                    "id", 
                                    "degree") \
                          .orderBy("ci", ascending=False) \
                          .cache()

collectiveInfluence.limit(20).show()

+--------+---------------+------+
|      ci|             id|degree|
+--------+---------------+------+
|71551312|Winston_Dunhill|  2325|
|57822552|   santosjorgeh|  1657|
|55026688|fernandocarnota|  1745|
|45848634|  JorgeFavaloro|  1483|
|42052088|lanatoparatodos|  1529|
|37856910|     elcoya1977|  1471|
|33925374|     betovaldez|  1407|
|31175045|     LaBelgrana|   606|
|30129480|        fargosi|  1271|
|29494764|    RobiBaradel|  1483|
|27043120|         copi__|  1041|
|23379462|      NunkMasKs|  1027|
|18770024| ConCiencia2015|   533|
|17316472|     WolffWaldo|   812|
|16165470|      HugoYasky|   987|
|15975882|   mis2centavos|  1020|
|15693120|   eldestapeweb|  1441|
|15118908|Carlos_AD_Viola|   533|
|14284499|          CTAok|   830|
|14149350|  EdgardoRovira|  1176|
+--------+---------------+------+



Para el caso de `LaBelgrana` por mas que no tenga tantas conexiones
estas conexiones son importantes.

## Motif Finding

* Es un Domain-Specific Language (DSL) para expresar queries.
* La unidad básica es la expresión de una arista

### Nomesclatura
 * `()` nodos
 * `[]` aristas
 * `-->` arista dirigida 

### Ejemplos

* `graph.find("(a)-[e]->(b); (b)-[e2]->(a)")` expresa vértices unidos en ambas direcciones.
    - Devuelve un DataFrame con columnas `"a"`, `"b"`, `"e"` y `"e2"`.
* `graph.find("(a)-[e]->(b); (b)-[e2]->(c)")` expresa dos aristas pegadas con un vértice común `b`. 

### Documentacion y ejemplo

* [GraphFrame User Guide](https://graphframes.github.io/graphframes/docs/_site/user-guide.html#motif-finding).

devolver las aristas en ambas direcciones

In [25]:
g.find("(a)-[e]->(b); (b)-[e2]->(a)").show()

+----------------+-------------+----------------+-------------+
|               a|            e|               b|           e2|
+----------------+-------------+----------------+-------------+
|[c, Charlie, 30]|[c, b, sigue]|    [b, Bob, 36]|[b, c, sigue]|
|    [b, Bob, 36]|[b, c, sigue]|[c, Charlie, 30]|[c, b, sigue]|
+----------------+-------------+----------------+-------------+



Buscar pares de vértices en ambas direccione:

In [26]:
motifs = g.find("(a)-[e1]->(b); (b)-[e2]->(a)")

motifs.printSchema()

root
 |-- a: struct (nullable = false)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- age: long (nullable = true)
 |-- e1: struct (nullable = false)
 |    |-- src: string (nullable = true)
 |    |-- dst: string (nullable = true)
 |    |-- relationship: string (nullable = true)
 |-- b: struct (nullable = false)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- age: long (nullable = true)
 |-- e2: struct (nullable = false)
 |    |-- src: string (nullable = true)
 |    |-- dst: string (nullable = true)
 |    |-- relationship: string (nullable = true)



In [27]:
motifs.show()

+----------------+-------------+----------------+-------------+
|               a|           e1|               b|           e2|
+----------------+-------------+----------------+-------------+
|[c, Charlie, 30]|[c, b, sigue]|    [b, Bob, 36]|[b, c, sigue]|
|    [b, Bob, 36]|[b, c, sigue]|[c, Charlie, 30]|[c, b, sigue]|
+----------------+-------------+----------------+-------------+



Despues se pueden aplicar filtros para queries mas complejos.

In [28]:
motifs.filter("b.age > 30").show()

+----------------+-------------+------------+-------------+
|               a|           e1|           b|           e2|
+----------------+-------------+------------+-------------+
|[c, Charlie, 30]|[c, b, sigue]|[b, Bob, 36]|[b, c, sigue]|
+----------------+-------------+------------+-------------+



## Visualización

A continuación visualizaremos con [Gephi](https://gephi.org/) la parte del grafo (subgrafo) que contiene los mayores influencers.

Para ello generaremos archivos *csv* como entrada de esta herramienta 

In [29]:
minCI = 29000000
minDegree = 600

# tomo los usarios con mayor influencia o con más conecciones
ciTops = collectiveInfluence \
    .filter((col("ci") >= minCI) | (col("degree") >= minDegree))

gInfluencers = GraphFrame(ciTops, graph.edges)

# Tomo solo las aristas que tengan vértices
veTops = gInfluencers.find("(a)-[e]->(b)") \
                .cache()
      
                
print("Cantidad de top influencers:", veTops.count())

veTops.printSchema()

Cantidad de top influencers: 35
root
 |-- a: struct (nullable = false)
 |    |-- ci: long (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- degree: integer (nullable = false)
 |-- e: struct (nullable = false)
 |    |-- src: string (nullable = true)
 |    |-- dst: string (nullable = true)
 |    |-- count_RT: long (nullable = true)
 |    |-- count_distinct_RT: long (nullable = false)
 |-- b: struct (nullable = false)
 |    |-- ci: long (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- degree: integer (nullable = false)



In [30]:
eTops = veTops.select("e.*")

eTops.printSchema()

# Grafo de los tops
gTops = GraphFrame(ciTops, eTops)


# guardo un csv con los vertices para Gephi 
gTopsInf = gTops.vertices \
        .select("*", col("id").alias("Label"))

gTopsInf.limit(10).toPandas()

root
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- count_RT: long (nullable = true)
 |-- count_distinct_RT: long (nullable = false)



Unnamed: 0,ci,id,degree,Label
0,71551312,Winston_Dunhill,2325,Winston_Dunhill
1,57822552,santosjorgeh,1657,santosjorgeh
2,55026688,fernandocarnota,1745,fernandocarnota
3,45848634,JorgeFavaloro,1483,JorgeFavaloro
4,42052088,lanatoparatodos,1529,lanatoparatodos
5,37856910,elcoya1977,1471,elcoya1977
6,33925374,betovaldez,1407,betovaldez
7,31175045,LaBelgrana,606,LaBelgrana
8,30129480,fargosi,1271,fargosi
9,29494764,RobiBaradel,1483,RobiBaradel


In [31]:
# guardo un csv con los vertices para Gephi 
gTopsInf.coalesce(1) \
        .write \
        .csv("../outputs/top_influencers.csv", mode="overwrite", header=True)

In [32]:
# guardo un csv con las aristas para Gephi
gTopsEdg = gTops.edges \
        .select(col("src").alias("Source"), 
                col("dst").alias("Target"), 
                "count_RT","count_distinct_RT")
gTopsEdg.limit(10).toPandas()

Unnamed: 0,Source,Target,count_RT,count_distinct_RT
0,todonoticias,LaBelgrana,170,1
1,Winston_Dunhill,LaBelgrana,340,2
2,betovaldez,LaBelgrana,510,3
3,fargosi,LaBelgrana,170,1
4,WolffWaldo,WolffWaldo,35,5
5,JonatanViale,LaBelgrana,170,1
6,HugoYasky,FernandezAnibal,13,1
7,fernandocarnota,LaBelgrana,170,1
8,CTAok,CasaRosadaAR,36,2
9,NunkMasKs,LaBelgrana,170,1


In [33]:
# guardo un csv con las aristas para Gephi
gTopsEdg.coalesce(1) \
        .write \
        .csv("../outputs/top_edges.csv", mode="overwrite", header=True)

#### Visualizacion

<img src="../docs/grafo_2.png" alt="Drawing" style="width:50%;"/>

Aqui se puede observar que la belgrana tiene un degree bajo
pero tiene un ci alto debido a las conexiones sus conexiones.

### Gephi

Lanzar el programa desde una terminal ejecutando:

```sh
cd
spark/gephi-0.9.2/bin/gephi
```

Seguir las instrucciones del profesor.

### Ejercicio

Complete el siguiente programa para calcular el grafo de todas las posibles conexiones a los 5 mayores influenciadores y graficar el resultado con Gephi. 

#### Ayuda
* Busque en la documentación [Api Datasets](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset) algún método que devuelva las *n* primeras files de un Dataset equivalente a la directiva SQL `LIMIT`. 
* Para llenar el comando `find` busque la documentación de [Motif finding](https://graphframes.github.io/graphframes/docs/_site/user-guide.html#motif-finding).

In [34]:
fiveInfluence = [(row['id']) for row in collectiveInfluence.limit(5).collect()]
fiveInfluence

['Winston_Dunhill',
 'santosjorgeh',
 'fernandocarnota',
 'JorgeFavaloro',
 'lanatoparatodos']

In [35]:
edgesFive = edgesDF.filter(
    col("dst").isin(fiveInfluence)
)
edgesFive.show()

+---------------+---------------+--------+-----------------+
|            src|            dst|count_RT|count_distinct_RT|
+---------------+---------------+--------+-----------------+
|        au9beat|  JorgeFavaloro|      79|                1|
| RODRIGOSOLER64|  JorgeFavaloro|      79|                1|
|      Pitinardy|  JorgeFavaloro|      79|                1|
|KTEPASACALABAZA|  JorgeFavaloro|      79|                1|
|   santosjorgeh|  JorgeFavaloro|      79|                1|
|       Andy4500|   santosjorgeh|      30|                1|
|       LAZZUBIA|   santosjorgeh|      30|                1|
|     vatica2015|   santosjorgeh|      30|                1|
|        infobae|fernandocarnota|      12|                1|
|Lauramoreiras52|   santosjorgeh|      30|                1|
|fernandocarnota|fernandocarnota|      24|                2|
|   edgyportenio|  JorgeFavaloro|      79|                1|
+---------------+---------------+--------+-----------------+



In [36]:
srcFive = edgesFive.select(col("src").alias("id"))
vertexFive = srcFive.unionAll(collectiveInfluence.select("id").limit(5)).distinct()
vertexFive.show()

+---------------+
|             id|
+---------------+
|KTEPASACALABAZA|
|     vatica2015|
|Lauramoreiras52|
|Winston_Dunhill|
|       LAZZUBIA|
|        infobae|
|fernandocarnota|
|  JorgeFavaloro|
|lanatoparatodos|
|      Pitinardy|
|   edgyportenio|
| RODRIGOSOLER64|
|       Andy4500|
|   santosjorgeh|
|        au9beat|
+---------------+



In [37]:
veFive = collectiveInfluence.filter(
    col("id").isin([(row['id']) for row in vertexFive.collect()])
)
veFive.show()

+--------+---------------+------+
|      ci|             id|degree|
+--------+---------------+------+
|71551312|Winston_Dunhill|  2325|
|57822552|   santosjorgeh|  1657|
|55026688|fernandocarnota|  1745|
|45848634|  JorgeFavaloro|  1483|
|42052088|lanatoparatodos|  1529|
| 9260920|        infobae|   681|
| 6188952|     vatica2015|   198|
| 2967154|       Andy4500|   114|
|  979336|   edgyportenio|    69|
|  969651|KTEPASACALABAZA|    82|
|  635629|        au9beat|    54|
|  442060|Lauramoreiras52|    47|
|  439988|       LAZZUBIA|    59|
|  259653| RODRIGOSOLER64|    42|
|   37791|      Pitinardy|    18|
+--------+---------------+------+



In [38]:
veFive.coalesce(1) \
        .write \
        .csv("../outputs/five_vertex.csv", mode="overwrite", header=True)

edgesFive.select(
    col("src").alias("Source"), 
    col("dst").alias("Target"), 
    "count_RT","count_distinct_RT")\
        .coalesce(1) \
        .write \
        .csv("../outputs/five_edges.csv", mode="overwrite", header=True)

### Visualizacion
<img src="../docs/grafo_3.png" alt="Drawing" style="width:60%;"/>


### Ejercicio

0. Tomar el grafo de tweets y crear un dataframe con todas las aristas y vértices que forman triángulos dirigidos. O sea, todos los casos donde un usuario *A* retuitea a *B* que retuitea a *C* que retuitea a *A*.

0. Hay casos donde los tres usuarios sean distintos?

0. Graficar con Gephi el resultado.

In [39]:
triangles = graph.find(
    "(a)-[ab]->(b); (b)-[bc]->(c); (c)-[ca]->(a)")

triangles = triangles.filter(
    (col("a") != col("b")) &
    (col("c") != col("b")) &
    (col("a") != col("c")))
# limito a 20
triangles = triangles.limit(20)
triangles.show()

+-----------------+--------------------+-----------------+--------------------+-----------------+--------------------+
|                a|                  ab|                b|                  bc|                c|                  ca|
+-----------------+--------------------+-----------------+--------------------+-----------------+--------------------+
|    [dccolectivo]|[dccolectivo, jud...|          [judrh]|[judrh, mamibel09...|      [mamibel09]|[mamibel09, dccol...|
|     [agapearg51]|[agapearg51, Gabr...|[GabrielaMasista]|[GabrielaMasista,...|[adrianapellaca1]|[adrianapellaca1,...|
|   [rodolfoCeniz]|[rodolfoCeniz, Ru...|      [RuthPizzi]|[RuthPizzi, Pablo...|    [PabloMGener]|[PabloMGener, rod...|
|  [SandraDocente]|[SandraDocente, C...|    [C_Echegoyen]|[C_Echegoyen, Rip...|  [RipamontiRita]|[RipamontiRita, S...|
|  [donofriojorge]|[donofriojorge, w...|[waltervazquez21]|[waltervazquez21,...|     [GurtzDario]|[GurtzDario, dono...|
|  [donofriojorge]|[donofriojorge, w...|[walterv

In [40]:
triEdges = triangles.select("ab.*")
triEdges = triEdges.unionAll(triangles.select("bc.*"))
triEdges = triEdges.unionAll(triangles.select("ca.*"))

triEdges.limit(10).toPandas()

Unnamed: 0,src,dst,count_RT,count_distinct_RT
0,dccolectivo,judrh,2,1
1,agapearg51,GabrielaMasista,58,1
2,rodolfoCeniz,RuthPizzi,5,1
3,SandraDocente,C_Echegoyen,108,3
4,donofriojorge,waltervazquez21,18,3
5,donofriojorge,waltervazquez21,18,3
6,lizzieyn,ConCiencia2015,76,1
7,Carlos_AD_Viola,PatriciaConti9,36,1
8,ajfernandez2001,interinosMAD,52,4
9,ajfernandez2001,interinosMAD,52,4


In [41]:
srcs = triEdges.select(col("src").alias("id"))

dsts = triEdges.select(col("dst").alias("id"))
    
triVertex = srcs.unionAll(dsts).distinct()

triVertex.limit(10).show()

+---------------+
|             id|
+---------------+
|payasobarricada|
|  Laura_Capurro|
| AccesojustoNAD|
|     LaBelgrana|
|    dccolectivo|
|     LiluzLisam|
|     agapearg51|
|    C_Echegoyen|
|  Fte_Izquierda|
|     GurtzDario|
+---------------+



In [42]:
triVertex.select("*")\
        .coalesce(1) \
        .write \
        .csv("../outputs/tri_vertex.csv", mode="overwrite", header=True)

triEdges.select(
    col("src").alias("Source"), 
    col("dst").alias("Target"), 
    "count_RT","count_distinct_RT")\
        .coalesce(1) \
        .write \
        .csv("../outputs/tri_edges.csv", mode="overwrite", header=True)

### Visualizacion

<img src="../docs/grafo_4.png" alt="Drawing" style="width:50%;"/>

Fin