Setting up PySpark in Colab

Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java

In [46]:
!sudo apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:8 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:14 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Get:15 http://archive.ubuntu.c

Next, we will install Apache Spark 3.0.1 with Hadoop 2.7

In [2]:
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz

Now, we just need to unzip that folder.

In [3]:
!tar xf spark-3.0.1-bin-hadoop2.7.tgz

There is one last thing that we need to install and that is the findspark library. It will locate Spark on the system and import it as a regular library.

In [4]:
!pip install -q findspark

Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.

In [6]:
import findspark
findspark.init()

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


If you want to know the location where Spark is installed, use findspark.find()

In [9]:
findspark.find()

'/content/spark-3.0.1-bin-hadoop2.7'

Now, we can import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark.

You can give a name to the session using appName() and add some configurations with config() if you wish.

In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

Finally, print the SparkSession variable.

In [31]:
spark

In [47]:
#we need to load the dataset. We will use the read.csv module. 
#The inferSchema parameter provided will enable Spark to automatically determine the data type for each column but it has to go over the data once.
# If you don’t want that to happen, then you can instead provide the schema explicitly in the schema parameter.

df = spark.read.csv("/content/data.csv", header=True, inferSchema= True)
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [49]:
df.show(5, truncate=False)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines   |InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract      |PaperlessBilling|PaymentMethod            |MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+
|7590-VHVEG|Female|0            |Yes    |No        |1     |No          |No phone service|DSL            |No            |Yes         |No              |N

In [50]:
#If you didn't set inderShema to True, here is what is happening to the type. There are all in string.
df_string = spark.read.csv("/content/data.csv", header=True, inferSchema=  False)
df_string.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: string (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: string (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: string (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [51]:
#You can select and show the rows with select and the names of the features. Below, gender and churn are selected.
df.select('gender','churn').show(6)

+------+-----+
|gender|churn|
+------+-----+
|Female|   No|
|  Male|   No|
|  Male|  Yes|
|  Male|   No|
|Female|  Yes|
|Female|  Yes|
+------+-----+
only showing top 6 rows



In [52]:
#To get a summary statistics, of the data, you can use describe(). It will compute the :count, mean, standarddeviation, min, max
df.describe().show()

+-------+----------+------+------------------+-------+----------+------------------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+------------------+------------------+-----+
|summary|customerID|gender|     SeniorCitizen|Partner|Dependents|            tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|    MonthlyCharges|      TotalCharges|Churn|
+-------+----------+------+------------------+-------+----------+------------------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+------------------+------------------+-----+
|  count|      7043|  7043|              7043|   7043|      7043|     

In [53]:
#Distinct values for Categorical columns
#The distinct() will come in handy when you want to determine the unique values in the categorical columns in the dataframe.

df.select("PaymentMethod").distinct().show()

+--------------------+
|       PaymentMethod|
+--------------------+
|Credit card (auto...|
|        Mailed check|
|Bank transfer (au...|
|    Electronic check|
+--------------------+



Map reduce example:


In [41]:
# For map reduce we need to create SparkContext so that we can read a textfile and perform mapping function on it. 
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
text_file = sc.textFile("/content/icp4.txt")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 2)) \
             .reduceByKey(lambda a, b: a + b)
for x in counts.collect():
    print (x)

('People', 2)
('who', 2)
('have', 2)
('been', 2)
('fully', 2)
('vaccinated', 2)
('against', 2)
('coronavirus', 2)
('--', 4)
('right', 2)
('now', 2)
('that', 2)
('means', 2)
('with', 4)
('two', 2)
('doses', 2)
('of', 2)
('either', 2)
('the', 8)
('Pfizer/BioNTech', 2)
('or', 2)
('Moderna', 2)
('vaccine', 2)
('can', 2)
('skip', 2)
('quarantine', 2)
('if', 2)
('they', 4)
('are', 2)
('exposed', 2)
('to', 4)
('someone', 2)
('infected', 2)
('virus,', 2)
('US', 2)
('Centers', 2)
('for', 4)
('Disease', 2)
('Control', 2)
('and', 2)
('Prevention', 2)
('said', 2)
('Wednesday.That', 2)
("doesn't", 2)
('mean', 2)
('should', 2)
('stop', 2)
('taking', 2)
('precautions,', 2)
('CDC', 2)
('noted', 2)
('in', 2)
('updated', 2)
('guidance.', 2)
("It's", 2)
('just', 2)
('not', 2)
('necessary', 2)
('them', 2)
('quarantine.', 2)


In [42]:
df = spark.read.csv("/content/data.csv",inferSchema=True, header =True)
df.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|  

In [43]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction

binary_map = {'Yes':2.0, 'No':1.0, 'True':2.0, 'False':1.0}
toNum = UserDefinedFunction(lambda k: binary_map[k], DoubleType())

df = df.drop('MultipleLines').withColumn('Churn', toNum(df['Churn'])).withColumn('Partner', toNum(df['Partner'])).withColumn('Dependents', toNum(df['Dependents'])).cache()

df.show()


+----------+------+-------------+-------+----------+------+------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    2.0|       1.0|     1|          No|            DSL|                 No|    

In [28]:
import pandas as pd
pd.DataFrame(df.take(8), columns=df.columns)

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,1.0,0.0,1,No,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,0.0
1,5575-GNVDE,Male,0,0.0,0.0,34,Yes,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,0.0
2,3668-QPYBK,Male,0,0.0,0.0,2,Yes,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,1.0
3,7795-CFOCW,Male,0,0.0,0.0,45,No,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,0.0
4,9237-HQITU,Female,0,0.0,0.0,2,Yes,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,1.0
5,9305-CDSKC,Female,0,0.0,0.0,8,Yes,Fiber optic,No,No,Yes,No,Yes,Yes,Month-to-month,Yes,Electronic check,99.65,820.5,1.0
6,1452-KIOVK,Male,0,0.0,1.0,22,Yes,Fiber optic,No,Yes,No,No,Yes,No,Month-to-month,Yes,Credit card (automatic),89.1,1949.4,0.0
7,6713-OKOMC,Female,0,0.0,0.0,10,No,DSL,Yes,No,No,No,No,No,Month-to-month,No,Mailed check,29.75,301.9,0.0


In [21]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
gender,7043,,,Female,Male
SeniorCitizen,7043,0.1621468124378816,0.3686116056100135,0,1
Partner,7043,0.4830327985233565,0.49974751071998735,0.0,1.0
Dependents,7043,0.2995882436461735,0.4581101675100144,0.0,1.0
tenure,7043,32.37114865824223,24.559481023094442,0,72
PhoneService,7043,,,No,Yes
MultipleLines,7043,,,No,Yes
InternetService,7043,,,DSL,No
OnlineSecurity,7043,,,No,Yes


In [58]:
df.select('customerID').take(6)

[Row(customerID='7590-VHVEG'),
 Row(customerID='5575-GNVDE'),
 Row(customerID='3668-QPYBK'),
 Row(customerID='7795-CFOCW'),
 Row(customerID='9237-HQITU'),
 Row(customerID='9305-CDSKC')]

In [44]:
df.select('InternetService').first()

Row(InternetService='DSL')