In [98]:
from pyspark.sql import SparkSession

In [99]:
spark = SparkSession.builder.config('spark.driver.host','localhost').appName('practice').getOrCreate()
spark

In [100]:
df_spark = spark.read.csv('test2.csv',header=True, inferSchema=True)
df_spark.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [101]:
#drop  null values

df_spark.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [102]:
# drop with threshold

df_spark.na.drop(thresh=3).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+



In [103]:
# drop with subset

df_spark.na.drop(subset='age').show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
|     null| 36|      null|  null|
+---------+---+----------+------+



In [104]:
# filling missing value

#df_spark.na.fill('missing').show() #  for string
#df_spark.na.fill(-1).show() # for int
df_spark.na.fill(-1).na.fill('missing').show() # for both


+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh| -1|        -1| 40000|
|  missing| 34|        10| 38000|
|  missing| 36|        -1|    -1|
+---------+---+----------+------+



In [105]:
#imputer - fill null value with mean,mode,median

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['age','Experience','Salary'], outputCols=[f'{c}_imputed' for c in ['age','Experience','Salary']]).setStrategy('mean')

imputer.fit(df_spark).transform(df_spark).show()



+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         28|                 5|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|  36|      null|  null|         36|                 5|         25750|
+---------+----+----------+-

In [106]:
#filter operation

df_spark.filter('salary<=20000').select(['name','age']).show()


+-------+---+
|   name|age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [107]:
df_spark.filter( (df_spark['salary']<=20000) & (df_spark['salary']>=15000) ).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [108]:
df_spark.filter( ~(df_spark['salary']<=20000) ).show()


+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
+---------+----+----------+------+



In [109]:
#aggregate function


df_spark.groupBy('name').sum().show() #avg,mean, count

+---------+--------+---------------+-----------+
|     name|sum(age)|sum(Experience)|sum(Salary)|
+---------+--------+---------------+-----------+
|     null|      70|             10|      38000|
|Sudhanshu|      30|              8|      25000|
|    Sunny|      29|              4|      20000|
|    Krish|      31|             10|      30000|
|   Harsha|      21|              1|      15000|
|     Paul|      24|              3|      20000|
|  Shubham|      23|              2|      18000|
|   Mahesh|    null|           null|      40000|
+---------+--------+---------------+-----------+



In [110]:
df_spark.agg({'Salary': 'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|     206000|
+-----------+



# Pyspark ML

In [111]:
#df_spark=df_spark.na.fill(-1).na.fill('missing') # fill all null values before using VectorAssembler
df_spark=df_spark.na.drop() # or delete all row with nulls value

In [112]:
from pyspark.ml.feature import VectorAssembler

featureAssembler = VectorAssembler(inputCols=['age','Experience'],
                                  outputCol='independent')

output=featureAssembler.transform(df_spark)

output.show()

+---------+---+----------+------+-----------+
|     Name|age|Experience|Salary|independent|
+---------+---+----------+------+-----------+
|    Krish| 31|        10| 30000|[31.0,10.0]|
|Sudhanshu| 30|         8| 25000| [30.0,8.0]|
|    Sunny| 29|         4| 20000| [29.0,4.0]|
|     Paul| 24|         3| 20000| [24.0,3.0]|
|   Harsha| 21|         1| 15000| [21.0,1.0]|
|  Shubham| 23|         2| 18000| [23.0,2.0]|
+---------+---+----------+------+-----------+



In [115]:
finalized_data = output.select('independent','Salary')
finalized_data.show()

+-----------+------+
|independent|Salary|
+-----------+------+
|[31.0,10.0]| 30000|
| [30.0,8.0]| 25000|
| [29.0,4.0]| 20000|
| [24.0,3.0]| 20000|
| [21.0,1.0]| 15000|
| [23.0,2.0]| 18000|
+-----------+------+



In [120]:
from pyspark.ml.regression import LinearRegression

train_data, test_data = finalized_data.randomSplit([.75,.25])
regressor = LinearRegression(featuresCol='independent',labelCol='Salary')
regressor = regressor.fit(train_data)


In [121]:
regressor.coefficients

DenseVector([109.3058, 1199.4092])

In [122]:
regressor.intercept

12187.592319054227

In [123]:
pred_results = regressor.evaluate(test_data)
pred_results.predictions.show()

+-----------+------+------------------+
|independent|Salary|        prediction|
+-----------+------+------------------+
| [24.0,3.0]| 20000| 18409.15805022155|
|[31.0,10.0]| 30000|27570.162481536143|
+-----------+------+------------------+



In [124]:
pred_results.r2

0.8313022304938102

In [125]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(2010.339734121153, 4217444.237654746)

# indexer

In [126]:
sparks = SparkSession.builder.config('spark.driver.host','localhost').appName('ok').getOrCreate()
sparks

In [127]:
spark_df = sparks.read.csv('tips.csv',header=True, inferSchema=True)
spark_df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [128]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='sex', outputCol='sex_indexed')
df_r = indexer.fit(spark_df).transform(spark_df)
df_r.show()

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|        0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|        0.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|        1.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        0.0|
|     18.43| 3.0|  Male|    No|Sun|Dinne