In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
print ("Scope 3 Emission Analysis")
print ("--")
print ("Analysis initialization:")
print ("  1. Building the Spark Session...")

Scope 3 Emission Analysis
--
Analysis initialization:
  1. Building the Spark Session...


In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "graphframes:graphframes:0.8.2-spark3.2-s_2.12" --jars /opt/hive3/lib/hive-hcatalog-core-3.1.2.jar pyspark-shell'

In [3]:
# 1. Building the Spark Session
sc = SparkSession.builder\
    .master('local')\
    .appName('Scope3')\
    .getOrCreate()
spark = SparkSession(sc)



:: loading settings :: url = jar:file:/opt/spark3/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-12ffc9ff-0379-4ba7-a882-ba5af1e0e3c1;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.2-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 533ms :: artifacts dl 40ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.2-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	--------------------------------

In [5]:
# 2. Dataframes creation
print ("  2. Creating the vertices (Sites) and edges (trips) DataFrames from HDFS...")
stations = (spark.read
        .option("header","true")
        .option("inferSchema","true")
        .csv("hdfs://localhost:9000/datalake/raw/scope3",sep=";")
        .limit(100)
        .distinct())
# trips = (spark.read
#               .option("header","true")
#               .option("inferSchema","true") 
#               .csv("hdfs://localhost:9000/datalake/raw/san-francisco-bay-bike-sharing/trips/"))

  2. Creating the vertices (Sites) and edges (trips) DataFrames from HDFS...


                                                                                

In [6]:
print ("  2a. Creating the vertices")
origin_keys=stations.select("Origin_Key").distinct()
target_keys=stations.select("Target_Key").distinct()
nodes=origin_keys.union(target_keys).distinct()
print ("  2b. Creating the edges")
# edges=stations.select("Origin_Key",'Target_Key').distinct()

  2a. Creating the vertices
  2b. Creating the edges


In [13]:
# 3. Time to build our graph with the GraphFrames library
from pyspark.sql.functions import count,avg,desc,asc,col
from graphframes import GraphFrame

print ("  3. Building the graph with GraphFrames...")
# GraphFrames requires the vertices DataFrame to have a column named id.
vertices = nodes.withColumnRenamed("Origin_Key","id")

# GraphFrames requires the edges DataFrame to have columns named src and dst
trips = (stations.withColumnRenamed("Origin_Key", "src")
              .withColumnRenamed("Target_Key", "dst"))
#               .withColumnRenamed("Origin_Key", "src_name")
#               .withColumnRenamed("Target_Key", "dst_name")              
edges = (trips.groupBy("src", "dst")
               .agg(count("*").alias("trip_count"),
                    avg("Distance").alias("Distance")))

# Build the graph
graph = GraphFrame(vertices, edges)

# graph processing usually requires multiple iterations, 
# therefore caching it will bring better performance
graph.cache()   

  3. Building the graph with GraphFrames...


GraphFrame(v:[id: string], e:[src: string, dst: string ... 2 more fields])

In [14]:
# 4. Let's use the PageRank algorithm to identify the top 5 most popular bikestations.
print ("  4. Running the PageRank algorithm to get the top 5 most popular bike stations (be patient)...")
from pyspark.sql.functions import desc
ranks = graph.pageRank(resetProbability=0.15, maxIter=10)
ranks.vertices.orderBy(desc("pagerank")).select("id", "pagerank").show(5)


  4. Running the PageRank algorithm to get the top 5 most popular bike stations (be patient)...




+---------+------------------+
|       id|          pagerank|
+---------+------------------+
|010 01-SK|1.1001751871317103|
|624 00-CZ|1.1001751871317103|
|010 04-SK|1.1001751871317103|
|974 01-SK|1.1001751871317103|
|901 01-SK|1.1001751871317103|
+---------+------------------+
only showing top 5 rows



                                                                                

In [None]:
# 5. Finalize the execution (release resources if needed)
print ("The job is done! Have a great day my human friend :)")