<a href="https://colab.research.google.com/github/vtien/BFS_Graph_Traversal/blob/master/Spark_SQL_Graph_Traversal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Spark, Hierarchical Data and Graph Data on Yelp Reviews Dataset

## Getting Started with Apache Spark

Apache Spark, which has become the de facto successor to Apache Hadoop, is a complex, cluster-based data processing system that was written in Scala.  It leverages a wide variety of distributed tools and components used for big data processing.  It interfaces “smoothly” to Python, but be forewarned that there are some rough edges.  For those interested in why, there are a few reasons:

* Scala has slightly different notions of types (especially things like Rows) and handles missing values (nulls) differently from Python.
* The Scala-based Spark “engine” can’t just run Python functions as it’s doing data processing.  This means that you want to be careful to use Spark’s library of functions, or the special mechanisms for inserting “user defined functions.”
* DataFrames on Spark are “sharded,” so there is no single object corresponding to the DataFrame!

While Spark DataFrames try to emulate the same programming style as Pandas DataFrames, there are some differences in how you express things.  Please refer to the Lecture Slides for our take on the differences.  You may also find the following Web pages to be useful resources for understanding Spark vs Pandas DataFrames:

https://lab.getbase.com/pandarize-spark-dataframes/
https://ogirardot.wordpress.com/2015/07/31/from-pandas-to-apache-sparks-dataframe/ 

For this assignment, we are going to get familiar with Spark without worrying too much about sharding and distribution.  We are going to run Spark on your Docker container.  This isn’t really using it to its strengths -- and in fact you might find Spark to be unexpectedly slow -- but it will get you comfortable with programming in Spark without worrying about distributed nodes, clusters, and spending real dollars on the cloud.  Your code, if written properly, will “naturally scale” to clusters running on the Cloud.  Later in the term we’ll connect your Jupyter instance to Spark running on the cloud -- to handle “truly big data.”


### Initializing a Connection to Spark

We'll open a connection to Spark as follows. Note that Spark has multiple interfaces, as you will see if you look at sample code elsewhere. `SparkSession` is the “most modern” one and we’ll be using it for this course.  From `SparkSession`, you can load data into Spark DataFrames as well as `RDD`s.

Run the follow cells to setup the notebook!

In [None]:
!apt install libkrb5-dev
!wget https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install findspark
!pip install sparkmagic
!pip install pyspark
! pip install pyspark --user
! pip install seaborn --user
! pip install plotly --user
! pip install imageio --user
! pip install folium --user

Reading package lists... Done
Building dependency tree       
Reading state information... Done
libkrb5-dev is already the newest version (1.16-2ubuntu0.1).
The following package was automatically installed and is no longer required:
  libnvidia-common-430
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 106 not upgraded.
--2020-02-25 00:17:23--  https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
Resolving www-us.apache.org (www-us.apache.org)... 40.79.78.1
Connecting to www-us.apache.org (www-us.apache.org)|40.79.78.1|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz [following]
--2020-02-25 00:17:23--  https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 2a01:4f8:10a:201a::2
Connecting to downloads.apache.org (downloads.apache

In [None]:
!apt update
!apt install gcc python-dev libkrb5-dev

[33m0% [Working][0m            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease
[33m0% [Connecting to archive.ubuntu.com (91.189.88.162)] [Waiting for headers] [Wa[0m[33m0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.162)[0m                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
[33m0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.162)[0m                                                                               Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
[33m0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [3 InRelease 14.2 kB/88.7 k[0m                                                                               Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
[33m0% [1 InRel

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

import os

spark = SparkSession.builder.appName('Graphs-HW2').getOrCreate()


In [None]:
%load_ext sparkmagic.magics

In [None]:
#graph section
import networkx as nx
# SQLite RDBMS
import sqlite3
# Parallel processing
# import swifter

# NoSQL DB
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError, OperationFailure

import os
os.environ['SPARK_HOME'] = '/content/spark-2.4.5-bin-hadoop2.7'
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
import pyspark
from pyspark.sql import SQLContext

In [None]:
try:
    if(spark == None):
        spark = SparkSession.builder.appName('Initial').getOrCreate()
        sqlContext=SQLContext(spark)
except NameError:
    spark = SparkSession.builder.appName('Initial').getOrCreate()
    sqlContext=SQLContext(spark)


### Download data

The following code retrieves the Yelp dataset files from Google Drive.

In [None]:
from google_drive_downloader import GoogleDriveDownloader as gdd

gdd.download_file_from_google_drive(file_id='1XCANGSCd0pUNcXq18t2QDwCIpJxus8Dy',
                                    dest_path='/content/yelp_business_attributes.csv')

# gdd.download_file_from_google_drive(file_id='1zY4xbZXPbnahBahraU7ZxoGF3zUZ4SO8',
                                    # dest_path='/content/yelp_business_hours.csv')

gdd.download_file_from_google_drive(file_id='11lwBibxX7PYGgOfHU25_dDDDsPX1Pt0Y',
                                    dest_path='/content/yelp_business.csv')

gdd.download_file_from_google_drive(file_id='1FU5Q-96erhTmk8SjC4XHUm94yWc6h3a0',
                                    dest_path='/content/yelp_checkin.csv')

gdd.download_file_from_google_drive(file_id='1UaaLrCKjqoQ7G3JT_VUw56pc-dnTwyrS', dest_path='/content/yelp_review2.csv')

gdd.download_file_from_google_drive(file_id='1JNFZeLlimxNSwcOb-oBxxbwJqdg22WgD',
                                    dest_path='/content/yelp_user.csv')




Downloading 1XCANGSCd0pUNcXq18t2QDwCIpJxus8Dy into /content/yelp_business_attributes.csv... Done.
Downloading 11lwBibxX7PYGgOfHU25_dDDDsPX1Pt0Y into /content/yelp_business.csv... Done.
Downloading 1FU5Q-96erhTmk8SjC4XHUm94yWc6h3a0 into /content/yelp_checkin.csv... Done.
Downloading 1UaaLrCKjqoQ7G3JT_VUw56pc-dnTwyrS into /content/yelp_review2.csv... Done.
Downloading 1JNFZeLlimxNSwcOb-oBxxbwJqdg22WgD into /content/yelp_user.csv... Done.


In [None]:
!rm -rf yelp_*
!wget https://upenn-bigdataanalytics.s3.amazonaws.com/data/yelp_business.csv
!wget https://upenn-bigdataanalytics.s3.amazonaws.com/data/yelp_review2.csv
!wget https://upenn-bigdataanalytics.s3.amazonaws.com/data/yelp_business_attributes.csv
!wget https://upenn-bigdataanalytics.s3.amazonaws.com/data/yelp_checkin.csv
!wget https://upenn-bigdataanalytics.s3.amazonaws.com/data/yelp_user.csv

--2020-02-25 00:30:09--  https://upenn-bigdataanalytics.s3.amazonaws.com/data/yelp_business.csv
Resolving upenn-bigdataanalytics.s3.amazonaws.com (upenn-bigdataanalytics.s3.amazonaws.com)... 52.216.111.83
Connecting to upenn-bigdataanalytics.s3.amazonaws.com (upenn-bigdataanalytics.s3.amazonaws.com)|52.216.111.83|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 30364138 (29M) [text/csv]
Saving to: ‘yelp_business.csv’


2020-02-25 00:30:10 (28.3 MB/s) - ‘yelp_business.csv’ saved [30364138/30364138]

--2020-02-25 00:30:11--  https://upenn-bigdataanalytics.s3.amazonaws.com/data/yelp_review2.csv
Resolving upenn-bigdataanalytics.s3.amazonaws.com (upenn-bigdataanalytics.s3.amazonaws.com)... 52.216.111.83
Connecting to upenn-bigdataanalytics.s3.amazonaws.com (upenn-bigdataanalytics.s3.amazonaws.com)|52.216.111.83|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3791120545 (3.5G) [text/csv]
Saving to: ‘yelp_review2.csv’


2020-02-25 00:31:29 (4

### 4.1 Load Our Graph Datasets.

For this assignment, we’ll be looking at graph data (reviews, reviewers, businesses) downloaded from Yelp.

**A very brief review of graph theory**. Recall that a graph $G$ is composed of a set of vertices $V$ (also called nodes) and edges $E$ (sometimes called links).  Each vertex $v \in V$ has an identity (often represented in the real world as a string or numeric “node ID”).  Each edge $e \in E$ is a tuple $(v_i,v_j)$ where $v_i$ represents the source or origin of the edge, and $v_j$ represents the target or destination.  In the simplest case, the edge tuple above is simply the pair $(v_i,v_j)$ but in many cases we may have additional fields such as a label or a distance.  Recall also that graphs may be undirected or directed; in undirected graphs, all edges are symmetric whereas in directed graphs, they are not.  For instance, airline flights are directed, whereas Facebook friend relationships are undirected.

Let’s read our social graph data from Yelp, which forms a directed graph.  Here, the set of nodes is also not specified; the assumption is that the only nodes that matter are linked to other nodes, and thus their IDs will appear in the set of edges.  To load the file `input.txt` into a Spark DataFrame, you can use lines like the following.

```
# Read lines from the text file
input_sdf = spark.read.load('input.txt', format="text")
```

We’ll use the suffix `_sdf` to represent “Spark DataFrame,” much as we used `_df` to denote a Pandas DataFrame in Homework 1.  Load the various files from Yelp.

Your datasets should be named `yelp_business_sdf`, `yelp_business_attributes_sdf`, `yelp_check_in_sdf`, `yelp_reviews_sdf`, and `yelp_users_sdf`.

Submit the first 75 entries of the yelp_business_sdf to the autograder as a pandas dataframe by using the toPandas() function to convert it from a spark dataframe to a normal dataframe. Also,
 make sure to sort it by the "name" column in ascending order

In [None]:
yelp_business_sdf = spark.read.format("csv").option("header", "true").load("yelp_business.csv")
#yelp_business_sdf.show()
yelp_business = yelp_business_sdf.toPandas()
yelp_business = yelp_business.sort_values(by=['name'], ascending=True)
#display(yelp_business)

yelp_business_attributes_sdf = spark.read.format("csv").option("header", "true").load("yelp_business_attributes.csv")
yelp_check_in_sdf = spark.read.format("csv").option("header", "true").load("yelp_checkin.csv")
yelp_reviews_sdf = spark.read.format("csv").option("header", "true").load("yelp_review2.csv")
yelp_users_sdf = spark.read.format("csv").option("header", "true").load("yelp_user.csv")

Put spark dataframes into temporary tables

In [None]:
yelp_business_sdf.createOrReplaceTempView('yelp_business')
yelp_business_attributes_sdf.createOrReplaceTempView('yelp_business_attributes')
yelp_check_in_sdf.createOrReplaceTempView('yelp_check_in')
yelp_reviews_sdf.createOrReplaceTempView('yelp_reviews')
yelp_users_sdf.createOrReplaceTempView('yelp_users')

### 4.2 Simple Analytics on the Data

In this section, we shall be executing Spark operations on the data given. Beyond simply executing the queries, you may try using `.explain()` method to see more about the query execution. Also, please read the data description prior to attempting the following questions to understand the data.


Compute, stored in `best_average_sdf`, the list of names of businesses based on their average review score (review stars), in decreasing order, sorted lexicographically (in increasing order) by name if they have the same score.  Output the number of reviews also.  Call the columns `name`, `avg_rating`, and `count`.

Convert this to pandas and submit the first 75 rows 

In [None]:
#yelp_business_sdf.explain()
best_average_sdf_75_first = spark.sql('select name as name, stars avg_rating, review_count as count from yelp_business order by stars desc, name asc limit 75')
best_average_sdf_75 = best_average_sdf_75_first.toPandas()
display(best_average_sdf_75)

Unnamed: 0,name,avg_rating,count
0,"""""""T""""s Hair Affair""",5.0,10
1,"""2 """"Di"""" 4 Gourmet Karmel Korn""",5.0,7
2,"""Davis """"N"""" Sons Car Detailing""",5.0,4
3,"""Manantial De Salud """"The Vitamin Store""""""",5.0,3
4,"""Scotty""""s Kitchen""",5.0,3
...,...,...,...
70,24 Hours of Booty,5.0,5
71,24-7 Electrical Services,5.0,124
72,24-7 JB Services Garage Door,5.0,47
73,24/7 Carpet & Floor Care,5.0,53


#### 4.2.1 Users who are more negative than average

Find the users whose average review is below the *average of the per-user* average reviews.  Think about how to factor that into steps!

* Compute the (floating-point) variable `overall_avg` as the average of the users' average reviews. (You might compute this in a Spark DataFrame first).
* Then output `negative_users_sdf` as the users whose average rating is below that.  This Spark dataframe should have `name` and `avg_rating` and should be sorted first (from lowest to highest) by average rating, then lexicographically (in ascending order) by name.  You should drop cases where the name is null.

Submit just the overall_avg number that you get to the autograder as a float


In [None]:
#overall_avg_sdf = yelp_users_sdf.select(F.avg("average_stars")).show()
overall_avg = float(3.7108406832064325)

negative_users_sdf = spark.sql('select name as name, average_stars as avg_rating from yelp_users where average_stars < 3.7108406832064325 and name is not null order by average_stars desc, name asc limit 75').show()

+-----+----------+
| name|avg_rating|
+-----+----------+
| -Roc|      3.71|
|    A|      3.71|
|    A|      3.71|
|    A|      3.71|
|    A|      3.71|
|    A|      3.71|
|    A|      3.71|
|    A|      3.71|
|    A|      3.71|
|    A|      3.71|
|    A|      3.71|
|    A|      3.71|
|    A|      3.71|
|    A|      3.71|
|A And|      3.71|
|   A.|      3.71|
| A.M.|      3.71|
| A.M.|      3.71|
|   AJ|      3.71|
|   AM|      3.71|
+-----+----------+
only showing top 20 rows



### 4.2.2 Cities by number of businesses

Find the top 10 cities by number of (Yelp-listed) businesses.

This time, use the `take()` function to create a *list* of the top 10 cities (as Rows).  Call this list `top10_cities` and make sure it includes city `name` and `num_restaurants`.

We want to answer as a list of lists where each element is the combination of the city name + num_restaurants:

[["Philadelphia", 12345], ["Los Angeles", 543]]

In [None]:
city_count_sdf = spark.sql('select city as name, count(*) as num_businesses from yelp_business group by city')
restaurant_count_sdf = spark.sql('select city as name, count(categories) as num_restaurants from yelp_business where categories like "%Restaurants%" group by city')


city_count_sdf.createOrReplaceTempView('city_count')
restaurant_count_sdf.createOrReplaceTempView('restaurant_count')

join_sdf = spark.sql('select city.name, city.num_businesses, num_restaurants from city_count city left join restaurant_count on city.name=restaurant_count.name order by num_businesses desc limit 10')

join_df = join_sdf.toPandas()
top10_cities = join_df.take([0,1], axis=1)
final = top10_cities.values.tolist()
print(final)
#yelp_business_sdf.show()


[['Las Vegas', 26775], ['Phoenix', 17213], ['Toronto', 17206], ['Charlotte', 8553], ['Scottsdale', 8228], ['Pittsburgh', 6355], ['Mesa', 5760], ['Montréal', 5709], ['Henderson', 4465], ['Tempe', 4263]]


The following is code we have given you. This builds a relationship graph thaat will be used in the next section. Take a look at it to familiarize yourself with it for the next section

In [None]:
review_graph_sdf = spark.sql("SELECT user_id AS from_node, business_id AS to_node, stars AS score FROM yelp_reviews "\
                        "WHERE business_id is not null "\
                        "AND user_id is not null")
review_graph_sdf.createOrReplaceTempView('review_graph')

review_graph_sdf.show()

+--------------------+--------------------+-----+
|           from_node|             to_node|score|
+--------------------+--------------------+-----+
|bv2nCi5Qv5vroFiqK...|AEx2SYEUJmTxVVB18...|    5|
|bv2nCi5Qv5vroFiqK...|VR6GpWIda3SfvPC-l...|    5|
|bv2nCi5Qv5vroFiqK...|CKC0-MOWMqoeWf6s-...|    5|
|bv2nCi5Qv5vroFiqK...|ACFtxLv8pGrrxMm6E...|    4|
|bv2nCi5Qv5vroFiqK...|s2I_Ni76bjJNK9yG6...|    4|
|_4iMDXbXZ1p1ONG29...|8QWPlVQ6D-OExqXoa...|    5|
|u0LXt3Uea_GidxRW1...|9_CGhHMz8698M9-Pk...|    4|
|u0LXt3Uea_GidxRW1...|gkCorLgPyQLsptTHa...|    4|
|u0LXt3Uea_GidxRW1...|5r6-G9C4YLbC7Ziz5...|    3|
|u0LXt3Uea_GidxRW1...|fDF_o2JPU8BR1Gya-...|    5|
|u0LXt3Uea_GidxRW1...|z8oIoCT1cXz7gZP5G...|    4|
|u0LXt3Uea_GidxRW1...|XWTPNfskXoUL-Lf32...|    3|
|u0LXt3Uea_GidxRW1...|13nKUHH-uEUXVZylg...|    1|
|u0LXt3Uea_GidxRW1...|RtUvSWO_UZ8V3Wpj0...|    3|
|u0LXt3Uea_GidxRW1...|Aov96CM4FZAXeZvKt...|    5|
|u0LXt3Uea_GidxRW1...|0W4lkclzZThpx3V65...|    4|
|u0LXt3Uea_GidxRW1...|fdnNZMk1NP7ZhL-YM...|    1|



## Part 5. “Traversing” a Graph

For our next tasks, we will be “walking” the graph and making connections.




### 5.1 Distributed Breadth-First Search
A search algorithm typically starts at a node or set of nodes, and “explores” or “walks” for some number of steps to find a match or a set of matches.

If you need an introduction for BFS, I highly suggest looking here: https://www.tutorialspoint.com/data_structures_algorithms/breadth_first_traversal.htm

We will be asking for something slightly different tho...

Let’s implement a distributed version of a popular algorithm, breadth-first-search (BFS).  This algorithm is given a graph `G`, a set of origin nodes `N`, and a depth `d`.  In each iteration or round up to depth `d`, it explores the set of all new nodes directly connected to the nodes it already has seen, before going on to the nodes another “hop” away.  If we do this correctly, we will explore the graph in a way that (1) avoids getting caught in cycles or loops, and (2) visits each node in the fewest number of “hops” from the origin.  BFS is commonly used in tasks such as friend recommendation in social networks.

**How does distributed BFS in Spark work**?  Let’s start with a brief sketch of standard BFS.  During exploration “rounds”, we can divide the graph into three categories:

1. *unexplored nodes*.  These are nodes we have not yet visited.  You don’t necessarily need to track these separately from the graph.
2. *visited nodes*.  We have already reached these nodes in a previous “round”.
3. *frontier nodes*.  These are nodes we have visited in this round.  We have not yet checked whether they have out-edges connecting to unexplored nodes.

Let’s look at the figure, which shows a digraph.  The green node A represents the origin.

<p align = "center">
<img src = "https://imgur.com/WU3AUwg.png" width= "600" align ="center"/>

* In the first round, the origin A is the sole frontier node.  We find all nodes reachable directly from A, namely B-F; then we remove all nodes we have already visited (there are none) or that are in the frontier (the node A itself).  This leaves the blue nodes B-F, which are all reachable in (at most) 1 hop from A.
* In the second round, we move A to the visited set and B-F to the frontier.  Now we explore all nodes connected directly to frontier nodes, namely A (from B), F (from E), and the red nodes G-L.  We eliminate the nodes already contained in the frontier and visited sets from the next round’s frontier set, leaving the red nodes only.
* In the third round, we will move B-F to the visited set, G-L to the frontier set, and explore the next round of neighbors N-V.  This process continues up to some maximum depth (or until there are no more unexplored nodes).

Assume we create data structures (we can make them DataFrames) for the visited and frontier nodes.  Consider (1) how to initialize the different sets at the start of computation (note: unexplored nodes are already in the graph), and (2) how to use the graph edges and the existing data structures to update state for the next iteration “round”.

You might possibly have seen how to create a breadth-first-search algorithm in a single-CPU programming language, using a queue to capture the frontier nodes. With Spark we don’t need a queue -- we just need the three sets above.

### 5.2 Breadth-First Search Algorithm

Create a function `spark_bfs(G, origins, max_depth)` that takes a Spark DataFrame with a graph G (following the schema for `review_graph_sdf` described above, but to be treated as an **undirected graph**), a Python list-of-dictionaries `origins` of the form 

```
[{‘node’: nid1}, 
 {‘node’: nid2}, 
 …]
```

and a nonnegative integer “exploration depth” `max_depth` (to only run BFS on a tractable portion of the graph).  The `max_depth` will be the maximum number of edge traversals (e.g., the origin is at `max_depth=0`, one hop from the origin is `max_depth=1`, etc.  The function should return a DataFrame containing pairs of the form (node, distance), where the distance is depth at which $n$ was *first* encountered (i.e., the shortest-path distance from the origin nodes).  Note that the origin nodes should also be returned in this Spark DataFrame (with depth 0)!  

You can create a new Spark DataFrame with an integer `node` column from the above list of maps `origins`, as follows. This will give you a DataFrame of the nodes to start the BFS at

```
schema = StructType([
            StructField("node", StringType(), True)
        ])

    my_sdf = spark.createDataFrame(my_list_of_maps, schema)
```

In this algorithm, be careful in each iteration to keep only the nodes with their shortest distances (you may need to do aggregation or item removal).  You should accumulate all nodes at distances 0, 1, ..., `max_depth`.

In [None]:
# TODO: iterative search over undirected graph

#Notes
#Build frontier using an iterative process
#First frontier is empty
#Start with origin
#Then put origin into frontier
#Notes
#Origin = C
#frontier = []
#visited = []
#Have a table of edges and nodes with from and to columns
#Do an inner (union) join on the from nodes which are in the frontier with the entire table of edges and nodes "reviews"
#This will make the smaller table of From To
                                      #C   F
                                      #C   A
#Then, you add what is in the "to" column to the next frontier
#Add all letters to a visited matrix

#However, right now our graph is directed. We want an undirected graph to do a BFS. Thus, we will
#flip our from and to columns around to get the inverse path.
#Do this by making a copy of the table "reviews" (the one that contains all the edges and nodes) and then
#renaming the names of the columns. Then add these two tables together
#Now you will get C-->F and C-->A if naturally goes A-->C-->F

#Now for second hop:
#Change frontier to F and A
#Now do same thing and inner join with "reviews" big table of all graph connections
#Use a drop duplicates beacuse you don't want to have C in the frontier again 
#Look into exclusive outer join
#CreateSparkdataframe()


#Above but implemented in code
#Given a graph
#Step 1
#origin = []
#graph.createTempView('Normal Graph')
#graph.create('Flipped Graph')  #now normal graph and flipped graph are the same thing
#Now rename columns to what you want
#Step2
#Slap columns together - call this graph
#Step 3
#Declare your frontier and visited
#Add origin to frontier
#4
#maxdepth = 3
#while depth<maxdepth:
  #inner join frontier and graph to get next hops
  #union (slaps two things together) 'visited' with the two column of this dataframe
  #Remove duplicates
  #Take "to" column (which is the first step of the traversal) and make it your frontier
  #Replace frontier variable with this new data. 
#Now this will rinse and repeat (go back to beginning of loop)

#End notes

def spark_bfs(G, origins, max_depth):
  #answer_df
  schema = StructType([
                StructField("node", StringType(), True),
                StructField("distance", IntegerType(), False)
            ])
  initial_answer = [{'node': 'C', 'distance': 0}]
  my_sdf = spark.createDataFrame(initial_answer, schema)
  #origin_df
  origin_schema = StructType([
            StructField("node", StringType(), True)
        ])
  origin_sdf = spark.createDataFrame(origins, origin_schema)
  origin_sdf.createOrReplaceTempView('origin')
  #initialize first frontier
  frontier = spark.sql('Select node as from_node from origin')
  frontier.createOrReplaceTempView('frontier')
  depth = 1
  #create master df that has no direction
  G.createOrReplaceTempView('G')
  all_connections_sdf = spark.sql('Select from_node, to_node from G union all select +\
   to_node as from_node, from_node as to_node from G')
  all_connections_sdf.createOrReplaceTempView('all_connections')
  while depth<=max_depth:
    next_hop_sdf = spark.sql('Select all.from_node, all.to_node from all_connections +\
    all inner join frontier f on f.from_node=all.from_node')
    next_hop_sdf.createOrReplaceTempView('next_hop')
    frontier = spark.sql('Select to_node as from_node from next_hop')
    dropped_frontier = frontier.groupBy('from_node').count().where('count = 1')
    dropped_frontier.createOrReplaceTempView('frontier')
    add_answer_sdf = spark.sql('Select from_node as node from frontier')
    new_add_answer_sdf = add_answer_sdf.withColumn("distance", F.lit(depth))
    new_add_answer_sdf.createOrReplaceTempView('add_answer')
    my_sdf.createOrReplaceTempView('my')
    my_sdf = spark.sql('Select * from my union all select * from add_answer')

    depth = depth+1
  
  my_sdf.createOrReplaceTempView('my')
  my_sdf = spark.sql('Select * from my ORDER BY node limit 75')
  my_df = my_sdf.toPandas()


  return (my_df)

In [None]:
## toy example
simple = [('A', 'B'),
         ('A', 'C'),
         ('A', 'D'),
         ('C', 'F'),
         ('B', 'G'),
         ('G', 'H'),
         ('D', 'E')]
data = {'from_node': ['A', 'A', 'A', 'C', 'B', 'G', 'D'],
       'to_node': ['B', 'C', 'D', 'F', 'G', 'H', 'E']}
uh = pd.DataFrame.from_dict(data)
uh_sdf = spark.createDataFrame(uh)
smallOrig = [{'node': 'C'}]

smallCount = spark_bfs(uh_sdf, smallOrig, 3)

In [None]:
smallCount

Unnamed: 0,node,distance
0,A,1
1,B,2
2,C,0
3,D,2
4,E,3
5,F,1
6,G,3


This is the starting origin node ID for your BFS and the associated call

In [None]:
orig = [{'node': 'bv2nCi5Qv5vroFiqKGopiw'}]
#grab the count
count = spark_bfs(review_graph_sdf, orig, 3)