In [1]:
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder.appName("DFPractice").getOrCreate()

In [3]:
spark

In [8]:
df = spark.read.option("header","true").csv("test1.csv", inferSchema=True)

In [9]:
df.show()

+-------+---+----------+
|   name|age|experience|
+-------+---+----------+
|   John| 25|         3|
|  Sarah| 30|         5|
|Michael| 35|         7|
|  Emily| 28|         4|
+-------+---+----------+



In [10]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



In [12]:
df = spark.read.csv("test1.csv", header=True, inferSchema=True)
df.show()

+-------+---+----------+
|   name|age|experience|
+-------+---+----------+
|   John| 25|         3|
|  Sarah| 30|         5|
|Michael| 35|         7|
|  Emily| 28|         4|
+-------+---+----------+



In [13]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



In [14]:
type(df)

pyspark.sql.dataframe.DataFrame

In [15]:
df.head(2)

[Row(name='John', age=25, experience=3),
 Row(name='Sarah', age=30, experience=5)]

In [17]:
type(df.select("name"))

pyspark.sql.dataframe.DataFrame

In [18]:
df.select("name").show()

+-------+
|   name|
+-------+
|   John|
|  Sarah|
|Michael|
|  Emily|
+-------+



In [19]:
df.select(["name","age"]).show()

+-------+---+
|   name|age|
+-------+---+
|   John| 25|
|  Sarah| 30|
|Michael| 35|
|  Emily| 28|
+-------+---+



In [20]:
df.dtypes

[('name', 'string'), ('age', 'int'), ('experience', 'int')]

In [21]:
df.describe().show()

+-------+-----+-----------------+-----------------+
|summary| name|              age|       experience|
+-------+-----+-----------------+-----------------+
|  count|    4|                4|                4|
|   mean| NULL|             29.5|             4.75|
| stddev| NULL|4.203173404306164|1.707825127659933|
|    min|Emily|               25|                3|
|    max|Sarah|               35|                7|
+-------+-----+-----------------+-----------------+



In [23]:
### Adding Columns
df = df.withColumn('experience after 2 years', df['experience']+2)

In [24]:
df.show()

+-------+---+----------+------------------------+
|   name|age|experience|experience after 2 years|
+-------+---+----------+------------------------+
|   John| 25|         3|                       5|
|  Sarah| 30|         5|                       7|
|Michael| 35|         7|                       9|
|  Emily| 28|         4|                       6|
+-------+---+----------+------------------------+



In [26]:
### Renaming Columns

df = df.drop('experience after 2 years')

In [28]:
df.show()

+-------+---+----------+
|   name|age|experience|
+-------+---+----------+
|   John| 25|         3|
|  Sarah| 30|         5|
|Michael| 35|         7|
|  Emily| 28|         4|
+-------+---+----------+



In [29]:
df.withColumnRenamed('name', 'first_name').show()

+----------+---+----------+
|first_name|age|experience|
+----------+---+----------+
|      John| 25|         3|
|     Sarah| 30|         5|
|   Michael| 35|         7|
|     Emily| 28|         4|
+----------+---+----------+



### Missing Values
##### Dropping Columns
##### Dropping Rows
##### Params in Dropping functionality
##### Mean Median Mode Imoutation


In [30]:
df = spark.read.csv("test2.csv", header=True, inferSchema=True)

In [31]:
df.show()

+-------+----+----------+------+
|   name| age|experience|salary|
+-------+----+----------+------+
|   John|  25|         3| 50000|
|  Sarah|  30|         5| 60000|
|Michael|  35|         7| 80000|
|  Emily|  28|         4| 55000|
|  James|NULL|      NULL| 90000|
|   NULL|  40|        10|100000|
|   NULL|  45|        12|  NULL|
+-------+----+----------+------+



In [32]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [33]:
df.drop("Name").show()

+----+----------+------+
| age|experience|salary|
+----+----------+------+
|  25|         3| 50000|
|  30|         5| 60000|
|  35|         7| 80000|
|  28|         4| 55000|
|NULL|      NULL| 90000|
|  40|        10|100000|
|  45|        12|  NULL|
+----+----------+------+



In [34]:
df.na.drop().show() #Drop all null rows

+-------+---+----------+------+
|   name|age|experience|salary|
+-------+---+----------+------+
|   John| 25|         3| 50000|
|  Sarah| 30|         5| 60000|
|Michael| 35|         7| 80000|
|  Emily| 28|         4| 55000|
+-------+---+----------+------+



In [35]:
#how keyword
df.na.drop(how="any").show() #Drop rows with any null values

+-------+---+----------+------+
|   name|age|experience|salary|
+-------+---+----------+------+
|   John| 25|         3| 50000|
|  Sarah| 30|         5| 60000|
|Michael| 35|         7| 80000|
|  Emily| 28|         4| 55000|
+-------+---+----------+------+



In [36]:
#how keyword
df.na.drop(how="all").show() #Drop rows with all null values

+-------+----+----------+------+
|   name| age|experience|salary|
+-------+----+----------+------+
|   John|  25|         3| 50000|
|  Sarah|  30|         5| 60000|
|Michael|  35|         7| 80000|
|  Emily|  28|         4| 55000|
|  James|NULL|      NULL| 90000|
|   NULL|  40|        10|100000|
|   NULL|  45|        12|  NULL|
+-------+----+----------+------+



In [37]:
#thresh keyword
df.na.drop(thresh=3).show() #Drop rows with less than 3 non-null values

+-------+---+----------+------+
|   name|age|experience|salary|
+-------+---+----------+------+
|   John| 25|         3| 50000|
|  Sarah| 30|         5| 60000|
|Michael| 35|         7| 80000|
|  Emily| 28|         4| 55000|
|   NULL| 40|        10|100000|
+-------+---+----------+------+



In [38]:
#subset keyword
df.na.drop(how="any", subset=["Experience"]).show() #Drop rows with null values in Experience column

+-------+---+----------+------+
|   name|age|experience|salary|
+-------+---+----------+------+
|   John| 25|         3| 50000|
|  Sarah| 30|         5| 60000|
|Michael| 35|         7| 80000|
|  Emily| 28|         4| 55000|
|   NULL| 40|        10|100000|
|   NULL| 45|        12|  NULL|
+-------+---+----------+------+



In [42]:
df.na.fill("FILL VALUE").show() #Fill all null values with FILL VALUE for all string columns

+----------+----+----------+------+
|      name| age|experience|salary|
+----------+----+----------+------+
|      John|  25|         3| 50000|
|     Sarah|  30|         5| 60000|
|   Michael|  35|         7| 80000|
|     Emily|  28|         4| 55000|
|     James|NULL|      NULL| 90000|
|FILL VALUE|  40|        10|100000|
|FILL VALUE|  45|        12|  NULL|
+----------+----+----------+------+



In [43]:
df.na.fill(0).show() #Fill all null values with 0 for all integer columns

+-------+---+----------+------+
|   name|age|experience|salary|
+-------+---+----------+------+
|   John| 25|         3| 50000|
|  Sarah| 30|         5| 60000|
|Michael| 35|         7| 80000|
|  Emily| 28|         4| 55000|
|  James|  0|         0| 90000|
|   NULL| 40|        10|100000|
|   NULL| 45|        12|     0|
+-------+---+----------+------+



In [45]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=["age","experience","salary"],
    outputCols=["{}_imputed".format(c) for c in ["age","experience","salary"]]
).setStrategy("mean")

In [46]:
# Fit the imputer and then transform the dataframe
imputer.fit(df).transform(df).show()

+-------+----+----------+------+-----------+------------------+--------------+
|   name| age|experience|salary|age_imputed|experience_imputed|salary_imputed|
+-------+----+----------+------+-----------+------------------+--------------+
|   John|  25|         3| 50000|         25|                 3|         50000|
|  Sarah|  30|         5| 60000|         30|                 5|         60000|
|Michael|  35|         7| 80000|         35|                 7|         80000|
|  Emily|  28|         4| 55000|         28|                 4|         55000|
|  James|NULL|      NULL| 90000|         33|                 6|         90000|
|   NULL|  40|        10|100000|         40|                10|        100000|
|   NULL|  45|        12|  NULL|         45|                12|         72500|
+-------+----+----------+------+-----------+------------------+--------------+



##### Filter Operations
##### &,|,=,~

In [49]:
df = spark.read.csv("test3.csv", header=True, inferSchema=True)
df.show()

+-------+---+----------+------+
|   name|age|experience|salary|
+-------+---+----------+------+
|   John| 25|         3| 50000|
|  Sarah| 30|         5| 60000|
|Michael| 35|         7| 80000|
|  Emily| 28|         4| 55000|
+-------+---+----------+------+



In [53]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [52]:
df.filter("salary<60000").show()

+-----+---+----------+------+
| name|age|experience|salary|
+-----+---+----------+------+
| John| 25|         3| 50000|
|Emily| 28|         4| 55000|
+-----+---+----------+------+



In [63]:
df.filter("salary<60000").select(["name","age"]).show() #Selecting specific columns after filtering

+-----+---+
| name|age|
+-----+---+
| John| 25|
|Emily| 28|
+-----+---+



In [59]:
df.filter((df['salary']<60000) & (df['age']<27)).show()

+----+---+----------+------+
|name|age|experience|salary|
+----+---+----------+------+
|John| 25|         3| 50000|
+----+---+----------+------+



In [66]:
df.filter(~(df['salary']<60000)).show()

+-------+---+----------+------+
|   name|age|experience|salary|
+-------+---+----------+------+
|  Sarah| 30|         5| 60000|
|Michael| 35|         7| 80000|
+-------+---+----------+------+



In [68]:
df.filter((df['salary']<60000)).show()

+-----+---+----------+------+
| name|age|experience|salary|
+-----+---+----------+------+
| John| 25|         3| 50000|
|Emily| 28|         4| 55000|
+-----+---+----------+------+



### Groupby and Aggregate Function

In [104]:
df = spark.read.csv("test4.csv", header=True, inferSchema=True)
df.show()

+-------+-----------+------+
|   name| department|salary|
+-------+-----------+------+
|Krishna|         IT| 10000|
|   Ravi|         IT| 20000|
|    Raj|         IT| 30000|
|Krishna|         HR| 15000|
| Sudhan|         HR| 25000|
| Rajesh|         HR| 35000|
|    Ram|         HR| 45000|
| Sudhan|    BigData| 50000|
|Krishna|    BigData| 60000|
| Chaman|DataScience| 70000|
|   Ravi|DataScience| 80000|
| Sudhan|        IOS| 90000|
+-------+-----------+------+



In [105]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = true)



In [108]:
df.groupBy("name").sum().show() #Grouping by name and summing all other numerical columns

+-------+-----------+
|   name|sum(salary)|
+-------+-----------+
|   Ravi|     100000|
|Krishna|      85000|
|    Ram|      45000|
| Chaman|      70000|
| Rajesh|      35000|
|    Raj|      30000|
| Sudhan|     165000|
+-------+-----------+



In [109]:
df.groupBy("department").sum().show() # Grouping by department and summing up all the columns

+-----------+-----------+
| department|sum(salary)|
+-----------+-----------+
|         HR|     120000|
|    BigData|     110000|
|        IOS|      90000|
|         IT|      60000|
|DataScience|     150000|
+-----------+-----------+



In [110]:
df.groupBy("department").mean().show() # Mean of all numeric columns

+-----------+-----------+
| department|avg(salary)|
+-----------+-----------+
|         HR|    30000.0|
|    BigData|    55000.0|
|        IOS|    90000.0|
|         IT|    20000.0|
|DataScience|    75000.0|
+-----------+-----------+



In [112]:
df.groupBy("department").count().show() #Count of each department

+-----------+-----+
| department|count|
+-----------+-----+
|         HR|    4|
|    BigData|    2|
|        IOS|    1|
|         IT|    3|
|DataScience|    2|
+-----------+-----+



In [113]:
df.groupBy("name").max().show()

+-------+-----------+
|   name|max(salary)|
+-------+-----------+
|   Ravi|      80000|
|Krishna|      60000|
|    Ram|      45000|
| Chaman|      70000|
| Rajesh|      35000|
|    Raj|      30000|
| Sudhan|      90000|
+-------+-----------+



### MLlib

In [117]:
df = spark.read.csv("test5.csv", header=True, inferSchema=True)
df.show()

+---------+---+----------+------+
|     name|age|experience|salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [118]:
from pyspark.ml.feature import VectorAssembler

featureassembler = VectorAssembler(inputCols=["age","experience"], outputCol="independent_features")


In [119]:
output = featureassembler.transform(df)

In [120]:
output.show()

+---------+---+----------+------+--------------------+
|     name|age|experience|salary|independent_features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3| 20000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [121]:
final_data = output.select("independent_features","salary")

In [122]:
final_data.show()

+--------------------+------+
|independent_features|salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [126]:
from pyspark.ml.regression import LinearRegression
train_data, test_data = final_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol="independent_features", labelCol="salary")
regressor = regressor.fit(train_data)

In [127]:
regressor.coefficients

DenseVector([47.619, 1285.7143])

In [128]:
regressor.intercept

13619.047619047662

In [130]:
pred_results = regressor.evaluate(test_data)

In [131]:
pred_results.predictions.show()

+--------------------+------+-----------------+
|independent_features|salary|       prediction|
+--------------------+------+-----------------+
|          [23.0,2.0]| 18000|17285.71428571428|
|         [31.0,10.0]| 30000|27952.38095238097|
+--------------------+------+-----------------+



In [132]:
pred_results.meanAbsoluteError

1380.9523809523762

In [133]:
pred_results.meanSquaredError

2351473.922902466