In [88]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [10]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Missing').getOrCreate()

In [89]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
import pandas as pd 
import numpy as np 

In [90]:
training=spark.read.csv('car.csv',header=True,inferSchema=True)

In [126]:
training.show()

+-----------+---------+----------+------------+-----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+----+------+-----------------+----------+--------+--------+-----------+-----+
|       make|fuel-type|aspiration|num-of-doors| body-style|drive-wheels|engine-location|wheel-base|length|width|height|curb-weight|engine-type|num-of-cylinders|engine-size|fuel-system|bore|stroke|compression-ratio|horsepower|peak-rpm|city-mpg|highway-mpg|price|
+-----------+---------+----------+------------+-----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+----+------+-----------------+----------+--------+--------+-----------+-----+
|alfa-romero|      gas|       std|         two|convertible|         rwd|          front|      88.6| 168.8| 64.1|  48.8|       2548|       dohc|            four|        130|       mpfi|3.47|  2.68|              9.0|

In [92]:
training.printSchema()

root
 |-- make: string (nullable = true)
 |-- fuel-type: string (nullable = true)
 |-- aspiration: string (nullable = true)
 |-- num-of-doors: string (nullable = true)
 |-- body-style: string (nullable = true)
 |-- drive-wheels: string (nullable = true)
 |-- engine-location: string (nullable = true)
 |-- wheel-base: double (nullable = true)
 |-- length: double (nullable = true)
 |-- width: double (nullable = true)
 |-- height: double (nullable = true)
 |-- curb-weight: integer (nullable = true)
 |-- engine-type: string (nullable = true)
 |-- num-of-cylinders: string (nullable = true)
 |-- engine-size: integer (nullable = true)
 |-- fuel-system: string (nullable = true)
 |-- bore: string (nullable = true)
 |-- stroke: string (nullable = true)
 |-- compression-ratio: double (nullable = true)
 |-- horsepower: string (nullable = true)
 |-- peak-rpm: string (nullable = true)
 |-- city-mpg: integer (nullable = true)
 |-- highway-mpg: integer (nullable = true)
 |-- price: string (nullable = t

In [93]:
training.columns

['make',
 'fuel-type',
 'aspiration',
 'num-of-doors',
 'body-style',
 'drive-wheels',
 'engine-location',
 'wheel-base',
 'length',
 'width',
 'height',
 'curb-weight',
 'engine-type',
 'num-of-cylinders',
 'engine-size',
 'fuel-system',
 'bore',
 'stroke',
 'compression-ratio',
 'horsepower',
 'peak-rpm',
 'city-mpg',
 'highway-mpg',
 'price']

In [94]:
##converting tring into integer.
from pyspark.sql.types import IntegerType
training = training.withColumn("horsepower", training["horsepower"].cast(IntegerType()))
training = training.withColumn("price", training["price"].cast(IntegerType()))

In [95]:
training.printSchema()

root
 |-- make: string (nullable = true)
 |-- fuel-type: string (nullable = true)
 |-- aspiration: string (nullable = true)
 |-- num-of-doors: string (nullable = true)
 |-- body-style: string (nullable = true)
 |-- drive-wheels: string (nullable = true)
 |-- engine-location: string (nullable = true)
 |-- wheel-base: double (nullable = true)
 |-- length: double (nullable = true)
 |-- width: double (nullable = true)
 |-- height: double (nullable = true)
 |-- curb-weight: integer (nullable = true)
 |-- engine-type: string (nullable = true)
 |-- num-of-cylinders: string (nullable = true)
 |-- engine-size: integer (nullable = true)
 |-- fuel-system: string (nullable = true)
 |-- bore: string (nullable = true)
 |-- stroke: string (nullable = true)
 |-- compression-ratio: double (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- peak-rpm: string (nullable = true)
 |-- city-mpg: integer (nullable = true)
 |-- highway-mpg: integer (nullable = true)
 |-- price: integer (nullable =

In [96]:
##Looking for null value if any
training=training.na.fill(0)


In [97]:
training.na.drop(how="any").show()

+-----------+---------+----------+------------+-----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+----+------+-----------------+----------+--------+--------+-----------+-----+
|       make|fuel-type|aspiration|num-of-doors| body-style|drive-wheels|engine-location|wheel-base|length|width|height|curb-weight|engine-type|num-of-cylinders|engine-size|fuel-system|bore|stroke|compression-ratio|horsepower|peak-rpm|city-mpg|highway-mpg|price|
+-----------+---------+----------+------------+-----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+----+------+-----------------+----------+--------+--------+-----------+-----+
|alfa-romero|      gas|       std|         two|convertible|         rwd|          front|      88.6| 168.8| 64.1|  48.8|       2548|       dohc|            four|        130|       mpfi|3.47|  2.68|              9.0|

In [98]:
### Handling Categorical Features
from pyspark.ml.feature import StringIndexer

In [99]:
indexer=StringIndexer(inputCols=["make","fuel-type"],outputCols=["make_indexed","fuel-type_indexed"])
df=indexer.fit(training).transform(training)
df.show()

+-----------+---------+----------+------------+-----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+----+------+-----------------+----------+--------+--------+-----------+-----+------------+-----------------+
|       make|fuel-type|aspiration|num-of-doors| body-style|drive-wheels|engine-location|wheel-base|length|width|height|curb-weight|engine-type|num-of-cylinders|engine-size|fuel-system|bore|stroke|compression-ratio|horsepower|peak-rpm|city-mpg|highway-mpg|price|make_indexed|fuel-type_indexed|
+-----------+---------+----------+------------+-----------+------------+---------------+----------+------+-----+------+-----------+-----------+----------------+-----------+-----------+----+------+-----------------+----------+--------+--------+-----------+-----+------------+-----------------+
|alfa-romero|      gas|       std|         two|convertible|         rwd|          front|      88.6| 168.8| 64.1|  48.8|  

In [100]:
#Here e can ee to extra coloumn make_indexed and fuel_indexed a computer onl undertand numeric value e have to convert categorical to numeric
df.columns

['make',
 'fuel-type',
 'aspiration',
 'num-of-doors',
 'body-style',
 'drive-wheels',
 'engine-location',
 'wheel-base',
 'length',
 'width',
 'height',
 'curb-weight',
 'engine-type',
 'num-of-cylinders',
 'engine-size',
 'fuel-system',
 'bore',
 'stroke',
 'compression-ratio',
 'horsepower',
 'peak-rpm',
 'city-mpg',
 'highway-mpg',
 'price',
 'make_indexed',
 'fuel-type_indexed']

In [101]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['make_indexed','fuel-type_indexed','horsepower','city-mpg','highway-mpg']
                                 ,outputCol="Independent Features")
output=featureassembler.transform(df)

In [102]:
output.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|[17.0,0.0,111.0,2...|
|[17.0,0.0,111.0,2...|
|[17.0,0.0,154.0,1...|
|[12.0,0.0,102.0,2...|
|[12.0,0.0,115.0,1...|
|[12.0,0.0,110.0,1...|
|[12.0,0.0,110.0,1...|
|[12.0,0.0,110.0,1...|
|[12.0,0.0,140.0,1...|
|[12.0,0.0,160.0,1...|
|[10.0,0.0,101.0,2...|
|[10.0,0.0,101.0,2...|
|[10.0,0.0,121.0,2...|
|[10.0,0.0,121.0,2...|
|[10.0,0.0,121.0,2...|
|[10.0,0.0,182.0,1...|
|[10.0,0.0,182.0,1...|
|[10.0,0.0,182.0,1...|
|[18.0,0.0,48.0,47...|
|[18.0,0.0,70.0,38...|
+--------------------+
only showing top 20 rows



In [103]:
finalized_data1=output.select("Independent Features","price")

In [104]:
finalized_data1.show()

+--------------------+-----+
|Independent Features|price|
+--------------------+-----+
|[17.0,0.0,111.0,2...|13495|
|[17.0,0.0,111.0,2...|16500|
|[17.0,0.0,154.0,1...|16500|
|[12.0,0.0,102.0,2...|13950|
|[12.0,0.0,115.0,1...|17450|
|[12.0,0.0,110.0,1...|15250|
|[12.0,0.0,110.0,1...|17710|
|[12.0,0.0,110.0,1...|18920|
|[12.0,0.0,140.0,1...|23875|
|[12.0,0.0,160.0,1...|    0|
|[10.0,0.0,101.0,2...|16430|
|[10.0,0.0,101.0,2...|16925|
|[10.0,0.0,121.0,2...|20970|
|[10.0,0.0,121.0,2...|21105|
|[10.0,0.0,121.0,2...|24565|
|[10.0,0.0,182.0,1...|30760|
|[10.0,0.0,182.0,1...|41315|
|[10.0,0.0,182.0,1...|36880|
|[18.0,0.0,48.0,47...| 5151|
|[18.0,0.0,70.0,38...| 6295|
+--------------------+-----+
only showing top 20 rows



In [105]:
##train test split
train_data,test_data=finalized_data1.randomSplit([0.90,0.10],seed=44)

In [106]:
from pyspark.ml.regression import LinearRegression


In [107]:
#LinearRegression
regressor1=LinearRegression(featuresCol='Independent Features', labelCol='price')
regressor1=regressor1.fit(train_data)

In [108]:
### Coefficients
regressor1.coefficients

DenseVector([250.5539, 6919.4862, 68.779, 45.2359, -502.9331])

In [109]:
### Intercepts
regressor1.intercept

17763.34396701478

In [110]:
### Prediction
pred_results1=regressor1.evaluate(test_data)

In [111]:
pred_results1.predictions.show()

+--------------------+-----+------------------+
|Independent Features|price|        prediction|
+--------------------+-----+------------------+
|[0.0,0.0,70.0,29....| 8238| 6789.988225588417|
|[0.0,0.0,70.0,30....| 6938|  5326.42480325661|
|[0.0,0.0,70.0,30....| 7198|  5326.42480325661|
|[0.0,0.0,161.0,20...|16558|17671.082228469648|
|[1.0,0.0,69.0,31....| 6649|  5553.43566903644|
|[1.0,0.0,69.0,31....| 6849|  5553.43566903644|
|[1.0,0.0,152.0,17...|14399|18172.783779459285|
|[3.0,0.0,86.0,27....| 8845|  9054.57463636191|
|[3.0,0.0,86.0,27....| 9095|  9054.57463636191|
|[4.0,0.0,68.0,31....| 6189| 5733.385233737166|
|[4.0,0.0,116.0,23...| 9279|12696.353209061786|
|[5.0,0.0,82.0,28....| 7775|  9325.80247719748|
|[5.0,0.0,82.0,32....| 7126| 7495.013780294919|
|[5.0,0.0,94.0,25....|10198|11021.308521201194|
|[6.0,0.0,85.0,27....| 7975| 9234.524201062635|
|[10.0,0.0,121.0,2...|24565|16922.529150490744|
|[11.0,0.0,184.0,1...|40960|25761.140592648888|
|[11.0,0.0,184.0,1...|45400|25761.140592

In [112]:
### PErformance Metrics
pred_results1.r2,pred_results1.meanAbsoluteError,pred_results1.meanSquaredError

(0.6736656159424528, 3654.122359392787, 38850475.82480145)

In [113]:
train_data,test_data=finalized_data1.randomSplit([0.90,0.10],seed=44)

In [114]:
from pyspark.ml.classification import LogisticRegression

In [115]:
##logitic reg.

lr=LogisticRegression(featuresCol='Independent Features', labelCol='price')
lr=lr.fit(train_data)

In [116]:
lr_summary=lr.summary

In [117]:
lr.summary.predictions.show()

+--------------------+-------+--------------------+--------------------+----------+
|Independent Features|  price|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,0.0,62.0,27....| 7898.0|[768.340249384914...|[6.07021641951916...|    7898.0|
|[0.0,0.0,62.0,27....| 8778.0|[768.340249384914...|[6.07021641951916...|    7898.0|
|[0.0,0.0,62.0,31....| 6918.0|[883.403860540484...|[1.81393809666119...|    6918.0|
|[0.0,0.0,62.0,31....| 6338.0|[893.012608912779...|[5.16531481492423...|    6488.0|
|[0.0,0.0,62.0,31....| 6488.0|[893.012608912779...|[5.16531481492423...|    6488.0|
|[0.0,0.0,62.0,35....| 5348.0|[969.641226579173...|[4.51864822045330...|    5348.0|
|[0.0,0.0,70.0,28....| 8358.0|[824.100651613463...|[4.27101675877146...|    9258.0|
|[0.0,0.0,70.0,28....| 9258.0|[824.100651613463...|[4.27101675877146...|    9258.0|
|[0.0,0.0,70.0,29....| 8058.0|[840.855618936988...|[1.94111452793293...|    

In [118]:
lr.summary.predictions.describe().show()

+-------+------------------+------------------+
|summary|             price|        prediction|
+-------+------------------+------------------+
|  count|               185|               185|
|   mean|12932.778378378378|12768.632432432432|
| stddev| 7708.593306905551| 7436.956793098954|
|    min|               0.0|               0.0|
|    max|           41315.0|           36880.0|
+-------+------------------+------------------+

