<a href="https://colab.research.google.com/github/vaddanki001/Spark-Optimization-Mini-Project/blob/master/Spark_Optimization_Mini_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### **Hadoop and Spark Installation Part**

Hadoop is a Java-based programming framework that supports the processing and storage of extremely large datasets on a cluster of inexpensive machines. It was the first major open source project in the big data playing field and is sponsored by the Apache Software Foundation.

### **Pre Installation Steps **
1. Clean up old files
2. GIT Clone

In [1]:
!rm -R Spark-Optimization-Mini-Project
!rm spark-3.0.2-bin-hadoop2.7-hive1.2.tgz

rm: cannot remove 'Spark-Mini-Project': No such file or directory
rm: cannot remove 'spark-3.0.2-bin-hadoop2.7-hive1.2.tgz': No such file or directory


In [2]:
!git clone https://github.com/vaddanki001/Spark-Optimization-Mini-Project

Cloning into 'Spark-Optimization-Mini-Project'...
remote: Enumerating objects: 18, done.[K
remote: Counting objects: 100% (18/18), done.[K
remote: Compressing objects: 100% (16/16), done.[K
remote: Total 18 (delta 0), reused 15 (delta 0), pack-reused 0[K
Unpacking objects: 100% (18/18), done.


### **Step 1:Installing Hadoop and Spark**

In [3]:
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7-hive1.2.tgz

In [4]:
!tar xf spark-3.0.2-bin-hadoop2.7-hive1.2.tgz

!pip install -q findspark

!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/27/67/5158f846202d7f012d1c9ca21c3549a58fd3c6707ae8ee823adcaca6473c/pyspark-3.0.2.tar.gz (204.8MB)
[K     |████████████████████████████████| 204.8MB 75kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 17.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.2-py2.py3-none-any.whl size=205186687 sha256=2593996efcd463d43b36db4145fb3fc462e1fdf24b156ecb8676659fff4d1bb2
  Stored in directory: /root/.cache/pip/wheels/8b/09/da/c1f2859bcc86375dc972c5b6af4881b3603269bcc4c9be5d16
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.2


### **Step2: install JDK**
Hadoop/Spark requires that you set the path to Java, either as an environment variable or in the Hadoop configuration file.

In [5]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#To find the default Java path
!readlink -f /usr/bin/java | sed "s:bin/java::"

/usr/lib/jvm/java-11-openjdk-amd64/


### Step 3: Setting **Java and Spark Home**

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7-hive1.2"

### Step 4: **Initiate Spark Session**

In [7]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

### Step 5: **Check Datafile exists**

In [8]:
!ls -ltr /content/Spark-Optimization-Mini-Project/data/

total 8
drwxr-xr-x 2 root root 4096 Feb 27 01:31 answers
drwxr-xr-x 2 root root 4096 Feb 27 01:31 questions


In [9]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, month

import os


spark = SparkSession.builder.appName('Optimize I').getOrCreate()

base_path = os.getcwd()

project_path = ('/').join(base_path.split('/')[0:-3]) 

answers_input_path = os.path.join(project_path, '/content/Spark-Optimization-Mini-Project/data/answers')

questions_input_path = os.path.join(project_path, '/content/Spark-Optimization-Mini-Project/data/questions')

answersDF = spark.read.option('path', answers_input_path).load()

questionsDF = spark.read.option('path', questions_input_path).load()

In [12]:
answersDF.show(5)

+-----------+---------+--------------------+--------+-------+-----+
|question_id|answer_id|       creation_date|comments|user_id|score|
+-----------+---------+--------------------+--------+-------+-----+
|     226592|   226595|2015-12-29 23:46:...|       3|  82798|    2|
|     388057|   388062|2018-02-22 18:52:...|       8|    520|   21|
|     293286|   293305|2016-11-17 21:35:...|       0|  47472|    2|
|     442499|   442503|2018-11-22 06:34:...|       0| 137289|    0|
|     293009|   293031|2016-11-16 13:36:...|       0|  83721|    0|
+-----------+---------+--------------------+--------+-------+-----+
only showing top 5 rows



In [11]:
questionsDF.show(5)

+-----------+--------------------+--------------------+--------------------+------------------+--------+-------+-----+
|question_id|                tags|       creation_date|               title|accepted_answer_id|comments|user_id|views|
+-----------+--------------------+--------------------+--------------------+------------------+--------+-------+-----+
|     382738|[optics, waves, f...|2018-01-28 07:22:...|What is the pseud...|            382772|       0|  76347|   32|
|     370717|[field-theory, de...|2017-11-25 09:09:...|What is the defin...|              null|       1|  75085|   82|
|     339944|[general-relativi...|2017-06-17 20:32:...|Could gravitation...|              null|      13| 116137|  333|
|     233852|[homework-and-exe...|2016-02-04 21:19:...|When does travell...|              null|       9|  95831|  185|
|     294165|[quantum-mechanic...|2016-11-22 11:39:...|Time-dependent qu...|              null|       1| 118807|   56|
+-----------+--------------------+--------------

In [15]:
answers_month = answersDF.withColumn('month', month('creation_date')).groupBy('question_id', 'month').agg(count('*').alias('cnt'))

resultDF = questionsDF.join(answers_month, 'question_id').select('question_id', 'creation_date', 'title', 'month', 'cnt')

resultDF.orderBy('question_id', 'month').show()

+-----------+--------------------+--------------------+-----+---+
|question_id|       creation_date|               title|month|cnt|
+-----------+--------------------+--------------------+-----+---+
|     155989|2015-01-01 01:59:...|Frost bubble form...|    1|  1|
|     155989|2015-01-01 01:59:...|Frost bubble form...|    2|  1|
|     155990|2015-01-01 02:51:...|The abstract spac...|    1|  2|
|     155992|2015-01-01 03:44:...|centrifugal force...|    1|  1|
|     155993|2015-01-01 03:56:...|How can I estimat...|    1|  1|
|     155995|2015-01-01 05:16:...|Why should a solu...|    1|  3|
|     155996|2015-01-01 06:06:...|Why do we assume ...|    1|  2|
|     155996|2015-01-01 06:06:...|Why do we assume ...|    2|  1|
|     155996|2015-01-01 06:06:...|Why do we assume ...|   11|  1|
|     155997|2015-01-01 06:26:...|Why do square sha...|    1|  3|
|     155999|2015-01-01 07:01:...|Diagonalizability...|    1|  1|
|     156008|2015-01-01 08:48:...|Capturing a light...|    1|  2|
|     1560

In [17]:
resultDF.explain()

== Physical Plan ==
*(3) Project [question_id#12L, creation_date#14, title#15, month#154, cnt#170L]
+- *(3) BroadcastHashJoin [question_id#12L], [question_id#0L], Inner, BuildRight
   :- *(3) Project [question_id#12L, creation_date#14, title#15]
   :  +- *(3) Filter isnotnull(question_id#12L)
   :     +- *(3) ColumnarToRow
   :        +- FileScan parquet [question_id#12L,creation_date#14,title#15] Batched: true, DataFilters: [isnotnull(question_id#12L)], Format: Parquet, Location: InMemoryFileIndex[file:/content/Spark-Optimization-Mini-Project/data/questions], PartitionFilters: [], PushedFilters: [IsNotNull(question_id)], ReadSchema: struct<question_id:bigint,creation_date:timestamp,title:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#213]
      +- *(2) HashAggregate(keys=[question_id#0L, month#154], functions=[count(1)])
         +- Exchange hashpartitioning(question_id#0L, month#154, 200), true, [id=#209]
            +- *(1) HashAggreg

In [19]:
questionsDF.count()

86936