In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName('tree').getOrCreate()
df = spark.read.csv('crop_recommendation.csv', inferSchema=True, header=True)
df.printSchema()

root
 |-- N: integer (nullable = true)
 |-- P: integer (nullable = true)
 |-- K: integer (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- ph: double (nullable = true)
 |-- rainfall: double (nullable = true)
 |-- label: string (nullable = true)



In [2]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ['N', 'P', 'K', 'temperature', 'humidity', 'ph', 'rainfall'], outputCol = 'features')

In [3]:
import pandas as pd

pd.DataFrame(df.take(5), columns = df.columns).transpose()

Unnamed: 0,0,1,2,3,4
N,90,85,60,74,78
P,42,58,55,35,42
K,43,41,44,40,42
temperature,20.8797,21.7705,23.0045,26.4911,20.1302
humidity,82.0027,80.3196,82.3208,80.1584,81.6049
ph,6.50299,7.0381,7.84021,6.9804,7.62847
rainfall,202.936,226.656,263.964,242.864,262.717
label,rice,rice,rice,rice,rice


In [4]:
df.columns

['N', 'P', 'K', 'temperature', 'humidity', 'ph', 'rainfall', 'label']

In [5]:
output = assembler.transform(df)

In [6]:
from pyspark.ml.feature import StringIndexer
gender_indexer = StringIndexer(inputCol="label", outputCol="genderIndex")
#Fits a model to the input dataset with optional parameters.
df = gender_indexer.fit(output).transform(output)
df.show()

+---+---+---+-----------+-----------------+------------------+------------------+-----+--------------------+-----------+
|  N|  P|  K|temperature|         humidity|                ph|          rainfall|label|            features|genderIndex|
+---+---+---+-----------+-----------------+------------------+------------------+-----+--------------------+-----------+
| 90| 42| 43|20.87974371|      82.00274423| 6.502985292000001|       202.9355362| rice|[90.0,42.0,43.0,2...|       20.0|
| 85| 58| 41|21.77046169|      80.31964408|       7.038096361|       226.6555374| rice|[85.0,58.0,41.0,2...|       20.0|
| 60| 55| 44|23.00445915|       82.3207629|       7.840207144|       263.9642476| rice|[60.0,55.0,44.0,2...|       20.0|
| 74| 35| 40|26.49109635|      80.15836264|       6.980400905|       242.8640342| rice|[74.0,35.0,40.0,2...|       20.0|
| 78| 42| 42|20.13017482|      81.60487287|       7.628472891|       262.7173405| rice|[78.0,42.0,42.0,2...|       20.0|
| 69| 37| 42|23.05804872|      8

In [7]:
final_df = df.select('features', 'genderIndex')
final_df.show(3)

+--------------------+-----------+
|            features|genderIndex|
+--------------------+-----------+
|[90.0,42.0,43.0,2...|       20.0|
|[85.0,58.0,41.0,2...|       20.0|
|[60.0,55.0,44.0,2...|       20.0|
+--------------------+-----------+
only showing top 3 rows



In [8]:
train, test = final_df.randomSplit([0.7, 0.3])

In [9]:
from pyspark.ml.classification import (DecisionTreeClassifier, RandomForestClassifier)
from pyspark.ml import Pipeline

dt = DecisionTreeClassifier(labelCol = 'genderIndex', featuresCol = 'features')
rf = RandomForestClassifier(labelCol = 'genderIndex', featuresCol = 'features')


In [10]:
dt_model = dt.fit(train)
rf_model = rf.fit(train)


In [11]:
dt_predictions = dt_model.transform(test)
rf_predictions = rf_model.transform(test)

In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

binary_evaluator = MulticlassClassificationEvaluator(labelCol = 'genderIndex')

print('Decision Tree:', binary_evaluator.evaluate(dt_predictions))

Decision Tree: 0.35711777358196445


In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'genderIndex', metricName = 'accuracy')
print('Decision Tree Accu:', multi_evaluator.evaluate(dt_predictions))

Decision Tree Accu: 0.4119318181818182


In [14]:
print('Random Forest:' , binary_evaluator.evaluate(rf_predictions))

Random Forest: 0.9787207528591864


In [15]:
print('Random Forest Accu:', multi_evaluator.evaluate(rf_predictions))

Random Forest Accu: 0.9786931818181818


In [16]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row

In [17]:
iris = spark.read.csv('crop_recommendation.csv', header=True, inferSchema=True)

In [18]:
iris2 = iris.rdd.map(lambda x: Row(features=Vectors.dense(x[:-1]), species=x[-1])).toDF()
iris2.show(5)

+--------------------+-------+
|            features|species|
+--------------------+-------+
|[90.0,42.0,43.0,2...|   rice|
|[85.0,58.0,41.0,2...|   rice|
|[60.0,55.0,44.0,2...|   rice|
|[74.0,35.0,40.0,2...|   rice|
|[78.0,42.0,42.0,2...|   rice|
+--------------------+-------+
only showing top 5 rows



In [19]:
from pyspark.ml import Pipeline

In [20]:
stringindexer = StringIndexer(inputCol='species', outputCol='label')
stages = [stringindexer]
pipeline = Pipeline(stages=stages)

In [21]:
iris_df = pipeline.fit(iris2).transform(iris2)
iris_df.show(5)

+--------------------+-------+-----+
|            features|species|label|
+--------------------+-------+-----+
|[90.0,42.0,43.0,2...|   rice| 20.0|
|[85.0,58.0,41.0,2...|   rice| 20.0|
|[60.0,55.0,44.0,2...|   rice| 20.0|
|[74.0,35.0,40.0,2...|   rice| 20.0|
|[78.0,42.0,42.0,2...|   rice| 20.0|
+--------------------+-------+-----+
only showing top 5 rows



In [22]:
iris_df.describe().show(5)

+-------+----------+-----------------+
|summary|   species|            label|
+-------+----------+-----------------+
|  count|      2200|             2200|
|   mean|      null|             10.5|
| stddev|      null|6.345731145773738|
|    min|     apple|              0.0|
|    max|watermelon|             21.0|
+-------+----------+-----------------+



In [23]:
iris_df.dtypes

[('features', 'vector'), ('species', 'string'), ('label', 'double')]

In [24]:
train, test = iris_df.randomSplit([0.8, 0.2], seed=1234)

In [25]:
from pyspark.ml.classification import NaiveBayes
naivebayes = NaiveBayes(featuresCol="features", labelCol="label")

In [26]:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(naivebayes.smoothing, [0, 1, 2, 4, 8]).\
    build()

In [27]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()

In [28]:
from pyspark.ml.tuning import CrossValidator
crossvalidator = CrossValidator(estimator=naivebayes, estimatorParamMaps=param_grid, evaluator=evaluator)

In [29]:
crossvalidation_mode = crossvalidator.fit(train)

In [30]:
pred_train = crossvalidation_mode.transform(train)
pred_train.show(5)

+--------------------+-----------+-----+--------------------+--------------------+----------+
|            features|    species|label|       rawPrediction|         probability|prediction|
+--------------------+-----------+-----+--------------------+--------------------+----------+
|[0.0,5.0,36.0,24....|pomegranate| 19.0|[-499.38742474583...|[6.61408756932272...|      19.0|
|[0.0,17.0,30.0,35...|      mango| 12.0|[-455.27347844735...|[5.14165896370608...|      12.0|
|[0.0,17.0,42.0,23...|pomegranate| 19.0|[-529.65008083143...|[1.45592672238025...|      19.0|
|[0.0,19.0,31.0,25...|    coconut|  4.0|[-646.35885666477...|[1.01337421998693...|       4.0|
|[0.0,19.0,33.0,27...|    coconut|  4.0|[-698.05975276499...|[3.09992240756038...|       4.0|
+--------------------+-----------+-----+--------------------+--------------------+----------+
only showing top 5 rows



In [31]:
pred_test = crossvalidation_mode.transform(test)
pred_test.show(5)

+--------------------+----------+-----+--------------------+--------------------+----------+
|            features|   species|label|       rawPrediction|         probability|prediction|
+--------------------+----------+-----+--------------------+--------------------+----------+
|[0.0,12.0,7.0,20....|    orange| 16.0|[-486.81372144990...|[2.75828665326150...|      16.0|
|[0.0,18.0,14.0,29...|    orange| 16.0|[-534.16811816168...|[5.32634657366113...|      16.0|
|[0.0,70.0,21.0,36...|pigeonpeas| 18.0|[-540.87173626541...|[8.00169243937490...|      18.0|
|[0.0,133.0,200.0,...|     apple|  0.0|[-859.64981906109...|[0.99998891838687...|       0.0|
|[0.0,137.0,195.0,...|    grapes|  7.0|[-775.85990522712...|[0.01694944491566...|       7.0|
+--------------------+----------+-----+--------------------+--------------------+----------+
only showing top 5 rows



In [32]:
print("The parameter smoothing has best value:",
      crossvalidation_mode.bestModel._java_obj.getSmoothing())

The parameter smoothing has best value: 8.0


In [33]:
print('training data (f1):', evaluator.setMetricName('f1').evaluate(pred_train), "\n",
     'training data (weightedPrecision): ', evaluator.setMetricName('weightedPrecision').evaluate(pred_train),"\n",
     'training data (weightedRecall): ', evaluator.setMetricName('weightedRecall').evaluate(pred_train),"\n",
     'training data (accuracy): ', evaluator.setMetricName('accuracy').evaluate(pred_train))

training data (f1): 0.8883428368172839 
 training data (weightedPrecision):  0.8916673502536663 
 training data (weightedRecall):  0.8905191873589166 
 training data (accuracy):  0.8905191873589164


In [34]:
print('test data (f1):', evaluator.setMetricName('f1').evaluate(pred_test), "\n",
     'test data (weightedPrecision): ', evaluator.setMetricName('weightedPrecision').evaluate(pred_test),"\n",
     'test data (weightedRecall): ', evaluator.setMetricName('weightedRecall').evaluate(pred_test),"\n",
     'test data (accuracy): ', evaluator.setMetricName('accuracy').evaluate(pred_test))

test data (f1): 0.8803898971666413 
 test data (weightedPrecision):  0.8859065599572711 
 test data (weightedRecall):  0.8808411214953269 
 test data (accuracy):  0.8808411214953271


In [35]:
#logistic regression
import findspark
findspark.init()

In [36]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import * 
import pyspark.sql.functions as F
from pyspark.sql.functions import col, asc,desc
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from pyspark.sql import SQLContext
from pyspark.mllib.stat import Statistics
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,StandardScaler #onehotencoder
from pyspark.ml import Pipeline
from sklearn.metrics import confusion_matrix

spark=SparkSession.builder \
.master ("local[*]")\
.appName("part3")\
.getOrCreate()

In [37]:
sc=spark.sparkContext
sqlContext=SQLContext(sc)



In [38]:
df=spark.read \
 .option("header","True")\
 .option("inferSchema","True")\
 .option("sep",",")\
 .csv("crop_recommendation.csv")
print("There are",df.count(),"rows",len(df.columns),
      "columns" ,"in the data.") 

There are 2200 rows 8 columns in the data.


In [39]:
df.show(4)

+---+---+---+-----------+-----------+-----------------+-----------+-----+
|  N|  P|  K|temperature|   humidity|               ph|   rainfall|label|
+---+---+---+-----------+-----------+-----------------+-----------+-----+
| 90| 42| 43|20.87974371|82.00274423|6.502985292000001|202.9355362| rice|
| 85| 58| 41|21.77046169|80.31964408|      7.038096361|226.6555374| rice|
| 60| 55| 44|23.00445915| 82.3207629|      7.840207144|263.9642476| rice|
| 74| 35| 40|26.49109635|80.15836264|      6.980400905|242.8640342| rice|
+---+---+---+-----------+-----------+-----------------+-----------+-----+
only showing top 4 rows



In [40]:
df.printSchema()

root
 |-- N: integer (nullable = true)
 |-- P: integer (nullable = true)
 |-- K: integer (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- ph: double (nullable = true)
 |-- rainfall: double (nullable = true)
 |-- label: string (nullable = true)



In [41]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
N,2200,50.551818181818184,36.917333833756594,0,140
P,2200,53.36272727272727,32.98588273858713,5,145
K,2200,48.14909090909091,50.647930546660135,5,205


In [42]:
numeric_features = [t[0] for t in df.dtypes if t[1] != 'string']
numeric_features_df=df.select(numeric_features)
numeric_features_df.toPandas().head()

Unnamed: 0,N,P,K,temperature,humidity,ph,rainfall
0,90,42,43,20.879744,82.002744,6.502985,202.935536
1,85,58,41,21.770462,80.319644,7.038096,226.655537
2,60,55,44,23.004459,82.320763,7.840207,263.964248
3,74,35,40,26.491096,80.158363,6.980401,242.864034
4,78,42,42,20.130175,81.604873,7.628473,262.71734


In [43]:
col_names =numeric_features_df.columns
features = numeric_features_df.rdd.map(lambda row: row[0:])
corr_mat=Statistics.corr(features, method="pearson")
corr_df = pd.DataFrame(corr_mat)
corr_df.index, corr_df.columns = col_names, col_names

corr_df

Unnamed: 0,N,P,K,temperature,humidity,ph,rainfall
N,1.0,-0.23146,-0.140512,0.026504,0.190688,0.096683,0.05902
P,-0.23146,1.0,0.736232,-0.127541,-0.118734,-0.138019,-0.063839
K,-0.140512,0.736232,1.0,-0.160387,0.190859,-0.169503,-0.053461
temperature,0.026504,-0.127541,-0.160387,1.0,0.20532,-0.017795,-0.030084
humidity,0.190688,-0.118734,0.190859,0.20532,1.0,-0.008483,0.094423
ph,0.096683,-0.138019,-0.169503,-0.017795,-0.008483,1.0,-0.109069
rainfall,0.05902,-0.063839,-0.053461,-0.030084,0.094423,-0.109069,1.0


In [47]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ['N', 'P', 'K', 'temperature', 'humidity', 'ph', 'rainfall'], outputCol = 'features')

In [48]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=5)
lrModel = lr.fit(train)
predictions = lrModel.transform(test)
predictions.show()
#predictions_train = lrModel.transform(train)
# predictions.select('label', 'features',  'rawPrediction', 'prediction', 'probability').toPandas().head(5

+--------------------+----------+-----+--------------------+--------------------+----------+
|            features|   species|label|       rawPrediction|         probability|prediction|
+--------------------+----------+-----+--------------------+--------------------+----------+
|[0.0,12.0,7.0,20....|    orange| 16.0|[-1.7137232221721...|[3.04189469945704...|      16.0|
|[0.0,18.0,14.0,29...|    orange| 16.0|[-1.6160972464000...|[1.53232136807711...|      16.0|
|[0.0,70.0,21.0,36...|pigeonpeas| 18.0|[-0.6452509925280...|[6.21267646738206...|      18.0|
|[0.0,133.0,200.0,...|     apple|  0.0|[9.09563817393450...|[0.44443467772825...|       7.0|
|[0.0,137.0,195.0,...|    grapes|  7.0|[7.65098269234096...|[0.08498656021339...|       7.0|
|[0.0,145.0,205.0,...|     apple|  0.0|[9.95792200232084...|[0.54013068617806...|       0.0|
|[1.0,6.0,35.0,27....|   coconut|  4.0|[-0.0788228004175...|[3.04407513179140...|       4.0|
|[1.0,35.0,34.0,30...|     mango| 12.0|[-2.6372118600303...|[7.1145344

In [49]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Accuracy : ",accuracy)

Accuracy :  0.8341121495327103


# SVM

In [58]:
fileName = "crop_recommendation.csv"
dataDF = spark.read.format("csv").option("numFeatures", "784").load(fileName)
dataDF.show()

+---+---+---+-----------+-----------------+------------------+------------------+-----+
|_c0|_c1|_c2|        _c3|              _c4|               _c5|               _c6|  _c7|
+---+---+---+-----------+-----------------+------------------+------------------+-----+
|  N|  P|  K|temperature|         humidity|                ph|          rainfall|label|
| 90| 42| 43|20.87974371|      82.00274423| 6.502985292000001|       202.9355362| rice|
| 85| 58| 41|21.77046169|      80.31964408|       7.038096361|       226.6555374| rice|
| 60| 55| 44|23.00445915|       82.3207629|       7.840207144|       263.9642476| rice|
| 74| 35| 40|26.49109635|      80.15836264|       6.980400905|       242.8640342| rice|
| 78| 42| 42|20.13017482|      81.60487287|       7.628472891|       262.7173405| rice|
| 69| 37| 42|23.05804872|      83.37011772|       7.073453503|       251.0549998| rice|
| 69| 55| 38|22.70883798|      82.63941394|        5.70080568|       271.3248604| rice|
| 94| 53| 40|20.27774362|      8

In [62]:
#Complete the #FILL IN# gaps
from pyspark.ml.feature import StandardScaler

# Define the normalizer object: indicate that you only want each feature to have unit standard deviation. Use the nomenclature "normFeatures" for column with the output normalized features.
scaler = StandardScaler(inputCol="features", outputCol="normFeatures", withStd=True, withMean=False)

# Fit the StandardScaler: learn the statistics of the data
scalerModel = scaler.fit(dataDF)

# Normalize the data: apply the normalization transformation
scaledData = scalerModel.transform(dataDF)

IllegalArgumentException: features does not exist. Available: _c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7