In [1]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import *

# Ingesta de datos de hdsf en dataframes

In [2]:
csv = '/user/jlondo97/datasets/articles1.csv'
df1 = spark.read.csv(csv,inferSchema=True,header=True)
csv = '/user/jlondo97/datasets/articles2.csv'
df2 = spark.read.csv(csv,inferSchema=True,header=True)
csv = '/user/jlondo97/datasets/articles3.csv'
df3 = spark.read.csv(csv,inferSchema=True,header=True)
# df1.show()
# df2.show()
# df3.show()

In [3]:
join_1_df_2 = df1.join(df2, on=['_c0', 'id', 'title', 'publication', 'author', 'date', 'year', 'month', 'url', 'content'], how='left_outer')
full_df = join_1_df_2.join(df3, on=['_c0', 'id', 'title', 'publication', 'author', 'date', 'year', 'month', 'url', 'content'], how='left_outer')
full_df = full_df.limit(100)
full_df.show()

+-----+-----+--------------------+--------------+--------------------+----------+------+-----+----+--------------------+
|  _c0|   id|               title|   publication|              author|      date|  year|month| url|             content|
+-----+-----+--------------------+--------------+--------------------+----------+------+-----+----+--------------------+
|10092|28828|Watch: Amazon Bos...|     Breitbart|         Nate Church|2017-03-21|2017.0|  3.0|null|At the MARS 2017 ...|
|10101|28837|Patriots Owner Ro...|     Breitbart|         Trent Baker|2017-02-03|2017.0|  2.0|null|”I remember who t...|
|10236|28972|Report: George So...|     Breitbart|         Aaron Klein|2017-01-23|2017.0|  1.0|null|Billionaire Georg...|
|10513|29249|Peter Schweizer: ...|     Breitbart|        John Hayward|2017-05-26|2017.0|  5.0|null|On Friday’s Breit...|
|10608|29344|Mexican Border St...|     Breitbart|   Cartel Chronicles|2017-02-19|2017.0|  2.0|null|PIEDRAS NEGRAS, C...|
|10646|29382|Maxine Waters: ’D..

# Limpieza del DataFrame
Creando un dataframe que contenga los contedidos de las publicaciones hechas y limpiando el contenido de caracteres especiales.

In [4]:
reg = '[^a-zA-Z ]'
reg1 = '[\s*]{1,}'

In [None]:
full_copy = full_df.withColumn("ltrimmed_word",ltrim(col("Content")))
full_df = full_copy.withColumn("clean", regexp_replace('ltrimmed_word', reg ,""))
full_df.select('ltrimmed_word','clean').show(100)

In [None]:
full_df = full_df.withColumn("clean1", regexp_replace('clean', reg1 ," "))
full_df.select('clean','clean1').show(100)

## Tokenización de los contenidos de las publicaciones
Creacion de un dataframe con el contenido de la publicacion tokenizado 

In [None]:
tokenization=Tokenizer(inputCol='clean1',outputCol='tokens')

In [None]:
tokenized_df = tokenization.transform(full_df)

In [None]:
tokenized_df.select('clean1','tokens').show()

## Eliminar stopWords
Eliminación de stopWord en la columna de contenido de las publicaciones, token tales como "I, and .or"

In [None]:
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')

In [None]:
refined_df=stopword_removal.transform(tokenized_df)

In [None]:
refined_df.select('clean1','tokens','refined_tokens').show(100)

## Vectorización del DataFrame

In [None]:
refined_df

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.types import IntegerType

In [None]:
len_udf = udf(lambda s: len(s), IntegerType())
refined_df = refined_df.withColumn("token_count", len_udf(col('refined_tokens')))

In [None]:
refined_df.select('clean1','tokens','refined_tokens','token_count').show(100)

## Agrupación de textos

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF
from pyspark.ml.clustering import LDA, KMeans

In [None]:
aux_df = refined_df
aux_df = aux_df.drop('publication', 'author', 'publication', 'content', 'date', 'year', 'month', 'url', 'clean', 'clean1', 'tokens')
aux_df.show(100)

In [None]:
fill = array().cast("array<string>")
tokens_a = when(col("refined_tokens").isNull(), fill).otherwise(col("refined_tokens"))
aux_df = aux_df.withColumn("refined_tokens", tokens_a)

In [None]:
hashingTF = HashingTF(numFeatures=2000, inputCol="refined_tokens", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
kmeans = KMeans(k=25)
# lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features")
pipeline = Pipeline(stages=[hashingTF, idf, kmeans])
# pipeline = Pipeline(stages=[hashingTF, idf, kmeans, lda])
model = pipeline.fit(aux_df)
results = model.transform(aux_df)
results.cache()

In [None]:
results.select('refined_tokens', 'features', 'prediction').show(100)


In [None]:
results.groupBy('prediction').count().show(100)
# results.withColumn("aux", print_columns("refined_tokens")) # .select("aux").show()
# results.filter("prediction = 13").select('refined_tokens', 'features', 'prediction').show(100)

In [None]:
# cv = CountVectorizer(inputCol="refined_tokens", outputCol="rawFeatures", vocabSize = 900)
# cvmodel = cv.fit(aux_df)
# featurizedData = cvmodel.transform(aux_df)
# vocab = cvmodel.vocabulary
# vocab_broadcast = sc.broadcast(vocab)
# idf = IDF(inputCol="rawFeatures", outputCol="features")
# idfModel = idf.fit(featurizedData)
# rescaledData = idfModel.transform(featurizedData)
# rescaledData.select("refined_tokens", "features").show(100)

In [None]:
# lda = LDA(k=25, seed=100, optimizer="em", featuresCol="features")
# ldamodel = lda.fit(rescaledData)

In [None]:
# ldamodel.isDistributed()
# ldamodel.vocabSize()

In [None]:
# ldatopics = ldamodel.describeTopics()
# ldatopics.show(25)

In [None]:
# results.select('refined_tokens', 'features', 'prediction').show(100, truncate = True) #.count()