# The Marvel Comics Social Network
### Data Engineering Capstone Project

#### Project Summary
This project analyzes the relationships between characters in the Marvel comics as a social network. The source data provides insight into how often various characters meet throughout the comics, and in which comics they appear. The goal of this project is to use the source data to make predictions about future appearances of characters in Marvel Cinimatic Universe (MCU) movies based on character "popularity" from the original source comics. In addition, we will seek to derive social conclusions from the character relationships.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [109]:
# Do all imports and installs here
import pandas as pd
import os
import csv
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

### Step 1: Scope the Project and Gather Data

#### Scope 
`Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>`

This project analyzes the relationships between characters in the Marvel comics as if they were people in a real social network. The source data provides insight into how often various characters meet throughout the comics, and in which comics they appear. This information could be used to assess which characters are the most "popular", by way of which character makes the most connections and appears in the most comics. Thinking of this in a "social network" context, we could make "People You May Know" recommendations for various characters with common connections (edges). The "People You May Know" output of this data exploration is similar to common functionality offered by other social media platforms, including Facebook, Instagram, LinkedIn, Twitter, and more.
Also, by cross-referencing this information with character appearances in the Marvel Cinimatic Universe (MCU), we may be able to project which characters are most likely to next appear in the upcoming Marvel movies based on character "popularity" in the comics.

#### Describe and Gather Data 
`Describe the data sets you're using. Where did it come from? What type of information is included?`

The source data is located in the `marvel_data` directory. The three main CSV files were sourced from [The Marvel Universe Social Network dataset on Kaggle](https://www.kaggle.com/csanhueza/the-marvel-universe-social-network). These files, along with a description of their contents, follows:
- edges.csv: Indicates which comics that each hero appears in.
- hero-network.csv: Indicates which heroes appear together in the comics.
- nodes.csv: Describes the node types (i.e. either "hero" or "comic").

The final CSV (screentime.csv) was gathered from [an MCU quiz on Sporkle](https://www.sporcle.com/games/wearevenom2/mcu-screen-time-2021). It contains the screentime for each comic book character in the Marvel Cinematic Universe (MCU) movies.

In [110]:
# Read in the data here

# Get source data folder
source_file_path = os.getcwd() + '/marvel_data'
edges_file_path = source_file_path + '/edges.csv'
hero_network_file_path = source_file_path + '/hero-network.csv'
nodes_file_path = source_file_path + '/nodes.csv'
screentime_file_path = source_file_path + '/screentime.csv'

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

comic_appearances_df = spark.read.csv(edges_file_path, header='true')
directed_hero_network_df = spark.read.csv(hero_network_file_path, header='true')
screentime_df = spark.read.csv(screentime_file_path, header='true')

comic_appearances_df.printSchema()
directed_hero_network_df.printSchema()
screentime_df.printSchema()

root
 |-- hero: string (nullable = true)
 |-- comic: string (nullable = true)

root
 |-- hero1: string (nullable = true)
 |-- hero2: string (nullable = true)

root
 |-- Screen Time: string (nullable = true)
 |-- Answer: string (nullable = true)
 |-- MCU Debut: string (nullable = true)



### Step 2: Explore and Assess the Data
#### Explore the Data 
`Identify data quality issues, like missing values, duplicate data, etc.`

1. Hero names don't match up between the "Screentime" table and the other tables, due to them coming from different sources.

2. Hero names in the hero-network.csv source are truncated.

3. Some characters in the "Screentime" table are not present as "Heroes" in the other tables because they are supporting characters, not heroes.

#### Cleaning Steps
`Document steps necessary to clean the data`

1. Here, I provide a map to standardize the hero names in the screentime table to have the names from the other tables, for consistency and cross-referencing the hero names as foreign keys between the tables. I don't include all names for now, to save time, but as time permits, I will finish this mapping.

2. This will require queries using "LIKE%" instead of "=". Also, names don't always align across tables. For instance, the nodes.csv has Spider-man as "SPIDER-MAN/PETER PARKERKER" (typo with "KER" repeated twice at the end), but the edges.csv table has him as "SPIDER-MAN/PETER PARKER" (correct spelling) and the hero-network.csv table has him as "SPIDER-MAN/PETER PAR" (truncated). This will require some tweaking of the data along the way. If results seem unintuitive, I will compare the hero and comic names and correct as I go.

3. This does not require cleanup, but is just additional information. It may result in some NULL values in derived tables, but that is expected. We will have to filter out NULL values in any analyses.


In [111]:
# Performing cleaning tasks here

# Cleaning Step #1
def map_names(old_name):
    """
    Allows for mapping hero names from the Screentime source data table to those used in the Nodes source data table.
    """
    
    name_map = {
        'Iron Man': 'IRON MAN/TONY STARK',
        'Captain America': 'CAPTAIN AMERICA',
        'Loki': 'LOKI [ASGARDIAN]',
        'Thor': 'THOR/DR. DONALD BLAK',
        'Black Widow': 'BLACK WIDOW/NATASHA',
        'Spider-Man': 'SPIDER-MAN/PETER PAR',
        'Scarlet Witch': 'SCARLET WITCH/WANDA',
        'Falcon': 'FALCON/SAM WILSON',
        'Winter Soldier': 'BUCKY/BUCKY BARNES',
        'Ant-Man': 'ANT-MAN II/SCOTT HAR',
        'Vision': 'VISION',
        'Doctor Strange': 'DR. STRANGE/STEPHEN',
        'Gamora': 'GAMORA',
        'Captain Marvel': 'CAPTAIN MARVEL/CAPTA',
        'Pepper Potts': 'POTTS, VIRGINIA PEPP',
        'Nick Fury': 'FURY, COL. NICHOLAS',
        'Black Panther': "BLACK PANTHER/T'CHAL",
        'War Machine': 'RHODES, MR.',
        'Rocket Raccoon': 'ROCKET RACCOON',
        'Wasp': 'WASP/JANET VAN DYNE',
        'Hawkeye': 'HAWKEYE/CLINT BARTON',
        'Thanos': 'THANOS',
        
#         'Tinkerer': 'TINKERER',
#         'Ayesha': 'AYESHA',
#         'Minn-Erva': 'MINERVA [KREE]',
#         'Supreme Intelligence': 'SUPREME INTELLIGENCE',
#         'The Other': 'OTHER',
#         'Morgan Stark': 'STARK, MORGAN',
#         'Ellen Brandt': 'BRANDT, ELLEN',
#         'Surtur': 'SURTUR'
    }
    return name_map[old_name] if old_name in name_map else old_name


map_names_udf = f.udf(map_names)
screentime_df = screentime_df.withColumn("Answer", map_names_udf("Answer"))


# "Cleaning" Step #2
print("Spiderman entries per table:")
print("---------------------------\n")
print("Edges table:")
comic_appearances_df.select("hero").filter(f.col("hero").contains("SPIDER-MAN/PETER")).distinct().show(5, False)

print("Hero Network table:")
directed_hero_network_df.select("hero1").filter(f.col("hero1").contains("SPIDER-MAN/PETER")).distinct().show(5, False)

print("Nodes table (Screentime table, after mapping):")
screentime_df.select("Answer").filter(f.col("Answer").contains("SPIDER-MAN")).distinct().show(5, False)

Spiderman entries per table:
---------------------------

Edges table:
+-----------------------+
|hero                   |
+-----------------------+
|SPIDER-MAN/PETER PARKER|
+-----------------------+

Hero Network table:
+--------------------+
|hero1               |
+--------------------+
|SPIDER-MAN/PETER PAR|
+--------------------+

Nodes table (Screentime table, after mapping):
+--------------------+
|Answer              |
+--------------------+
|SPIDER-MAN/PETER PAR|
+--------------------+



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
`Map out the conceptual data model and explain why you chose that model`

The [schema diagram](./schema_diagram.png) illustrates the data model. An asterisk (*) indicates a primary key, and multiple asterisks on a single table indicate a composite primary key.
This data model uses a Star schema with fact and dimension tables.
The "comic_appearances" table is the fact table, as the fact of this data model is which heroes appeared in which comics. The combination of hero and comic is unique in the "comic_appearances" table, so those two fields together form the composite primary key.
The "screentime" table is a dimension of the data model. The hero name is the primary key here, and also the foreign key from the "comic_appearances" table.
The "directed_network" table is a dimension based off the hero_network source CSV. This table may have duplicate entries, and is treated more as a graph source than as a normalized database table. For that reason, there is no primary key.
The "undirected_network" table is derived from the "directed_network" table. However, it is separated into its own table to improve the ability to query the database for valuable information. For instance, in this project, we assume that a hero's "popularity" is dependent on their interactions with others, but not on the direction of those interactions. By transforming the hero_network source CSV into both a directed and undirected network table, we preserve the data integrity of the source data in the directed_network table while improving the time to perform common queries from the undirected_network table. Going forward in our use of this database, we would need to insert hero interactions into each table. This improvement to querying the database would come at the expense of some database write performance, but this is a tradeoff we're willing to make, based on the assumption that we will be drawing analytics about character relationships much more often than we will be writing new interactions to the database.


#### 3.2 Mapping Out Data Pipelines
`List the steps necessary to pipeline the data into the chosen data model`

First, the edges.csv should be read into the "comic_appearances" table.

Next, the screentime.csv should be read into a "screentime" table. I choose to rename the column labels on this table to remove spaces and standardize the language. I also chose to parse the initial time string from "mm:ss" format into just "ss" format, to facilitate numeric operations on the duration. This requires parsing the time by splitting the string on the ":", multiplying the "mm" portion by 60, and adding that to the "ss" value.

Then, the hero-network.csv should be read into the "directed_hero_network" table.

Finally, the "hero_network" dataframe must be parsed to generate the "undirected_hero_network" table. This step includes:
1. Standardize the order of the hero relationships. i.e. Remove the "directed" nature of the network.
This step will require comparing hero1 and hero2 alphabetically and putting the alphabetically-first hero in the hero1 column and and the second hero in the hero2 column. This will standardize entries such as "A -> B" and "B -> A" to both be "A -> B". This will remove the directed nature of the network in the table.
2. Removing duplicates
Once each relationship is stored in alphabetical order, we can aggregate by each hero1-hero2 pair and add a "sum" column to count how many times each interaction occurred. This essentially removes duplicates, while maintaining the magnitude of interactions in the "sum" column. This table can then be more easily queried for network relationships.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
`Build the data pipelines to create the data model.`

In [112]:
# Write code here

comic_appearances_df.createOrReplaceTempView("comic_appearances")

screentime_df = screentime_df.withColumnRenamed('Answer', 'hero')\
                    .withColumnRenamed('Screen Time', 'duration')\
                    .withColumnRenamed('MCU Debut', 'debut_movie')

def duration_to_seconds(duration_string):
    """
    Converts duration string from "mm:ss" string format to "ss" integer format 
    """
    
    parts = duration_string.split(":")
    seconds = (int(parts[0]) * 60) + int(parts[1])
    return seconds

duration_to_seconds_udf = f.udf(duration_to_seconds)
screentime_df = screentime_df.withColumn("duration", duration_to_seconds_udf("duration").cast("int"))

screentime_df.printSchema()
screentime_df.createOrReplaceTempView("screentime")

undirected_hero_network_df = directed_hero_network_df.withColumn("first_hero", f.when(f.col("hero1") < f.col("hero2"), f.col("hero1")).otherwise(f.col("hero2")))\
                                                    .withColumn("second_hero", f.when(f.col("hero1") > f.col("hero2"), f.col("hero1")).otherwise(f.col("hero2")))\
                                                    .drop("hero1").drop("hero2")
undirected_hero_network_df = undirected_hero_network_df.groupBy("first_hero", "second_hero").count().withColumnRenamed('count', 'interactions_count')

directed_hero_network_df.printSchema()
undirected_hero_network_df.printSchema()

directed_hero_network_df.createOrReplaceTempView("directed_hero_network")
undirected_hero_network_df.createOrReplaceTempView("undirected_hero_network")

root
 |-- duration: integer (nullable = true)
 |-- hero: string (nullable = true)
 |-- debut_movie: string (nullable = true)

root
 |-- hero1: string (nullable = true)
 |-- hero2: string (nullable = true)

root
 |-- first_hero: string (nullable = true)
 |-- second_hero: string (nullable = true)
 |-- interactions_count: long (nullable = false)



#### 4.2 Data Quality Checks
```
Explain and run the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
```

##### 4.2.1. Integrity constraints:
First, I check the screentime table for the datatype of the duration column. I do this because the "screentime" is initially stored as a string in the source data, so this check allows me to validate my "duration_to_seconds" udf transformer to some degree.


In [113]:
# Perform quality checks here

# Check the screentime table has a "int" type duration column, since it begins as a string in the source data.

def check_field_data_type(table, field, expected):

    field_entry = spark.sql(f'''
        SELECT {field}
        FROM {table}
        WHERE {field} IS NOT NULL
        LIMIT 1
    ''').head()
    
    if field_entry is None:
        raise AssertionError
        return False
    else: 
        field_entry = field_entry[0]
    
    assert isinstance(field_entry, expected), f"Column '{field}' in table '{table}' is not {expected} type."
    return isinstance(field_entry, expected)
    
print(check_field_data_type('screentime', 'duration', int))

True


Next, I join the screentime table with the undirected network table to validate that I get expected NULL duration results for a hero who has not yet appeared in MCU movies (i.e. screentime duration = NULL) but who do have undirected hero network interactions. At the same time, I validate that I get non-NULL duration results for a hero who I know to have appeared in MCU movies.

In [114]:
# Check the screentime duration value for a given hero name. Some may be NULL.

def check_hero_movie_duration(hero_name, expected_duration):

    field_entry = spark.sql(f'''
        SELECT un.first_hero, st.duration, SUM(un.interactions_count) interactions
        FROM undirected_hero_network un
        LEFT JOIN screentime st ON (un.first_hero = st.hero)
        WHERE un.first_hero = "{hero_name}"
        GROUP BY un.first_hero, st.duration
        ORDER BY st.duration DESC, interactions DESC
        LIMIT 1
    ''').head()
    
    if field_entry is None:
        raise AssertionError('No entry found.')
        return False
    else: 
        field_entry = field_entry[1]
        
    field_entry = 0 if field_entry is None else field_entry
    
    assert field_entry == expected_duration, f"{hero_name} has {field_entry} screentime duration, not {expected_duration}."
    return field_entry == expected_duration
    
print(check_hero_movie_duration('CAPTAIN AMERICA', 14610))
print(check_hero_movie_duration('BEAST/HENRY &HANK& P', 0))
print(check_hero_movie_duration('IRON MAN/TONY STARK ', 0))
# print(check_hero_movie_duration('IRON MAN/TONY STARK', 0))  # Expected to throw an assertion error. Name not found.

True
True
True


These results reveal something interesting. Iron Man has the most screentime duration according to the screentime table, but does not appear at the top of the joined table, even when we sort by duration descending. This reveals the data error that Iron Man is stored under different names in the two tables. On closer inspection, the name has a space (' ') character at the end in the undirected_hero_network table. This error in the source data would have to be corrected upstream, prior to loading the dataframe data into the SQL tables.

##### 4.2.2. Unit test scripts:
I perform simple unit tests for the map_names and duration_to_seconds udf transformer functions outside of a dataframe and with known values, to validate their operation.
For completeness, I use the previous test function to perform a search in the screentime table on a hero name that was translated from the edges.csv to ensure that the map_names udf function was properly performed.

In [115]:
# Validate the map_names udf functionality
assert map_names("Spider-Man") == "SPIDER-MAN/PETER PAR", "Unexpected value received from map_names converter."
print(map_names("Spider-Man") == "SPIDER-MAN/PETER PAR")

# Validate the duration_to_seconds udf functionality
assert duration_to_seconds("10:30") == 630, "Unexpected value received from duration_to_seconds converter."
print(duration_to_seconds("10:30") == 630)

# Use a transformed hero name to check for the user in the screentime table
print(check_hero_movie_duration('SPIDER-MAN/PETER PAR', 9750))

True
True
True


##### 4.2.3. Soure/Count checks: 
I perform a filter on the directed_hero_network for a hero with both initiating and receiving interactions in the hero-network source. I then perform the same filter in the undirected_hero_network table to ensure that the number of row results in the directed_hero_network table filter matches the "interactions_count" value in the undirected_hero_network table.

In [116]:
# Compare number of interaction rows in the directed interaction table to the counts in the undirected interaction table

def get_directed_hero_interaction_count(hero1, hero2):

    interactions = spark.sql(f'''
        SELECT hero1, hero2
        FROM directed_hero_network
        WHERE (hero1='{hero1}' OR hero2='{hero1}') AND (hero1='{hero2}' OR hero2='{hero2}')
        ORDER BY hero1 ASC
    ''').count()
    
    return interactions

def get_undirected_hero_interaction_count(hero1, hero2):

    interactions = spark.sql(f'''
        SELECT first_hero, second_hero, interactions_count
        FROM undirected_hero_network
        WHERE (first_hero='{hero1}' OR second_hero='{hero1}') AND (first_hero='{hero2}' OR second_hero='{hero2}')
        ORDER BY first_hero ASC
    ''').head()[2]
    
    return interactions

directed_count = get_directed_hero_interaction_count('DOLLAR BILL', 'CLEA')
undirected_count = get_undirected_hero_interaction_count('DOLLAR BILL', 'CLEA')
assert directed_count == undirected_count, "The directed and undirected interaction counts for the heroes provided do not align."
print(directed_count == undirected_count)

True


#### 4.3 Data dictionary 
`Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.`

"comic_appearances" table

| Row         | Data Type                    | Description                      | Source      |
| ----------- | ---------------------------- | -------------------------------- | ----------- |
| hero        | string (all caps, truncated) | Hero Name                        | edges.csv   |
| comic       | string (all caps)            | Comic Title (Series and Edition) | edges.csv   |


"screentime" table

| Row         | Data Type | Description                                | Source           |
| ----------- | --------- | ------------------------------------------ | ---------------- |
| hero        | string    | Hero Name                                  | screentime.csv   |
| duration    | int       | Screen time in MCU movies (in seconds)     | screentime.csv   |
| debut_movie | string    | First movie that the character appeared in | screentime.csv   |


"directed_hero_network" table

| Row      | Data Type | Description                     | Source             |
| -------- | --------- | ------------------------------- | ------------------ |
| hero1    | string    | Hero initiating the interaction | hero-network.csv   |
| hero2    | string    | Hero receiving the interaction  | hero-network.csv   |


"undirected_hero_network" table

| Row                | Data Type | Description                                            | Source             |
| ------------------ | --------- | ------------------------------------------------------ | ------------------ |
| first_hero         | string    | One of the heroes in the interation                    | hero-network.csv   |
| second_hero        | string    | One of the heroes in the interation                    | hero-network.csv   |
| interactions_count | int       | Number of recorded interactions between the two heroes | hero-network.csv   |

#### Step 5: Complete Project Write Up
```
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.
```

This project was chosen to explore my interest in Marvel movies in a context with social media applicability. I chose to use Apache Spark for its simplicity in extracting CSV data, transforming it in dataframes, and loading it into SQL tables.

Also, since this project simulates social network functionality, which is an excellent choice for running on a distributed network, Spark is an excellent technology choice, as it can be easily configured to run on a distributed cluster. I ultimately chose not to actually execute this pipeline on an AWS cluster, due to a lack of AWS credits remaining.

The initial data import gets the database up-to-date with the current state of hero relationships in the MCU movies and Marvel comics. Going forward, it would be sufficient to update the data once per day, during low-traffic hours. User relationship data is not more time-sensitive than per day, so it should be enough to have an updated representation of user relationships daily, for making new relationship recommendations and for deriving new analytics.

If the data was increased by 100x, I would partition the data by hero. Individual users have less frequent interactions than a large mass of user. This partioning would support hero-based queries, including relationship recommendations and analytics around hero inclusion in the MCU movies.
Partitioning the data helps with storage and organization, but we can also make improvements to processing the data at a larger scale. Spark can process the results in parallel across several cores. While this parallelization can occur locally, it is more beneficial to execute these queries, analytics, and processes on a remote, distributed cluster optimized for this purpose. I would likely execute analytics on an AWS Redshift cluster on the partitioned data. The cores, memory, and other parameters could be configured and scaled to grow or shrink easily with the data.

If a dashboard needed to be populated daily at 7am, I would use an Airflow pipeline. Airflow pipelines support scheduled tasks, and they could easily perform queries on the past day of new data entries, aggregate them into a larger repository of aggregated data, and report on the developments.

If the data needed to be accessed by 100+ people, I would implement an AWS cloud data warehouse. A distributed database network could be accessed more easily globally. And while this would result in data duplication, it would improve data redundancy for data integrity and backup purposes while improving database read times. I would also implement a caching solution in front of the database at various levels (i.e. server, CDN, etc.) to improve read times.

# Analysis

The initial purpose of this project was to analyze the relationships between heroes in the Marvel movies and comics. The data has several source inconsistencies, primarily with hero names, which make some analysis results a bit skewed, but we are still able to derive general takeaways from the project.

One of the main questions initially was "Who are some of the most prominent/active characters from the comics who have yet to debut in the Marvel movies?". This question can be answered with the query below.

Interestingly, it would seem that "Beast" and "Human Torch" are two of the top candidates for Marvel movie screentime, based on their interactions in the comics. However, it's important to remember that they have both appeared in several other movie franchises already. Beast has been prevalent in X-Men movies, and Human Torch has been in Fantastic Four movies. Thus, this dataset tells the story in regards to the MCU, but may be missing some data if we want to tell a broader picture about general screentime.

In [117]:
# Most "popular/active" character who's not in the movies

spark.sql('''
    SELECT un.first_hero, st.duration, SUM(un.interactions_count) interactions
    FROM undirected_hero_network un
    LEFT JOIN screentime st ON (un.first_hero = st.hero)
    WHERE duration IS NULL
    GROUP BY un.first_hero, st.duration
    ORDER BY st.duration DESC, interactions DESC
''').show()

+--------------------+--------+------------+
|          first_hero|duration|interactions|
+--------------------+--------+------------+
|BEAST/HENRY &HANK& P|    null|        8589|
|HUMAN TORCH/JOHNNY S|    null|        7356|
|ANT-MAN/DR. HENRY J.|    null|        7273|
|CYCLOPS/SCOTT SUMMER|    null|        7069|
|ANGEL/WARREN KENNETH|    null|        6872|
|COLOSSUS II/PETER RA|    null|        6798|
|IRON MAN/TONY STARK |    null|        6606|
|INVISIBLE WOMAN/SUE |    null|        5937|
|                HAWK|    null|        5738|
|HULK/DR. ROBERT BRUC|    null|        4858|
|BLACK WIDOW/NATASHA |    null|        4438|
|DR. STRANGE/STEPHEN |    null|        4177|
|DAREDEVIL/MATT MURDO|    null|        4058|
|MR. FANTASTIC/REED R|    null|        4013|
|CANNONBALL II/SAM GU|    null|        3994|
|   JAMESON, J. JONAH|    null|        3962|
|HERCULES [GREEK GOD]|    null|        3634|
|ICEMAN/ROBERT BOBBY |    null|        3606|
|BLACK KNIGHT V/DANE |    null|        3461|
|   CRYSTA

In [118]:
# Strongest relationship (sort by most interactions)

spark.sql('''
    SELECT un.first_hero h1, un.second_hero h2, un.interactions_count interactions
    FROM undirected_hero_network un
    ORDER BY interactions DESC
''').show()

+--------------------+--------------------+------------+
|                  h1|                  h2|interactions|
+--------------------+--------------------+------------+
|MISS AMERICA/MADELIN|   PATRIOT/JEFF MACE|        1894|
|   PATRIOT/JEFF MACE|   PATRIOT/JEFF MACE|        1275|
|HUMAN TORCH/JOHNNY S|THING/BENJAMIN J. GR|         744|
|HUMAN TORCH/JOHNNY S|MR. FANTASTIC/REED R|         713|
|MR. FANTASTIC/REED R|THING/BENJAMIN J. GR|         708|
|INVISIBLE WOMAN/SUE |MR. FANTASTIC/REED R|         701|
|HUMAN TORCH/JOHNNY S|INVISIBLE WOMAN/SUE |         694|
|MISS AMERICA/MADELIN|MISS AMERICA/MADELIN|         672|
|INVISIBLE WOMAN/SUE |THING/BENJAMIN J. GR|         668|
|SPIDER-MAN/PETER PAR|WATSON-PARKER, MARY |         616|
|   JAMESON, J. JONAH|SPIDER-MAN/PETER PAR|         526|
|     CAPTAIN AMERICA|IRON MAN/TONY STARK |         446|
|SCARLET WITCH/WANDA |             VISION |         422|
|ANT-MAN/DR. HENRY J.|WASP/JANET VAN DYNE |         406|
|CYCLOPS/SCOTT SUMMER|MARVEL GI

In [119]:
# Most popular hero by most associations

spark.sql('''
    SELECT un.first_hero h1, COUNT(un.interactions_count) associations
    FROM undirected_hero_network un
    GROUP BY h1
    ORDER BY associations DESC
''').show()

+--------------------+------------+
|                  h1|associations|
+--------------------+------------+
|     CAPTAIN AMERICA|        1659|
|BEAST/HENRY &HANK& P|        1184|
|ANT-MAN/DR. HENRY J.|        1060|
|ANGEL/WARREN KENNETH|        1053|
|IRON MAN/TONY STARK |         896|
|CYCLOPS/SCOTT SUMMER|         882|
|HUMAN TORCH/JOHNNY S|         850|
|BLACK WIDOW/NATASHA |         848|
|COLOSSUS II/PETER RA|         840|
|DR. STRANGE/STEPHEN |         822|
|DAREDEVIL/MATT MURDO|         774|
|INVISIBLE WOMAN/SUE |         758|
|                HAWK|         746|
|BLACK KNIGHT V/DANE |         674|
|HULK/DR. ROBERT BRUC|         667|
|BLACK PANTHER/T'CHAL|         656|
|FURY, COL. NICHOLAS |         633|
|HERCULES [GREEK GOD]|         623|
|      JARVIS, EDWIN |         584|
|MR. FANTASTIC/REED R|         583|
+--------------------+------------+
only showing top 20 rows



In [120]:
# Most popular hero by most interactions

spark.sql('''
    SELECT un.first_hero h1, SUM(un.interactions_count) interactions
    FROM undirected_hero_network un
    GROUP BY h1
    ORDER BY interactions DESC
''').show()

+--------------------+------------+
|                  h1|interactions|
+--------------------+------------+
|     CAPTAIN AMERICA|       14188|
|BEAST/HENRY &HANK& P|        8589|
|HUMAN TORCH/JOHNNY S|        7356|
|ANT-MAN/DR. HENRY J.|        7273|
|CYCLOPS/SCOTT SUMMER|        7069|
|ANGEL/WARREN KENNETH|        6872|
|COLOSSUS II/PETER RA|        6798|
|IRON MAN/TONY STARK |        6606|
|INVISIBLE WOMAN/SUE |        5937|
|                HAWK|        5738|
|HULK/DR. ROBERT BRUC|        4858|
|BLACK WIDOW/NATASHA |        4438|
|DR. STRANGE/STEPHEN |        4177|
|DAREDEVIL/MATT MURDO|        4058|
|MR. FANTASTIC/REED R|        4013|
|CANNONBALL II/SAM GU|        3994|
|   JAMESON, J. JONAH|        3962|
|HERCULES [GREEK GOD]|        3634|
|ICEMAN/ROBERT BOBBY |        3606|
|BLACK KNIGHT V/DANE |        3461|
+--------------------+------------+
only showing top 20 rows



These prior two queries depict the difference between a character having a lot of back-and-forth interactions versus having interactions with many different characters. After the first few rows, the results vary widely.

#### Acquaintance Referral Scenario

In a social network, a realistic query analysis may be used to suggest a new relationship to a user. This can be done using the undirected_hero_network table. Assuming we are looking for a recommendation for a certain hero (say, Spider-Man), we can inspect the table to see who they interact with most. For now, let's call that hero "Hero B". Then, we can perform a query for Hero B's interactions, where we search for the first hero who does not appear to have any interactions with Spider-Man (call them, Hero C). We could then refer Hero C to Spider-Man as a referral. The logic here is that, if Spider-Man and Hero B have many interactions together, perhaps Spider-Man would be interested to know Hero C, who also has relatively many interactions with Hero B.

In [121]:
def get_top_interaction(hero):
    """
    Gets top interaction partner for a specified hero.
    """
    
    result = spark.sql(f'''
        SELECT un.second_hero
        FROM undirected_hero_network un
        WHERE un.first_hero = "{hero}" OR un.second_hero = "{hero}"
        GROUP BY un.first_hero, un.second_hero
        ORDER BY SUM(interactions_count) DESC
        LIMIT 1
    ''').head()[0]
    return result

def get_recommendations(main_hero, min_threshold=0, limit=5, show_interactions=False):
    """
    Gets top recommendations for a hero's new acquaintance, based on the top interactions of that hero's top interaction partner.
    
    @param main_hero Name of the hero for whom recommendations are being retrieved.
    @param min_threshold Minimum number of interactions that must occur for a recommendation to be made.
    @param limit Max number of recommendations to make.
    
    @return array of recommendations
    """
    
    print(f"Gathering recommendations for {main_hero}...")
    best_friend = get_top_interaction(main_hero)
    print(f"Top acquaintance of {main_hero} is {best_friend}.")
    
    recommendations_df = spark.sql(f'''
        SELECT best_friend_list.acquaintance, best_friend_list.interactions best_friend_interactions, main_hero_list.interactions main_hero_interactions FROM
        (SELECT un.first_hero, un.second_hero, 
            CASE
                WHEN un.first_hero = '{best_friend}'
                THEN un.second_hero
                ELSE un.first_hero
                END as acquaintance,
        SUM(interactions_count) interactions
        FROM undirected_hero_network un
        WHERE (un.first_hero = "{best_friend}" OR un.second_hero = "{best_friend}")
        AND (un.interactions_count >= {min_threshold})
        GROUP BY un.first_hero, un.second_hero
        ORDER BY interactions DESC) best_friend_list
        LEFT JOIN
        (SELECT un.first_hero, un.second_hero,
            CASE
                WHEN un.first_hero = '{main_hero}'
                THEN un.second_hero
                ELSE un.first_hero
                END as acquaintance,
            SUM(interactions_count) interactions
        FROM undirected_hero_network un
        WHERE un.first_hero = "{main_hero}" OR un.second_hero = "{main_hero}"
        GROUP BY un.first_hero, un.second_hero
        ORDER BY interactions DESC) main_hero_list
        ON best_friend_list.acquaintance = main_hero_list.acquaintance
        WHERE main_hero_list.interactions IS NULL AND best_friend_list.acquaintance <> "{main_hero}"
        ORDER BY best_friend_list.interactions DESC
        LIMIT {limit}
    ''')
    
    if show_interactions:
        recommendations_df.show()

    recommendations_list = [row['acquaintance'] for row in recommendations_df.collect()]
    
    return recommendations_list

def display_recommendations(hero, rec_list):
    print(f"Top recommendations for {hero} are:")
    print(*rec_list, sep='\n')

spiderman_recs = get_recommendations("SPIDER-MAN/PETER PAR", 0, 3)
display_recommendations("Spider-Man", spiderman_recs)

cap_recs = get_recommendations("CAPTAIN AMERICA", 5, 5, True)
display_recommendations("Captain America", cap_recs)

To break down this query: We use a subquery to get a list of acquaintainces to "WATSON-PARKER, MARY" (after querying the undirected_hero_network table for Spider-Man's top acquaintance, and getting that it is "WATSON-PARKER, MARY"). We also execute the same query to get the Spider-man filtered version of the undirected_hero_network table with the acquaintance name in a predetermined column. We then filter for instances where Spider-Man has no interactions (i.e. NULL) where "WATSON-PARKER, MARY" does have interactions with the acquaintance. Finally, we sort in descending order by "WATSON-PARKER, MARY"'s interactions, and filter out Spider-Man's entry as well (since he can't be his own acquaintance). This leaves us with a short list of two characters whom "WATSON-PARKER, MARY" interacted with and who Spider-Man did not. "WATSON-PARKER, MARY" has very few interactions with these characters (just one each), but if we were to make a recommendation to Spider-Man for a new acquaintance based on his strong relationship with "WATSON-PARKER, MARY", we would recommend them.

Due to the low level of interactions depicted in this example between "WATSON-PARKER, MARY" and the recommended characters, I added several parameters to the recommendation function.

First, I added an optional minimum threshold to acquaintance recommendations. If the secondary hero does not surpass that level of interaction with the "recommended" characters, then they won't be recommended. I also added an optional limit for an upper limit of how many recommendations to return.

### Summary

These examples demonstrate how the source dataset can be used to provide valuable insights into a social networking application. While it is assumed that a graph, rather than a relational database, is a better representation of nodes in a social network, this example utilizes the tools we've learned in this course with a real-world scenario.