#  **Big Data Processing in Spark**

In my earlier 3 notebooks, I discussed in detail about installing Spark fast, uploading data in Colab and data cleasing.

This notebook focuses on Data Processing as a way to get useful information from data.



## PART 1. Configure PySpark environment

Copy & Paste code below. 

Read more https://github.com/kyramichel/Spark/blob/master/PySpark_GoogleColab.ipynb

In [117]:
#update the packages existing on the machine
!apt-get update

#install java 
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


#install spark: get the file
!wget -q https://archive.apache.org/dist/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
    
#unzip the file
!tar xf spark-2.4.1-bin-hadoop2.7.tgz

#set up the ennvironmental variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.2-bin-hadoop2.7"

#install finspark  
!pip install -q findspark

#importing findspark adds pyspark to the system path, so that next time you can import pyspark like any other python library
import findspark
findspark.init("/content/spark-2.4.1-bin-hadoop2.7")

import pyspark

#SparkContext: the entry point of spark functionality is the interface to running a spark cluster manager
from pyspark import SparkContext, SparkConf


#import a spark session
from pyspark.sql import SparkSession
#create a session
spark = SparkSession.builder.getOrCreate()
spark

#test the installation
df0 = spark.sql("select 'PySpark' as Hello")
df0.show()

Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:2 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:6 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Ign:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:9 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:13 http://ppa.launchpad

# PART 2. Data Processing

Exploratory data analysis (EDA) involves handling nulls, changing data types, shape, summary statistics, data transformation, etc. 

Let's get deeper in data processing and query data in Spark using both Python and SQL.

First, click on File panel, then upload and select your data file to upload it in Colab. More on how to get data in Colab @ https://github.com/kyramichel/Spark/blob/master/DataPysparkCloudColab.ipynb


In [55]:
#Loading the data; param inferSchema=True enables automatic detection of underlying schema
data = spark.read.csv('data2.csv', header=True, inferSchema=True)
data.show()

+---+--------+-----+-----------------+--------------+-----+-------------+--------+----------+------+
| id| Product|Price|             Name|          City|State|      Country|Latitude| Longitude|US Zip|
+---+--------+-----+-----------------+--------------+-----+-------------+--------+----------+------+
|  1|Product1| 1200|           Betina|     Parkville|   MO|United States|  39.195| -94.68194| 64152|
|  2|Product1| 1200|Federica e Andrea|       Astoria|   OR|United States|46.18806|   -123.83| 97103|
|  3|Product2| 3600|           Gerd W|Cahaba Heights|   AL|United States|33.52056|  -86.8025| 35243|
|  4|Product1| 1200|         LAURENCE|     Mickleton|   NJ|United States|   39.79| -75.23806|  8056|
|  5|Product1| 1200|            Fleur|        Peoria|   IL|United States|40.69361| -89.58889| 61601|
|  6|Product1| 1200|             adam|        Martin|   TN|United States|36.34333| -88.85028| 38237|
|  7|Product1| 1200|            Stacy|      New York|   NY|United States|40.71417| -74.0063

In [56]:
data.columns

['id',
 'Product',
 'Price',
 'Name',
 'City',
 'State',
 'Country',
 'Latitude',
 'Longitude',
 'US Zip']

### **Ommiting columns**

- can be useful when analysing a dataset

Like in Python, this can be achieve easily using slicing 

In [82]:
coli = data.columns[0:3]
print(coli)

['id', 'Product', 'Price']


In [84]:
data.select(coli).show()


+---+--------+-----+
| id| Product|Price|
+---+--------+-----+
|  1|Product1| 1200|
|  2|Product1| 1200|
|  3|Product2| 3600|
|  4|Product1| 1200|
|  5|Product1| 1200|
|  6|Product1| 1200|
|  7|Product1| 1200|
|  8|Product1| 1200|
|  9|Product1| 1200|
| 10|Product1| 1200|
| 11|Product1| 1200|
| 12|Product1| 1200|
| 13|Product1| 1200|
| 14|Product1| 1200|
| 15|Product1| 1200|
| 16|Product1| 1200|
| 17|Product1| 1200|
| 18|Product1| 1200|
| 19|Product1| 1200|
| 20|Product1| 1200|
+---+--------+-----+
only showing top 20 rows



In [79]:
#select all columns but last one
except_last = data.columns[:-1] 
print(except_last)


['id', 'Product', 'Price', 'Name', 'City', 'State', 'Country', 'Latitude', 'Longitude']


In [80]:
#Drop last column using drop method
lastcol= data.drop(*except_last)
lastcol.show()


+------+
|US Zip|
+------+
| 64152|
| 97103|
| 35243|
|  8056|
| 61601|
| 38237|
| 10002|
| 78230|
| 83616|
|  8075|
| 84111|
| 91910|
| 77478|
| 10007|
| 61550|
| 95032|
| 10007|
| 33130|
| 11226|
| 20190|
+------+
only showing top 20 rows



In [61]:
#alternatively, use select to drop last column
data.select("US Zip").show()


+------+
|US Zip|
+------+
| 64152|
| 97103|
| 35243|
|  8056|
| 61601|
| 38237|
| 10002|
| 78230|
| 83616|
|  8075|
| 84111|
| 91910|
| 77478|
| 10007|
| 61550|
| 95032|
| 10007|
| 33130|
| 11226|
| 20190|
+------+
only showing top 20 rows



In [62]:
#Drop selected columns
col_list = ['Name', 'City', "US Zip"]
df= data.drop(*col_list)
df.show()


+---+--------+-----+-----+-------------+--------+----------+
| id| Product|Price|State|      Country|Latitude| Longitude|
+---+--------+-----+-----+-------------+--------+----------+
|  1|Product1| 1200|   MO|United States|  39.195| -94.68194|
|  2|Product1| 1200|   OR|United States|46.18806|   -123.83|
|  3|Product2| 3600|   AL|United States|33.52056|  -86.8025|
|  4|Product1| 1200|   NJ|United States|   39.79| -75.23806|
|  5|Product1| 1200|   IL|United States|40.69361| -89.58889|
|  6|Product1| 1200|   TN|United States|36.34333| -88.85028|
|  7|Product1| 1200|   NY|United States|40.71417| -74.00639|
|  8|Product1| 1200|   TX|United States|29.42389| -98.49333|
|  9|Product1| 1200|   ID|United States|43.69556|-116.35306|
| 10|Product1| 1200|   NJ|United States|40.03222| -74.95778|
| 11|Product1| 1200|   UT|United States|40.76083|-111.89028|
| 12|Product1| 1200|   CA|United States|   32.64|-117.08333|
| 13|Product1| 1200|   TX|United States|29.61944| -95.63472|
| 14|Product1| 1200|   N

In [78]:
#Alternatively, use select to filter out uneeded columns:
data2 = data.select(['id', 'Product', 'Price', 'City', 'State', 'Country', 'Latitude', 'Longitude'])
data2.show()

+---+--------+-----+--------------+-----+-------------+--------+----------+
| id| Product|Price|          City|State|      Country|Latitude| Longitude|
+---+--------+-----+--------------+-----+-------------+--------+----------+
|  1|Product1| 1200|     Parkville|   MO|United States|  39.195| -94.68194|
|  2|Product1| 1200|       Astoria|   OR|United States|46.18806|   -123.83|
|  3|Product2| 3600|Cahaba Heights|   AL|United States|33.52056|  -86.8025|
|  4|Product1| 1200|     Mickleton|   NJ|United States|   39.79| -75.23806|
|  5|Product1| 1200|        Peoria|   IL|United States|40.69361| -89.58889|
|  6|Product1| 1200|        Martin|   TN|United States|36.34333| -88.85028|
|  7|Product1| 1200|      New York|   NY|United States|40.71417| -74.00639|
|  8|Product1| 1200|  Shavano Park|   TX|United States|29.42389| -98.49333|
|  9|Product1| 1200|         Eagle|   ID|United States|43.69556|-116.35306|
| 10|Product1| 1200|     Riverside|   NJ|United States|40.03222| -74.95778|
| 11|Product

In [63]:

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



### **Changing the fields data type of a dataframe**

 - can be easily done when creating the dataframe like this:

In [67]:
#Define a new schema in order to change the types of fields
from pyspark.sql.types import *
newschema = StructType([
                        StructField('id', IntegerType()),
                        StructField('Product', StringType()),
                        StructField('Price', IntegerType()),
                        StructField("State", StringType()),
                        StructField('Country', StringType()),
                        StructField("Latitude", FloatType()),
                        StructField('Longitude', FloatType()),
])
  

In [68]:
df1 = spark.read.csv('datacleaned.csv', header=True, schema=newschema)
df1.show()

+---+--------+-----+-----+-------------+--------+----------+
| id| Product|Price|State|      Country|Latitude| Longitude|
+---+--------+-----+-----+-------------+--------+----------+
|  1|Product1| 1200|   MO|United States|  39.195| -94.68194|
|  2|Product1| 1200|   OR|United States|46.18806|   -123.83|
|  3|Product2| 3600|   AL|United States|33.52056|  -86.8025|
|  4|Product1| 1200|   NJ|United States|   39.79| -75.23806|
|  5|Product1| 1200|   IL|United States|40.69361| -89.58889|
|  6|Product1| 1200|   TN|United States|36.34333| -88.85028|
|  7|Product1| 1200|   NY|United States|40.71417| -74.00639|
|  8|Product1| 1200|   TX|United States|29.42389| -98.49333|
|  9|Product1| 1200|   ID|United States|43.69556|-116.35306|
| 10|Product1| 1200|   NJ|United States|40.03222| -74.95778|
| 11|Product1| 1200|   UT|United States|40.76083|-111.89028|
| 12|Product1| 1200|   CA|United States|   32.64|-117.08333|
| 13|Product1| 1200|   TX|United States|29.61944| -95.63472|
| 14|Product1| 1200|   N

In [69]:
#check data types again
df1.dtypes

[('id', 'int'),
 ('Product', 'string'),
 ('Price', 'int'),
 ('State', 'string'),
 ('Country', 'string'),
 ('Latitude', 'float'),
 ('Longitude', 'float')]

### **Alternatively**

it can be achieved by changing the type for each column like this:

df1 = df.withColumn("ColNew", df("Col").cast("DateType")).drop("Col")

### **Summary statistics** 

similar to describe method in Python  

In [77]:

df.describe().show()

+-------+-----------------+--------+------------------+------+-------------+------------------+------------------+
|summary|               id| Product|             Price| State|      Country|          Latitude|         Longitude|
+-------+-----------------+--------+------------------+------+-------------+------------------+------------------+
|  count|              998|     998|               998|   989|          998|               998|               998|
|   mean|            499.5|    null|1633.7675350701402|  null|         null|39.015704860721506|-41.33782005571143|
| stddev|288.2420857543187|    null|1156.0347239235023|  null|         null|19.508572435045227| 67.38947865548309|
|    min|                1|Product1|               250|    AK|    Argentina|           -41.465|        -159.48528|
|    max|              998|Product3|             13000|Zurich|United States|          64.83778|       174.7666667|
+-------+-----------------+--------+------------------+------+-------------+----

#### **Dropna in Pyspark** 

is similar to Python 

it allows removing the corresponding records(rows) or fields(columns) from a dataset is the easiest way to deal with missing data

This data is pretty cleaned so this is only for demonstration 


In [76]:
#drop all rows from the subset of fields
df.dropna(subset=('Price','Latitude','Longitude')).show()

+---+--------+-----+-----+-------------+--------+----------+
| id| Product|Price|State|      Country|Latitude| Longitude|
+---+--------+-----+-----+-------------+--------+----------+
|  1|Product1| 1200|   MO|United States|  39.195| -94.68194|
|  2|Product1| 1200|   OR|United States|46.18806|   -123.83|
|  3|Product2| 3600|   AL|United States|33.52056|  -86.8025|
|  4|Product1| 1200|   NJ|United States|   39.79| -75.23806|
|  5|Product1| 1200|   IL|United States|40.69361| -89.58889|
|  6|Product1| 1200|   TN|United States|36.34333| -88.85028|
|  7|Product1| 1200|   NY|United States|40.71417| -74.00639|
|  8|Product1| 1200|   TX|United States|29.42389| -98.49333|
|  9|Product1| 1200|   ID|United States|43.69556|-116.35306|
| 10|Product1| 1200|   NJ|United States|40.03222| -74.95778|
| 11|Product1| 1200|   UT|United States|40.76083|-111.89028|
| 12|Product1| 1200|   CA|United States|   32.64|-117.08333|
| 13|Product1| 1200|   TX|United States|29.61944| -95.63472|
| 14|Product1| 1200|   N

#### **dropna method supports additional parameters like:** 

that can be useful when dealing with missing values

In [75]:
# drop rows that have less than thresh non-Null values
df.dropna(thresh=3, subset=('Price','Latitude','Longitude')).show()


+---+--------+-----+-----+-------------+--------+----------+
| id| Product|Price|State|      Country|Latitude| Longitude|
+---+--------+-----+-----+-------------+--------+----------+
|  1|Product1| 1200|   MO|United States|  39.195| -94.68194|
|  2|Product1| 1200|   OR|United States|46.18806|   -123.83|
|  3|Product2| 3600|   AL|United States|33.52056|  -86.8025|
|  4|Product1| 1200|   NJ|United States|   39.79| -75.23806|
|  5|Product1| 1200|   IL|United States|40.69361| -89.58889|
|  6|Product1| 1200|   TN|United States|36.34333| -88.85028|
|  7|Product1| 1200|   NY|United States|40.71417| -74.00639|
|  8|Product1| 1200|   TX|United States|29.42389| -98.49333|
|  9|Product1| 1200|   ID|United States|43.69556|-116.35306|
| 10|Product1| 1200|   NJ|United States|40.03222| -74.95778|
| 11|Product1| 1200|   UT|United States|40.76083|-111.89028|
| 12|Product1| 1200|   CA|United States|   32.64|-117.08333|
| 13|Product1| 1200|   TX|United States|29.61944| -95.63472|
| 14|Product1| 1200|   N

In [19]:
#drop only rows that have all fields (cols) Null
df.dropna(how='any').show()

+---+--------+-----+-----+-------------+--------+----------+
| id| Product|Price|State|      Country|Latitude| Longitude|
+---+--------+-----+-----+-------------+--------+----------+
|  1|Product1| 1200|   MO|United States|  39.195| -94.68194|
|  2|Product1| 1200|   OR|United States|46.18806|   -123.83|
|  3|Product2| 3600|   AL|United States|33.52056|  -86.8025|
|  4|Product1| 1200|   NJ|United States|   39.79| -75.23806|
|  5|Product1| 1200|   IL|United States|40.69361| -89.58889|
|  6|Product1| 1200|   TN|United States|36.34333| -88.85028|
|  7|Product1| 1200|   NY|United States|40.71417| -74.00639|
|  8|Product1| 1200|   TX|United States|29.42389| -98.49333|
|  9|Product1| 1200|   ID|United States|43.69556|-116.35306|
| 10|Product1| 1200|   NJ|United States|40.03222| -74.95778|
| 11|Product1| 1200|   UT|United States|40.76083|-111.89028|
| 12|Product1| 1200|   CA|United States|   32.64|-117.08333|
| 13|Product1| 1200|   TX|United States|29.61944| -95.63472|
| 14|Product1| 1200|   N

In [20]:
#display first 5 rows
df.head(5)

[Row(id=1, Product='Product1', Price='1200', State='MO', Country='United States', Latitude=39.19499969482422, Longitude=-94.68193817138672),
 Row(id=2, Product='Product1', Price='1200', State='OR', Country='United States', Latitude=46.18806076049805, Longitude=-123.83000183105469),
 Row(id=3, Product='Product2', Price='3600', State='AL', Country='United States', Latitude=33.52056121826172, Longitude=-86.80249786376953),
 Row(id=4, Product='Product1', Price='1200', State='NJ', Country='United States', Latitude=39.790000915527344, Longitude=-75.2380599975586),
 Row(id=5, Product='Product1', Price='1200', State='IL', Country='United States', Latitude=40.69361114501953, Longitude=-89.5888900756836)]

In [22]:
#df.shape 
print((df.count(), len(df.columns)))

(998, 7)


In [26]:
df.count()

998

In [27]:
#there are no duplicate records
df.distinct().count()

998

### **Create a data frame**


In [89]:
# Create DF 
newDF = spark.createDataFrame([
                               (1, "a", "4", 0), 
                                (2, "b", "10", 3), 
                                (7, "b", "4", 1), 
                                (7, "d", "4", 9)],
                                ("id", "x1", "x2", "y"))
newDF.show()


+---+---+---+---+
| id| x1| x2|  y|
+---+---+---+---+
|  1|  a|  4|  0|
|  2|  b| 10|  3|
|  7|  b|  4|  1|
|  7|  d|  4|  9|
+---+---+---+---+



### **Slicing a data frame in PySpark using indexing is possible**

See: https://stackoverflow.com/a/66610720/4375912


In machine learning data processing often involves filtering out the unneeded columns. This part of the ML data processing comes after sourcing the data and wrangling the data.

In [94]:
slice = newDF.columns[1:3]
newDF.select(slice).show()


+---+---+
| x1| x2|
+---+---+
|  a|  4|
|  b| 10|
|  b|  4|
|  d|  4|
+---+---+



In [95]:
features = newDF.columns[:-1]
newDF.select(features).show()

+---+---+---+
| id| x1| x2|
+---+---+---+
|  1|  a|  4|
|  2|  b| 10|
|  7|  b|  4|
|  7|  d|  4|
+---+---+---+



In [96]:
last_col= newDF.drop(*features)
last_col.show()

+---+
|  y|
+---+
|  0|
|  3|
|  1|
|  9|
+---+



### **Imputing missing values in Spark using Spark ML**

Similar to Python, using the **Imputer** class makes cleansing job easier 

(details next)

In [None]:
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='mean', inputCols=['Col'], outputCols=
['ColImputed'])
imputer_model = imputer.fit(data)
data = imputer_model.transform(data)