In [1]:
#  Replace with your container and storage account:  "wasbs://<container>@<storage account>.blob.core.windows.net/"
pathPrefix = "wasbs://marvel@vpldb.blob.core.windows.net/"
#  Fetch porgat.txt from storage account
file = sc.textFile(pathPrefix + "porgat.txt")

In [2]:
#  Remove the headers from the file:  lines starting with a star
noHeaders = file.filter(lambda x: len(x)>0 and x[0]!='*')
#  Extract a pair from each line:  the leading integer and a string for the rest of the line
paired = noHeaders.map(lambda l:  l.partition(' ')).filter(lambda t:  len(t)==3 and len(t[0])>0 and len(t[2])>0).map(lambda t: (int(t[0]), t[2]))
#  Filter relationships as they do not start with quotes, then split the integer list
scatteredRelationships = paired.filter(lambda (charId, text):  text[0]!='"').map(lambda (charId, text): (charId, [int(x) for x in text.split(' ')]))
#  Relationships for the same character id sometime spans more than a line in the file, so let's group them together
relationships = scatteredRelationships.reduceByKey(lambda pubList1, pubList2: pubList1 + pubList2)
#  Filter non-relationships as they start with quotes ; remove the quotes
nonRelationships = paired.filter(lambda (index, text):  text[0]=='"').map(lambda (index, text):  (index, text[1:-1].strip()))
#  Characters stop at a certain line (part of the initial header ; we hardcode it here)
characters = nonRelationships.filter(lambda (charId, name): charId<=6486)
#  Publications starts after the characters
publications = nonRelationships.filter(lambda (charId, name): charId>6486)

In [3]:
from pyspark.sql import Row

#  Let's create dataframes out of the RDDs and register them as temporary views for SQL to use

#  Relationships has a list as a component, let's flat that
flatRelationships = relationships.flatMap(lambda (charId, pubList):  [(charId, pubId) for pubId in pubList])
#  Let's map the relationships to an RDD of rows in order to create a data frame out of it
relationshipsDf = spark.createDataFrame(flatRelationships.map(lambda t: Row(charId=t[0], pubId=t[1])))
#  Register relationships as a temporary view
relationshipsDf.createOrReplaceTempView("relationships")

#  Let's do the same for characters
charactersDf = spark.createDataFrame(characters.map(lambda t:  Row(charId=t[0], name=t[1])))
charactersDf.createOrReplaceTempView("characters")

#  and for publications
publicationsDf = spark.createDataFrame(publications.map(lambda t:  Row(pubId=t[0], name=t[1])))
publicationsDf.createOrReplaceTempView("publications")


In [4]:
%sql

SELECT c1.name AS name1, c2.name AS name2, sub.charId1, sub.charId2, sub.pubCount
FROM
(
  SELECT r1.charId AS charId1, r2.charId AS charId2, COUNT(r1.pubId, r2.pubId) AS pubCount
  FROM relationships AS r1
  CROSS JOIN relationships AS r2
  WHERE r1.charId < r2.charId
  AND r1.pubId=r2.pubId
  GROUP BY r1.charId, r2.charId
) AS sub
INNER JOIN characters c1 ON c1.charId=sub.charId1
INNER JOIN characters c2 ON c2.charId=sub.charId2
ORDER BY sub.pubCount DESC
LIMIT 10

In [5]:
%sql

SELECT c1.name AS name1, c2.name AS name2, c3.name AS name3, sub.charId1, sub.charId2, sub.charId3, sub.pubCount
FROM
(
  SELECT r1.charId AS charId1, r2.charId AS charId2, r3.charId AS charId3, COUNT(r1.pubId, r2.pubId, r3.pubId) AS pubCount
  FROM relationships AS r1
  CROSS JOIN relationships AS r2
  CROSS JOIN relationships AS r3
  WHERE r1.charId < r2.charId
  AND r2.charId < r3.charId
  AND r1.pubId=r2.pubId
  AND r2.pubId=r3.pubId
  GROUP BY r1.charId, r2.charId, r3.charId
) AS sub
INNER JOIN characters c1 ON c1.charId=sub.charId1
INNER JOIN characters c2 ON c2.charId=sub.charId2
INNER JOIN characters c3 ON c3.charId=sub.charId3
ORDER BY sub.pubCount DESC
LIMIT 10