## ***Módulo 1 - Utilización, procesamiento y visualiazción de grandes volúmenes de datos.***

### Mariana Castro Payns - A01706038

DataSet obtenido de: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews?select=Books_rating.csv

In [1]:
# Importar y establecer variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/mariana/spark/spark-3.2.2-bin-hadoop3.2"

In [2]:
# Buscar e inicializar instalación Spark
import findspark
findspark.init()
findspark.find()

'/home/mariana/spark/spark-3.2.2-bin-hadoop3.2'

In [3]:
# Inicializar sesión de Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.memory", "5g").appName('Books').getOrCreate()

22/11/02 22:55:20 WARN Utils: Your hostname, mariana-Victus resolves to a loopback address: 127.0.1.1; using 192.168.1.74 instead (on interface wlo1)
22/11/02 22:55:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/02 22:55:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Obtener App Name y Master como verificación de la sesión
print("APP Name :"+spark.sparkContext.appName)
print("Master :"+spark.sparkContext.master)

APP Name :Books
Master :local[*]


In [5]:
# Leer archivo e indicar el encabezado
df = spark.read.option('header', 'true').csv('file:///home/mariana/IA/Books_rating.csv', inferSchema=True)

                                                                                

In [6]:
# Mostrar el data frame cargado
df.show()

+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|        Id|               Title|Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text|
+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|1882931173|Its Only Art If I...| null| AVCGYZL8FQQTD|"Jim of Oz ""jim-...|               7/7|         4.0|  940636800|Nice collection o...|This is only for ...|
|0826414346|Dr. Seuss: Americ...| null|A30TK6U7DNS82R|       Kevin Killian|             10/10|         5.0| 1095724800|   Really Enjoyed It|I don't care much...|
|0826414346|Dr. Seuss: Americ...| null|A3UH4UZ4RSVO82|        John Granger|             10/11|         5.0| 1078790400|Essential for eve...|"If people become...|
|0826414346|Dr. Seuss: Ameri

In [7]:
# Imprimir el esquema de las variables
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/score: string (nullable = true)
 |-- review/time: string (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)



In [8]:
# Cambiar tipos de datos en columnas necesarias
from pyspark.sql.functions import column
df = df.withColumn("label", column("review/score").cast('int'))
df = df.withColumn("Price", column("price").cast('int'))

In [9]:
# Verificar tipos de dato
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- User_id: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/score: string (nullable = true)
 |-- review/time: string (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)
 |-- label: integer (nullable = true)



In [10]:
# Determinar valores nulos
from pyspark.sql.functions import when,lit,count,isnan,col
df.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in df.columns]).show()



+---+-----+-------+-------+-----------+------------------+------------+-----------+--------------+-----------+-----+
| Id|Title|  Price|User_id|profileName|review/helpfulness|review/score|review/time|review/summary|review/text|label|
+---+-----+-------+-------+-----------+------------------+------------+-----------+--------------+-----------+-----+
|  0|  208|2519269| 562250|     562250|               367|         130|         27|            65|         43|18064|
+---+-----+-------+-------+-----------+------------------+------------+-----------+--------------+-----------+-----+



                                                                                

In [11]:
# Eliminar valores nulos
df = df.na.drop()

In [12]:
# Seleccionar columnas relevantes
df = df.select("id", "review/summary", "label")

In [13]:
# Mostrar selección
df.show()

+----------+--------------------+-----+
|        id|      review/summary|label|
+----------+--------------------+-----+
|0829814000|Outstanding Resou...|    5|
|0829814000|Small Churches CA...|    5|
|0829814000|Not Just for Past...|    5|
|0829814000|Small church past...|    5|
|0595344550|            not good|    1|
|0595344550|  Here is my opinion|    4|
|0595344550|        Buyer beware|    1|
|0595344550| Fall on your knee's|    5|
|0595344550|      Bravo Veronica|    5|
|0595344550|           Wonderful|    5|
|0595344550|           Awesome !|    5|
|0595344550|      Glorious story|    5|
|0595344550|         Loved it !!|    5|
|0595344550|Five stars it not...|    5|
|0595344550|    A FIVE STAR BOOK|    5|
|0595344550|Whispers of the W...|    5|
|0595344550|Errors, but great...|    4|
|0595344550|          The Worst!|    1|
|0595344550|The truth about W...|    5|
|0595344550|How could someone...|    5|
+----------+--------------------+-----+
only showing top 20 rows



In [14]:
# Importar librerías necesarias
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Configuración de fases del Pipeline - Tokenizer, Hashing, Regresion Lineal
tokenizer = Tokenizer(inputCol="review/summary", outputCol="words")
hashing = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

In [15]:
lr = LinearRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashing, lr])

In [16]:
# Importar librerías necesarias
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator

# Construcción de un grid de parametros 
paramGrid = ParamGridBuilder()\
    .addGrid(hashing.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

In [17]:
# Importar librerías necesarias
from pyspark.ml.evaluation import RegressionEvaluator

# Configurar el validador Train-Validation
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 75% de los datos sera utilizado para el training.
                           trainRatio=0.75)

In [18]:
# Configurar el cross validator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=4)  # use 3+ folds in practice

In [19]:
# Dividir el data ser
train, test = df.randomSplit([0.75,0.25])

In [20]:
# Se ejecuta la validación cruzada para encontrar los mejores parámetros de configuración.
cvModel = crossval.fit(train)

22/11/02 22:55:55 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
22/11/02 22:56:22 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/11/02 22:56:22 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/11/02 22:56:45 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/11/02 22:56:45 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [21]:
# Obtener predicciones con validación cruzada
predictions = cvModel.transform(test)
sel = predictions.select("id", "label", "prediction")
for row in sel.collect():
    print(row)

                                                                                

Row(id='0027613909', label=5, prediction=4.611361222082167)
Row(id='0028642627', label=1, prediction=3.2553707919637875)
Row(id='0028642627', label=4, prediction=4.393261736646758)
Row(id='002864266X', label=5, prediction=3.757931715679891)
Row(id='002864266X', label=4, prediction=4.120680562930761)
Row(id='002864266X', label=5, prediction=4.897046952758258)
Row(id='002864266X', label=4, prediction=3.5382525899443347)
Row(id='002864266X', label=5, prediction=4.035337431693732)
Row(id='0030651786', label=5, prediction=4.174640856465918)
Row(id='0060226064', label=3, prediction=3.529083629237598)
Row(id='0060226064', label=5, prediction=4.5307841674561296)
Row(id='0060226064', label=5, prediction=4.140786258147981)
Row(id='0060513640', label=2, prediction=4.045083479974337)
Row(id='0060513640', label=3, prediction=4.808518751963748)
Row(id='0060513640', label=5, prediction=4.227341872078179)
Row(id='0060513640', label=5, prediction=4.52610984127078)
Row(id='0060513640', label=3, predicti

In [22]:
# Buscar la mejor configuración con el Train-Validation
tvsModel = tvs.fit(train)

                                                                                

In [23]:
# Realizar predicciones con los datos de test
tvsModel.transform(test).select("id", "label", "prediction").show()

[Stage 1096:>                                                       (0 + 1) / 1]

+----------+-----+------------------+
|        id|label|        prediction|
+----------+-----+------------------+
|0027613909|    5| 4.611361222082167|
|0028642627|    1|3.2553707919637875|
|0028642627|    4| 4.393261736646758|
|002864266X|    5| 3.757931715679891|
|002864266X|    4| 4.120680562930761|
|002864266X|    5| 4.897046952758258|
|002864266X|    4|3.5382525899443347|
|002864266X|    5| 4.035337431693732|
|0030651786|    5| 4.174640856465918|
|0060226064|    3| 3.529083629237598|
|0060226064|    5|4.5307841674561296|
|0060226064|    5| 4.140786258147981|
|0060513640|    2| 4.045083479974337|
|0060513640|    3| 4.808518751963748|
|0060513640|    5| 4.227341872078179|
|0060513640|    5|  4.52610984127078|
|0060513640|    3| 3.063250126591321|
|0060513640|    5|4.6520661413907165|
|0060513640|    5| 4.146479032294986|
|0060513640|    5| 3.395511714829313|
+----------+-----+------------------+
only showing top 20 rows



                                                                                