<a href="https://colab.research.google.com/github/namakutiwik/PySpark-in-Colab/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setting up PySpark

In [None]:
# Connecting to drive colab
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [61]:
# Setting up PySpark in Colab

# Install spark-related dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

!pip install -q findspark
!pip install pyspark

# Set up required environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"



In [None]:
import findspark
findspark.init()

In [None]:
findspark.find()

'/content/spark-3.1.2-bin-hadoop3.2'

In [63]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [65]:
spark

In [66]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

--2021-07-13 21:54:23--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 3.223.73.198, 3.211.204.50, 34.228.108.156, ...
Connecting to bin.equinox.io (bin.equinox.io)|3.223.73.198|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2021-07-13 21:54:23 (55.6 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://577f6e78b746.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}


# Data Exploration

In [None]:
# Reading data from drive
!unrar x "/content/drive/MyDrive/Kemenkes - Data Analyst/data2015.rar" 
!unrar x "/content/drive/MyDrive/Kemenkes - Data Analyst/data2016.rar" 
!unrar x "/content/drive/MyDrive/Kemenkes - Data Analyst/data2017.rar" 


UNRAR 5.50 freeware      Copyright (c) 1993-2017 Alexander Roshal


Extracting from /content/drive/MyDrive/Kemenkes - Data Analyst/data2015.rar

Extracting  data2015.csv                                                   0%  1%  2%  3%  4%  5%  6%  7%  8%  9% 10% 11% 12% 13% 14% 15% 16% 17% 18% 19% 20% 21% 22% 23% 24% 25% 26% 27% 28% 29% 30% 31% 32% 33% 34% 35% 36% 37% 38% 39% 40% 41% 42% 43% 44% 45% 46% 47% 48% 49% 50% 51% 52% 53% 54% 55% 56% 57% 58% 59% 60% 61% 62% 63% 64% 65% 66% 67% 68% 69% 70% 71% 72% 73% 74% 75% 76% 77% 78% 79% 80% 81% 82% 83% 84% 85% 86% 87% 88% 89% 90% 91% 92% 93% 94% 95% 96% 

In [None]:
df_2015 = spark.read.csv("/content/data2015.csv", header=True, inferSchema=True)
df_2016 = spark.read.csv("/content/data2016.csv", header=True, inferSchema=True)
df_2017 = spark.read.csv("/content/data2017.csv", header=True, inferSchema=True)

In [None]:
df = df_2015.unionAll(df_2016).unionAll(df_2017)

In [None]:
#Show column details
df.printSchema()

In [None]:
#Display Rows
df.show(5)

+------------+-------------------+-------------------+----------------+--------------------+----------------------+-----------------------+--------------+------------------+--------------------+---------------------+-------+----------+----------+------+-------------+
|tripduration|          starttime|           stoptime|start_station_id|  start_station_name|start_station_latitude|start_station_longitude|end_station_id|  end_station_name|end_station_latitude|end_station_longitude| bikeid|  usertype|birth_year|gender|customer_plan|
+------------+-------------------+-------------------+----------------+--------------------+----------------------+-----------------------+--------------+------------------+--------------------+---------------------+-------+----------+----------+------+-------------+
|       547.0|2015-03-15 10:24:00|2015-03-15 10:33:00|           434.0|     9 Ave & W 18 St|           40.74317449|           -74.00366443|         523.0|   W 38 St & 8 Ave|         40.75466591|  

In [None]:
#Number of Rows
df.count()

33977186

In [None]:
#Display specific columns
df.select("starttime","stoptime").show(5)

In [None]:
#Describing the columns
df.describe().show()

In [None]:
#Distinct values
df.select("gender").distinct().show()

In [None]:
#Aggregate with Groupby
from pyspark.sql import functions as F
df.groupBy("gender").agg(F.avg("tripduration")).show()

In [None]:
#Counting and Removing Null values
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

In [None]:
#Replace NA with zero value
df = df.fillna({'birth_year':0})

In [None]:
#Drop NA value
df = df.na.drop()