In [0]:
# Setting up Spark environment

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [6]:
# Mounting on Drive

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
pip install pyspark



In [0]:
# Importing libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

**1. Explore configuration options on SparkSession**

In [0]:
# Building Spark session with config details

spark = SparkSession.builder \
    .master("local") \
    .appName("Assignment_6") \
    .config("spark.some.config.option", "some-value") \
    .config("driver.cores", "1") \
    .config("broadcast.compress", "true") \
    .config("checkpoint.compress", "false") \
    .config("streaming.backpressure.enabled", "false") \
    .getOrCreate()

 **2. READING DATA**

READ DATA AS DATAFRAME

Read a csv file into spark dataframe, considering first row as headers. A csv file is a comma seperated file

In [0]:
# Read data into dataframe

df = spark.read.format("csv").option("header", "true").load("/content/sample_data/california_housing_test.csv")

In [14]:
# Look at the dataframe

df.show()

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|-122.050000|37.370000|         27.000000|3885.000000|    661.000000|1537.000000| 606.000000|     6.608500|     344700.000000|
|-118.300000|34.260000|         43.000000|1510.000000|    310.000000| 809.000000| 277.000000|     3.599000|     176500.000000|
|-117.810000|33.780000|         27.000000|3589.000000|    507.000000|1484.000000| 495.000000|     5.793400|     270500.000000|
|-118.360000|33.820000|         28.000000|  67.000000|     15.000000|  49.000000|  11.000000|     6.135900|     330000.000000|
|-119.670000|36.330000|         19.000000|1241.000000|    244.000000| 850.000000| 237.000000|     2.937500|    

READING DATA AS RDD

Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects

In [0]:
# Read csv to RDD

rdd_1 = spark.sparkContext.textFile("/content/sample_data/california_housing_test.csv")

In [17]:
# View RDD top 5 records

rdd_1.take(5)

['"longitude","latitude","housing_median_age","total_rooms","total_bedrooms","population","households","median_income","median_house_value"',
 '-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000',
 '-118.300000,34.260000,43.000000,1510.000000,310.000000,809.000000,277.000000,3.599000,176500.000000',
 '-117.810000,33.780000,27.000000,3589.000000,507.000000,1484.000000,495.000000,5.793400,270500.000000',
 '-118.360000,33.820000,28.000000,67.000000,15.000000,49.000000,11.000000,6.135900,330000.000000']

CONVERT RDD TO SPARK DATAFRAME

In [0]:
# Importing libraries

from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.types import *

In [19]:
#Converting to dataframe

get_data = rdd_1.map(lambda x : x.split(","))
dataframe = get_data.map(lambda p: Row(longitude=p[0],latitude=p[1],housing_median_age=p[2],total_rooms=p[3],total_bedrooms=p[4],population=p[5],households=p[6],median_income=p[7],median_house_value=p[8]))
rdd_df = spark.createDataFrame(dataframe)
rdd_df.show(10)

+------------+--------------------+----------+-----------+--------------------+---------------+------------+----------------+-------------+
|  households|  housing_median_age|  latitude|  longitude|  median_house_value|  median_income|  population|  total_bedrooms|  total_rooms|
+------------+--------------------+----------+-----------+--------------------+---------------+------------+----------------+-------------+
|"households"|"housing_median_age"|"latitude"|"longitude"|"median_house_value"|"median_income"|"population"|"total_bedrooms"|"total_rooms"|
|  606.000000|           27.000000| 37.370000|-122.050000|       344700.000000|       6.608500| 1537.000000|      661.000000|  3885.000000|
|  277.000000|           43.000000| 34.260000|-118.300000|       176500.000000|       3.599000|  809.000000|      310.000000|  1510.000000|
|  495.000000|           27.000000| 33.780000|-117.810000|       270500.000000|       5.793400| 1484.000000|      507.000000|  3589.000000|
|   11.000000|      

CONVERT SPARK DATAFRAME TO RDD 

In [20]:
# Creating dataframe from the previously generated RDD

df_rdd1 = df.rdd
df_rdd1

MapPartitionsRDD[38] at javaToPython at NativeMethodAccessorImpl.java:0

CONVERT SPARK DATAFRAME TO PANDAS DATAFRAME

In [21]:
# Converting the spark dataframe into pandas dataframe

pandas_df = df.toPandas()
pandas_df

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000
1,-118.300000,34.260000,43.000000,1510.000000,310.000000,809.000000,277.000000,3.599000,176500.000000
2,-117.810000,33.780000,27.000000,3589.000000,507.000000,1484.000000,495.000000,5.793400,270500.000000
3,-118.360000,33.820000,28.000000,67.000000,15.000000,49.000000,11.000000,6.135900,330000.000000
4,-119.670000,36.330000,19.000000,1241.000000,244.000000,850.000000,237.000000,2.937500,81700.000000
...,...,...,...,...,...,...,...,...,...
2995,-119.860000,34.420000,23.000000,1450.000000,642.000000,1258.000000,607.000000,1.179000,225000.000000
2996,-118.140000,34.060000,27.000000,5257.000000,1082.000000,3496.000000,1036.000000,3.390600,237200.000000
2997,-119.700000,36.300000,10.000000,956.000000,201.000000,693.000000,220.000000,2.289500,62000.000000
2998,-117.120000,34.100000,40.000000,96.000000,14.000000,46.000000,14.000000,3.270800,162500.000000


 **3. TASKS**

Here, I performed 8 taskes on my dataset using pyspark.sql.dataframe functions and sql queries. In order to implement SQL queries, we either need to connect to our SQL server or use a temporary view. Here, I created a temporary view for SQL purposes.

1. Select first 10 rows of dataset - pyspark.sql.dataframe

In [22]:
# Looking at the dataframe's first 10 rows

df.show(10)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-122.050000|37.370000|         27.000000|3885.000000|    661.000000|1537.000000|606.000000|     6.608500|     344700.000000|
|-118.300000|34.260000|         43.000000|1510.000000|    310.000000| 809.000000|277.000000|     3.599000|     176500.000000|
|-117.810000|33.780000|         27.000000|3589.000000|    507.000000|1484.000000|495.000000|     5.793400|     270500.000000|
|-118.360000|33.820000|         28.000000|  67.000000|     15.000000|  49.000000| 11.000000|     6.135900|     330000.000000|
|-119.670000|36.330000|         19.000000|1241.000000|    244.000000| 850.000000|237.000000|     2.937500|      81700.

1. Select first 10 rows of dataset - SQL

In [0]:
# Creating a temperary view to implement SQL queries in Spark

df.createOrReplaceTempView("df_sql")

In [24]:
# First 10 rows using SQL

task_1 = 'select * from df_sql LIMIT 10'
spark.sql(task_1).show()

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-122.050000|37.370000|         27.000000|3885.000000|    661.000000|1537.000000|606.000000|     6.608500|     344700.000000|
|-118.300000|34.260000|         43.000000|1510.000000|    310.000000| 809.000000|277.000000|     3.599000|     176500.000000|
|-117.810000|33.780000|         27.000000|3589.000000|    507.000000|1484.000000|495.000000|     5.793400|     270500.000000|
|-118.360000|33.820000|         28.000000|  67.000000|     15.000000|  49.000000| 11.000000|     6.135900|     330000.000000|
|-119.670000|36.330000|         19.000000|1241.000000|    244.000000| 850.000000|237.000000|     2.937500|      81700.

2. Show the schema of of the dataset - pyspark.sql.dataframe

Looking at the datatypes and nullable constraints for each column

In [25]:
# Schema of our data using pyspark

df.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housing_median_age: string (nullable = true)
 |-- total_rooms: string (nullable = true)
 |-- total_bedrooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- median_income: string (nullable = true)
 |-- median_house_value: string (nullable = true)



2. Show the schema of of the dataset - SQL

In [26]:
# Schema using Sql's describe method using SQL
 
task_2 = "DESCRIBE FORMATTED df_sql"
spark.sql(task_2).show()

+------------------+---------+-------+
|          col_name|data_type|comment|
+------------------+---------+-------+
|         longitude|   string|   null|
|          latitude|   string|   null|
|housing_median_age|   string|   null|
|       total_rooms|   string|   null|
|    total_bedrooms|   string|   null|
|        population|   string|   null|
|        households|   string|   null|
|     median_income|   string|   null|
|median_house_value|   string|   null|
+------------------+---------+-------+



3. Group by and get max, min, count of a column in the dataset - pyspark.sql.dataframe

The GROUP BY statement groups rows that have the same values into summary rows

In [27]:
# count of each income class and it's min and max values using pyspark

df.groupBy('median_income').count().show()
df.select(max('median_income'),min('median_income')).show()

+-------------+-----+
|median_income|count|
+-------------+-----+
|     4.638900|    1|
|     4.693000|    1|
|     5.898000|    1|
|    10.598100|    1|
|     0.740300|    1|
|     3.272700|    1|
|     4.931800|    1|
|     2.306800|    1|
|     2.401000|    1|
|     4.163000|    1|
|     3.209100|    1|
|     1.442700|    1|
|     2.021700|    1|
|     4.736100|    2|
|     4.786100|    1|
|     2.853300|    1|
|     5.273000|    1|
|     1.000000|    1|
|     4.033300|    1|
|     3.956600|    1|
+-------------+-----+
only showing top 20 rows

+------------------+------------------+
|max(median_income)|min(median_income)|
+------------------+------------------+
|          9.870800|          0.499900|
+------------------+------------------+



3. Group by and get max, min, count of a column in the dataset - SQL

In [28]:
# Using groupby for median_income and max and min functions of SQL

task_3 = "select median_income, count(median_income) from df_sql group by median_income"
spark.sql(task_3).show()

task_3_1 = "select MAX(median_income) as max_median_income from df_sql"
spark.sql(task_3_1).show()

task_3_2 = "select MIN(median_income) as min_median_income from df_sql"
spark.sql(task_3_2).show()

+-------------+--------------------+
|median_income|count(median_income)|
+-------------+--------------------+
|     4.638900|                   1|
|     4.693000|                   1|
|     5.898000|                   1|
|    10.598100|                   1|
|     0.740300|                   1|
|     3.272700|                   1|
|     4.931800|                   1|
|     2.306800|                   1|
|     2.401000|                   1|
|     4.163000|                   1|
|     3.209100|                   1|
|     1.442700|                   1|
|     2.021700|                   1|
|     4.736100|                   2|
|     4.786100|                   1|
|     2.853300|                   1|
|     5.273000|                   1|
|     1.000000|                   1|
|     4.033300|                   1|
|     3.956600|                   1|
+-------------+--------------------+
only showing top 20 rows

+-----------------+
|max_median_income|
+-----------------+
|         9.870800|
+-----

4. Filter your dataset by some conditions based on your column - pyspark.sql.dataframe

Analysis on a dataset based on condition is very helpful to extract specific information from the data. Let's have a look at one such conditional extraction.

In [29]:
# As seen above, the lowest is 0.49 and highest is 9.8. Let's look at incomes > 3 using pyspark

df.filter(df['median_income'] > 3).show()

+-----------+---------+------------------+------------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age| total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+------------+--------------+-----------+-----------+-------------+------------------+
|-122.050000|37.370000|         27.000000| 3885.000000|    661.000000|1537.000000| 606.000000|     6.608500|     344700.000000|
|-117.810000|33.780000|         27.000000| 3589.000000|    507.000000|1484.000000| 495.000000|     5.793400|     270500.000000|
|-118.360000|33.820000|         28.000000|   67.000000|     15.000000|  49.000000|  11.000000|     6.135900|     330000.000000|
|-121.930000|37.250000|         36.000000| 1089.000000|    182.000000| 535.000000| 170.000000|     4.690000|     252600.000000|
|-117.030000|32.970000|         16.000000| 3936.000000|    694.000000|1935.000000| 659.000000|     4.562

4. Filter your dataset by some conditions based on your column - SQL

In [30]:
# Looking at the data with income > 3.0 using SQL

task_4 = "select * from df_sql where median_income > 3"
spark.sql(task_4).show()

+-----------+---------+------------------+------------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age| total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+------------+--------------+-----------+-----------+-------------+------------------+
|-122.050000|37.370000|         27.000000| 3885.000000|    661.000000|1537.000000| 606.000000|     6.608500|     344700.000000|
|-117.810000|33.780000|         27.000000| 3589.000000|    507.000000|1484.000000| 495.000000|     5.793400|     270500.000000|
|-118.360000|33.820000|         28.000000|   67.000000|     15.000000|  49.000000|  11.000000|     6.135900|     330000.000000|
|-121.930000|37.250000|         36.000000| 1089.000000|    182.000000| 535.000000| 170.000000|     4.690000|     252600.000000|
|-117.030000|32.970000|         16.000000| 3936.000000|    694.000000|1935.000000| 659.000000|     4.562

5. Apply order by - pyspark.sql.dataframe

Ordering data based on a column values

In [31]:
# Ordering the data based on the house values using pyspark

df.select('total_rooms', 'total_bedrooms', 'median_house_value').orderBy('median_house_value').show()

+-----------+--------------+------------------+
|total_rooms|total_bedrooms|median_house_value|
+-----------+--------------+------------------+
|3854.000000|   1046.000000|     100000.000000|
|2235.000000|    545.000000|     100000.000000|
|1439.000000|    327.000000|     100000.000000|
| 432.000000|     87.000000|     100000.000000|
|1200.000000|    468.000000|     100000.000000|
|2722.000000|    479.000000|     100000.000000|
| 831.000000|    149.000000|     100000.000000|
| 954.000000|    233.000000|     100000.000000|
|1138.000000|    304.000000|     100000.000000|
|2668.000000|    510.000000|     100000.000000|
| 764.000000|    200.000000|     100000.000000|
| 980.000000|    193.000000|     100000.000000|
|1886.000000|    586.000000|     100000.000000|
|3585.000000|    548.000000|     100100.000000|
|4776.000000|   1082.000000|     100500.000000|
|2581.000000|    499.000000|     100500.000000|
|1724.000000|    432.000000|     100600.000000|
|1477.000000|    264.000000|     100600.

5. Apply order by - SQL

In [32]:
# Ordering the data by mdeian house values using SQL's order by

task_5 = "select total_rooms, total_bedrooms, median_house_value from df_sql order by median_house_value"
spark.sql(task_5).show()

+-----------+--------------+------------------+
|total_rooms|total_bedrooms|median_house_value|
+-----------+--------------+------------------+
|3854.000000|   1046.000000|     100000.000000|
|2235.000000|    545.000000|     100000.000000|
|1439.000000|    327.000000|     100000.000000|
| 432.000000|     87.000000|     100000.000000|
|1200.000000|    468.000000|     100000.000000|
|2722.000000|    479.000000|     100000.000000|
| 831.000000|    149.000000|     100000.000000|
| 954.000000|    233.000000|     100000.000000|
|1138.000000|    304.000000|     100000.000000|
|2668.000000|    510.000000|     100000.000000|
| 764.000000|    200.000000|     100000.000000|
| 980.000000|    193.000000|     100000.000000|
|1886.000000|    586.000000|     100000.000000|
|3585.000000|    548.000000|     100100.000000|
|4776.000000|   1082.000000|     100500.000000|
|2581.000000|    499.000000|     100500.000000|
|1724.000000|    432.000000|     100600.000000|
|1477.000000|    264.000000|     100600.

6. Select distinct records by a column - pyspark.sql.dataframe

Getting unique values for a column can be useful to find the non-repeating values. We will explore distinct values of the median incomes for our data.

In [33]:
# All distinct incomes using pyspark

df.select('median_income').distinct().show()

+-------------+
|median_income|
+-------------+
|     4.638900|
|     4.693000|
|     5.898000|
|    10.598100|
|     0.740300|
|     3.272700|
|     4.931800|
|     2.306800|
|     2.401000|
|     4.163000|
|     3.209100|
|     1.442700|
|     2.021700|
|     4.736100|
|     4.786100|
|     2.853300|
|     5.273000|
|     1.000000|
|     4.033300|
|     3.956600|
+-------------+
only showing top 20 rows



6. Select distinct records by a column - SQL

In [34]:
# All distinct incomes using SQL

task_6 = "select distinct median_income from df_sql"
spark.sql(task_6).show()

+-------------+
|median_income|
+-------------+
|     4.638900|
|     4.693000|
|     5.898000|
|    10.598100|
|     0.740300|
|     3.272700|
|     4.931800|
|     2.306800|
|     2.401000|
|     4.163000|
|     3.209100|
|     1.442700|
|     2.021700|
|     4.736100|
|     4.786100|
|     2.853300|
|     5.273000|
|     1.000000|
|     4.033300|
|     3.956600|
+-------------+
only showing top 20 rows



7. Transform the data type of column from string to float - pyspark.sql.dataframe

In this data, all columns are of string datatype, but as seen in the description, we have float values in various columns as well, so we can cast the type of that column from string to integer or float.

In [35]:
# Casting of column datatype from string to float using pyspark

df_n = df.withColumn('median_income', col('median_income').cast((FloatType())))
df_n.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housing_median_age: string (nullable = true)
 |-- total_rooms: string (nullable = true)
 |-- total_bedrooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- median_income: float (nullable = true)
 |-- median_house_value: string (nullable = true)



7. Transform the data type of column from string to float - SQL

In [36]:
# Type casting in SQL

task_7 = "select CAST(median_income as FLOAT) from df_sql"
task_7_column = spark.sql(task_7)

task_7_column.dtypes

[('median_income', 'float')]

8. Apply group by with having clause - pyspark.sql.dataframe

Grouping a column value (categorical) and sum of population based on those values

In [37]:
# Groupby with having using pyspark

df.groupBy('median_income').agg(sum('population')).alias('pop_income').show()

+-------------+---------------+
|median_income|sum(population)|
+-------------+---------------+
|     4.638900|          499.0|
|     4.693000|         2760.0|
|     5.898000|         1055.0|
|    10.598100|         1605.0|
|     0.740300|         1046.0|
|     3.272700|          709.0|
|     4.931800|         1196.0|
|     2.306800|          664.0|
|     2.401000|         1121.0|
|     4.163000|          660.0|
|     3.209100|         2332.0|
|     1.442700|         1623.0|
|     2.021700|          956.0|
|     4.736100|         3320.0|
|     4.786100|         1309.0|
|     2.853300|          439.0|
|     5.273000|          829.0|
|     1.000000|          133.0|
|     4.033300|         2003.0|
|     3.956600|         2272.0|
+-------------+---------------+
only showing top 20 rows



8. Apply group by with having clause - SQL

In [38]:
# Groupby with having using SQL

task_8 = 'SELECT median_income, sum(population) from df_sql GROUP BY median_income HAVING sum(population) > 660'
spark.sql(task_8).show()

+-------------+-------------------------------+
|median_income|sum(CAST(population AS DOUBLE))|
+-------------+-------------------------------+
|     4.693000|                         2760.0|
|     5.898000|                         1055.0|
|    10.598100|                         1605.0|
|     0.740300|                         1046.0|
|     3.272700|                          709.0|
|     4.931800|                         1196.0|
|     2.306800|                          664.0|
|     2.401000|                         1121.0|
|     3.209100|                         2332.0|
|     1.442700|                         1623.0|
|     2.021700|                          956.0|
|     4.736100|                         3320.0|
|     4.786100|                         1309.0|
|     5.273000|                          829.0|
|     4.033300|                         2003.0|
|     3.956600|                         2272.0|
|     2.932400|                          850.0|
|     5.288100|                         