In [1]:
#Bibliotecas para poder trabajar con Spark
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.2.1//spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
#Configuración de Spark con Python
!pip install -q findspark
!pip install pyspark

#Estableciendo variable de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

#Buscando e inicializando la instalación de Spark
import findspark
findspark.init()
findspark.find()

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (185.1[0m[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (185.1[0m[33m0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to[0m                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
[33m0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com] [Connecting to[0m                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:6 http://security.ubuntu.com/ubuntu bionic-security InRele

'/content/spark-3.2.1-bin-hadoop3.2'

In [None]:
#Creación de la SparkSession para trabajar
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PySpark_prueba1').getOrCreate()

##Validación cruzada (cross-validation)

In [7]:
#Cargamos las bibliotecas necesarias para llevar acabo la validación cruzada
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


#Preparamos el conjunto de datos de entrenamiento en este caso cadenas de texto etiquetadas
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

training .show()


+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
|  4|     b spark who|  1.0|
|  5|         g d a y|  0.0|
|  6|       spark fly|  1.0|
|  7|   was mapreduce|  0.0|
|  8| e spark program|  1.0|
|  9|         a e c l|  0.0|
| 10|   spark compile|  1.0|
| 11| hadoop software|  0.0|
+---+----------------+-----+



## Creando un ML Pipeline

In [8]:
# Configuración de un ML pipeline, que consiste en 3 fases: tokenizer, hashingTF, y regresión logística.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)

#creación del Pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])



##Creación de la Grid de parámetros de configuración

In [9]:
#Se contruye un Grid de combinación de parámetros para cada esta del pipeline. 
#En este caso tres valores para hashingTF y 2 para la regresión logística.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

##Se configura el objeto de validación cruzada y se ejecuta

In [10]:
#Para crear la instancia de validación cruzada se toma el pipeline  y la Grid de parámetros.
#Dependiendo del problema que se tenga se escoge el evaluador para este caso en particular dado
#que es una regresión logística entonces es una clasificación binaria y aquí se coloca el número de
#folds en los que se dividirá los datos iniciales

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4)  # use 3+ folds in practice

# Se ejecuta la validación cruzada para encontrar los mejores parámetros de configuración.
cvModel = crossval.fit(training)

Prueba del mejor modelo con documentos sin etiquetar

In [11]:
# Se prepara documentos de prueba los cuales estan sin etiquetar.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Se realizan las predicciones. cvModel usa el mejor modelo de regresión logistica encontrado dentro de la Grid de parámetros. (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)

Row(id=4, text='spark i j k', probability=DenseVector([0.2665, 0.7335]), prediction=1.0)
Row(id=5, text='l m n', probability=DenseVector([0.9204, 0.0796]), prediction=0.0)
Row(id=6, text='mapreduce spark', probability=DenseVector([0.4438, 0.5562]), prediction=1.0)
Row(id=7, text='apache hadoop', probability=DenseVector([0.8587, 0.1413]), prediction=0.0)


#Train-Validation

Además del CrossValidator, Spark también ofrece TrainValidationSplit para el ajuste de hiperparámetros. TrainValidationSplit solo evalúa cada combinación de parámetros una vez, a diferencia de $k$ veces como CrossValidator. Por lo tanto, es menos costoso, pero no producirá resultados tan confiables cuando el conjunto de datos de entrenamiento no sea lo suficientemente grande. A diferencia de CrossValidator, TrainValidationSplit crea un único par de conjuntos de datos (entrenamiento, prueba). Divide el conjunto de datos en estas dos partes utilizando el parámetro *trainRatio*. Por ejemplo, con trainRatio=0.75, TrainValidationSplit generará un par de conjuntos de datos de prueba y entrenamiento donde el 75 % de los datos se usa para entrenamiento y el 25 % para validación.

In [5]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

#Cargamos el conjunto de datos inicial dentro de la misma sesión
data = spark.read.format("libsvm")\
    .load("spark-3.2.1-bin-hadoop3.2/data/mllib/sample_linear_regression_data.txt")
train, test = data.randomSplit([0.9, 0.1], seed=12345)

#Creamos el objeto base de regresión lineal
lr = LinearRegression(maxIter=10)

#Construimos la grid de búsqueda con los parámetros de configuración de la regresión lineal
# regParam, fitIntercept y elasticNetParam
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

#Como en el ejemplo anterior configuramos el validador
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Iniciamos la búsqueda de la mejor configuración
model = tvs.fit(train)

# Hacemos predicciones con los datos de prueba que no se han visto por el modelo.
model.transform(test)\
    .select("features", "label", "prediction")\
    .show()

+--------------------+--------------------+--------------------+
|            features|               label|          prediction|
+--------------------+--------------------+--------------------+
|(10,[0,1,2,3,4,5,...| -17.026492264209548|  -1.780062242348691|
|(10,[0,1,2,3,4,5,...|  -16.71909683360509| -0.1893325701092588|
|(10,[0,1,2,3,4,5,...| -15.375857723312297|  0.7252323736487188|
|(10,[0,1,2,3,4,5,...| -13.772441561702871|  3.2696413241677718|
|(10,[0,1,2,3,4,5,...| -13.039928064104615| 0.18817684046065775|
|(10,[0,1,2,3,4,5,...|   -9.42898793151394|  -3.449987079269568|
|(10,[0,1,2,3,4,5,...|    -9.2679651250406|-0.33109075490696316|
|(10,[0,1,2,3,4,5,...|  -9.173693798406978|-0.42727135281551937|
|(10,[0,1,2,3,4,5,...| -7.1500991588127265|   2.936884251408867|
|(10,[0,1,2,3,4,5,...|  -6.930603551528371|-0.02839768193150...|
|(10,[0,1,2,3,4,5,...|  -6.456944198081549| -0.9224776887934015|
|(10,[0,1,2,3,4,5,...| -3.2843694575334834| -1.0821208483033875|
|(10,[0,1,2,3,4,5,...|   