<a href="https://colab.research.google.com/github/nagavatar/sparkoncolab/blob/main/Spark_on_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#mounting drive on colab
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#unzipping the zipped csv file
!unzip "/content/drive/My Drive/Spark_AnalyticsVidhya/train_oSwQCTC.zip"

Archive:  /content/drive/My Drive/Spark_AnalyticsVidhya/train_oSwQCTC.zip
  inflating: train.csv               


# **Why to use Spark?**
It is the most effective data processing framework in enterprises today. It’s true that the cost of Spark is high as it requires a lot of RAM for in-memory computation but is still a hot favorite among Data Scientists and Big Data Engineers.

* Organisations mainly used Map-Reduce architecture of Hadoop but now most of them are shifting to Apache Spark Framework.
* Spark not only performs in-memory computing but it’s 100 times faster than Map Reduce frameworks like Hadoop. 
* Spark is a big hit among data scientists as it distributes and caches data in memory and helps them in optimizing machine learning algorithms on Big Data.

In [None]:
#let's set up Spark
#Spark is written in scala and requires JVM so let's download Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
#let's install Apache Spark with Hadoop
!wget -q https://www-us.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz

In [None]:
#let's unzip the downloaded folder
!tar xf spark-3.1.1-bin-hadoop2.7.tgz

In [None]:
#let's import findspark library. It will locate Spark on the system and will import it as regular library
!pip install -q findspark

In [None]:
#we have downloaded all necessary dependencies in colab
#let's set environment path
#it will help to run pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [None]:
#all required setup done
#now let's import findspark which wil help in finding spark
import findspark
findspark.init()

In [None]:
#let's find where spark is located
#it's not necessary
#i'm using it just to showcase where spark is located
findspark.find()

'/content/spark-3.1.1-bin-hadoop2.7'

In [None]:
#we'll import SparkSession from pyspark.sql and will create SparkSession which is **entry point** to Spark
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
#now let's print SparkSession variable which we just created
spark

In [None]:
#let's try to see using SparkUI
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnelsA

--2021-03-31 11:11:52--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 34.235.106.23, 34.193.233.154, 34.194.108.77, ...
Connecting to bin.equinox.io (bin.equinox.io)|34.235.106.23|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 14746350 (14M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.1’


2021-03-31 11:11:53 (38.1 MB/s) - ‘ngrok-stable-linux-amd64.zip.1’ saved [14746350/14746350]

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: A
  inflating: ngrok                   
{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://83a2c6765c17.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p

In [None]:
#we see that public url is working and we can see spark running


#now let's see spark working on the dataset
df = spark.read.csv("train.csv", header=True, inferSchema=True)

In [None]:
df.head()

Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370)

# Exploratory Data Analysis aka **EDA**

In [None]:
#now let's get rolling and start EDA of the data
df.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [None]:
#let's print top 20 rows
df.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|                12|              null|              null|    1422

In [None]:
df.count()

550068

In [None]:
#let's display specific columns
df.select("User_ID","Gender","Age","Occupation").show(5)

+-------+------+----+----------+
|User_ID|Gender| Age|Occupation|
+-------+------+----+----------+
|1000001|     F|0-17|        10|
|1000001|     F|0-17|        10|
|1000001|     F|0-17|        10|
|1000001|     F|0-17|        10|
|1000002|     M| 55+|        16|
+-------+------+----+----------+
only showing top 5 rows



In [None]:
#let's get a summary of all columns of the data
#here it shows summary of numeric as well as character data
df.describe().show()

+-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|        Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|            550068|       550068|                    550068|             550068|            550068|            376430|            166821|           550068|
|   mean|1003028.8424013031|      null|  null|  null| 8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 9.842329251

In [None]:
#let's check distinct values for categorical columns
df.select("City_Category").distinct().show()

+-------------+
|City_Category|
+-------------+
|            B|
|            C|
|            A|
+-------------+



In [None]:
#now let's see sales of each city category.
#we will groupby city_category to see sales
from pyspark.sql import functions as F
df.groupBy("City_Category").agg(F.sum("Purchase")).show()

+-------------+-------------+
|City_Category|sum(Purchase)|
+-------------+-------------+
|            B|   2115533605|
|            C|   1663807476|
|            A|   1316471661|
+-------------+-------------+



In [None]:
#let's count and remove null values
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|      0|         0|     0|  0|         0|            0|                         0|             0|                 0|            173638|            383247|       0|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



# Handling NULL values

In [None]:
#we saw that product_category_2 and product_category_3 has lots of null values so it might be possible that those who didn't buy the product it's showing null for those
#so let's replace null with 0
#since spark df are immutable so then we'll have to create new dataframe
df = df.fillna({'Product_Category_2':0, 'Product_Category_3':0})


In [None]:
#let's confirm the result
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|      0|         0|     0|  0|         0|            0|                         0|             0|                 0|                 0|                 0|       0|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



# Save to the file

In [None]:
#There won’t be just a single CSV saved but multiple depending on the number of partitions of the dataframe. So if there are 2 partitions, then there will be two CSV files saved for each partition.
df.write.csv("/content/drive/My Drive/Spark_AnalyticsVidhya/preprocessed_data")

AnalysisException: ignored

In [None]:
#so let's check no of partition and that's many number of files we will have
df.rdd.getNumPartitions()

1

# What is RDD?
RDDs or Resilient Distributed Datasets is the fundamental data structure of the Spark. It is the collection of objects which is capable of storing the data partitioned across the multiple nodes of the cluster and also allows them to do processing in parallel.

It is fault-tolerant if you perform multiple transformations on the RDD and then due to any reason any node fails. The RDD, in that case, is capable of recovering automatically.

### How to create RDD?
There are 3 ways of creating an RDD:

* Parallelizing an existing collection of data
* Referencing to the external data file stored
* Creating RDD from an already existing RDD


### When to use RDD?
* When we want to do low-level transformations on the dataset. 
* While building ML pipeline


*NOTE* - It does not automatically infer the schema of the ingested data, we need to specify the schema of each and every dataset when we create an RDD.

In [None]:
#here we have saved spark dataframe as RDD which is equal to number of partitions but it won't be convenient while reading files again
#so let's save Spark df to Pandas df
# Spark df to Pandas df
df_pd = df.toPandas()

# Store result
df_pd.to_csv("/content/drive/My Drive/Spark_AnalyticsVidhya/pandas_preprocessed_data.csv")

In [None]:
#so yeah this is how we can use Apache Spark on Colab.
#i'll get working on it to build ML model once I get some free time from my college studies