<a href="https://colab.research.google.com/github/kukretinishtha/Pyspark/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [19]:
# Pyspark Introduction

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [1]:
# Install Pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=51e9517b499373f7027efc15a70d5e6b8bfcb53fed5c42aa56ccdfbdf48b51ae
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
import pyspark

In [38]:
!mv ../data.csv sample_data

In [39]:
# Import Pandas
import pandas as pd
pd.read_csv('sample_data/data.csv')

Unnamed: 0,Name,Age,Experience
0,Nishtha,26,1
1,Nidhi,33,2
2,Neha,32,3
3,,29,4
4,XYZ,98,0


In [40]:
# Create Spark Session 
from pyspark.sql import SparkSession

In [41]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [42]:
spark

In [43]:
# Read csv using pyspark
# ------ header - header will be shown
# ------ inferSchema - It will gives the actual data type of every column
df_pyspark = spark.read.csv('sample_data/data.csv', header = 'True', inferSchema = 'True')

In [44]:
# Show dataframe
df_pyspark.show()

+-------+----+----------+
|   Name| Age|Experience|
+-------+----+----------+
|Nishtha|26.0|         1|
|  Nidhi|33.0|         2|
|   Neha|32.0|         3|
|   null|29.0|         4|
|    XYZ|98.0|         0|
+-------+----+----------+



In [45]:
# Print schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Experience: integer (nullable = true)



In [46]:
# Show the 2 rows only
df_pyspark.head(2)

[Row(Name='Nishtha', Age=26.0, Experience=1),
 Row(Name='Nidhi', Age=33.0, Experience=2)]

In [47]:
# Show the entire column
df_pyspark.select('Name').show()

+-------+
|   Name|
+-------+
|Nishtha|
|  Nidhi|
|   Neha|
|   null|
|    XYZ|
+-------+



In [48]:
# Select the column name
df_pyspark.select('Name')

DataFrame[Name: string]

In [49]:
# Select the column name
df_pyspark['Name']

Column<'Name'>

In [50]:
# Describe the column
df_pyspark.describe().show()

+-------+----+------------------+------------------+
|summary|Name|               Age|        Experience|
+-------+----+------------------+------------------+
|  count|   4|                 5|                 5|
|   mean|null|              43.6|               2.0|
| stddev|null|30.533588062984016|1.5811388300841898|
|    min|Neha|              26.0|                 0|
|    max| XYZ|              98.0|                 4|
+-------+----+------------------+------------------+



In [51]:
# Add the new column
df_pyspark = df_pyspark.withColumn('Age after 2 years', df_pyspark['Age']+2)
df_pyspark.show()

+-------+----+----------+-----------------+
|   Name| Age|Experience|Age after 2 years|
+-------+----+----------+-----------------+
|Nishtha|26.0|         1|             28.0|
|  Nidhi|33.0|         2|             35.0|
|   Neha|32.0|         3|             34.0|
|   null|29.0|         4|             31.0|
|    XYZ|98.0|         0|            100.0|
+-------+----+----------+-----------------+



In [52]:
# Drop the column
df_pyspark = df_pyspark.drop('Age after 2 years')
df_pyspark.show()

+-------+----+----------+
|   Name| Age|Experience|
+-------+----+----------+
|Nishtha|26.0|         1|
|  Nidhi|33.0|         2|
|   Neha|32.0|         3|
|   null|29.0|         4|
|    XYZ|98.0|         0|
+-------+----+----------+



In [53]:
# Reanme the column
df_pyspark.withColumnRenamed('Age', 'New Age').show()

+-------+-------+----------+
|   Name|New Age|Experience|
+-------+-------+----------+
|Nishtha|   26.0|         1|
|  Nidhi|   33.0|         2|
|   Neha|   32.0|         3|
|   null|   29.0|         4|
|    XYZ|   98.0|         0|
+-------+-------+----------+



In [57]:
# Drop column if all columns is na
df_pyspark.na.drop(how = 'all').show()

+-------+----+----------+
|   Name| Age|Experience|
+-------+----+----------+
|Nishtha|26.0|         1|
|  Nidhi|33.0|         2|
|   Neha|32.0|         3|
|   null|29.0|         4|
|    XYZ|98.0|         0|
+-------+----+----------+



In [58]:
# Drop column if any column is na
df_pyspark.na.drop(how = 'any').show()

+-------+----+----------+
|   Name| Age|Experience|
+-------+----+----------+
|Nishtha|26.0|         1|
|  Nidhi|33.0|         2|
|   Neha|32.0|         3|
|    XYZ|98.0|         0|
+-------+----+----------+



In [59]:
# Drop column if column is na where threshold = 1
df_pyspark.na.drop(how = 'any', thresh = 1).show()

+-------+----+----------+
|   Name| Age|Experience|
+-------+----+----------+
|Nishtha|26.0|         1|
|  Nidhi|33.0|         2|
|   Neha|32.0|         3|
|   null|29.0|         4|
|    XYZ|98.0|         0|
+-------+----+----------+



In [61]:
#Drop column if column is na  with subset
df_pyspark.na.drop(how = 'any', subset = ['Age']).show()

+-------+----+----------+
|   Name| Age|Experience|
+-------+----+----------+
|Nishtha|26.0|         1|
|  Nidhi|33.0|         2|
|   Neha|32.0|         3|
|   null|29.0|         4|
|    XYZ|98.0|         0|
+-------+----+----------+



In [64]:
# Impute with mean
from pyspark.ml.feature import Imputer 
imputer  = Imputer(
    inputCols = ['Age', 'Experience'],
    outputCols=["{}_imputed".format(c) for c in ['Age', 'Experience']]
).setStrategy('mean')

In [65]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+-------+----+----------+-----------+------------------+
|   Name| Age|Experience|Age_imputed|Experience_imputed|
+-------+----+----------+-----------+------------------+
|Nishtha|26.0|         1|       26.0|                 1|
|  Nidhi|33.0|         2|       33.0|                 2|
|   Neha|32.0|         3|       32.0|                 3|
|   null|29.0|         4|       29.0|                 4|
|    XYZ|98.0|         0|       98.0|                 0|
+-------+----+----------+-----------+------------------+



In [66]:
# Filter with pyspark
df_pyspark.filter('Age<30').show()

+-------+----+----------+
|   Name| Age|Experience|
+-------+----+----------+
|Nishtha|26.0|         1|
|   null|29.0|         4|
+-------+----+----------+



In [68]:
# Filter and show only selected columns
df_pyspark.filter('Age<30').select(['Name','Age']).show()

+-------+----+
|   Name| Age|
+-------+----+
|Nishtha|26.0|
|   null|29.0|
+-------+----+



In [71]:
# Two conditions with filter with selected columns
df_pyspark.filter((df_pyspark['Age']<30) & (df_pyspark['Experience'] > 2)).select(['Name','Age']).show()

+----+----+
|Name| Age|
+----+----+
|null|29.0|
+----+----+



In [72]:
# Not operator with pyspark filter
df_pyspark.filter(~(df_pyspark['Age'] < 30)).select(['Name','Age']).show()

+-----+----+
| Name| Age|
+-----+----+
|Nidhi|33.0|
| Neha|32.0|
|  XYZ|98.0|
+-----+----+

