In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark.sql.functions as fun

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

### Building a SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

### Read the json file:

In [3]:
data = spark.read.json('DataFrames_sample.json')

### Display the schema:


In [4]:
data.printSchema()

root
 |-- D: double (nullable = true)
 |-- H: double (nullable = true)
 |-- HDD: string (nullable = true)
 |-- Id: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- RAM: string (nullable = true)
 |-- ScreenSize: string (nullable = true)
 |-- W: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Year: long (nullable = true)



In [5]:
data.show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
|7.74|0.52|256GB SSD|  2|    MacBook| 8GB|       12"|11.04|  2.03|2016|
|8.94|0.68|128GB SSD|  3|MacBook Air| 8GB|     13.3"| 12.8|  2.96|2016|
| 8.0|20.3|  1TB SSD|  4|       iMac|64GB|       27"| 25.6|  20.8|2017|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Getting all the data when "Model" equal "MacBook Pro"

In [6]:
data.select('*').where("Model = 'MacBook Pro'").show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Creating a TempView

In [7]:
data.createOrReplaceTempView('FirstView')

### Displaying "RAM"column and count "RAM" column

In [8]:
spark.sql("SELECT RAM, COUNT(RAM) from FirstView group by RAM").show()

+----+----------+
| RAM|count(RAM)|
+----+----------+
|64GB|         1|
|16GB|         1|
| 8GB|         2|
+----+----------+



### Getting all columns when "Year" column equal "2015"

In [9]:
spark.sql("SELECT * from FirstView where Year = 2015").show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Getting all when "Model" start with "M"

In [10]:
spark.sql("SELECT * from FirstView where Model LIKE 'M%'").show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
|7.74|0.52|256GB SSD|  2|    MacBook| 8GB|       12"|11.04|  2.03|2016|
|8.94|0.68|128GB SSD|  3|MacBook Air| 8GB|     13.3"| 12.8|  2.96|2016|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Getting all data when "Model" column equal "MacBook Pro"

In [11]:
spark.sql("SELECT * from FirstView where Model = 'MacBook Pro'").show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Getting all data with Multiple Conditions when "RAM" column equal "8GB" and "Model" column is "Macbook"

In [12]:
spark.sql("SELECT * from FirstView where RAM = '8GB' and Model = 'MacBook'").show()

+----+----+---------+---+-------+---+----------+-----+------+----+
|   D|   H|      HDD| Id|  Model|RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-------+---+----------+-----+------+----+
|7.74|0.52|256GB SSD|  2|MacBook|8GB|       12"|11.04|  2.03|2016|
+----+----+---------+---+-------+---+----------+-----+------+----+



### Getting all data with Multiple Conditions when "D" greater than or equal "8" and "Model" column is "iMac"

In [13]:
spark.sql("SELECT * from FirstView where D >= 8 and Model = 'iMac'").show()

+---+----+-------+---+-----+----+----------+----+------+----+
|  D|   H|    HDD| Id|Model| RAM|ScreenSize|   W|Weight|Year|
+---+----+-------+---+-----+----+----------+----+------+----+
|8.0|20.3|1TB SSD|  4| iMac|64GB|       27"|25.6|  20.8|2017|
+---+----+-------+---+-----+----+----------+----+------+----+



### Working on the second dataset

### Reading "test1" dataset

In [14]:
data_2 = spark.read.csv('test1.csv',header=True)

In [15]:
data_2.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



### Displaying Salary of the people less than or equal to 20000

In [16]:
data_2.select('*').where('Salary <= 20000').show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Displaying Salary of the people less than or equal to 20000 and greater than or equal 15000

In [17]:
data_2.select('*').where('15000 <= Salary and Salary <= 20000').show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Working on the third dataset 

### Reading "test3" dataset

In [18]:
data_3 = spark.read.csv('test3.csv',header=True)

### Displaying the dataset

In [19]:
data_3.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



### Displaying schema

In [20]:
data_3.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: string (nullable = true)



In [21]:
schema = "Name STRING, Departments STRING, salary INT"
data_3 = (spark.read.format('csv')
          .schema(schema)
          .option('header','true')
          .load('test3.csv'))

In [22]:
data_3.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



### Grouping by "Name" column and using sum function on Numerical columns

In [23]:
data_3.groupby("Name").sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



### Grouping by "Name" column and using avg function on Numerical columns

In [24]:
data_3.groupby("Name").mean().show()

+---------+------------------+
|     Name|       avg(salary)|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|    Sunny|            6000.0|
|    Krish| 6333.333333333333|
|   Mahesh|            3500.0|
+---------+------------------+



### Grouping by "Departments" column and using sum function on Numerical columns

In [25]:
data_3.groupby("Departments").sum().show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



### Grouping by "Departments" column and using mean function on Numerical columns

In [26]:
data_3.groupby("Departments").mean().show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



### Grouping by "Departments" column and using count function on Numerical columns

In [27]:
data_3.select('Departments').groupBy('Departments').agg(fun.count('Departments')).show()

+------------+------------------+
| Departments|count(Departments)|
+------------+------------------+
|         IOT|                 2|
|    Big Data|                 4|
|Data Science|                 4|
+------------+------------------+



### Applying agg to using sum function get the total of salaries

In [28]:
data_3.agg(fun.sum('salary')).show()

+-----------+
|sum(salary)|
+-----------+
|      73000|
+-----------+



## Using ML and creating a regression model on the data

### Reading Data

In [29]:
final_data = spark.read.csv('data.csv',header=True)

In [30]:
final_data.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Elation|   Carnival| 15|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Fantasy|   Carnival| 23| 

In [31]:
final_data.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Tonnage: string (nullable = true)
 |-- passengers: string (nullable = true)
 |-- length: string (nullable = true)
 |-- cabins: string (nullable = true)
 |-- passenger_density: string (nullable = true)
 |-- crew: string (nullable = true)



In [32]:
final_schema = 'Ship_name STRING, Cruise_line STRING, Age INT, Tonnage FLOAT, passengers FLOAT, length FLOAT, '\
               'cabins FLOAT, passenger_density FLOAT, crew FLOAT'

In [33]:
final_data = (spark.read.format('csv')
             .schema(final_schema)
             .option('header','true')
             .load('data.csv'))

### OneHotEncoder 


In [34]:
categoricalCols = [field for (field, dataType) in final_data.dtypes
                   if dataType == "string"]
categoricalCols

['Ship_name', 'Cruise_line']

In [35]:
indexOutputCols = [x + "_Index" for x in categoricalCols]
indexOutputCols

['Ship_name_Index', 'Cruise_line_Index']

In [36]:
oheOutputCols = [x + "_OHE" for x in categoricalCols]
oheOutputCols

['Ship_name_OHE', 'Cruise_line_OHE']

In [37]:
stringIndexer = StringIndexer(inputCols=categoricalCols,
                              outputCols=indexOutputCols,
                              handleInvalid='skip')
onehotEncoder = OneHotEncoder(inputCols=indexOutputCols,
                              outputCols=oheOutputCols)

In [38]:
numericCols = [field for (field, dataType) in final_data.dtypes
              if (((dataType=='float') or (dataType=='int')) and (field!='crew'))]
numericCols

['Age', 'Tonnage', 'passengers', 'length', 'cabins', 'passenger_density']

### Using VectorAssembler to merge all columns into one column:

In [39]:
assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')

### Creating a Linear Regression Model 

In [40]:
lr = LinearRegression(labelCol='crew',featuresCol='features')

### Creating a Pipeline

In [41]:
pipeline = Pipeline(stages = [stringIndexer,onehotEncoder,vecAssembler,lr])

In [42]:
model = pipeline.fit(final_data)

In [43]:
model_predict = model.transform(final_data)

### Model Evaluation

In [44]:
regressionEvaluator = RegressionEvaluator(predictionCol='prediction',
                                         labelCol='crew',
                                         metricName='rmse')

In [45]:
rmse = regressionEvaluator.evaluate(model_predict)
rmse

0.15958779502019368