In [None]:
%pip install pyspark



In [None]:
import pandas as pd

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
Sc = SparkContext()

In [None]:
sqlContext = SQLContext(Sc)



In [None]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('healthcare-dataset-stroke-data.csv')
df.take(1)

[Row(id=9046, gender='Male', age=67.0, hypertension=0, heart_disease=1, ever_married='Yes', work_type='Private', Residence_type='Urban', avg_glucose_level=228.69, bmi='36.6', smoking_status='formerly smoked', stroke=1)]

In [None]:
pd_df = df.toPandas()
print(pd_df)

         id  gender   age  hypertension  heart_disease ever_married  \
0      9046    Male  67.0             0              1          Yes   
1     51676  Female  61.0             0              0          Yes   
2     31112    Male  80.0             0              1          Yes   
3     60182  Female  49.0             0              0          Yes   
4      1665  Female  79.0             1              0          Yes   
...     ...     ...   ...           ...            ...          ...   
5105  18234  Female  80.0             1              0          Yes   
5106  44873  Female  81.0             0              0          Yes   
5107  19723  Female  35.0             0              0          Yes   
5108  37544    Male  51.0             0              0          Yes   
5109  44679  Female  44.0             0              0          Yes   

          work_type Residence_type  avg_glucose_level   bmi   smoking_status  \
0           Private          Urban             228.69  36.6  former

In [None]:
pd_df.dtypes

id                     int32
gender                object
age                  float64
hypertension           int32
heart_disease          int32
ever_married          object
work_type             object
Residence_type        object
avg_glucose_level    float64
bmi                   object
smoking_status        object
stroke                 int32
dtype: object

In [None]:
df.cache()
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [None]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
id,5110,36517.82935420744,21161.72162482715,67,72940
gender,5110,,,Female,Other
age,5110,43.226614481409015,22.61264672311348,0.08,82.0
hypertension,5110,0.0974559686888454,0.296606674233791,0,1
heart_disease,5110,0.05401174168297456,0.22606298750336554,0,1
ever_married,5110,,,No,Yes
work_type,5110,,,Govt_job,children
Residence_type,5110,,,Rural,Urban
avg_glucose_level,5110,106.14767710371804,45.28356015058193,55.12,271.74


In [None]:
df.head()

Row(id=9046, gender='Male', age=67.0, hypertension=0, heart_disease=1, ever_married='Yes', work_type='Private', Residence_type='Urban', avg_glucose_level=228.69, bmi='36.6', smoking_status='formerly smoked', stroke=1)

In [None]:
df.dtypes

[('id', 'int'),
 ('gender', 'string'),
 ('age', 'double'),
 ('hypertension', 'int'),
 ('heart_disease', 'int'),
 ('ever_married', 'string'),
 ('work_type', 'string'),
 ('Residence_type', 'string'),
 ('avg_glucose_level', 'double'),
 ('bmi', 'string'),
 ('smoking_status', 'string'),
 ('stroke', 'int')]

In [None]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['age', 'avg_glucose_level', 'hypertension', 'heart_disease', 'stroke'], outputCol = 'features')
tdf = vectorAssembler.transform(df)
print(tdf)
tdf = tdf.select(['features', 'stroke'])
tdf.show(3)

DataFrame[id: int, gender: string, age: double, hypertension: int, heart_disease: int, ever_married: string, work_type: string, Residence_type: string, avg_glucose_level: double, bmi: string, smoking_status: string, stroke: int, features: vector]
+--------------------+------+
|            features|stroke|
+--------------------+------+
|[67.0,228.69,0.0,...|     1|
|[61.0,202.21,0.0,...|     1|
|[80.0,105.92,0.0,...|     1|
+--------------------+------+
only showing top 3 rows



In [None]:
splits = tdf.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [None]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='stroke', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0,0.0,0.0,0.0,0.0]
Intercept: 0.04802021903959562


In [None]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 0.213809
r2: -0.000000


Classification - https://colab.research.google.com/drive/1VKUg_-4zlgJpy1AhhDStx-q9oKkANPcC?usp=sharing


In [None]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_iris
import pandas as pd


iris = load_iris()
df_iris = pd.DataFrame(iris.data, columns=iris.feature_names)
df_iris['label'] = pd.Series(iris.target)
 
print(df_iris.head())

data = sqlContext.createDataFrame(df_iris)
print(data.printSchema())

features = iris.feature_names

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'label'])
va_df.show(3)

(train, test) = va_df.randomSplit([0.8, 0.2])

dtc = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dtc = dtc.fit(train)

pred = dtc.transform(test)
pred.show(3)

evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")
acc = evaluator.evaluate(pred)
print("Prediction Accuracy: ", acc)

y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  \
0                5.1               3.5                1.4               0.2   
1                4.9               3.0                1.4               0.2   
2                4.7               3.2                1.3               0.2   
3                4.6               3.1                1.5               0.2   
4                5.0               3.6                1.4               0.2   

   label  
0      0  
1      0  
2      0  
3      0  
4      0  
root
 |-- sepal length (cm): double (nullable = true)
 |-- sepal width (cm): double (nullable = true)
 |-- petal length (cm): double (nullable = true)
 |-- petal width (cm): double (nullable = true)
 |-- label: long (nullable = true)

None
+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|    0|
|[4.9,3.0,1.4,0.2]|    0|
|[4.7,3.2,1.3,0.2]|    0|
+-----------------+-----+
only showing top 3 rows

+-----------