In [None]:
#Install Java Development kit for Spark
!apt-get install openjdk-8-jdk

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java libatk-wrapper-java-jni libfontenc1
  libgail-common libgail18 libgtk2.0-0 libgtk2.0-bin libgtk2.0-common libice-dev librsvg2-common
  libsm-dev libxkbfile1 libxt-dev libxtst6 libxxf86dga1 openjdk-8-jdk-headless openjdk-8-jre
  openjdk-8-jre-headless x11-utils
Suggested packages:
  gvfs libice-doc libsm-doc libxt-doc openjdk-8-demo openjdk-8-source visualvm libnss-mdns
  fonts-nanum fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei
  fonts-indic mesa-utils
The following NEW packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java libatk-wrapper-java-jni libfontenc1
  libgail-common libgail18 libgtk2.0-0 libgtk2.0-bin libgtk2.0-common libice-dev librsvg2-common
  libsm-dev libxkbfile1 libxt-dev libxtst6 libxxf86dga1 openjdk-

In [None]:
import os
#Set the JAVA_HOME env variable
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
#Current working directory
!pwd

/content


In [None]:
!echo $JAVA_HOME

/usr/lib/jvm/java-8-openjdk-amd64


In [None]:
#Install PySpark with latest version
!pip install pyspark==3.0.0

Collecting pyspark==3.0.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.0.0)
  Downloading py4j-0.10.9-py2.py3-none-any.whl.metadata (1.3 kB)
Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m12.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044158 sha256=4c72c3b3ac6cd85d5b1f119f0d1ac2277f9e05665bbf25b6246da97d6c4f7f5d
  Stored in directory: /root/.cache/pip/wheels/b1/bb/8b/ca24d3f756f2ed967225b0871898869db676eb5846df5adc56
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attem

In [None]:
#mount your drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer,OneHotEncoder
from pyspark.sql import SparkSession

Foundational Code Snippets StringIndexer

StringIndexer assigns indices to categories based on how often they appear in the input column. The category that occurs most frequently gets the lowest index (0.0), the next most frequent gets the next index (1.0), and so on.

In [None]:


# Create Spark session
spark = SparkSession.builder.appName("StringIndexerExample").getOrCreate()

# Sample data
data = spark.createDataFrame([
    (0, "cat"),
    (1, "dog"),
    (2, "dog"),
    (3, "cat"),
    (4, "rabbit"),
    (5, "dog")
], ["id", "animal"])

data.show()


+---+------+
| id|animal|
+---+------+
|  0|   cat|
|  1|   dog|
|  2|   dog|
|  3|   cat|
|  4|rabbit|
|  5|   dog|
+---+------+



In [None]:

# Create StringIndexer
indexer = StringIndexer(inputCol="animal", outputCol="animalIndex")

# Fit the indexer model and transform the data
indexed_data = indexer.fit(data).transform(data)
indexed_data.show()

+---+------+-----------+
| id|animal|animalIndex|
+---+------+-----------+
|  0|   cat|        1.0|
|  1|   dog|        0.0|
|  2|   dog|        0.0|
|  3|   cat|        1.0|
|  4|rabbit|        2.0|
|  5|   dog|        0.0|
+---+------+-----------+



#One Hot Encoding

In PySpark, OneHotEncoder is used to convert categorical variables into a binary (one-hot) encoded format, which is often necessary for machine learning algorithms. It converts the categorical column into multiple columns, each representing one category as a binary value (0 or 1).


In PySpark, OHE returns a sparse vector which is generally represented as:


(size, [indices], [values])


Where:

size is the total number of categories.

indices is a list of positions where the vector has non-zero elements.

values is a list of the actual non-zero values.

In [None]:
# OneHotEncoder - Convert the indexed column to one-hot encoded format
encoder = OneHotEncoder(inputCol="animalIndex", outputCol="animalVec")#dropLast=True
encoded_data = encoder.fit(indexed_data).transform(indexed_data)

encoded_data.show()

+---+------+-----------+-------------+
| id|animal|animalIndex|    animalVec|
+---+------+-----------+-------------+
|  0|   cat|        1.0|(2,[1],[1.0])|
|  1|   dog|        0.0|(2,[0],[1.0])|
|  2|   dog|        0.0|(2,[0],[1.0])|
|  3|   cat|        1.0|(2,[1],[1.0])|
|  4|rabbit|        2.0|    (2,[],[])|
|  5|   dog|        0.0|(2,[0],[1.0])|
+---+------+-----------+-------------+



#VectorAssembler
VectorAssembler in PySpark is a feature transformer used to combine multiple columns into a single vector column. It is particularly useful in machine learning pipelines where models expect input features to be in the form of vectors. The output of VectorAssembler can then be fed into machine learning algorithms.

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("VectorAssemblerExample").getOrCreate()

# Sample data
data = spark.createDataFrame([
    (0, 18.0, 1.0, 5.0),
    (1, 20.0, 0.0, 3.0),
    (2, 22.0, 1.0, 8.0),
    (3, 25.0, 0.0, 2.0),
], ["id", "age", "gender", "experience"])

# Step 1: Define VectorAssembler
assembler = VectorAssembler(inputCols=["age", "gender", "experience"], outputCol="features")

# Step 2: Apply VectorAssembler to transform the data
output = assembler.transform(data)

# Display the output with the combined feature vector
output.select("id", "features").show(truncate=False)


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession

# Step 1: Create Spark session
spark = SparkSession.builder.appName("PipelineWithOHEExample").getOrCreate()

# Sample data (including a categorical column)
data = spark.createDataFrame([
    (0, 18.0, "male", 5.0, 35000),
    (1, 20.0, "female", 3.0, 45000),
    (2, 22.0, "male", 8.0, 58000),
    (3, 25.0, "female", 2.0, 62000),
], ["id", "age", "gender", "experience", "salary"])

# Step 2: Use StringIndexer to convert the categorical column 'gender' to numerical index
indexer = StringIndexer(inputCol="gender", outputCol="genderIndex")

# Step 3: Use OneHotEncoder to encode the indexed gender column into a vector
encoder = OneHotEncoder(inputCol="genderIndex", outputCol="genderOHE")

# Step 4: Use VectorAssembler to combine the feature columns into a single vector
assembler = VectorAssembler(inputCols=["age", "genderOHE", "experience"], outputCol="features")

# Step 5: Define a LinearRegression model
lr = LinearRegression(featuresCol="features", labelCol="salary")

# Step 6: Create a pipeline with stages: indexer, encoder, assembler, and linear regression model
pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])

# Step 7: Fit the pipeline model to the data
model = pipeline.fit(data)

# Step 8: Make predictions
predictions = model.transform(data)

# Display the predictions
predictions.select("id", "features", "salary", "prediction").show(truncate=False)
