In [1]:
from streamgraph.graph import spark, sc, Graph

In [2]:
g = Graph('/mnt/d/Datasets/harvey_streams/2017-08-25 09_00_00.csv')

[2.31] Loaded 13624 nodes.
[4.89] Parsed lists within hashtags and mentions.
[5.12] Found 3507876 edges by topic.
[11.56] Found 4174765 edges by authority.
[15.68] Found 510427 edges by hashtags.
[20.37] Found 3865403 edges by mentions.
[27.57] Found 7998447 edges in total.


In [9]:
from pyspark.sql.functions import collect_set, collect_list, array_union, concat, array, col, sum, pandas_udf, PandasUDFType, udf
from pyspark.sql.types import ArrayType, LongType, StructType, StructField
from pyspark.ml.linalg import VectorUDT, SparseVector
import pandas as pd
import numpy as np

def getNodeAdjacency(edges, num_nodes):
    # get dataframe with bi-directional edges
    a = edges.groupby('src').agg(collect_list('dst').alias('dst'))
    b = edges \
        .withColumnRenamed('src', 'tmp') \
        .withColumnRenamed('dst', 'src') \
        .withColumnRenamed('tmp', 'dst') \
        .groupby('src').agg(collect_list('dst').alias('dst'))
    neighbors = a.union(b)

    @pandas_udf('node long, neighbors array<long>', PandasUDFType.GROUPED_MAP)
    def joinArrays(a):
        dst = np.unique(np.concatenate(a.dst))
        return pd.DataFrame([(a.src.iloc[0], dst)], columns=['node', 'neighbors'])

    neighbors = neighbors.groupby('src').apply(joinArrays)

    # convert to sparse vectors
    @udf(VectorUDT())
    def toSparse(a):
        vector = [(dst, 1) for dst in a]
        return SparseVector(num_nodes, vector)

    neighbors = neighbors.withColumn('neighbors', toSparse('neighbors'))
    return neighbors

In [10]:
na = getNodeAdjacency(g.edges, g.num_nodes.value)
na.show()

+----+--------------------+
|node|           neighbors|
+----+--------------------+
|  26|(13624,[5,18,36,4...|
|  29|(13624,[21,58,122...|
| 474|(13624,[9,23,35,5...|
| 964|(13624,[5,6,9,10,...|
|1677|(13624,[5,7,8,11,...|
|1697|(13624,[5,7,8,11,...|
|1806|(13624,[21,23,35,...|
|1950|(13624,[93,95,116...|
|2040|(13624,[20,61,79,...|
|2214|(13624,[114,162,1...|
|2250|(13624,[7,11,14,2...|
|2453|(13624,[28,87,110...|
|2509|(13624,[6,10,39,1...|
|2529|(13624,[3,33,36,6...|
|2927|(13624,[21,37,43,...|
|3091|(13624,[4,77,112,...|
|3506|(13624,[1,3,14,67...|
|3764|(13624,[1,3,14,69...|
|4590|(13624,[40,64,84,...|
|4823|(13624,[7,58,61,8...|
+----+--------------------+
only showing top 20 rows



In [11]:
t = na.select('neighbors').collect()

In [13]:
t[0]

Row(neighbors=SparseVector(13624, {5: 1.0, 18: 1.0, 36: 1.0, 45: 1.0, 60: 1.0, 63: 1.0, 65: 1.0, 93: 1.0, 94: 1.0, 97: 1.0, 99: 1.0, 116: 1.0, 119: 1.0, 125: 1.0, 127: 1.0, 129: 1.0, 143: 1.0, 174: 1.0, 180: 1.0, 184: 1.0, 192: 1.0, 197: 1.0, 218: 1.0, 220: 1.0, 230: 1.0, 240: 1.0, 259: 1.0, 261: 1.0, 263: 1.0, 267: 1.0, 271: 1.0, 273: 1.0, 277: 1.0, 296: 1.0, 301: 1.0, 303: 1.0, 308: 1.0, 317: 1.0, 321: 1.0, 326: 1.0, 329: 1.0, 340: 1.0, 352: 1.0, 368: 1.0, 373: 1.0, 376: 1.0, 377: 1.0, 392: 1.0, 393: 1.0, 394: 1.0, 398: 1.0, 405: 1.0, 431: 1.0, 443: 1.0, 450: 1.0, 452: 1.0, 469: 1.0, 481: 1.0, 492: 1.0, 520: 1.0, 521: 1.0, 547: 1.0, 561: 1.0, 567: 1.0, 570: 1.0, 577: 1.0, 593: 1.0, 624: 1.0, 627: 1.0, 628: 1.0, 632: 1.0, 635: 1.0, 636: 1.0, 678: 1.0, 681: 1.0, 710: 1.0, 746: 1.0, 785: 1.0, 799: 1.0, 814: 1.0, 815: 1.0, 820: 1.0, 842: 1.0, 844: 1.0, 854: 1.0, 863: 1.0, 866: 1.0, 867: 1.0, 889: 1.0, 905: 1.0, 908: 1.0, 909: 1.0, 917: 1.0, 925: 1.0, 949: 1.0, 962: 1.0, 991: 1.0, 1001: 1

In [37]:
g.nodes.filter(g.nodes.id == 168576).show()

+------+------------------+-------------------+----------+--------------------+-----+-----+-------------+---------+--------+----------+
|    id|         twitterID|          timestamp|      user|        originalText|topic|reply|inReplyToUser|authority|hashtags|  mentions|
+------+------------------+-------------------+----------+--------------------+-----+-----+-------------+---------+--------+----------+
|168576|901273847550410752|2017-08-26 02:43:24|ulovejamie|@iCyclone Thx for...|   17|false|         none| iCyclone|    null|[iCyclone]|
+------+------------------+-------------------+----------+--------------------+-----+-----+-------------+---------+--------+----------+



In [38]:
g.nodes.filter(g.nodes.topic == 17).show()

+------+------------------+-------------------+----------+--------------------+-----+-----+-------------+---------+--------+----------+
|    id|         twitterID|          timestamp|      user|        originalText|topic|reply|inReplyToUser|authority|hashtags|  mentions|
+------+------------------+-------------------+----------+--------------------+-----+-----+-------------+---------+--------+----------+
|168576|901273847550410752|2017-08-26 02:43:24|ulovejamie|@iCyclone Thx for...|   17|false|         none| iCyclone|    null|[iCyclone]|
+------+------------------+-------------------+----------+--------------------+-----+-----+-------------+---------+--------+----------+



In [7]:
g.nodes.select('id').distinct().count()

2399