<a href="https://colab.research.google.com/github/sasansharifipour/Spark_Class/blob/main/Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [3]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

In [4]:
training = spark.createDataFrame([
                                  (1.0, Vectors.dense([0.0, 1.1, 0.1])),
                                  (0.0, Vectors.dense([2.0, 1.0, -1.0])),
                                  (0.0, Vectors.dense([2.0, 1.3, 1.0])),
                                  (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

In [5]:
#Create a LogisticRegression instance, This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)

In [6]:
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

LogisticRegression parameters:
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The bou

In [7]:
model = lr.fit(training)

In [8]:
model

LogisticRegressionModel: uid=LogisticRegression_f08299e33604, numClasses=2, numFeatures=3

In [9]:
print("Model 1 was fit using parameters : ")
print(model.extractParamMap())

Model 1 was fit using parameters : 
{Param(parent='LogisticRegression_f08299e33604', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LogisticRegression_f08299e33604', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_f08299e33604', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto', Param(parent='LogisticRegression_f08299e33604', name='featuresCol', doc='features column name.'): 'features', Param(parent='LogisticRegression_f08299e33604', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LogisticRegression_f08299e33604', name='labelCol', doc='label column name.'): 'label', Param(parent='LogisticRegression_f08299e33604', name='maxIter', doc='max n

In [11]:
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 #Specify 1 param, overwriting the original maxIter
paramMap.update({lr.regParam:0.1, lr.threshold: 0.55}) # Specify multiple Param

In [12]:
paramMap2 = {lr.probabilityCol: "myProbability"} # change output column name
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)

In [13]:
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters : ")
print(model2.extractParamMap())

Model 2 was fit using parameters : 
{Param(parent='LogisticRegression_f08299e33604', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LogisticRegression_f08299e33604', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_f08299e33604', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto', Param(parent='LogisticRegression_f08299e33604', name='featuresCol', doc='features column name.'): 'features', Param(parent='LogisticRegression_f08299e33604', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LogisticRegression_f08299e33604', name='labelCol', doc='label column name.'): 'label', Param(parent='LogisticRegression_f08299e33604', name='maxIter', doc='max n

In [14]:
test = spark.createDataFrame([
                                  (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
                                  (0.0, Vectors.dense([3.0, 2.0, -0.1])),
                                  (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

In [15]:
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction").collect()

In [16]:
for row in result :
  print("features = %s, label = %s -> prob=%s, prediction=%s" 
        % (row.features, row.label, row.myProbability, row.prediction))

features = [-1.0,1.5,1.3], label = 1.0 -> prob=[0.05707304171033977,0.9429269582896603], prediction=1.0
features = [3.0,2.0,-0.1], label = 0.0 -> prob=[0.9238522311704088,0.07614776882959128], prediction=0.0
features = [0.0,2.2,-1.5], label = 1.0 -> prob=[0.10972776114779119,0.8902722388522087], prediction=1.0


In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

In [18]:
training = spark.createDataFrame([(0, "a b c d e spark", 1.0),
                                  (1, "b d", 0.0),
                                  (2, "spark f g h", 1.0),
                                  (3, "hadoop mapreduce", 0.0)], ["id", "text", "label"])

In [19]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [21]:
model = pipline.fit(training)

In [22]:
test = spark.createDataFrame([(4, "spark i j k"),
                                  (5, "l m n"),
                                  (6, "spark hadoop spark"),
                                  (7, "apache hadoop")], ["id", "text"])

In [23]:
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction" )

for row in selected.collect():
  rid, text, prob, prediction = row
  print(" (%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

 (4, spark i j k) --> prob=[0.15964077387874118,0.8403592261212589], prediction=1.000000
 (5, l m n) --> prob=[0.8378325685476612,0.16216743145233875], prediction=0.000000
 (6, spark hadoop spark) --> prob=[0.06926633132976273,0.9307336686702373], prediction=1.000000
 (7, apache hadoop) --> prob=[0.9821575333444208,0.01784246665557917], prediction=0.000000


In [24]:
spark.stop()