In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("PySpark Intro") \
    .getOrCreate()

In [2]:
dat = spark.read.options(samplingRatio=0.005).json('tweet-data/')

In [None]:
dat.printSchema()

In [None]:
dat.count()

In [3]:
from itertools import permutations

pairs = dat.rdd \
    .map(lambda r: r.entities) \
    .filter(lambda e: e and len(e.hashtags) > 1) \
    .map(lambda e: e.hashtags) \
    .map(lambda tags: [t.text for t in tags]) \
    .flatMap(lambda li: list(permutations(li, 2)))    

In [4]:
pairs.cache()

N = 2

popular_tags = set(pairs.mapValues(lambda v: 1) \
                   .reduceByKey(lambda a,b: a + b) \
                   .filter(lambda t: t[1] > N) \
                   .map(lambda t: t[0]) \
                   .collect())

N = len(popular_tags)

In [5]:
N

47939

In [6]:
lookup = {v:i for i,v in enumerate(popular_tags)}

In [7]:
len(lookup)

47939

In [8]:
def _agg(t,m):
    return t + m

def local_reducer(m,t):
    i,j = t[0]
    m[i,j] = t[1]
    return m

In [9]:
from scipy.sparse import dok_matrix
import numpy as np 

m = dok_matrix((N, N), dtype=np.int32)

out = pairs \
    .filter(lambda t: t[0] in popular_tags and t[1] in popular_tags) \
    .map(lambda t: (lookup[t[0]], lookup[t[1]])) \
    .map(lambda t: (t, 1)) \
    .reduceByKey(lambda a,b: a+b) \
    .aggregate(m, local_reducer, _agg)

In [10]:
out = out.asformat('csr')

In [11]:
out[lookup['NewGreenDeal'], lookup['Brexit']]

13

In [12]:
out[lookup['Memes'], lookup['Merkel']]

25

In [13]:
out

<47939x47939 sparse matrix of type '<class 'numpy.int32'>'
	with 482357 stored elements in Compressed Sparse Row format>