In [1]:
import findspark
findspark.init()

In [2]:
import pyspark

In [3]:
spark = pyspark.sql.SparkSession.builder.appName('HeroAppearances').getOrCreate()

In [4]:
heroNames = spark.read.csv('old/Marvel-Names.txt', sep='\"')
heroNames = heroNames.drop('_c2')
heroNames = heroNames.withColumnRenamed('_c0','heroId')
heroNames = heroNames.withColumnRenamed('_c1', 'heroName')
heroNames.show(5)

+------+--------------------+
|heroId|            heroName|
+------+--------------------+
|    1 |24-HOUR MAN/EMMANUEL|
|    2 |3-D MAN/CHARLES CHAN|
|    3 |    4-D MAN/MERCURIO|
|    4 |             8-BALL/|
|    5 |                   A|
+------+--------------------+
only showing top 5 rows



In [5]:
#Just another way to do the same thing as above
heroNames2 = spark.read.csv('old/Marvel-Names.txt', sep='\"')
heroNames2 = heroNames2.selectExpr('_c0 as heroId', '_c1 as heroName')
heroNames2.show(5)

+------+--------------------+
|heroId|            heroName|
+------+--------------------+
|    1 |24-HOUR MAN/EMMANUEL|
|    2 |3-D MAN/CHARLES CHAN|
|    3 |    4-D MAN/MERCURIO|
|    4 |             8-BALL/|
|    5 |                   A|
+------+--------------------+
only showing top 5 rows



In [6]:
occurs = spark.read.csv('old/Marvel-Graph.txt', sep='|')
occurs = occurs.withColumnRenamed('_c0', 'occurs')
occursSplit = pyspark.sql.functions.split(occurs.occurs, ' ')
occurs = occurs.withColumn('occurs', occursSplit)
occurs.show(5)

+--------------------+
|              occurs|
+--------------------+
|[5988, 748, 1722,...|
|[5989, 4080, 4264...|
|[5982, 217, 595, ...|
|[5983, 1165, 3836...|
|[5980, 2731, 3712...|
+--------------------+
only showing top 5 rows



In [7]:
targetId = 5020 
#can't pass a single variable to a udf, need to create a column of literal values
targetCol = pyspark.sql.functions.lit(str(targetId))

In [8]:
def findTarget(line, targetCol):
    return line.count(targetCol)
udfFindTarget = pyspark.sql.functions.udf(findTarget)

In [9]:
occurs = occurs.withColumn('targetOccurs', udfFindTarget('occurs', targetCol).cast('integer'))
occurs.show(5)

+--------------------+------------+
|              occurs|targetOccurs|
+--------------------+------------+
|[5988, 748, 1722,...|           0|
|[5989, 4080, 4264...|           0|
|[5982, 217, 595, ...|           0|
|[5983, 1165, 3836...|           0|
|[5980, 2731, 3712...|           0|
+--------------------+------------+
only showing top 5 rows



In [10]:
targetSum = occurs.groupBy().sum('targetOccurs').collect()[0][0]
targetHero = heroNames.where(heroNames.heroId == targetId).select('heroName').collect()[0][0]

In [11]:
print('{0} has {1} co-appearances'.format(targetHero, targetSum))

SHANGA has 27 co-appearances


In [None]:
spark.stop()