In [1]:
#Librerias
import sys
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from datetime import datetime

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: 9c29dd90-a814-4f5e-9cb9-3d149993f5cd
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session 9c29dd90-a814-4f5e-9cb9-3d149993f5cd to get into ready status...
Session 9c29dd90-a814-4f5e-9cb9-3d149993f5cd has been created.



In [2]:
#Inicio de Contexto
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "1")
spark.conf.set("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory")
logger = glueContext.get_logger()




In [2]:
#Configuracion
input_path = "s3://nequijmmarinq/processed/clean_data/"
model_output_path = "s3://nequijmmarinq/models/msg-classifier/"
version = "1.0"
trained = datetime.utcnow().isoformat()




In [4]:
#Leer los datos limpios 
logger.info("Leyendo datos limpios desde S3...")
df = spark.read.parquet(input_path)




In [5]:
df.schema

StructType([StructField('inbound', IntegerType(), True), StructField('created_at', StringType(), True), StructField('text', StringType(), True)])


In [6]:
#Mismas cantidad de los datos del job anterior
df.count()

2811776


In [7]:
df.show()

+-------+--------------------+--------------------+
|inbound|          created_at|                text|
+-------+--------------------+--------------------+
|      1|Thu Oct 12 03:25:...|promise promise h...|
|      0|Thu Oct 12 11:53:...|291022 absolutely...|
|      1|Thu Oct 12 11:50:...| 115911 i’m contract|
|      0|Thu Oct 12 11:57:...|308357 weve danci...|
|      1|Thu Oct 12 11:56:...|southwestair happ...|
|      0|Thu Oct 12 11:52:...|308358 pleasure s...|
|      1|Thu Oct 12 11:47:...|southwestair anot...|
|      0|Thu Oct 12 11:50:...|308359 cant wait ...|
|      1|Thu Oct 12 12:58:...|southwestair 3083...|
|      0|Thu Oct 12 12:59:...|308360 cant wait ...|
|      1|Thu Oct 12 11:47:...|pretty pumped wak...|
|      0|Thu Oct 12 11:50:...|308361 kerry yes ...|
|      1|Thu Oct 12 11:52:...|        southwestair|
|      1|Thu Oct 12 11:44:...|southwestair allo...|
|      0|Thu Oct 12 11:49:...|159315 yes—put pt...|
|      1|Thu Oct 12 11:40:...|southwestair anno...|
|      0|Thu

In [8]:
#Target
df = df.filter(F.col("inbound").isNotNull())




In [9]:
#Tokenizador, convertir palabras en vectores
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")




In [10]:
wordsData = tokenizer.transform(df)




In [11]:
wordsData.show()

+-------+--------------------+--------------------+--------------------+
|inbound|          created_at|                text|              tokens|
+-------+--------------------+--------------------+--------------------+
|      1|Thu Oct 12 03:25:...|promise promise h...|[promise, promise...|
|      0|Thu Oct 12 11:53:...|291022 absolutely...|[291022, absolute...|
|      1|Thu Oct 12 11:50:...| 115911 i’m contract|[115911, i’m, con...|
|      0|Thu Oct 12 11:57:...|308357 weve danci...|[308357, weve, da...|
|      1|Thu Oct 12 11:56:...|southwestair happ...|[southwestair, ha...|
|      0|Thu Oct 12 11:52:...|308358 pleasure s...|[308358, pleasure...|
|      1|Thu Oct 12 11:47:...|southwestair anot...|[southwestair, an...|
|      0|Thu Oct 12 11:50:...|308359 cant wait ...|[308359, cant, wa...|
|      1|Thu Oct 12 12:58:...|southwestair 3083...|[southwestair, 30...|
|      0|Thu Oct 12 12:59:...|308360 cant wait ...|[308360, cant, wa...|
|      1|Thu Oct 12 11:47:...|pretty pumped wak...|

In [12]:
#Hashing, proceso de convertir un vector en longitud fija
tf = HashingTF(inputCol="tokens", outputCol="rawFeatures", numFeatures=10000)




In [13]:
hashingTF_model = tf.transform(wordsData)




In [14]:
hashingTF_model.show()

+-------+--------------------+--------------------+--------------------+--------------------+
|inbound|          created_at|                text|              tokens|         rawFeatures|
+-------+--------------------+--------------------+--------------------+--------------------+
|      1|Thu Oct 12 03:25:...|promise promise h...|[promise, promise...|(10000,[1813,3855...|
|      0|Thu Oct 12 11:53:...|291022 absolutely...|[291022, absolute...|(10000,[157,1280,...|
|      1|Thu Oct 12 11:50:...| 115911 i’m contract|[115911, i’m, con...|(10000,[1125,2628...|
|      0|Thu Oct 12 11:57:...|308357 weve danci...|[308357, weve, da...|(10000,[1071,2662...|
|      1|Thu Oct 12 11:56:...|southwestair happ...|[southwestair, ha...|(10000,[1259,1690...|
|      0|Thu Oct 12 11:52:...|308358 pleasure s...|[308358, pleasure...|(10000,[3221,6117...|
|      1|Thu Oct 12 11:47:...|southwestair anot...|[southwestair, an...|(10000,[448,2144,...|
|      0|Thu Oct 12 11:50:...|308359 cant wait ...|[308359, 

In [15]:
#Calculo de frecuencias en las palabras
idf = IDF(inputCol="rawFeatures", outputCol="features").fit(hashingTF_model)




In [16]:
tfidf = idf.transform(hashingTF_model)




In [17]:
tfidf.show()

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|inbound|          created_at|                text|              tokens|         rawFeatures|            features|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      1|Thu Oct 12 03:25:...|promise promise h...|[promise, promise...|(10000,[1813,3855...|(10000,[1813,3855...|
|      0|Thu Oct 12 11:53:...|291022 absolutely...|[291022, absolute...|(10000,[157,1280,...|(10000,[157,1280,...|
|      1|Thu Oct 12 11:50:...| 115911 i’m contract|[115911, i’m, con...|(10000,[1125,2628...|(10000,[1125,2628...|
|      0|Thu Oct 12 11:57:...|308357 weve danci...|[308357, weve, da...|(10000,[1071,2662...|(10000,[1071,2662...|
|      1|Thu Oct 12 11:56:...|southwestair happ...|[southwestair, ha...|(10000,[1259,1690...|(10000,[1259,1690...|
|      0|Thu Oct 12 11:52:...|308358 pleasure s...|[308358, pleasure...|(10000,[

In [18]:
tfidf.select("inbound", "rawFeatures","features").show(truncate=False)

+-------+------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|inbound|rawFeatures                                                                                                                                     |features                                                                                                                                                                                                                                                                                                                                         

In [1]:
#Estas relaciones son las que entran al modelo, al pipeline 

In [19]:
#Se propone una regresion logistica 
lr = LogisticRegression(featuresCol="features", labelCol="inbound", maxIter=20)




In [20]:
#Ahora con todos esos componentes se propone el pipeline completo 
pipeline = Pipeline(stages=[tokenizer, tf, idf, lr])




In [21]:
#Dividir el dataset en un 80% train y 20% para la evaluación 
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)




In [22]:
#Entrenamiento 
logger.info("Entrenando modelo...")
model = pipeline.fit(train_df)




In [23]:
logger.info("Evaluando modelo...")
predictions = model.transform(test_df)
evaluator = BinaryClassificationEvaluator(labelCol="inbound", rawPredictionCol="rawPrediction")
auc = evaluator.evaluate(predictions)
logger.info(f"AUC en test set: {auc}")




In [1]:
#logger.info("Guardando modelo entrenado en S3...")
#model.write().overwrite().save(model_output_path)
#logger.info("Entrenamiento completado y modelo guardado.")

In [25]:
test_df = spark.createDataFrame([("test",)], ["col"])
test_df.write.mode("overwrite").parquet("s3://nequijmmarinq/models/test-write/")


