# Intro

In [None]:
!pip install pyspark



In [None]:
import pyspark

# upload file -> present in downloads
# test1.csv
# test2.csv
# test3.csv
# tips.csv

In [None]:
import pandas as pd
pd.read_csv('test1.csv')

Unnamed: 0,Name,age,Experience,Salary
0,Krish,31,10,30000
1,Sudhanshu,30,8,25000
2,Sunny,29,4,20000
3,Paul,24,3,20000
4,Harsha,21,1,15000
5,Shubham,23,2,18000


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Practice').getOrCreate()

In [None]:
spark

In [None]:
df_pyspark = spark.read.csv('test1.csv')
df_pyspark.show()

+---------+---+----------+------+
|      _c0|_c1|       _c2|   _c3|
+---------+---+----------+------+
|     Name|age|Experience|Salary|
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [None]:
df_pyspark = spark.read.option('header', 'true').csv('test1.csv')

In [None]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [None]:
type(df_pyspark)

In [None]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



# Part 1

- pyspark dataframe
- reading the dataset
- checking the datatypes of the column (schema)
- selecting columns and indexing
- check describe option similar to pandas
- adding columns
- dropping columns
- renaming columns

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Practice').getOrCreate()

In [None]:
spark

In [None]:
# read the dataset
df_pyspark = spark.read.option('header', 'true').csv('test1.csv', inferSchema = True)

In [None]:
df_pyspark.printSchema()

# it takes all the entries to be "STRING" until and unless we specify it.
# inferSchema = True (it will give us the original datatype it is in the raw data)
# nullable = true (it can have null values)

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [None]:
df_pyspark = spark.read.csv('test1.csv', header=True,inferSchema=True)

# another way of reading a .csv file

In [None]:
type(df_pyspark)

# DataFrame is one kind of datastructure.

In [None]:
df_pyspark.columns

['Name', 'age', 'Experience', 'Salary']

In [None]:
df_pyspark.head(3)

# we get in LIST format

[Row(Name='Krish', age=31, Experience=10, Salary=30000),
 Row(Name='Sudhanshu', age=30, Experience=8, Salary=25000),
 Row(Name='Sunny', age=29, Experience=4, Salary=20000)]

In [None]:
type(df_pyspark.select('Name'))

# if you want to select sincle column/field

In [None]:
df_pyspark.select(['Name', 'Experience']).show()

# if you want to select multiple columns/fields

+---------+----------+
|     Name|Experience|
+---------+----------+
|    Krish|        10|
|Sudhanshu|         8|
|    Sunny|         4|
|     Paul|         3|
|   Harsha|         1|
|  Shubham|         2|
+---------+----------+



In [None]:
df_pyspark['Name']

# here the .show() function wont work

Column<'Name'>

In [None]:
df_pyspark.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [None]:
df_pyspark.describe().show()

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  NULL|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  NULL| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



In [None]:
### adding columns in data frame

df_pyspark = df_pyspark.withColumn('Experience after 2 years', df_pyspark['Experience'] + 2)

In [None]:
df_pyspark.show()

+---------+---+----------+------+------------------------+
|     Name|age|Experience|Salary|Experience after 2 years|
+---------+---+----------+------+------------------------+
|    Krish| 31|        10| 30000|                      12|
|Sudhanshu| 30|         8| 25000|                      10|
|    Sunny| 29|         4| 20000|                       6|
|     Paul| 24|         3| 20000|                       5|
|   Harsha| 21|         1| 15000|                       3|
|  Shubham| 23|         2| 18000|                       4|
+---------+---+----------+------+------------------------+



In [None]:
## drop the columns

df_pyspark = df_pyspark.drop('Experience After 2 year')

In [None]:
df_pyspark.show()

+---------+---+----------+------+------------------------+
|     Name|age|Experience|Salary|Experience after 2 years|
+---------+---+----------+------+------------------------+
|    Krish| 31|        10| 30000|                      12|
|Sudhanshu| 30|         8| 25000|                      10|
|    Sunny| 29|         4| 20000|                       6|
|     Paul| 24|         3| 20000|                       5|
|   Harsha| 21|         1| 15000|                       3|
|  Shubham| 23|         2| 18000|                       4|
+---------+---+----------+------+------------------------+



In [None]:
### rename the column

df_pyspark.withColumnRenamed('Name', 'New Name').show()

+---------+---+----------+------+------------------------+
|  New Nme|age|Experience|Salary|Experience after 2 years|
+---------+---+----------+------+------------------------+
|    Krish| 31|        10| 30000|                      12|
|Sudhanshu| 30|         8| 25000|                      10|
|    Sunny| 29|         4| 20000|                       6|
|     Paul| 24|         3| 20000|                       5|
|   Harsha| 21|         1| 15000|                       3|
|  Shubham| 23|         2| 18000|                       4|
+---------+---+----------+------+------------------------+



# Part 2

- handling missing values
- dropping columns
- dropping rows
- various parameter in dropping functionalities
- handling missing values by mean, median, mode

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [None]:
df_2 = spark.read.csv('test2.csv', header=True, inferSchema=True)

In [None]:
df_2.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [None]:
# dropping the columns
df_2.drop('Name').show()

+----+----------+------+
| age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|NULL|      NULL| 40000|
|  34|        10| 38000|
|  36|      NULL|  NULL|
+----+----------+------+



In [None]:
# drop rows (which have null values)
df_2.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [None]:
# df_spark.na.drop(how='any', thresh=None, subset=None)

### how
# default any
# == any -> drop a row if it contains any nulls
# == all -> drop a row only if all its values are null

### thresh
# default None
# if specified, drop rows that have less than 'thresh' non-null values
# this overwrites the 'how' parameter
# eg :- 2, atleast 2 non-null values should be present

### subset
# str, tuple or list, optional
# optional list of column names to consider

In [None]:
df_2.na.drop(how="any", thresh=2).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
+---------+----+----------+------+



In [None]:
df_2.na.drop(how="any", subset=['Experience']).show()
# so in that specific column the null values rows will be eliminated

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     NULL| 34|        10| 38000|
+---------+---+----------+------+



In [None]:
# fil the missing values
df_2.na.fill('Missing Values', ['Name', 'age']).show()

# it should change of all but idk why it isnt

+--------------+----+----------+------+
|          Name| age|Experience|Salary|
+--------------+----+----------+------+
|         Krish|  31|        10| 30000|
|     Sudhanshu|  30|         8| 25000|
|         Sunny|  29|         4| 20000|
|          Paul|  24|         3| 20000|
|        Harsha|  21|         1| 15000|
|       Shubham|  23|         2| 18000|
|        Mahesh|NULL|      NULL| 40000|
|Missing Values|  34|        10| 38000|
|Missing Values|  36|      NULL|  NULL|
+--------------+----+----------+------+



In [None]:
df_2.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



`from pyspark.ml.feature import Imputer`

- Imports Spark ML’s Imputer, used to fill missing values (nulls) in numeric columns.

- Works on DataFrames, not pandas.

- Supports strategies:
"mean"
"median"
"mode"

`inputCols` -> columns that contain missing (null) values

- must be numeric column

`outputCols`
- Creates new column names dynamically
- Spark does not overwrite original columns
- Original + imputed columns will both exist in the DataFrame

`.setStrategy("median")`
- Tells Spark how to compute the replacement value

`imputer.fit(df_pyspark)`
- Computes the median for each input column from the data
- This step “learns” the statistics (like training in sklearn)

In [None]:
### Imputer function (sklearn)

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'],
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("median")

# can change stratergy to mean, median or mode.

In [None]:
# Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+---+----------+------+------------------------+-----------+------------------+--------------+
|     Name|age|Experience|Salary|Experience after 2 years|age_imputed|Experience_imputed|Salary_imputed|
+---------+---+----------+------+------------------------+-----------+------------------+--------------+
|    Krish| 31|        10| 30000|                      12|         31|                10|         30000|
|Sudhanshu| 30|         8| 25000|                      10|         30|                 8|         25000|
|    Sunny| 29|         4| 20000|                       6|         29|                 4|         20000|
|     Paul| 24|         3| 20000|                       5|         24|                 3|         20000|
|   Harsha| 21|         1| 15000|                       3|         21|                 1|         15000|
|  Shubham| 23|         2| 18000|                       4|         23|                 2|         18000|
+---------+---+----------+------+----------------------

# Part 3
- filter operation
- &, |, ==
- ~

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Practice').getOrCreate()

In [3]:
df_3 = spark.read.csv('test1.csv', header=True,inferSchema=True)
df_3.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



### filter operation

In [4]:
# salary of people less than or equal to 20k
df_3.filter("Salary<=20000").show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [5]:
df_3.filter("Salary<=20000").select(['Name', 'age']).show()

+-------+---+
|   Name|age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [6]:
df_3.filter(df_3['Salary']<=20000).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [8]:
df_3.filter((df_3['Salary']<=20000) &
            (df_3['age']>=23)).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [9]:
df_3.filter(~(df_3['Salary']<=20000)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
+---------+---+----------+------+



# Part 4

- pyspark groupby nd aggregate functions

In [10]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [11]:
df_4 = spark.read.csv('test3.csv',header=True, inferSchema=True)

In [12]:
df_4.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [13]:
df_4.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



In [16]:
### groupby operation

# groupby and aggregte functions work hand in hand.
# first we should call groupby then we should call aggregate.

# grouped to find the maximum salary
df_4.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [17]:
# groupby departments which gives maximum salary
df_4.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [19]:
df_4.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [21]:
df_4.groupBy('Name').count().show()

+---------+-----+
|     Name|count|
+---------+-----+
|Sudhanshu|    3|
|    Sunny|    2|
|    Krish|    3|
|   Mahesh|    2|
+---------+-----+



In [23]:
df_4.groupBy('Departments').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [25]:
df_4.agg({'Salary':'sum'}).show()
# total salary

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



In [28]:
df_4.groupBy('Name').max().show()

# so see here even if we specific an column with string datatype but the max() function is applied on the numeric only
# similarly for the count(), sum(), mean() defined above.

+---------+-----------+
|     Name|max(salary)|
+---------+-----------+
|Sudhanshu|      20000|
|    Sunny|      10000|
|    Krish|      10000|
|   Mahesh|       4000|
+---------+-----------+



In [29]:
df_4.groupBy('Name').min().show()

+---------+-----------+
|     Name|min(salary)|
+---------+-----------+
|Sudhanshu|       5000|
|    Sunny|       2000|
|    Krish|       4000|
|   Mahesh|       3000|
+---------+-----------+



# Part 5

MLib documentation
- RDD techniques
- DataFrame API (recent)

In [30]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [32]:
# read the dataset
training = spark.read.csv('test1.csv', header=True, inferSchema=True)

In [33]:
training.show()

# aim is to here understand 2 columns and predict the third one.

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [34]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [35]:
training.columns

['Name', 'age', 'Experience', 'Salary']

In [None]:
# we have to create a way where i we can group all individual features.
# vector assembler
# group the independent features like this [Age, Experience]
# then we treat the group as a new feature
# [Age, Experience] --> new feature --> independent feature

In [37]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=["age", "Experience"], outputCol="Independent Features")

In [38]:
output = featureassembler.transform(training)

In [39]:
output.show()

+---------+---+----------+------+--------------------+
|     Name|age|Experience|Salary|Independent Features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3| 20000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [40]:
output.columns

['Name', 'age', 'Experience', 'Salary', 'Independent Features']

In [41]:
finalized_data = output.select("Independent Features", "Salary")

In [42]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [45]:
from pyspark.ml.regression import LinearRegression

# train test split
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol = 'Independent Features', labelCol='Salary')
regressor = regressor.fit(train_data)

In [46]:
### Coefficients
regressor.coefficients

DenseVector([-90.5483, 1608.7819])

In [47]:
### Intercepts
regressor.intercept

16079.13669064716

In [48]:
### Prediction
pred_results=regressor.evaluate(test_data)

In [49]:
pred_results.predictions.show()

+--------------------+------+------------------+
|Independent Features|Salary|        prediction|
+--------------------+------+------------------+
|          [23.0,2.0]| 18000|17214.090796328448|
+--------------------+------+------------------+



In [51]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(785.909203671552, 617653.276415653)

# Part 6

# MLib - PySpark
https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html