### **Pyspark**
- PySpark is the Python API for Apache Spark. It enables you to perform real-time, large-scale data processing in a distributed environment using Python. It also provides a PySpark shell for interactively analyzing your data.
- PySpark combines Python’s learnability and ease of use with the power of Apache Spark to enable processing and analysis of data at any size for everyone familiar with Python.
- PySpark supports all of Spark’s features such as Spark SQL, DataFrames, Structured Streaming, Machine Learning (MLlib) and Spark Core.

In [3]:
# Import the SparkSession module from PySpark
from pyspark.sql import SparkSession

# Create a SparkSession
# A SparkSession is the entry point to using Spark functionality.
# It configures Spark and sets up an environment for using Spark.
# The `builder` object is used to configure various settings for Spark.
# `appName` is used to set a name for your Spark application.
# `getOrCreate()` tries to get an existing SparkSession or creates a new one if it doesn't exist.
spark = SparkSession.builder.appName("Tutorial_!").getOrCreate()

# Now, you have a SparkSession named 'spark' that you can use for various Spark operations.


In [4]:
spark

**Read the dataset**

In [48]:
df_pyspark = spark.read.csv("./Data/test2.csv")
df_pyspark.show()

+---------+----+----------+------+
|      _c0| _c1|       _c2|   _c3|
+---------+----+----------+------+
|     Name| age|Experience|Salary|
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [49]:
df_pyspark = spark.read.option('header','true').csv("./Data/test1.csv")
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [50]:
df_pyspark = spark.read.option('header','true').csv("./Data/test1.csv", inferSchema = True)
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



**Check the schema**

In [51]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [52]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [53]:
df_pyspark.head(3)

[Row(Name='Krish', age=31, Experience=10, Salary=30000),
 Row(Name='Sudhanshu', age=30, Experience=8, Salary=25000),
 Row(Name='Sunny', age=29, Experience=4, Salary=20000)]

In [54]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [55]:
df_pyspark.select(['Name','Experience']).show()

+---------+----------+
|     Name|Experience|
+---------+----------+
|    Krish|        10|
|Sudhanshu|         8|
|    Sunny|         4|
|     Paul|         3|
|   Harsha|         1|
|  Shubham|         2|
+---------+----------+



In [56]:
df_pyspark['Name']

Column<'Name'>

In [57]:
df_pyspark.columns

['Name', 'age', 'Experience', 'Salary']

In [58]:
df_pyspark.describe().show()

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



In [59]:
df_pyspark.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

**Adding Columns in data frame**

In [60]:

df_pyspark=df_pyspark.withColumn('Experience After 2 year',df_pyspark['Experience']+2)

In [61]:
df_pyspark.show()

+---------+---+----------+------+-----------------------+
|     Name|age|Experience|Salary|Experience After 2 year|
+---------+---+----------+------+-----------------------+
|    Krish| 31|        10| 30000|                     12|
|Sudhanshu| 30|         8| 25000|                     10|
|    Sunny| 29|         4| 20000|                      6|
|     Paul| 24|         3| 20000|                      5|
|   Harsha| 21|         1| 15000|                      3|
|  Shubham| 23|         2| 18000|                      4|
+---------+---+----------+------+-----------------------+



**Drop the columns**

In [62]:
df_pyspark=df_pyspark.drop('Experience After 2 year')

In [63]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



**Rename the columns**

In [64]:
df_pyspark.withColumnRenamed('Name','New Name').show()

+---------+---+----------+------+
| New Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



**Pyspark Handling Missing Values**
- Dropping Columns
- Dropping Rows
- Various Parameter In Dropping functionalities
- Handling Missing values by Mean, MEdian And Mode

In [65]:
df_pyspark.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [66]:
# any==how
df_pyspark.na.drop(how="any").show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [67]:
##threshold
df_pyspark.na.drop(how="any",thresh=3).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [68]:
##Subset
df_pyspark.na.drop(how="any",subset=['Age']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



**Filling the Missing Value**

In [69]:
df_pyspark.na.fill('Missing Values',['Experience','age']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [70]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'], 
    outputCols=[f"{c}_imputed" for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("median")

In [71]:
# Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+---+----------+------+-----------+------------------+--------------+
|     Name|age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+---+----------+------+-----------+------------------+--------------+
|    Krish| 31|        10| 30000|         31|                10|         30000|
|Sudhanshu| 30|         8| 25000|         30|                 8|         25000|
|    Sunny| 29|         4| 20000|         29|                 4|         20000|
|     Paul| 24|         3| 20000|         24|                 3|         20000|
|   Harsha| 21|         1| 15000|         21|                 1|         15000|
|  Shubham| 23|         2| 18000|         23|                 2|         18000|
+---------+---+----------+------+-----------+------------------+--------------+



**Pyspark Dataframes**
- Filter Operation
- &,|,==
- ~

In [72]:
### Salary of the people less than or equal to 20000
df_pyspark.filter("Salary<=20000").show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [73]:
df_pyspark.filter("Salary<=20000").select(['Name','age']).show()

+-------+---+
|   Name|age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [74]:
df_pyspark.filter(df_pyspark['Salary']<=20000).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [75]:
df_pyspark.filter((df_pyspark['Salary']<=20000) | 
                  (df_pyspark['Salary']>=15000)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [76]:
df_pyspark.filter(~(df_pyspark['Salary']<=20000)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
+---------+---+----------+------+



**Pyspark GroupBy And Aggregate Functions**

In [80]:
## Groupby
### Grouped to find the maximum salary
df_pyspark.groupBy('Name').sum().show()

+---------+--------+---------------+-----------+
|     Name|sum(age)|sum(Experience)|sum(Salary)|
+---------+--------+---------------+-----------+
|Sudhanshu|      30|              8|      25000|
|    Sunny|      29|              4|      20000|
|    Krish|      31|             10|      30000|
|   Harsha|      21|              1|      15000|
|     Paul|      24|              3|      20000|
|  Shubham|      23|              2|      18000|
+---------+--------+---------------+-----------+



In [78]:
## Groupby
### Grouped to find the maximum salary
df_pyspark.groupBy('Name').sum('Salary').show()

+---------+-----------+
|     Name|sum(Salary)|
+---------+-----------+
|Sudhanshu|      25000|
|    Sunny|      20000|
|    Krish|      30000|
|   Harsha|      15000|
|     Paul|      20000|
|  Shubham|      18000|
+---------+-----------+



In [81]:
df_pyspark.groupBy('Name').avg().show()

+---------+--------+---------------+-----------+
|     Name|avg(age)|avg(Experience)|avg(Salary)|
+---------+--------+---------------+-----------+
|Sudhanshu|    30.0|            8.0|    25000.0|
|    Sunny|    29.0|            4.0|    20000.0|
|    Krish|    31.0|           10.0|    30000.0|
|   Harsha|    21.0|            1.0|    15000.0|
|     Paul|    24.0|            3.0|    20000.0|
|  Shubham|    23.0|            2.0|    18000.0|
+---------+--------+---------------+-----------+



In [82]:
df_pyspark.groupBy('Name').avg('Salary').show()

+---------+-----------+
|     Name|avg(Salary)|
+---------+-----------+
|Sudhanshu|    25000.0|
|    Sunny|    20000.0|
|    Krish|    30000.0|
|   Harsha|    15000.0|
|     Paul|    20000.0|
|  Shubham|    18000.0|
+---------+-----------+



In [85]:
df_pyspark.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|     128000|
+-----------+



**Examples Of Pyspark ML**

In [86]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=["age","Experience"],outputCol="Independent Features")

In [88]:
output=featureassembler.transform(df_pyspark)

In [89]:
output.show()

+---------+---+----------+------+--------------------+
|     Name|age|Experience|Salary|Independent Features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3| 20000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [90]:
output.columns

['Name', 'age', 'Experience', 'Salary', 'Independent Features']

In [91]:
finalized_data=output.select("Independent Features","Salary")

In [92]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [93]:
from pyspark.ml.regression import LinearRegression
##train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features', labelCol='Salary')
regressor=regressor.fit(train_data)

In [94]:
### Coefficients
regressor.coefficients

DenseVector([-102.53, 1688.6818])

In [95]:
### Intercepts
regressor.intercept

16470.03994673731

In [96]:
### Prediction
pred_results=regressor.evaluate(test_data)

In [97]:
pred_results.predictions.show()

+--------------------+------+-----------------+
|Independent Features|Salary|       prediction|
+--------------------+------+-----------------+
|          [30.0,8.0]| 25000|26903.59520639148|
+--------------------+------+-----------------+



In [98]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(1903.595206391481, 3623674.709796625)