In [55]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func

In [2]:
spark = SparkSession \
        .builder \
        .appName("SuperheroAnalysis") \
        .getOrCreate()
sc = spark.sparkContext

In [103]:
def get_heroname_lookup() -> dict:
    with open(
        r"/home/jovyan/data/marvel-names.txt","r", 
        encoding="utf-8", errors="surrogateescape"
    ) as heronames:
        heroname_lookup = {}
        for line in heronames:
            fields = line.split(' ', 1)
            hero_id = fields[0]
            hero_name = fields[1].replace('"','').strip()
            heroname_lookup[hero_id] = hero_name
        return heroname_lookup

def heroname_map(hero_id:str) -> str:
    return broadcast.value[hero_id]

def process_hero_connections(line) -> tuple:
    fields = line.split(" ", 1)
    hero_id = fields[0]
    connections = len(line.split(" ")) - 1
    return (hero_id, connections)

def identify_popular_heroes(rdd):
    return rdd.map(process_hero_connections) \
            .reduceByKey(lambda x, y: x+y) \
            .map(lambda x: (x[0], heroname_map(x[0]), x[1])) \
            .sortBy(lambda x: x[2], ascending=False)

def popular_heroes_df(connection_df):
    return connection_df \
        .withColumn(
            "hero_id", func.split(func.col("value"), " ")[0]
        ) \
        .withColumn(
            "hero_name", udf_heroname_lookup(func.col("hero_id"))
        ) \
        .withColumn (
            "connections", func.size(func.split(func.col("value"), " "))-1
        ) \
        .groupBy(["hero_id", "hero_name"]) \
        .agg(func.sum("connections").alias("connections")) \
        .sort(func.col("connections").desc())

In [104]:
broadcast = sc.broadcast(get_heroname_lookup())
udf_heroname_lookup = func.udf(heroname_map)

In [105]:
hero_graph = sc.textFile("file:///home/jovyan/data/marvel-graph.txt")

In [106]:
top10_popularheroes = identify_popular_heroes(hero_graph).take(10)

In [107]:
print(f"hero_name \t # of connections")
for line in top10_popularheroes:
    print(f"{line[1]} \t {line[2]}")

hero_name 	 # of connections
CAPTAIN AMERICA 	 1937
SPIDER-MAN/PETER PAR 	 1745
IRON MAN/TONY STARK 	 1532
THING/BENJAMIN J. GR 	 1429
WOLVERINE/LOGAN 	 1397
MR. FANTASTIC/REED R 	 1389
HUMAN TORCH/JOHNNY S 	 1374
SCARLET WITCH/WANDA 	 1348
THOR/DR. DONALD BLAK 	 1292
BEAST/HENRY &HANK& P 	 1283


In [89]:
connections_df = spark.read.text("file:///home/jovyan/data/marvel-graph.txt")

In [94]:
hero_connections = popular_heroes_df(connections_df)

In [97]:
hero_connections.select("hero_name","connections").show(10)

+--------------------+-----------+
|           hero_name|connections|
+--------------------+-----------+
|     CAPTAIN AMERICA|       1937|
|SPIDER-MAN/PETER PAR|       1745|
| IRON MAN/TONY STARK|       1532|
|THING/BENJAMIN J. GR|       1429|
|     WOLVERINE/LOGAN|       1397|
|MR. FANTASTIC/REED R|       1389|
|HUMAN TORCH/JOHNNY S|       1374|
| SCARLET WITCH/WANDA|       1348|
|THOR/DR. DONALD BLAK|       1292|
|BEAST/HENRY &HANK& P|       1283|
+--------------------+-----------+
only showing top 10 rows

