<a href="https://colab.research.google.com/github/wyctorfogos/Introduction-to-PySpark/blob/main/Introduction_to_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [198]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [199]:
import numpy as np
import pyspark as sp
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.regression import RegressionModel


In [200]:
spark = SparkSession.builder.appName('PySpark Read CSV').getOrCreate()

df = spark.read.csv('./breast-cancer.data', sep=',',
                       inferSchema=True, header=False)

In [201]:
df.show()
#df = df.toPandas()
#df.head()

+--------------------+-----+-------+-----+---+---+---+-----+---------+---+
|                 _c0|  _c1|    _c2|  _c3|_c4|_c5|_c6|  _c7|      _c8|_c9|
+--------------------+-----+-------+-----+---+---+---+-----+---------+---+
|no-recurrence-events|30-39|premeno|30-34|0-2| no|  3| left| left_low| no|
|no-recurrence-events|40-49|premeno|20-24|0-2| no|  2|right| right_up| no|
|no-recurrence-events|40-49|premeno|20-24|0-2| no|  2| left| left_low| no|
|no-recurrence-events|60-69|   ge40|15-19|0-2| no|  2|right|  left_up| no|
|no-recurrence-events|40-49|premeno|  0-4|0-2| no|  2|right|right_low| no|
|no-recurrence-events|60-69|   ge40|15-19|0-2| no|  2| left| left_low| no|
|no-recurrence-events|50-59|premeno|25-29|0-2| no|  2| left| left_low| no|
|no-recurrence-events|60-69|   ge40|20-24|0-2| no|  1| left| left_low| no|
|no-recurrence-events|40-49|premeno|50-54|0-2| no|  2| left| left_low| no|
|no-recurrence-events|40-49|premeno|20-24|0-2| no|  2|right|  left_up| no|
|no-recurrence-events|40-

In [202]:
print(df.columns)

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9']


In [203]:
#df= df.select('30-39', 'premeno', '30-34', '0-2', 'no5', '3', 'left', 'left_low', 'no9')
df =  df.select('_c1','_c2','_c3','_c4','_c5','_c6','_c7','_c8','_c9')

In [204]:
df.show(100)

+-----+-------+-----+---+---+---+-----+---------+---+
|  _c1|    _c2|  _c3|_c4|_c5|_c6|  _c7|      _c8|_c9|
+-----+-------+-----+---+---+---+-----+---------+---+
|30-39|premeno|30-34|0-2| no|  3| left| left_low| no|
|40-49|premeno|20-24|0-2| no|  2|right| right_up| no|
|40-49|premeno|20-24|0-2| no|  2| left| left_low| no|
|60-69|   ge40|15-19|0-2| no|  2|right|  left_up| no|
|40-49|premeno|  0-4|0-2| no|  2|right|right_low| no|
|60-69|   ge40|15-19|0-2| no|  2| left| left_low| no|
|50-59|premeno|25-29|0-2| no|  2| left| left_low| no|
|60-69|   ge40|20-24|0-2| no|  1| left| left_low| no|
|40-49|premeno|50-54|0-2| no|  2| left| left_low| no|
|40-49|premeno|20-24|0-2| no|  2|right|  left_up| no|
|40-49|premeno|  0-4|0-2| no|  3| left|  central| no|
|50-59|   ge40|25-29|0-2| no|  2| left| left_low| no|
|60-69|   lt40|10-14|0-2| no|  1| left| right_up| no|
|50-59|   ge40|25-29|0-2| no|  3| left| right_up| no|
|40-49|premeno|30-34|0-2| no|  3| left|  left_up| no|
|60-69|   lt40|30-34|0-2| no

In [205]:
df.printSchema()

root
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)



In [206]:
df.groupBy('_c1').count().show()

+-----+-----+
|  _c1|count|
+-----+-----+
|30-39|   36|
|20-29|    1|
|60-69|   57|
|40-49|   90|
|70-79|    6|
|50-59|   96|
+-----+-----+



In [207]:
## One way to separate the features and classes availables into the jobs

In [208]:
X=df.select(["_c0","_c1","_c2","_c3","_c4","_c5","_c6","_c7","_c8"]).show(10)

AnalysisException: ignored

In [None]:
Y=df.select(["_c9"]).show(10)

In [209]:
## Second way to get the features - Use an Encoder

In [210]:
from pyspark.ml.feature import VectorAssembler, StringIndexer

In [211]:
df.select('_c1').distinct().count()

6

In [212]:
encoder=StringIndexer(inputCols=['_c1','_c2','_c3','_c4','_c5','_c6','_c7','_c8','_c9'], outputCols=['feature_1','feature_2','feature_3','feature_4','feature_5','feature_6','feature_7','feature_8','classes']).fit(df)

In [213]:
df_transform=encoder.transform(df)

In [214]:
df_transform.show(10)

+-----+-------+-----+---+---+---+-----+---------+---+---------+---------+---------+---------+---------+---------+---------+---------+-------+
|  _c1|    _c2|  _c3|_c4|_c5|_c6|  _c7|      _c8|_c9|feature_1|feature_2|feature_3|feature_4|feature_5|feature_6|feature_7|feature_8|classes|
+-----+-------+-----+---+---+---+-----+---------+---+---------+---------+---------+---------+---------+---------+---------+---------+-------+
|30-39|premeno|30-34|0-2| no|  3| left| left_low| no|      3.0|      0.0|      0.0|      0.0|      0.0|      1.0|      0.0|      0.0|    0.0|
|40-49|premeno|20-24|0-2| no|  2|right| right_up| no|      1.0|      0.0|      2.0|      0.0|      0.0|      0.0|      1.0|      2.0|    0.0|
|40-49|premeno|20-24|0-2| no|  2| left| left_low| no|      1.0|      0.0|      2.0|      0.0|      0.0|      0.0|      0.0|      0.0|    0.0|
|60-69|   ge40|15-19|0-2| no|  2|right|  left_up| no|      2.0|      1.0|      3.0|      0.0|      0.0|      0.0|      1.0|      1.0|    0.0|
|40-49

In [215]:
print(df_transform.columns)

['_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', 'feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5', 'feature_6', 'feature_7', 'feature_8', 'classes']


In [216]:
input_features=['feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5', 'feature_6', 'feature_7', 'feature_8', 'classes']


In [217]:
df2= df_transform.select(input_features)

In [218]:
print(df2.count())

286


In [219]:
df2= df2.toPandas().replace('NA',0).astype(float)

In [220]:
df2

Unnamed: 0,feature_1,feature_2,feature_3,feature_4,feature_5,feature_6,feature_7,feature_8,classes
0,3.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
1,1.0,0.0,2.0,0.0,0.0,0.0,1.0,2.0,0.0
2,1.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0
3,2.0,1.0,3.0,0.0,0.0,0.0,1.0,1.0,0.0
4,1.0,0.0,7.0,0.0,0.0,0.0,1.0,3.0,0.0
...,...,...,...,...,...,...,...,...,...
281,3.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
282,3.0,0.0,2.0,0.0,0.0,1.0,0.0,1.0,1.0
283,2.0,1.0,2.0,0.0,0.0,2.0,1.0,1.0,0.0
284,1.0,1.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0


In [221]:
new_df2= spark.createDataFrame(df2)

In [222]:
new_df2.show()

+---------+---------+---------+---------+---------+---------+---------+---------+-------+
|feature_1|feature_2|feature_3|feature_4|feature_5|feature_6|feature_7|feature_8|classes|
+---------+---------+---------+---------+---------+---------+---------+---------+-------+
|      3.0|      0.0|      0.0|      0.0|      0.0|      1.0|      0.0|      0.0|    0.0|
|      1.0|      0.0|      2.0|      0.0|      0.0|      0.0|      1.0|      2.0|    0.0|
|      1.0|      0.0|      2.0|      0.0|      0.0|      0.0|      0.0|      0.0|    0.0|
|      2.0|      1.0|      3.0|      0.0|      0.0|      0.0|      1.0|      1.0|    0.0|
|      1.0|      0.0|      7.0|      0.0|      0.0|      0.0|      1.0|      3.0|    0.0|
|      2.0|      1.0|      3.0|      0.0|      0.0|      0.0|      0.0|      0.0|    0.0|
|      0.0|      0.0|      1.0|      0.0|      0.0|      0.0|      0.0|      0.0|    0.0|
|      2.0|      1.0|      2.0|      0.0|      0.0|      2.0|      0.0|      0.0|    0.0|
|      1.0

In [197]:
vec_assembler=VectorAssembler(inputCols=input_features, outputCol='features')


In [223]:
vec_df=vec_assembler.transform(new_df2)

In [224]:
vec_df.show()

+---------+---------+---------+---------+---------+---------+---------+---------+-------+--------------------+
|feature_1|feature_2|feature_3|feature_4|feature_5|feature_6|feature_7|feature_8|classes|            features|
+---------+---------+---------+---------+---------+---------+---------+---------+-------+--------------------+
|      3.0|      0.0|      0.0|      0.0|      0.0|      1.0|      0.0|      0.0|    0.0| (9,[0,5],[3.0,1.0])|
|      1.0|      0.0|      2.0|      0.0|      0.0|      0.0|      1.0|      2.0|    0.0|(9,[0,2,6,7],[1.0...|
|      1.0|      0.0|      2.0|      0.0|      0.0|      0.0|      0.0|      0.0|    0.0| (9,[0,2],[1.0,2.0])|
|      2.0|      1.0|      3.0|      0.0|      0.0|      0.0|      1.0|      1.0|    0.0|[2.0,1.0,3.0,0.0,...|
|      1.0|      0.0|      7.0|      0.0|      0.0|      0.0|      1.0|      3.0|    0.0|(9,[0,2,6,7],[1.0...|
|      2.0|      1.0|      3.0|      0.0|      0.0|      0.0|      0.0|      0.0|    0.0|(9,[0,1,2],[2.0,1...|
|

In [226]:
### Split vector into Train and Test

In [241]:
train_df, test_df = vec_df.randomSplit([0.7,0.3], seed=48)

In [242]:
train_df.show()

+---------+---------+---------+---------+---------+---------+---------+---------+-------+--------------------+
|feature_1|feature_2|feature_3|feature_4|feature_5|feature_6|feature_7|feature_8|classes|            features|
+---------+---------+---------+---------+---------+---------+---------+---------+-------+--------------------+
|      0.0|      0.0|      0.0|      0.0|      0.0|      1.0|      0.0|      0.0|    0.0|       (9,[5],[1.0])|
|      0.0|      0.0|      1.0|      0.0|      0.0|      0.0|      1.0|      3.0|    0.0|(9,[2,6,7],[1.0,1...|
|      0.0|      0.0|      1.0|      0.0|      1.0|      0.0|      0.0|      1.0|    0.0|(9,[2,4,7],[1.0,1...|
|      0.0|      0.0|      2.0|      0.0|      0.0|      2.0|      0.0|      0.0|    0.0| (9,[2,5],[2.0,2.0])|
|      0.0|      0.0|      3.0|      0.0|      0.0|      2.0|      0.0|      0.0|    0.0| (9,[2,5],[3.0,2.0])|
|      0.0|      0.0|      4.0|      0.0|      0.0|      1.0|      0.0|      0.0|    0.0| (9,[2,5],[4.0,1.0])|
|

In [254]:
from pyspark.sql import SQLContext
from pyspark.ml.classification import DecisionTreeClassifier

# Every record of this DataFrame contains the label and
# features represented by a vector.
##df = SQLContext.createDataFrame(df, ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = DecisionTreeClassifier(maxDepth=5, labelCol="classes", featuresCol="features")

# Fit the model to the data.
model = lr.fit(train_df)

# Given a dataset, predict each point's label, and show the results.
y_pred=model.transform(test_df)


In [260]:
y_pred.show()

+---------+---------+---------+---------+---------+---------+---------+---------+-------+--------------------+-------------+-----------+----------+
|feature_1|feature_2|feature_3|feature_4|feature_5|feature_6|feature_7|feature_8|classes|            features|rawPrediction|probability|prediction|
+---------+---------+---------+---------+---------+---------+---------+---------+-------+--------------------+-------------+-----------+----------+
|      0.0|      0.0|      1.0|      0.0|      0.0|      0.0|      0.0|      0.0|    0.0|       (9,[2],[1.0])|  [156.0,0.0]|  [1.0,0.0]|       0.0|
|      0.0|      0.0|      1.0|      0.0|      0.0|      0.0|      0.0|      0.0|    0.0|       (9,[2],[1.0])|  [156.0,0.0]|  [1.0,0.0]|       0.0|
|      0.0|      0.0|      1.0|      0.0|      0.0|      2.0|      1.0|      1.0|    0.0|(9,[2,5,6,7],[1.0...|  [156.0,0.0]|  [1.0,0.0]|       0.0|
|      0.0|      0.0|      2.0|      1.0|      1.0|      0.0|      0.0|      0.0|    0.0|(9,[2,3,4],[2.0,1...|  

In [261]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


evaluator =MulticlassClassificationEvaluator(labelCol='classes', metricName='accuracy')

In [262]:
evaluator.evaluate(y_pred)

1.0

In [266]:
from pyspark.mllib.evaluation import MulticlassMetrics

In [272]:
evaluator = MulticlassMetrics(y_pred['classes','prediction'].rdd)



In [284]:
print(evaluator.precision(1.0))

1.0


In [290]:
print(evaluator.confusionMatrix())

DenseMatrix([[62.,  0.],
             [ 0., 24.]])


In [285]:
##Save model

model.save("Decision tree to analyze breast-cancer dataset")