
# **Uso, procesamiento y visualización de grandes volúmenes de datos**

### **Configuración de entorno de trabajo en Colab para utilizar PySpark**

In [None]:
!pip install pyspark py4j

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=7db22affb3fdfa17e89faad62aa49d2b6f08ffbb25a52acfa44c66b8ac8a7699
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


Primero se crea un contexto de Spark, el cual es necesario para interactuar con el clúster de Spark y ejecutar las operaciones de procesamiento distribuido.

In [None]:
from pyspark import SparkContext
from pyspark.mllib.util import MLUtils

sc = SparkContext(appName="BigData")

### **Modelo de regresión**
#### Decision Tree

In [None]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

En esta sección, se carga un archivo de datos y se almacena en un Resilient Distributed Dataset (RDD). Cada registro del RDD es un LabeledPoint, que contiene una etiqueta y un conjunto de características. Los datos se dividen en conjuntos de entrenamiento y prueba, después se entrena un modelo de regresión de árbol de decisión utilizando los datos de entrenamiento.

Se realizan predicciones utilizando el modelo entrenado en el conjunto de prueba, y se calcula el Error Cuadrático Medio (MSE) para evaluar el rendimiento del modelo.

In [None]:
if __name__ == "__main__":

    # Carga y analisis del archivo de datos en un RDD de LabeledPoint
    data = MLUtils.loadLibSVMFile(sc, 'sample_libsvm_data.txt')

    # Division de los datos en conjuntos de entrenamiento y prueba
    (trainingData, testData) = data.randomSplit([0.7, 0.3])

    # Entrenamiento del modelo
    model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                        impurity='variance', maxDepth=5, maxBins=32)

    # Evaluacion del modelo en instancias de prueba y calculo del error de prueba
    predictions = model.predict(testData.map(lambda x: x.features))
    labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
    testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
                                      float(testData.count())

    print('Test Mean Squared Error = ' + str(testMSE))
    print('Learned regression tree model:')
    print(model.toDebugString())

    # Guarda y carga el modelo
    model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
    sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")

Test Mean Squared Error = 0.02857142857142857
Learned regression tree model:
DecisionTreeModel regressor of depth 1 with 3 nodes
  If (feature 434 <= 70.5)
   Predict: 0.0
  Else (feature 434 > 70.5)
   Predict: 1.0



### **Modelo de clasificación**
#### Random Forest

In [None]:
from pyspark.mllib.tree import RandomForest, RandomForestModel

Aquí se está entrenando un modelo de Random Forest para clasificación. De igual manera, se carga un conjunto de datos y se divide en conjuntos de entrenamiento y prueba. Se utiliza el algoritmo de Random Forest para crear un modelo de clasificación con ciertos parámetros, como el número de árboles, la estrategia de selección de características, la impureza, la profundidad máxima y el número máximo de divisiones de nodos.

Adicionalmente, se realizan predicciones en el conjunto de prueba utilizando el modelo de Random Forest y se calcula la tasa de error en el conjunto de prueba.

In [None]:
if __name__ == "__main__":

    # Carga y analisis del archivo de datos en un RDD de LabeledPoint
    data = MLUtils.loadLibSVMFile(sc, 'sample_libsvm_data.txt')

    # Division de los datos en conjuntos de entrenamiento y prueba
    (trainingData, testData) = data.randomSplit([0.7, 0.3])

    # Entrenamiento del modelo
    model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                         numTrees=3, featureSubsetStrategy="auto",
                                         impurity='gini', maxDepth=4, maxBins=32)

    # Evaluacion del modelo en instancias de prueba y calculo del error de prueba
    predictions = model.predict(testData.map(lambda x: x.features))
    labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
    testErr = labelsAndPredictions.filter(
        lambda lp: lp[0] != lp[1]).count() / float(testData.count())

    print('Test Error = ' + str(testErr))
    print('Learned classification forest model:')
    print(model.toDebugString())

    # Guarda y carga el modelo
    model.save(sc, "target/tmp/myRandomForestClassificationModel")
    sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestClassificationModel")

Test Error = 0.0
Learned classification forest model:
TreeEnsembleModel classifier with 3 trees

  Tree 0:
    If (feature 517 <= 56.5)
     If (feature 435 <= 39.0)
      Predict: 0.0
     Else (feature 435 > 39.0)
      Predict: 1.0
    Else (feature 517 > 56.5)
     Predict: 1.0
  Tree 1:
    If (feature 510 <= 20.5)
     If (feature 207 <= 2.0)
      Predict: 1.0
     Else (feature 207 > 2.0)
      If (feature 428 <= 69.0)
       Predict: 1.0
      Else (feature 428 > 69.0)
       Predict: 0.0
    Else (feature 510 > 20.5)
     Predict: 0.0
  Tree 2:
    If (feature 323 <= 104.5)
     If (feature 460 <= 46.5)
      Predict: 0.0
     Else (feature 460 > 46.5)
      If (feature 268 <= 9.5)
       Predict: 1.0
      Else (feature 268 > 9.5)
       Predict: 0.0
    Else (feature 323 > 104.5)
     If (feature 496 <= 1.0)
      Predict: 1.0
     Else (feature 496 > 1.0)
      Predict: 0.0



### **Agrupamiento**
#### K-means

In [None]:
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel

En esta última sección, se lleva a cabo el agrupamiento de datos utilizando el algoritmo K-means. Los datos se cargan desde un archivo de texto y se analizan. Cada línea del archivo se divide en una matriz de números que representan las características de cada punto de datos. Se construye el modelo K-means y posteriormente se evalúa el agrupamiento calculando la Suma de Errores Cuadrados Dentro del Conjunto (WSSSE), el cual mide la dispersión de los puntos dentro de los clústeres.

In [None]:
if __name__ == "__main__":

    # Carga y analisis de datos
    data = sc.textFile('kmeans_data.txt')
    parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

    # Construccion del modelo
    clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

    # Evaluacion del agrupamiento calculando la suma de errores cuadrados dentro del conjunto
    def error(point):
        center = clusters.centers[clusters.predict(point)]
        return sqrt(sum([x**2 for x in (point - center)]))

    WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    print("Within Set Sum of Squared Error = " + str(WSSSE))

    # Guarda y carga el modelo
    clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
    sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")

    # Detiene el contexto existente
    sc.stop()

Within Set Sum of Squared Error = 0.6928203230275529
