<a href="https://colab.research.google.com/github/sundarramamurthy/DS/blob/master/Pyspark_in_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Connecting Drive to Colab

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


# Setting up PySpark in Colab

## Install Pyspark

In [None]:
# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [None]:
# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"




In [None]:
# install findspark using pip
!pip install -q findspark



In [None]:
#Install Pyspark for python
!pip install Pyspark



# Local spark session to test our installation:

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
# spark = SparkSession.builder.master("local[*]").getOrCreate()

spark = SparkSession.builder\
        .master("local")\
        .appName("LRusingspark")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Test the environment

In [None]:
spark

# Loading data into PySpark

In [None]:
df = spark.read.csv("/content/drive/MyDrive/Data/boston.csv", header=True, inferSchema=True)

Understanding the Data

Variables

There are 14 attributes in each case of the dataset. 
They are:

CRIM - per capita crime rate by town

ZN - proportion of residential land zoned for lots over 25,000 sq.ft.

INDUS - proportion of non-retail business acres per town.

CHAS - Charles River dummy variable (1 if tract bounds river; 0 otherwise)

NOX - nitric oxides concentration (parts per 10 million)

RM - average number of rooms per dwelling

AGE - proportion of owner-occupied units built prior to 1940

DIS - weighted distances to five Boston employment centres

RAD - index of accessibility to radial highways

TAX - full-value property-tax rate per $10,000

PTRATIO - pupil-teacher ratio by town

B - 1000(Bk - 0.63)^2 where Bk is the proportion of blacks by town

LSTAT - % lower status of the population

MEDV - Median value of owner-occupied homes in $1000's

Data Exploration

In [None]:
df.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



Display Rows

In [None]:
df.show(10)

+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio|     b|lstat|medv|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|
|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311|   15.2| 396.9|19.15|27.1|
|0.21124|12.5| 7.87|   0|0.524|5.631|100.0|6.0821|  5|311|   15.2

Number of rows in Data Frame

In [None]:
df.count()

506

Display specific columns

In [None]:
df.select("crim","indus","age","medv").show(5)

+-------+-----+----+----+
|   crim|indus| age|medv|
+-------+-----+----+----+
|0.00632| 2.31|65.2|24.0|
|0.02731| 7.07|78.9|21.6|
|0.02729| 7.07|61.1|34.7|
|0.03237| 2.18|45.8|33.4|
|0.06905| 2.18|54.2|36.2|
+-------+-----+----+----+
only showing top 5 rows



Pandas DF format

In [None]:
df.select("crim","indus","age","medv").limit(5).toPandas()

Unnamed: 0,crim,indus,age,medv
0,0.00632,2.31,65.2,24.0
1,0.02731,7.07,78.9,21.6
2,0.02729,7.07,61.1,34.7
3,0.03237,2.18,45.8,33.4
4,0.06905,2.18,54.2,36.2


Describe the Data

In [None]:
df.describe().show()

+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|summary|              crim|                zn|             indus|              chas|                nox|                rm|               age|              dis|              rad|               tax|           ptratio|                 b|             lstat|              medv|
+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|  count|               506|               506|               506|               506|                506|               506|               506|              506|              

Distinct values for a column

In [None]:
df.select("age").distinct().show()

+----+
| age|
+----+
|74.5|
|56.8|
|98.3|
|78.9|
|15.7|
|67.0|
|96.8|
|56.5|
|32.3|
|49.9|
|54.2|
|98.1|
|86.9|
|83.5|
|94.9|
|90.3|
|97.1|
|65.4|
|59.5|
|76.5|
+----+
only showing top 20 rows



Aggregation

In [None]:
from pyspark.sql import functions as F
df.groupBy("age","indus").agg(F.avg("medv")).show()

+-----+-----+------------------+
|  age|indus|         avg(medv)|
+-----+-----+------------------+
| 95.7|19.58|              14.6|
| 68.7| 4.05|              22.6|
| 76.7|  9.9|              20.3|
| 85.1|13.89|              28.7|
| 64.5| 3.33|              45.4|
| 77.8| 18.1|               6.3|
| 45.8|12.83|              20.8|
|100.0|10.59|              20.0|
| 96.2| 18.1|             16.95|
| 94.1| 8.14|14.149999999999999|
| 36.6|12.83|              20.3|
| 86.9| 3.97|              50.0|
| 36.9| 2.89|              43.8|
| 84.2|10.01|              18.5|
| 72.9| 9.69|              19.7|
| 98.2|19.58|              50.0|
| 29.7| 2.01|              24.5|
| 18.4|13.92|              23.9|
| 54.0| 9.69|              21.8|
| 33.2| 2.68|              48.5|
+-----+-----+------------------+
only showing top 20 rows



Counting and Removing Null values

In [None]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+
|crim| zn|indus|chas|nox| rm|age|dis|rad|tax|ptratio|  b|lstat|medv|
+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+
|   0|  0|    0|   0|  0|  0|  0|  0|  0|  0|      0|  0|    0|   0|
+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+



In [None]:
# df = df.fillna({'age_new':0, 'age':0})

Save the file

In [None]:
df.write.csv("/content/drive/MyDrive/Data/preprocessed_data2")

In [None]:
df.rdd.getNumPartitions()

1

In [None]:
# Spark df to Pandas df
df_pd = df.toPandas()

# Store result
df_pd.to_csv("/content/drive/MyDrive/Data/preprocessed_data/boston_processed_data.csv")