# Social characteristics of the Marvel Universe

Adapted from [https://vincentlauzon.com/2018/01/24/azure-databricks-spark-sql-data-frames/](https://vincentlauzon.com/2018/01/24/azure-databricks-spark-sql-data-frames/)

Run the right cell depending on the platform being used.

For AWS EMR use:

In [3]:
file = sc.textFile("s3://bigdatateaching/marvel/porgat.txt")

For Databricks on Azure use:

In [None]:
file = sc.textFile("wasbs://marvel@bigdatateaching.blob.core.windows.net/porgat.txt")

## The Data

The data file is in three parts, with the single file:

* Marvel Characters
* Publications
* Relationships between the two

We need to pre-process the file before we can use it. The following operations are all RDD operations.

In [4]:
#  Remove the headers from the file:  lines starting with a star
noHeaders = file.filter(lambda x: len(x)>0 and x[0]!='*')

#  Extract a pair from each line:  the leading integer and a string for the rest of the line
paired = noHeaders.map(lambda l:  l.partition(' ')).filter(lambda t:  len(t)==3 and len(t[0])>0 and len(t[2])>0).map(lambda t: (int(t[0]), t[2]))

#  Filter relationships as they do not start with quotes, then split the integer list
scatteredRelationships = paired.filter(lambda (charId, text):  text[0]!='"').map(lambda (charId, text): (charId, [int(x) for x in text.split(' ')]))

#  Relationships for the same character id sometime spans more than a line in the file, so let's group them together
relationships = scatteredRelationships.reduceByKey(lambda pubList1, pubList2: pubList1 + pubList2)

#  Filter non-relationships as they start with quotes ; remove the quotes
nonRelationships = paired.filter(lambda (index, text):  text[0]=='"').map(lambda (index, text):  (index, text[1:-1].strip()))

#  Characters stop at a certain line (part of the initial header ; we hardcode it here)
characters = nonRelationships.filter(lambda (charId, name): charId<=6486)

#  Publications starts after the characters
publications = nonRelationships.filter(lambda (charId, name): charId>6486)

In [12]:
noHeaders.take(5)

[u'1 "24-HOUR MAN/EMMANUEL"',
 u'2 "3-D MAN/CHARLES CHAN"',
 u'3 "4-D MAN/MERCURIO"',
 u'4 "8-BALL/"',
 u'5 "A"']

The following cells begin to use SparkSQL.

Spark SQL works with Data Frames which are a kind of “structured” RDD or an “RDD with schema”.

The integration between the two works by creating a RDD of Row (a type from pyspark.sql) and then creating a Data Frame from it.

The Data Frames can then be registered as views.  It is those views we’ll query using Spark SQL.

In [10]:
from pyspark.sql import Row

#  Let's create dataframes out of the RDDs and register them as temporary views for SQL to use

#  Relationships has a list as a component, let's flat that
flatRelationships = relationships.flatMap(lambda (charId, pubList):  [(charId, pubId) for pubId in pubList])

#  Let's map the relationships to an RDD of rows in order to create a data frame out of it
relationshipsDf = spark.createDataFrame(flatRelationships.map(lambda t: Row(charId=t[0], pubId=t[1])))

#  Register relationships as a temporary view
relationshipsDf.createOrReplaceTempView("relationships")

#  Let's do the same for characters
charactersDf = spark.createDataFrame(characters.map(lambda t:  Row(charId=t[0], name=t[1])))
charactersDf.createOrReplaceTempView("characters")

#  and for publications
publicationsDf = spark.createDataFrame(publications.map(lambda t:  Row(pubId=t[0], name=t[1])))
publicationsDf.createOrReplaceTempView("publications")

In [11]:
relationshipsDf.take(10)

[Row(charId=2, pubId=6488),
 Row(charId=2, pubId=6489),
 Row(charId=2, pubId=6490),
 Row(charId=2, pubId=6491),
 Row(charId=2, pubId=6492),
 Row(charId=2, pubId=6493),
 Row(charId=2, pubId=6494),
 Row(charId=2, pubId=6495),
 Row(charId=2, pubId=6496),
 Row(charId=4, pubId=6506)]

The following cell is the standard way of running a SQL query on Spark. 

In [15]:
df1 = spark.sql("""SELECT c1.name AS name1, c2.name AS name2, sub.charId1, sub.charId2, sub.pubCount
FROM
(
  SELECT r1.charId AS charId1, r2.charId AS charId2, COUNT(r1.pubId, r2.pubId) AS pubCount
  FROM relationships AS r1
  CROSS JOIN relationships AS r2
  WHERE r1.charId < r2.charId
  AND r1.pubId=r2.pubId
  GROUP BY r1.charId, r2.charId
) AS sub
INNER JOIN characters c1 ON c1.charId=sub.charId1
INNER JOIN characters c2 ON c2.charId=sub.charId2
ORDER BY sub.pubCount DESC
LIMIT 10""")


In [16]:
df1.take(10)

[Row(name1=u'HUMAN TORCH/JOHNNY S', name2=u'THING/BENJAMIN J. GR', charId1=2557, charId2=5716, pubCount=744),
 Row(name1=u'HUMAN TORCH/JOHNNY S', name2=u'MR. FANTASTIC/REED R', charId1=2557, charId2=3805, pubCount=713),
 Row(name1=u'MR. FANTASTIC/REED R', name2=u'THING/BENJAMIN J. GR', charId1=3805, charId2=5716, pubCount=708),
 Row(name1=u'INVISIBLE WOMAN/SUE', name2=u'MR. FANTASTIC/REED R', charId1=2650, charId2=3805, pubCount=701),
 Row(name1=u'HUMAN TORCH/JOHNNY S', name2=u'INVISIBLE WOMAN/SUE', charId1=2557, charId2=2650, pubCount=694),
 Row(name1=u'INVISIBLE WOMAN/SUE', name2=u'THING/BENJAMIN J. GR', charId1=2650, charId2=5716, pubCount=668),
 Row(name1=u'SPIDER-MAN/PETER PAR', name2=u'WATSON-PARKER, MARY', charId1=5306, charId2=6166, pubCount=616),
 Row(name1=u'JAMESON, J. JONAH', name2=u'SPIDER-MAN/PETER PAR', charId1=2959, charId2=5306, pubCount=526),
 Row(name1=u'CAPTAIN AMERICA', name2=u'IRON MAN/TONY STARK', charId1=859, charId2=2664, pubCount=446),
 Row(name1=u'SCARLET WIT

The following cell runs the same query as above, **but this works only in the Azure Databricks notebooks using the [magic functions](https://docs.databricks.com/user-guide/notebooks/index.html)**

In [12]:
%sql

SELECT c1.name AS name1, c2.name AS name2, sub.charId1, sub.charId2, sub.pubCount
FROM
(
  SELECT r1.charId AS charId1, r2.charId AS charId2, COUNT(r1.pubId, r2.pubId) AS pubCount
  FROM relationships AS r1
  CROSS JOIN relationships AS r2
  WHERE r1.charId < r2.charId
  AND r1.pubId=r2.pubId
  GROUP BY r1.charId, r2.charId
) AS sub
INNER JOIN characters c1 ON c1.charId=sub.charId1
INNER JOIN characters c2 ON c2.charId=sub.charId2
ORDER BY sub.pubCount DESC
LIMIT 10

SyntaxError: invalid syntax (<ipython-input-12-a54ca3fe8031>, line 3)

In [18]:
df2 = spark.sql("""
SELECT c1.name AS name1, c2.name AS name2, c3.name AS name3, sub.charId1, sub.charId2, sub.charId3, sub.pubCount
FROM
(
  SELECT r1.charId AS charId1, r2.charId AS charId2, r3.charId AS charId3, COUNT(r1.pubId, r2.pubId, r3.pubId) AS pubCount
  FROM relationships AS r1
  CROSS JOIN relationships AS r2
  CROSS JOIN relationships AS r3
  WHERE r1.charId < r2.charId
  AND r2.charId < r3.charId
  AND r1.pubId=r2.pubId
  AND r2.pubId=r3.pubId
  GROUP BY r1.charId, r2.charId, r3.charId
) AS sub
INNER JOIN characters c1 ON c1.charId=sub.charId1
INNER JOIN characters c2 ON c2.charId=sub.charId2
INNER JOIN characters c3 ON c3.charId=sub.charId3
ORDER BY sub.pubCount DESC
LIMIT 10
""")


In [None]:
df2.take(10)

In [None]:
%sql

SELECT c1.name AS name1, c2.name AS name2, c3.name AS name3, sub.charId1, sub.charId2, sub.charId3, sub.pubCount
FROM
(
  SELECT r1.charId AS charId1, r2.charId AS charId2, r3.charId AS charId3, COUNT(r1.pubId, r2.pubId, r3.pubId) AS pubCount
  FROM relationships AS r1
  CROSS JOIN relationships AS r2
  CROSS JOIN relationships AS r3
  WHERE r1.charId < r2.charId
  AND r2.charId < r3.charId
  AND r1.pubId=r2.pubId
  AND r2.pubId=r3.pubId
  GROUP BY r1.charId, r2.charId, r3.charId
) AS sub
INNER JOIN characters c1 ON c1.charId=sub.charId1
INNER JOIN characters c2 ON c2.charId=sub.charId2
INNER JOIN characters c3 ON c3.charId=sub.charId3
ORDER BY sub.pubCount DESC
LIMIT 10