<h1><center>Apache Spark GraphFrames</center></h1>

Let's run GraphFrame code on Google Colab and see if it's faster than Databricks.


### Installing Spark

Install Dependencies:


1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)


In [None]:
!rm -rf spark-3.1.1-bin-hadoop3.2

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q --show-progress http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
#!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark pyspark
#!pip -q install findspark pyspark graphframes

Set Environment Variables:

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
!ls

sample_data


In [None]:
!pip show pyspark

Name: pyspark
Version: 3.5.5
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.11/dist-packages
Requires: py4j
Required-by: google-spark-connect


### Installing GraphFrames

In [None]:
!pip install graphframes

Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl.metadata (934 bytes)
Collecting nose (from graphframes)
  Downloading nose-1.3.7-py3-none-any.whl.metadata (1.7 kB)
Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/154.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.7/154.7 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: nose, graphframes
Successfully installed graphframes-0.6 nose-1.3.7


In [None]:
!python -V

Python 3.11.11


In [None]:
!curl -L -o "/usr/local/lib/python3.11/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.3.2-s_2.11.jar" https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.1-s_2.12/graphframes-0.8.2-spark3.1-s_2.12.jar

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  242k  100  242k    0     0  1904k      0 --:--:-- --:--:-- --:--:-- 1906k


In [None]:
!ls /usr/local/lib/python3.11/dist-packages/pyspark/jars/

activation-1.1.1.jar
aircompressor-0.27.jar
algebra_2.12-2.0.1.jar
annotations-17.0.0.jar
antlr4-runtime-4.9.3.jar
antlr-runtime-3.5.2.jar
aopalliance-repackaged-2.6.1.jar
arpack-3.0.3.jar
arpack_combined_all-0.1.jar
arrow-format-12.0.1.jar
arrow-memory-core-12.0.1.jar
arrow-memory-netty-12.0.1.jar
arrow-vector-12.0.1.jar
audience-annotations-0.5.0.jar
avro-1.11.4.jar
avro-ipc-1.11.4.jar
avro-mapred-1.11.4.jar
blas-3.0.3.jar
bonecp-0.8.0.RELEASE.jar
breeze_2.12-2.1.0.jar
breeze-macros_2.12-2.1.0.jar
cats-kernel_2.12-2.1.1.jar
chill_2.12-0.10.0.jar
chill-java-0.10.0.jar
commons-cli-1.5.0.jar
commons-codec-1.16.1.jar
commons-collections-3.2.2.jar
commons-collections4-4.4.jar
commons-compiler-3.1.9.jar
commons-compress-1.23.0.jar
commons-crypto-1.1.0.jar
commons-dbcp-1.4.jar
commons-io-2.16.1.jar
commons-lang-2.6.jar
commons-lang3-3.12.0.jar
commons-logging-1.1.3.jar
commons-math3-3.6.1.jar
commons-pool-1.5.4.jar
commons-text-1.10.0.jar
compress-lzf-1.1.2.jar
curator-client-2.13.0.jar
cur

### Starting Spark with Libraries Loaded

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.jars", "/usr/local/lib/python3.11/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.3.2-s_2.11.jar") \
    .config("spark.driver.memory", "12g") \
    .getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)  # Property used to format output tables better\


### Example Dataset

In [None]:
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
], ["id", "name", "age"])
# Edge DataFrame
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])
# Create a GraphFram

In [None]:
from graphframes import *
from graphframes import GraphFrame

In [None]:
print('PySpark Version :'+spark.version)
print('PySpark Version :'+spark.sparkContext.version)


PySpark Version :3.5.5
PySpark Version :3.5.5


In [None]:
GraphFrame(v, e)

Py4JJavaError: An error occurred while calling o158.loadClass.
: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
g = GraphFrame(v, e)

Py4JJavaError: An error occurred while calling o114.loadClass.
: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:

# Search from "Esther" for users of age < 32.
paths = g.bfs("name = 'Esther'", "age < 32")
paths.show()



In [None]:
# Specify edge filters or max path lengths.
g.bfs("name = 'Esther'", "age >33",\
  edgeFilter="relationship != 'friend'", maxPathLength=3)

In [None]:
results = g.shortestPaths(landmarks=["a", "d"])
results.select("id", "distances").show()

In [None]:
g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()

In [None]:
g1.vertices.show()

In [None]:
g1.edges.show()

In [None]:
sc = spark.sparkContext
sc.setCheckpointDir("/tmp")

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

In [None]:


sc.setCheckpointDir("/tmp")
# Run PageRank until convergence to tolerance "tol".
results = g.pageRank(resetProbability=0.15, tol=0.01)
# Display resulting pageranks and final edge weights
# Note that the displayed pagerank may be truncated, e.g., missing the E notation.
# In Spark 1.5+, you can use show(truncate=False) to avoid truncation.
results.vertices.select("id", "pagerank").show()
results.edges.select("src", "dst", "weight").show()

In [None]:
results = g.shortestPaths(landmarks=["a", "d"])
results.select("id", "distances").show()

In [None]:
results = g.triangleCount()
results.select("id", "count").show()