Spark -> Colab

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=9a2abaa49ce8ee03bf957aca6ef711ee05f51290cf20b7c9109ae4a8441dcb67
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [3]:
!wget -q https://www-us.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [4]:
from pyspark.sql import SparkSession


In [5]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
spark

Pt.1

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
from pyspark.sql.types import *

#reading csv file and creating df in spark 
df = spark.read.csv("/content/weatherAUS.csv", header=True, inferSchema=True)
df.show(5)


In [10]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# categorical columns
categorical_columns = ["Location", "Sunshine", "WindGustDir", "WindDir9am", "WindDir3pm", "RainToday"]


In [11]:
#pipeline & stages

#stringindexer 

stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c) for c in categorical_columns]
stringindexer_stages += [StringIndexer(inputCol='RainTomorrow', outputCol='label')]

#onehotencoder 
onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in categorical_columns]

#vector assembler 
feature_columns = ['onehot_' + c for c in categorical_columns]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features')

#pipeline with all stages
all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]
pipeline = Pipeline(stages=all_stages)

In [12]:
#fitting pipeline 

pipeline_model = pipeline.fit(df)


In [None]:
#transforming the data

final_columns = feature_columns + ['features', 'label']
check_df = pipeline_model.transform(df).\
            select(final_columns)
            
check_df.show(5)

In [14]:
#test/train split

training, test = check_df.randomSplit([0.8, 0.2], seed=1234)


In [15]:
#estimator

from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')

In [16]:
#paramteter grid

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(dt.impurity, ['gini', 'entropy'])
             .addGrid(dt.maxDepth, [3,5,7])
             .addGrid(dt.maxBins, [5, 10, 15])
             .addGrid(dt.minInfoGain, [0.0, 0.2, 0.4])
             .addGrid(dt.maxDepth, [3,5,7])
             .build())

In [17]:
#evaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

In [18]:
#cv model 

from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=4)



In [19]:
#Fit cross-validation mode

cv_model = cv.fit(training)


In [20]:
print("best params = ", cv_model.bestModel.params)
# print("numNodes = ", cv_model.bestModel.numNodes)
# print("depth = ", cv_model.bestModel.depth)

best params =  [Param(parent='DecisionTreeClassifier_f11e68cc38da', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'), Param(parent='DecisionTreeClassifier_f11e68cc38da', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'), Param(parent='DecisionTreeClassifier_f11e68cc38da', name='featuresCol', doc='features column name.'), Param(parent='DecisionTreeClassifier_f11e68cc38da', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'), Param(parent='D

In [21]:
predictions = cv_model.transform(test)


In [22]:

dtModel = DecisionTreeClassifier().fit(test)
 
# ROC for training data
display(dtModel, test, "ROC")


DecisionTreeClassificationModel: uid=DecisionTreeClassifier_476ef232e682, depth=5, numNodes=47, numClasses=3, numFeatures=243

DataFrame[onehot_Location: vector, onehot_Sunshine: vector, onehot_WindGustDir: vector, onehot_WindDir9am: vector, onehot_WindDir3pm: vector, onehot_RainToday: vector, features: vector, label: double]

'ROC'

In [23]:
evaluator.evaluate(predictions)


0.7801047120418848

In [24]:
#fitting the model

from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder


dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')
dtModel = dt.fit(training)

    

In [25]:
predictions = dtModel.transform(test)
predictions.printSchema()


root
 |-- onehot_Location: vector (nullable = true)
 |-- onehot_Sunshine: vector (nullable = true)
 |-- onehot_WindGustDir: vector (nullable = true)
 |-- onehot_WindDir9am: vector (nullable = true)
 |-- onehot_WindDir3pm: vector (nullable = true)
 |-- onehot_RainToday: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [26]:
selected = predictions.select("onehot_Location", "onehot_RainToday", "onehot_Sunshine", "onehot_WindDir3pm", "onehot_WindDir9am")
display(selected)

DataFrame[onehot_Location: vector, onehot_RainToday: vector, onehot_Sunshine: vector, onehot_WindDir3pm: vector, onehot_WindDir9am: vector]