<a href="https://colab.research.google.com/github/zackives/upenn-cis5450-hw/blob/main/9_Module_2_Notebook_V_Big_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Big Data and Graph Data

In this module, we'll take what we learned about indices and generalize!

Apache Spark is a big data engine that runs on compute clusters, including on the cloud.  This notebook is set up assuming that (1) Spark is running on an AWS server that is public [this may **not** be true at the time you look at this!].

You may need to look at this notebook without directly running it, until we give you specific instructions on launching your own Spark cluster.


# Setup

In [None]:
# TODO: fill this one in based on the host posted on Ed
%set_env EMR_HOST=
%set_env HW_ID=cis5450_25f_HW9

In [None]:
from pyspark.sql import SparkSession
import os

# Start Spark Session by Specifying the Spark Cluster Address.
spark = SparkSession.builder \
  .appName("CIS-5450") \
  .remote("sc://{host}:15002".format(host=os.getenv('EMR_HOST'))).getOrCreate()

In [None]:
!python --version

The following line connects to Spark running remotely (note you'll need to start an Amazon AWS Elastic MapReduce instance)
.  You will likely need to change the URL after the `-u` to connect to an active server.

## Autograder setup

In [None]:
#PLEASE ENSURE YOUR PENN-ID IS ENTERED CORRECTLY. IF NOT, THE AUTOGRADER WON'T KNOW WHO
#TO ASSIGN POINTS TO YOU IN OUR BACKEND
STUDENT_ID = 99999999 # YOUR PENN-ID GOES HERE AS AN INTEGER##PLEASE ENSURE YOUR PENN-ID IS ENTERED CORRECTLY. IF NOT, THE AUTOGRADER WON'T KNOW WHO

In [None]:
%%writefile notebook-config.yaml

grader_api_url: 'https://23whrwph9h.execute-api.us-east-1.amazonaws.com/default/Grader23'
grader_api_key: 'flfkE736fA6Z8GxMDJe2q8Kfk8UDqjsG3GVqOFOa'

In [None]:
!pip3 install penngrader-client

In [None]:
import os
from penngrader.grader import *

grader = PennGrader('notebook-config.yaml', os.environ['HW_ID'], STUDENT_ID, STUDENT_ID)

## Example of Loading Sharded Data

First let's do our preliminaries.  **Every** cell in this notebook will need `%%spark` at the start so it runs on the remote machine with Spark on it, instead of on the machine with Jupyter.

## Load into Spark

Spark needs to know the structure of the data in its dataframes, i.e., their schemas.  Over the years it has gotten better at inferring schemas, but sometimes you'll want to set the schema yourself.

There are some basic types:
  * The table is a `StructType` with a list of fields (each row)
  * Most fields, in our case, are `StringType`.
  * We also have nested dictionary for the name, which is a `MapType` from `StringType` keys to `StringType` values.
  * `skills` is an `ArrayType` since it's a list, and it contains `StringType`s.
  * `also_view` is an array of structs.

See Pyspark documentation on `StructType` and examples such as https://www.programcreek.com/python/example/104715/pyspark.sql.types.StructType.

See below for a partial sketch:

In [None]:
# Spark uses schemas to define the format for DataFrames. By default it will
# try to infer, which has varying luck. Here is an example of part of a schema
# for LinkedIn.
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, MapType
schema = StructType([
        StructField("_id", StringType(), True),
        StructField("name", MapType(StringType(), StringType()), True),
        StructField("locality", StringType(), True),
        StructField("skills", ArrayType(StringType()), True),
        StructField("industry", StringType(), True),
        StructField("summary", StringType(), True),
        StructField("url", StringType(), True),
        StructField("also_view", ArrayType(\
                    StructType([\
                      StructField("url", StringType(), True),\
                      StructField("id", StringType(), True)])\
                    ), True)\
         ])

Let's now load a remote file.  To do this, we add the URL to the sparkContext, and then (in the next Cell) we will use `spark.read.json` to open and load the file.

In [None]:
# Read JSON Lines file
linked_df = spark.read\
  .json("s3://penn-cis545-files/linkedin_anon.jsonl")\
  .repartition('_id')

linked_df.show(5)

We can see the full, inferred schema here:

In [None]:
linked_df.printSchema()

Let's try a simple select/project query!

In [None]:
linked_df.filter(linked_df.locality == 'United States')[['_id', 'name', 'locality']].show(5)

Also in SQL-like syntax:

In [None]:
linked_df.select("_id", 'name', "locality").show(5)

And real SQL:

In [None]:
linked_df.createOrReplaceTempView('linked_in')
spark.sql('select * from linked_in').show(5)

In [None]:
spark.sql("select _id, name.given_name, name.family_name from linked_in").show(5)

This currently (Fall 2025) does not work between Colab and Apache Spark on Amazon Elastic Mapreduce, simply because Amazon EMR is 3 versions behind in Python (3.9 vs 3.12). When an updated EMR is available, it will work

In [37]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(returnType=StringType(), useArrow=True)
def acro(x: str):
    return ''.join([n[0] for n in x.split()])

# linked_df.select("_id", acro("locality").alias("acronym")).show(5)

In [None]:
# Which industries are most popular?
spark.sql('select count(_id), industry '+\
               'from linked_in '+\
               'group by industry '+\
               'order by count(_id) desc').\
    show(5)

## Graphs

For the next set of examples, we will look at graph-structured data.  It turns out our LinkedIn dataset has a list of nodes (by int ID, but associated with the user ID we used in the linked_in table) and a list of edges.

In [None]:
# Let's consider edges to be bidirectional
# from people and the organizations they work for
temp_df = spark.sql("""
  CREATE TEMPORARY VIEW edges_nested AS
  SELECT _id AS from, explode(experience) AS to
  FROM linked_in
""")

# Create graph with edges in each direction
edges_df = spark.sql('''
  select from, to.org as to from edges_nested
  union
  select to.org as from, from as to from edges_nested
  ''')

edges_df.show(5)

In [None]:
edges_df.printSchema()

In [None]:
edges_df.createOrReplaceTempView('edges')
spark.sql('select from as id, count(to) as degree from edges group by from').show(5)

## Traversing the Graph

In [None]:
from pyspark.sql.functions import col

# Start with a subset of nodes, looking at everything
# that could be considered a number under 1000
start_nodes_df = edges_df[['from']].filter(edges_df['from'] < 1000).\
  select(col('from').alias('id')).drop_duplicates()

print('{} start nodes'.format(start_nodes_df.count()))
start_nodes_df.show(9)

# The neighbors require us to join
# and we'll use Spark DataFrames syntax here
neighbor_nodes_df = start_nodes_df.\
  join(edges_df.alias('e'), start_nodes_df.id == col('e.from')).\
  select(col('to').alias('id'))

neighbor_nodes_df.show(5)

In [None]:
edges_df[['from']].orderBy('from').drop_duplicates().show()

edges_df.filter(edges_df['from'] == '1').show()

In [None]:
neighbor_neighbor_nodes_df = neighbor_nodes_df.\
  join(edges_df.alias('e'), neighbor_nodes_df.id == col('e.from')).\
  select(col('to').alias('id'))

neighbor_neighbor_nodes_df.show(5)

Let's find a small subset of our graph that actually connects somewhere.

In [None]:
start_nodes_df.createOrReplaceTempView('start_nodes')
edges_df.createOrReplaceTempView('edges')

spark.sql("""
  select e1.from as from, e1.to as med, e2.to as to
  from start_nodes s join edges e1 on s.id=e1.from join edges e2 on e1.to = e2.from
""").show(5)

# This will be the starting
start_df = spark.sql("""
  select e1.from as from, e1.to as to
  from start_nodes s join edges e1 on s.id=e1.from
""")

start_df.show(5)

In [None]:
def iterate(df, edges, depth):
  df.createOrReplaceTempView('base')
  edges.createOrReplaceTempView('iter')

  # Base case: direct connection
  result = spark.sql('select from, to, 1 as depth from base')

  for i in range(1, depth):
    result.createOrReplaceTempView('result')
    result = spark.sql("""select r1.from as from, r2.to as to, r1.depth+1 as depth
                            from result r1 join iter r2
                            on r1.to=r2.from
                            where r1.from <> r2.to
                            """)
  return result

In [None]:
iterate(start_df, edges_df, 1).orderBy('from','to').show()

In [None]:
iterate(start_df, edges_df, 2).orderBy('from','to').show()

In [None]:
iterate(start_df, edges_df, 3).orderBy('from','to').show()

## Joins in Spark, Beyond Graph Traversals


What if we want to look at relationships between people -- say, co-working?  This involves looking at people and going *every 2 hops* because there are organizations in between.

## Finding Coworkers, by ID

Let's get our people first!

In [None]:
nodes_df = spark.sql("""
    select _id as nid, concat(name.given_name, ' ', name.family_name) as user, industry
    from linked_in
  """)

In [None]:
nodes_df.createOrReplaceTempView('nodes')

# Let's limit coworkers to edges that start
# from existing workers, and are 2 hops away (through a company)
coworked_df = spark.sql("""
  select e1.from, e2.to as to
  from edges e1 join edges e2 on e1.to = e2.from
  where e1.from in (select nid from nodes)
""")

In [None]:
nodes_df.createOrReplaceTempView('nodes')
coworked_df.createOrReplaceTempView('edges')

coworkers_df = spark.sql("""SELECT n1.user, n2.user as coworker
               FROM (nodes n1 join edges e on n1.nid = e.from) join nodes n2 on e.to = n2.nid
               WHERE n1.user <> n2.user
               """)

coworkers_df.show(5)


## Exercise

Can you find the *company* with the most common coworker pairs?

As a starting point, let's pull back the original edges table...

In [None]:
edges_df.createOrReplaceTempView("edges")


Be sure to find the top-1 organization by count (you should return the `org` and the `count` in the schema).

Recall that SQL has `ORDER BY` and `LIMIT` clauses.

In [None]:
# TODO: create coworkers_company_sdf (Spark DataFrame).
# You don't need to convert to Pandas

# Make sure you only get one result from this!
coworkers_company_sdf.show(2)

In [None]:
import pandas as pd

coworkers_company_df = pd.DataFrame(coworkers_company_sdf.collect(),columns=coworkers_company_sdf.columns)

if not isinstance(coworkers_company_df, pd.DataFrame) or not 'org' in coworkers_company_df.columns:
  raise TypeError("Data should be in a DataFrame and organization should be a column")

grader.grade('top_coworkers', coworkers_company_df)