## colab preparation

In [None]:
# connecting to google drive
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
# !rm -r spark*
!ls

gdrive	sample_data


In [None]:
# install pyspark in colab
# Dependencies:
    # Java 8
    # Apache Spark with hadoop and
    # Findspark (used to locate the spark in the system)

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.6.tgz
!tar xf spark-2.4.8-bin-hadoop2.6.tgz
!pip -q install findspark graphframes

[?25l[K     |██▏                             | 10kB 19.4MB/s eta 0:00:01[K     |████▎                           | 20kB 21.6MB/s eta 0:00:01[K     |██████▍                         | 30kB 17.6MB/s eta 0:00:01[K     |████████▌                       | 40kB 15.2MB/s eta 0:00:01[K     |██████████▋                     | 51kB 5.7MB/s eta 0:00:01[K     |████████████▊                   | 61kB 5.6MB/s eta 0:00:01[K     |██████████████▉                 | 71kB 6.1MB/s eta 0:00:01[K     |█████████████████               | 81kB 6.8MB/s eta 0:00:01[K     |███████████████████             | 92kB 7.1MB/s eta 0:00:01[K     |█████████████████████▏          | 102kB 7.7MB/s eta 0:00:01[K     |███████████████████████▎        | 112kB 7.7MB/s eta 0:00:01[K     |█████████████████████████▍      | 122kB 7.7MB/s eta 0:00:01[K     |███████████████████████████▌    | 133kB 7.7MB/s eta 0:00:01[K     |█████████████████████████████▋  | 143kB 7.7MB/s eta 0:00:01[K     |███████████████████████

In [None]:
# Set Environment Variables:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.6"

os.environ["HADOOP_HOME"] = os.environ["SPARK_HOME"]
os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages graphframes:graphframes:0.8.0-spark2.4-s_2.11 pyspark-shell"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
sc = spark.sparkContext
spark

## Question 4

edgs.txt and vertex.txt are attached. edgs.txt and vertex.txt respectively represents edgs vertexes of a graph. 

each edgs is a publication and the vertices represents the citation.

### Section 1

make the graph using the edgs.txt and vertex.txt 

In [None]:
from pyspark.sql.functions import split
# reading the Vertex.txt and put all its vlue in a single-column dataframe
vertex_single_col_df = spark.read.text('/content/gdrive/My Drive/bigData_hw3/Q4/dataset/Vertex.txt')

# splitting single column to multiple column. so we will have a dataframe with 2 column.
vertex_df = vertex_single_col_df   \
                                .withColumn('id', split(vertex_single_col_df['value'], '\t').getItem(0))   \
                                .withColumn('name',split(vertex_single_col_df['value'], '\t')   \
                                .getItem(1))   \
                                .drop('value')

# reading the edges.txt and put all its values in a single-column dataframe
edges_single_col_df = spark.read.text('/content/gdrive/My Drive/bigData_hw3/Q4/dataset/edges.txt')

# splitting single column to multiple column. so we will have a dataframe with 2 column.
edges_df = edges_single_col_df   \
                                .withColumn("src", split(edges_single_col_df['value'], '\t').getItem(0))   \
                                .withColumn('dst',split(edges_single_col_df['value'], '\t')   \
                                .getItem(1))   \
                                .drop('value')

# Creating a GraphFrame
from graphframes import *
g = GraphFrame(vertex_df, edges_df)
print('graph created.')
g

graph created.


GraphFrame(v:[id: string, name: string], e:[src: string, dst: string])

### Section 2

In the graph, what is the largest input degree? what is the largest output degree?

In [None]:
# getting the maximum value of in degree in the graph named "g"
g_sorted_by_inDegree = g.inDegrees.sort('inDegree', ascending=False)
print('maximum in-degree:')
g_sorted_by_inDegree.show(1)
print( '---'*20, '\n')

# getting the maximum value of out degree in the graph named "g"
g_sorted_by_outDegree = g.outDegrees.sort('outDegree', ascending=False)
print('maximum out-degree:')
g_sorted_by_outDegree.show(1)

maximum in-degree:
+------------------+--------+
|                id|inDegree|
+------------------+--------+
|946065507707541358|     327|
+------------------+--------+
only showing top 1 row

------------------------------------------------------------
maximum out-degree:
+-------------------+---------+
|                 id|outDegree|
+-------------------+---------+
|3841755165517709241|      264|
+-------------------+---------+
only showing top 1 row



### Section 3

Determine size of each connected Component

In [None]:
from pyspark.sql.functions import col
# getting connected component
result = g.stronglyConnectedComponents(maxIter = 10)
result.select("id", "component").orderBy("component", ascending=False )
# counting size of each connected component; node in a connected component have same component value in the result dataframe
size_of_connected_component = result.groupBy('component').count().where(col('count') > 1).sort('count', ascending=False)
# saving result in colab
size_of_connected_component.toPandas().to_csv('/content/gdrive/My Drive/bigData_hw3/Q4/result_Q4_part3.csv')
print('the result has been saved to coalab in a file named "result_Q4_part3.csv"')
size_of_connected_component

the result has been saved to coalab in a file named "result_Q4_part3.csv"


component,count
3,1678
8589934626,316
8589934637,135
42,79
43,71
68719476762,48
25769803798,32
137438953482,26
17179869207,23
42949672979,20


### Section 4

what are top ten publications? publications with largest input degree

In [None]:
# getting the maximum value of in degree in the graph named "g"
g_sorted_by_inDegree = g.inDegrees.sort('inDegree', ascending=False)
g_sorted_by_inDegree.show(10)

maximum in-degree:
+-------------------+--------+
|                 id|inDegree|
+-------------------+--------+
| 946065507707541358|     327|
|3856212023725725593|     322|
|8978262722425160811|     316|
|6245498229508734555|     185|
|7264519433548233535|     180|
|5362090331808156011|     179|
| 277710621679830671|     149|
|1984578398767042266|     145|
|2395551540800395672|     134|
|5395033957924805072|     130|
+-------------------+--------+
only showing top 10 rows

------------------------------------------------------------
