<a href="https://colab.research.google.com/github/zackives/upenn-cis-2450/blob/main/9_Module_2_Notebook_V_Big_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Big Data and Graph Data

In this module, we'll take what we learned about indices and generalize!

Apache Spark is a big data engine that runs on compute clusters, including on the cloud.  This notebook is set up assuming that (1) Spark is running on an AWS server that is public [this may **not** be true at the time you look at this!] and (2) we need to run the actual Python commands on that server, requiring us to put `%%spark` "magic" commands at the start of each cell.

You may need to look at this notebook without directly running it, until we give you specific instructions on launching your own Spark cluster.


## Cluster Setup

1. Read the [AWS Getting Started Guide](https://docs.google.com/document/d/e/2PACX-1vTXKkEI9im2BQFxCqVeMfwRl42xIhaIqIYEpqsh1zGDqSvUg_kV8B0yHobb8a1tK47WpcJNE48orFVt/pub)
2. Launch a CloudFormation Stack, which may take 5-15 minutes. You want to use this template: https://penn-cis545-files.s3.amazonaws.com/emr-course.yml

# Setup

In [None]:
!pip install sparkmagic



In [None]:
%load_ext sparkmagic.magics

The sparkmagic.magics extension is already loaded. To reload it, use:
  %reload_ext sparkmagic.magics


The following line connects to Spark running remotely (note you'll need to start an Amazon AWS Elastic MapReduce instance)
.  You will likely need to change the URL after the `-u` to connect to an active server.

In [None]:
%spark add -s my_session -l python -u http://ec2-54-226-179-42.compute-1.amazonaws.com/ -a cis545-livy -p passme -t Basic_Access
# The above can connect to an EMR node running Spark + Livy, assuming the firewall is set to let anyone in

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1727623924636_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


In [None]:
# Use this only if you want to reset your session.
#%spark delete -s my_session

## Autograder setup

In [None]:
#PLEASE ENSURE YOUR PENN-ID IS ENTERED CORRECTLY. IF NOT, THE AUTOGRADER WON'T KNOW WHO
#TO ASSIGN POINTS TO YOU IN OUR BACKEND
STUDENT_ID = 99999999 # YOUR PENN-ID GOES HERE AS AN INTEGER##PLEASE ENSURE YOUR PENN-ID IS ENTERED CORRECTLY. IF NOT, THE AUTOGRADER WON'T KNOW WHO

In [None]:
%%writefile notebook-config.yaml

grader_api_url: 'https://23whrwph9h.execute-api.us-east-1.amazonaws.com/default/Grader23'
grader_api_key: 'flfkE736fA6Z8GxMDJe2q8Kfk8UDqjsG3GVqOFOa'

Writing notebook-config.yaml


In [None]:
%set_env HW_ID=cis2450_fall24_HW9

env: HW_ID=cis2450_fall24_HW9


In [None]:
!pip3 install penngrader-client

Collecting penngrader-client
  Downloading penngrader_client-0.5.2-py3-none-any.whl.metadata (15 kB)
Collecting dill (from penngrader-client)
  Downloading dill-0.3.9-py3-none-any.whl.metadata (10 kB)
Downloading penngrader_client-0.5.2-py3-none-any.whl (10 kB)
Downloading dill-0.3.9-py3-none-any.whl (119 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/119.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m119.4/119.4 kB[0m [31m9.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dill, penngrader-client
Successfully installed dill-0.3.9 penngrader-client-0.5.2


In [None]:
import os
from penngrader.grader import *

grader = PennGrader('notebook-config.yaml', os.environ['HW_ID'], STUDENT_ID, STUDENT_ID)

PennGrader initialized with Student ID: 99999999

Make sure this correct or we will not be able to store your grade


## Example of Loading Sharded Data

First let's do our preliminaries.  **Every** cell in this notebook will need `%%spark` at the start so it runs on the remote machine with Spark on it, instead of on the machine with Jupyter.

## Load into Spark

Spark needs to know the structure of the data in its dataframes, i.e., their schemas.  Over the years it has gotten better at inferring schemas, but sometimes you'll want to set the schema yourself.

There are some basic types:
  * The table is a `StructType` with a list of fields (each row)
  * Most fields, in our case, are `StringType`.
  * We also have nested dictionary for the name, which is a `MapType` from `StringType` keys to `StringType` values.
  * `skills` is an `ArrayType` since it's a list, and it contains `StringType`s.
  * `also_view` is an array of structs.

See Pyspark documentation on `StructType` and examples such as https://www.programcreek.com/python/example/104715/pyspark.sql.types.StructType.

See below for a partial sketch:

In [None]:
%%spark

# Spark uses schemas to define the format for DataFrames. By default it will
# try to infer, which has varying luck. Here is an example of part of a schema
# for LinkedIn.
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, MapType
schema = StructType([
        StructField("_id", StringType(), True),
        StructField("name", MapType(StringType(), StringType()), True),
        StructField("locality", StringType(), True),
        StructField("skills", ArrayType(StringType()), True),
        StructField("industry", StringType(), True),
        StructField("summary", StringType(), True),
        StructField("url", StringType(), True),
        StructField("also_view", ArrayType(\
                    StructType([\
                      StructField("url", StringType(), True),\
                      StructField("id", StringType(), True)])\
                    ), True)\
         ])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's now load a remote file.  To do this, we add the URL to the sparkContext, and then (in the next Cell) we will use `spark.read.json` to open and load the file.

In [None]:
%%spark

from pyspark import SparkFiles
from pyspark.sql import SparkSession

zip_url = "https://storage.googleapis.com/penn-cis5450/linkedin_anon.jsonl"

spark.sparkContext.addFile(zip_url)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

# Read JSON Lines file
linked_df = spark.read\
  .json("file://" + SparkFiles.get("linkedin_anon.jsonl"))\
  .repartition('_id')

linked_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                _id|           education|              events|          experience|               group|              honors|            industry|           interests|interval|            locality|                name|              skills|         specilities|             summary|                 url|
+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|exhaustive-diatonic|[{null, , 1990, n...|[{23873, taller t...|[{Empresa dedicad...|{[AT

We can see the full, inferred schema here:

In [None]:
%%spark
linked_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _id: string (nullable = true)
 |-- education: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- degree: string (nullable = true)
 |    |    |-- desc: string (nullable = true)
 |    |    |-- end: string (nullable = true)
 |    |    |-- major: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- start: string (nullable = true)
 |-- events: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- from: string (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- title1: string (nullable = true)
 |    |    |-- title2: string (nullable = true)
 |    |    |-- to: string (nullable = true)
 |-- experience: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- desc: string (nullable = true)
 |    |    |-- end: string (nullable = true)
 |    |    |-- org: string (nullable = true)
 |    |  

And we can also see that there are more shards / partitions than the 3 worker nodes in the EMR cluster:

In [None]:
%%spark
linked_df.rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

26

Let's try a simple select/project query!

In [None]:
%%spark
linked_df.filter(linked_df.locality == 'United States')[['_id', 'name', 'locality']].show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+-------------+
|                 _id|                name|     locality|
+--------------------+--------------------+-------------+
|     quadratic-model|   {Rose, Bertuccio}|United States|
|         briny-level|{Macdonald, Simon...|United States|
|counting-permutation| {Murray, Bullimore}|United States|
|     covering-shrike|     {Post, Cadbury}|United States|
|          breezy-tin|     {Ross, Brunton}|United States|
+--------------------+--------------------+-------------+
only showing top 5 rows

Also in SQL-like syntax:

In [None]:
%%spark
linked_df.select("_id", 'name', "locality").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+--------------------+
|                _id|                name|            locality|
+-------------------+--------------------+--------------------+
|exhaustive-diatonic|   {Douglas, Alfred}|Barcelona y alred...|
|  primordial-flight|  {Russell, Barkley}|San Leandro, Cali...|
|    absolute-bazaar| {Ogilvy, Leporello}|Vancouver, Canada...|
|      proud-trainer|  {Lennox, Merriman}|Lisbon Area, Port...|
|          flat-beef|{Charteris, Barry...|New South Wales, ...|
+-------------------+--------------------+--------------------+
only showing top 5 rows

And real SQL:

In [None]:
%%spark
linked_df.createOrReplaceTempView('linked_in')
sqlContext.sql('select * from linked_in').show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                _id|           education|              events|          experience|               group|              honors|            industry|           interests|interval|            locality|                name|              skills|         specilities|             summary|                 url|
+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|exhaustive-diatonic|[{null, , 1990, n...|[{23873, taller t...|[{Empresa dedicad...|{[AT

In [None]:
%%spark
sqlContext.sql("select _id, name.given_name, name.family_name from linked_in").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----------+-----------+
|                _id|given_name|family_name|
+-------------------+----------+-----------+
|exhaustive-diatonic|    Alfred|    Douglas|
|  primordial-flight|   Barkley|    Russell|
|    absolute-bazaar| Leporello|     Ogilvy|
|      proud-trainer|  Merriman|     Lennox|
|          flat-beef| Barrymore|  Charteris|
+-------------------+----------+-----------+
only showing top 5 rows

In [None]:
%%spark
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

acro = udf(lambda x: ''.join([n[0] for n in x.split()]), StringType())

linked_df.select("_id", acro("locality").alias("acronym")).show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-------+
|                _id|acronym|
+-------------------+-------+
|exhaustive-diatonic|   ByaE|
|  primordial-flight|    SLC|
|    absolute-bazaar|    VCA|
|      proud-trainer|    LAP|
|          flat-beef|   NSWA|
+-------------------+-------+
only showing top 5 rows

In [None]:
%%spark
# Which industries are most popular?
sqlContext.sql('select count(_id), industry '+\
               'from linked_in '+\
               'group by industry '+\
               'order by count(_id) desc').\
    show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+
|count(_id)|            industry|
+----------+--------------------+
|      5437|Information Techn...|
|      3396|   Computer Software|
|      2571|Marketing and Adv...|
|      2089|            Internet|
|      1485|  Financial Services|
+----------+--------------------+
only showing top 5 rows

## Graphs

For the next set of examples, we will look at graph-structured data.  It turns out our LinkedIn dataset has a list of nodes (by int ID, but associated with the user ID we used in the linked_in table) and a list of edges.

In [None]:
%%spark

# Let's consider edges to be bidirectional
# from people and the organizations they work for
temp_df = sqlContext.sql("""
  CREATE TEMPORARY VIEW edges_nested AS
  SELECT _id AS from, explode(experience) AS to
  FROM linked_in
""")

# Create graph with edges in each direction
edges_df = sqlContext.sql('''
  select from, to.org as to from edges_nested
  union
  select to.org as from, from as to from edges_nested
  ''')

edges_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------------+
|           from|                  to|
+---------------+--------------------+
|  greasy-westie|   Corgan Associates|
| lean-commander|BCRE - Brack Capi...|
|absolute-prison|University of Mic...|
| interior-board|Enterprise Inform...|
|advanced-market|National Entrepre...|
+---------------+--------------------+
only showing top 5 rows

In [None]:
%%spark

edges_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- from: string (nullable = true)
 |-- to: string (nullable = true)

In [None]:
%%spark
edges_df.createOrReplaceTempView('edges')
sqlContext.sql('select from as id, count(to) as degree from edges group by from').show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+
|                  id|degree|
+--------------------+------+
|       doughy-format|     6|
|brute-force-instance|    11|
|       ash-euphemism|     4|
|         glass-grove|     5|
|Kjøbenhavns Boldklub|     1|
+--------------------+------+
only showing top 5 rows

## Traversing the Graph

In [None]:
%%spark

from pyspark.sql.functions import col

# Start with a subset of nodes, looking at everything
# that could be considered a number under 1000
start_nodes_df = edges_df[['from']].filter(edges_df['from'] < 1000).\
  select(col('from').alias('id')).drop_duplicates()

print('{} start nodes'.format(start_nodes_df.count()))
start_nodes_df.show(9)

# The neighbors require us to join
# and we'll use Spark DataFrames syntax here
neighbor_nodes_df = start_nodes_df.\
  join(edges_df, start_nodes_df.id == edges_df['from']).\
  select(col('to').alias('id'))

neighbor_nodes_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

9 start nodes
+---+
| id|
+---+
|  .|
| 23|
| 26|
|  3|
|  1|
|212|
|.99|
|636|
|513|
+---+

+-------------------+
|                 id|
+-------------------+
|        median-helo|
|       offline-mint|
|    sunny-interface|
|aquamarine-fortress|
|       clear-period|
+-------------------+
only showing top 5 rows

In [None]:
%%spark
edges_df[['from']].orderBy('from').drop_duplicates().show()

edges_df.filter(edges_df['from'] == '1').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|                from|
+--------------------+
|       doughy-format|
|brute-force-instance|
|       ash-euphemism|
|         glass-grove|
|Kjøbenhavns Boldklub|
|MARICO INDUSTRIES...|
|      Further Afield|
|      ArtIstanbul PR|
|        daring-layer|
|       boolean-triad|
|             icy-tin|
|       humane-script|
|      careful-height|
|       either-symbol|
|         finite-tint|
|      rancid-cobbler|
|    critical-meander|
|  exothermic-outpost|
|Fidelis Resourcin...|
|Orchestre symphon...|
+--------------------+
only showing top 20 rows

+----+------------+
|from|          to|
+----+------------+
|   1|nippy-folder|
+----+------------+

In [None]:
%%spark
neighbor_neighbor_nodes_df = neighbor_nodes_df.\
  join(edges_df, neighbor_nodes_df.id == edges_df['from']).\
  select(col('to').alias('id'))

neighbor_neighbor_nodes_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|                  id|
+--------------------+
|                   .|
|             Proceda|
|   Advus Corporation|
|Tata Consultancy ...|
|Grupo Santander B...|
+--------------------+
only showing top 5 rows

Let's find a small subset of our graph that actually connects somewhere.

In [None]:
%%spark

start_nodes_df.createOrReplaceTempView('start_nodes')
edges_df.createOrReplaceTempView('edges')

sqlContext.sql("""
  select e1.from as from, e1.to as med, e2.to as to
  from start_nodes s join edges e1 on s.id=e1.from join edges e2 on e1.to = e2.from
""").show(5)

# This will be the starting
start_df = sqlContext.sql("""
  select e1.from as from, e1.to as to
  from start_nodes s join edges e1 on s.id=e1.from
""")

start_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------------------+--------------------+
|from|                med|                  to|
+----+-------------------+--------------------+
|   .|matching-commission|                   .|
|   .|       greasy-chain|             Proceda|
|   .|       greasy-chain|   Advus Corporation|
|   .|      light-entropy|Tata Consultancy ...|
|   .|       greasy-chain|Grupo Santander B...|
+----+-------------------+--------------------+
only showing top 5 rows

+----+-------------------+
|from|                 to|
+----+-------------------+
|   .|        median-helo|
|   .|       offline-mint|
|   .|    sunny-interface|
|   .|aquamarine-fortress|
|   .|       clear-period|
+----+-------------------+
only showing top 5 rows

In [None]:
%%spark
def iterate(df, edges, depth):
  df.createOrReplaceTempView('base')
  edges.createOrReplaceTempView('iter')

  # Base case: direct connection
  result = sqlContext.sql('select from, to, 1 as depth from base')

  for i in range(1, depth):
    result.createOrReplaceTempView('result')
    result = sqlContext.sql("""select r1.from as from, r2.to as to, r1.depth+1 as depth
                            from result r1 join iter r2
                            on r1.to=r2.from
                            where r1.from <> r2.to
                            """)
  return result

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
iterate(start_df, edges_df, 1).orderBy('from','to').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------------------+-----+
|from|                 to|depth|
+----+-------------------+-----+
|   .|aquamarine-fortress|    1|
|   .|     avocado-umpire|    1|
|   .|    blaring-trainer|    1|
|   .|         canary-bow|    1|
|   .|     chestnut-liner|    1|
|   .|        citric-byte|    1|
|   .|       clear-period|    1|
|   .|      crunchy-claim|    1|
|   .|        devout-harp|    1|
|   .|       direct-force|    1|
|   .|          dry-fixed|    1|
|   .|       greasy-chain|    1|
|   .|        humid-lodge|    1|
|   .|     isochoric-drum|    1|
|   .|      light-entropy|    1|
|   .|matching-commission|    1|
|   .|        median-helo|    1|
|   .|     mild-animation|    1|
|   .|      noisy-diction|    1|
|   .|       offline-mint|    1|
+----+-------------------+-----+
only showing top 20 rows

In [None]:
%%spark
iterate(start_df, edges_df, 2).orderBy('from','to').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------------------+-----+
|from|                  to|depth|
+----+--------------------+-----+
|   .|ABRA Enterprises,...|    2|
|   .|       AT IT Limited|    2|
|   .|Advanced Systems ...|    2|
|   .|   Advus Corporation|    2|
|   .|Airtours Internat...|    2|
|   .|Airtours Internat...|    2|
|   .|Airtours plc seco...|    2|
|   .|         Alinma Bank|    2|
|   .|     Aspen Insurance|    2|
|   .| BAE Defense Systems|    2|
|   .|    Baan Development|    2|
|   .|          Baan/Xebic|    2|
|   .|    Bayer Healthcare|    2|
|   .|     Belastingdienst|    2|
|   .|             Betfair|    2|
|   .|CASE Communicatio...|    2|
|   .|CableCom Networki...|    2|
|   .|             Camelot|    2|
|   .|Canadian Leisure ...|    2|
|   .|   Canon New Zealand|    2|
+----+--------------------+-----+
only showing top 20 rows

In [None]:
%%spark
iterate(start_df, edges_df, 3).orderBy('from','to').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------------------+-----+
|from|                 to|depth|
+----+-------------------+-----+
|   .|  absolute-cabernet|    3|
|   .|      absolute-fort|    3|
|   .| accepting-ancestor|    3|
|   .|  achromatic-panini|    3|
|   .|  achromatic-pepato|    3|
|   .|  achromatic-pepato|    3|
|   .|achromatic-revolver|    3|
|   .|  achromatic-upload|    3|
|   .|       acidic-asset|    3|
|   .|       acidic-delta|    3|
|   .|acoustic-rottweiler|    3|
|   .|      active-energy|    3|
|   .|       active-event|    3|
|   .|       active-liner|    3|
|   .|        active-roof|    3|
|   .|    acute-amplifier|    3|
|   .|      acute-halibut|    3|
|   .|       acyclic-halo|    3|
|   .|  adaptive-bisector|    3|
|   .|adaptive-gorgonzola|    3|
+----+-------------------+-----+
only showing top 20 rows

## Joins in Spark, Beyond Graph Traversals


What if we want to look at relationships between people -- say, co-working?  This involves looking at people and going *every 2 hops* because there are organizations in between.

## Finding Coworkers, by ID

Let's get our people first!

In [None]:
%%spark

nodes_df = sqlContext.sql("""
    select _id as nid, concat(name.given_name, ' ', name.family_name) as user, industry
    from linked_in
  """)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

nodes_df.createOrReplaceTempView('nodes')

# Let's limit coworkers to edges that start
# from existing workers, and are 2 hops away (through a company)
coworked_df = sqlContext.sql("""
  select e1.from, e2.to as to
  from edges e1 join edges e2 on e1.to = e2.from
  where e1.from in (select nid from nodes)
""")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

nodes_df.createOrReplaceTempView('nodes')
coworked_df.createOrReplaceTempView('edges')

coworkers_df = sqlContext.sql("""SELECT n1.user, n2.user as coworker
               FROM (nodes n1 join edges e on n1.nid = e.from) join nodes n2 on e.to = n2.nid
               WHERE n1.user <> n2.user
               """)

coworkers_df.show(5)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----------------+
|               user|        coworker|
+-------------------+----------------+
|      Jenkins Dewar|Simonides Gordon|
| Simonides Spalding|Simonides Gordon|
|    Alfred Hamilton|Simonides Gordon|
|  Barkley Forrester|Simonides Gordon|
|Merriman Cunningham|Simonides Gordon|
+-------------------+----------------+
only showing top 5 rows

## Exercise

Can you find the *company* with the most common coworker pairs?

As a starting point, let's pull back the original edges table...

In [None]:
%%spark

edges_df.createOrReplaceTempView("edges")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Be sure to find the top-1 organization by count (you should return the `org` and the `count` in the schema).

Recall that SQL has `ORDER BY` and `LIMIT` clauses.

In [None]:
%%spark
coworkers_company_df = sqlContext.sql(""TODO
               """)

result = coworkers_company_df.collect()
result.show(5)

This copies the dataframe from Spark to Colab's Python kernel, converting to a Pandas dataframe in the meantime!

In [None]:
%spark -o coworkers_company_df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
coworkers_company_df

In [None]:
import pandas as pd

if not isinstance(coworkers_company_df, pd.DataFrame) or not 'org' in coworkers_company_df.columns:
  raise TypeError("Data should be in a DataFrame and organization should be a column")

grader.grade('top_coworkers', coworkers_company_df)