## Carga de Librerías

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col, expr, when, concat, lit, isnan,struct
import pyspark
from pyspark.sql import SQLContext
import pyspark.sql.functions as F

In [2]:
#Al ejecutar esta celda se demora un poco, así que un poco de paciencia
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local', "Articles_topic_model") 
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

## Carga de Datos de Artículos

In [3]:
dfArticles=spark.read.csv("file:///home/ubuntu/3ra_entrega_PI/datasets/articles.csv", inferSchema=True, header=True, encoding="UTF-8")
dfArticles.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|          identifier|               title|         description|             subject|             creator|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|http://arxiv.org/...|Smooth R\'enyi En...|  We prove that t...|Quantum Physics ;...|Schoenmakers, Ber...|
|http://arxiv.org/...|Analyzing Design ...|  In the field of...|Computer Science ...|Brust, Matthias R...|
|http://arxiv.org/...|Colour image segm...|  We propose a ne...|Computer Science ...|Kay, David A ; To...|
|http://arxiv.org/...|Unequal Error Pro...|  An information ...|Computer Science ...|Borade, Shashi ; ...|
|http://arxiv.org/...|On the hitting ti...|  In this paper w...|Quantum Physics ;...|Magniez, Frederic...|
|http://arxiv.org/...|Coding Theory and...|  This chapter in...|Mathematics - Com...|   Huber, Michael ; |
|http://arxiv.org/...|Generating Rand

In [4]:
# Conteo de las palabras iniciales que están en el título y en la descripcion de cada artículo

def countWords(record):
    textTitle  = record[1]
    textDescription = record[2]
    textCombined = textTitle + " " + textDescription
    words = textCombined.split()
    longitudTexto=len(words)
    return longitudTexto

udf_countWords = udf(countWords, IntegerType())
dfcountWords = dfArticles.withColumn("countWords", udf_countWords(struct([dfArticles[x] for x in dfArticles.columns])))

In [5]:
#Total de palabras del título y de la descripción de los artículos
totalWords = dfcountWords.agg(F.sum("countWords")).collect()
totalWords

[Row(sum(countWords)=146184)]

In [6]:
#Selección de las columnas relevantes

dfMainCols= dfArticles.select('identifier','title','description')
dfMainCols.show(10)

+--------------------+--------------------+--------------------+
|          identifier|               title|         description|
+--------------------+--------------------+--------------------+
|http://arxiv.org/...|Smooth R\'enyi En...|  We prove that t...|
|http://arxiv.org/...|Analyzing Design ...|  In the field of...|
|http://arxiv.org/...|Colour image segm...|  We propose a ne...|
|http://arxiv.org/...|Unequal Error Pro...|  An information ...|
|http://arxiv.org/...|On the hitting ti...|  In this paper w...|
|http://arxiv.org/...|Coding Theory and...|  This chapter in...|
|http://arxiv.org/...|Generating Random...|  Random graph ge...|
|http://arxiv.org/...|Variations on a t...|  Schalkwijk and ...|
|http://arxiv.org/...|Rotation Distance...|  Rotation distan...|
|http://arxiv.org/...|A Linear-Time App...|  Rotation distan...|
+--------------------+--------------------+--------------------+
only showing top 10 rows



In [7]:
#def cleanup_filename(record):
#    textURL  = record[0]
#    longURL =len(textURL)
#    textFile =textURL[longURL-9:longURL]
#    return textFile
#udf_cleanfilename = udf(cleanup_filename, ArrayType(StringType()))
#dfCleanFile = dfMainCols.withColumn("File", udf_cleanfilename(struct([dfMainCols[x] for x in dfMainCols.columns])))
#dfCleanFile.show(10)

In [8]:
#textfile="http://arxiv.org/abs/0704.3504"
#longURL=len(textfile)
#tfile=textfile[longURL-9:longURL]
#print(tfile)

In [9]:
from pyspark.ml.feature import IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA, BisectingKMeans
#from pyspark.sql.functions import monotonically_increasing_id
import re

In [54]:
import nltk
import pandas as pd
import numpy as np
#import codecs
#import matplotlib.pyplot as plt
from nltk.stem.porter import PorterStemmer

nltk.download(['stopwords'])
from nltk.corpus import stopwords

[nltk_data] Downloading package stopwords to /home/ubuntu/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [55]:
# Setting up list of Stopwords
stopWords1 = stopwords.words('english')

stopwords2 = [word.lower() for word in stopWords1] 

#Stemmer
ps = PorterStemmer()

def cleanup_text(record):
    textTitle  = record[1]
    textDescription = record[2]
    # Remove special characters
    textCombined = textTitle + " " + textDescription
    textCombined = re.sub('(-|-|\u2212|\u2012|\u2013|\u2014|\u2015)\n','',textCombined)
    textCombined = re.sub('(-|-|\u2212|\u2012|\u2013|\u2014|\u2015)',' ',textCombined)
    textCombined = re.sub('[^a-zA-Z]',' ',textCombined) 
    words = textCombined.split()
    # Remove stopwords and words under 3 length
    tokens3 = [word.lower() for word in words if len(word)>3 and word.lower() not in stopwords2] 
    text_out = [ps.stem(w) for w in tokens3]
    return text_out

In [56]:
udf_tokensCombined = udf(cleanup_text, ArrayType(StringType()))
dfCombined = dfMainCols.withColumn("tokensCombined", udf_tokensCombined(struct([dfMainCols[x] for x in dfMainCols.columns])))   

In [57]:
dfCombined.show(10)

+--------------------+--------------------+--------------------+--------------------+
|          identifier|               title|         description|      tokensCombined|
+--------------------+--------------------+--------------------+--------------------+
|http://arxiv.org/...|Smooth R\'enyi En...|  We prove that t...|[smooth, enyi, en...|
|http://arxiv.org/...|Analyzing Design ...|  In the field of...|[analyz, design, ...|
|http://arxiv.org/...|Colour image segm...|  We propose a ne...|[colour, imag, se...|
|http://arxiv.org/...|Unequal Error Pro...|  An information ...|[unequ, error, pr...|
|http://arxiv.org/...|On the hitting ti...|  In this paper w...|[hit, time, quant...|
|http://arxiv.org/...|Coding Theory and...|  This chapter in...|[code, theori, al...|
|http://arxiv.org/...|Generating Random...|  Random graph ge...|[gener, random, n...|
|http://arxiv.org/...|Variations on a t...|  Schalkwijk and ...|[variat, theme, s...|
|http://arxiv.org/...|Rotation Distance...|  Rotation 

In [58]:
def countTokens(record):
    listTokens = record[3]
    #words = textTitle.split(",")
    numTokens=len(listTokens)
    return numTokens

udf_countTokens = udf(countTokens, IntegerType())
dfcountTokens = dfCombined.withColumn("countTokens", udf_countTokens(struct([dfCombined[x] for x in dfCombined.columns])))

In [59]:
dfcountTokens.show()

+--------------------+--------------------+--------------------+--------------------+-----------+
|          identifier|               title|         description|      tokensCombined|countTokens|
+--------------------+--------------------+--------------------+--------------------+-----------+
|http://arxiv.org/...|Smooth R\'enyi En...|  We prove that t...|[smooth, enyi, en...|         33|
|http://arxiv.org/...|Analyzing Design ...|  In the field of...|[analyz, design, ...|         95|
|http://arxiv.org/...|Colour image segm...|  We propose a ne...|[colour, imag, se...|         66|
|http://arxiv.org/...|Unequal Error Pro...|  An information ...|[unequ, error, pr...|         69|
|http://arxiv.org/...|On the hitting ti...|  In this paper w...|[hit, time, quant...|        113|
|http://arxiv.org/...|Coding Theory and...|  This chapter in...|[code, theori, al...|         61|
|http://arxiv.org/...|Generating Random...|  Random graph ge...|[gener, random, n...|        104|
|http://arxiv.org/..

In [60]:
totalTokens = dfcountTokens.agg(F.sum("countTokens")).collect()
totalTokens

[Row(sum(countTokens)=84576)]

In [61]:
dfvocab=dfcountTokens.select("tokensCombined").toPandas()

In [93]:
vocabArticles = set()
fil,col=dfvocab.shape
#print("Dimensiones de filas {} y de columnas {} de dataframe".format(fil,col))
#in workVoc.iloc[:,0]:
for i in range(fil): 
    vocabArticles = vocabArticles.union(set(dfvocab.iloc[i,0]))
len(vocabArticles)

5483

In [47]:
# Pendiente Hacer el conteo de palabras que quedaron.
# Implementar SVD para reducción de la dimensionalidad.

In [48]:
#def setVocab (record):
#    listTokens = record[3]
#    #words = textTitle.split(",")
#    numTokens=len(listTokens)
#    return numTokens

#udf_setVocab = udf(setVocab, IntegerType())
#dfsetVocab = dfcountTokens.withColumn("setVocab", udf_setVocab(struct([dfcountTokens[x] for x in dfcountTokens.columns])))

In [94]:
# Term Frequency Vectorization: 
cvTokensCombined = CountVectorizer(inputCol="tokensCombined", outputCol="rawFeatures", minDF = 3, vocabSize = 4000)
cvmodel = cvTokensCombined.fit(dfCombined)
featurizedData = cvmodel.transform(dfCombined)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)

#Longitud del Vocabulario del BoW
len(cvmodel.vocabulary)


2252

In [95]:
#featurizedData.select("rawFeatures").toPandas().to_csv("featurizedData.csv", header=True)
dfFeatures=featurizedData.toPandas()

In [96]:
dfFeatures.iloc[0,4]

SparseVector(2252, {23: 4.0, 27: 1.0, 48: 1.0, 88: 4.0, 114: 2.0, 143: 1.0, 199: 1.0, 268: 4.0, 373: 5.0, 429: 1.0, 644: 2.0, 1046: 2.0, 1128: 1.0, 1279: 1.0, 1602: 1.0, 1693: 1.0})

In [97]:
#Calculating IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [98]:
rescaledData.select('identifier','title','description','features').show()

+--------------------+--------------------+--------------------+--------------------+
|          identifier|               title|         description|            features|
+--------------------+--------------------+--------------------+--------------------+
|http://arxiv.org/...|Smooth R\'enyi En...|  We prove that t...|(2252,[23,27,48,8...|
|http://arxiv.org/...|Analyzing Design ...|  In the field of...|(2252,[1,9,11,12,...|
|http://arxiv.org/...|Colour image segm...|  We propose a ne...|(2252,[1,2,3,9,12...|
|http://arxiv.org/...|Unequal Error Pro...|  An information ...|(2252,[9,19,20,21...|
|http://arxiv.org/...|On the hitting ti...|  In this paper w...|(2252,[0,1,3,4,8,...|
|http://arxiv.org/...|Coding Theory and...|  This chapter in...|(2252,[1,9,16,29,...|
|http://arxiv.org/...|Generating Random...|  Random graph ge...|(2252,[0,1,2,3,4,...|
|http://arxiv.org/...|Variations on a t...|  Schalkwijk and ...|(2252,[3,9,20,28,...|
|http://arxiv.org/...|Rotation Distance...|  Rotation 

In [99]:
# Generate 20 Data-Driven Topics:
lda = LDA(k=20, seed=123, optimizer="em", featuresCol="features")

ldamodel = lda.fit(rescaledData)

#model.isDistributed()
#model.vocabSize()

ldatopics = ldamodel.describeTopics()

In [100]:
ldaResults = ldamodel.transform(rescaledData)

ldaResults.select('identifier','title','tokensCombined','features','topicDistribution').show(40)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|          identifier|               title|      tokensCombined|            features|   topicDistribution|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|http://arxiv.org/...|Smooth R\'enyi En...|[smooth, enyi, en...|(2252,[23,27,48,8...|[0.02562707864610...|
|http://arxiv.org/...|Analyzing Design ...|[analyz, design, ...|(2252,[1,9,11,12,...|[0.01993572074744...|
|http://arxiv.org/...|Colour image segm...|[colour, imag, se...|(2252,[1,2,3,9,12...|[0.03759360230722...|
|http://arxiv.org/...|Unequal Error Pro...|[unequ, error, pr...|(2252,[9,19,20,21...|[0.04054565049418...|
|http://arxiv.org/...|On the hitting ti...|[hit, time, quant...|(2252,[0,1,3,4,8,...|[0.01990665797540...|
|http://arxiv.org/...|Coding Theory and...|[code, theori, al...|(2252,[1,9,16,29,...|[0.02439916890143...|
|http://arxiv.org/...|Generating Rand

In [101]:
#featurizedData.take(10)

In [102]:
def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))
ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.topic_desc).show(20,False)

+-----+--------------------------------------------------------------------------------------+
|topic|topic_desc                                                                            |
+-----+--------------------------------------------------------------------------------------+
|0    |[mathbb, finit, size, induc, length, subgraph, sequenc, theorem, width, fix]          |
|1    |[learn, task, classif, train, method, statist, fit, data, decis, model]               |
|2    |[network, node, measur, small, world, navig, encod, estim, transform, grid]           |
|3    |[game, strategi, player, action, equilibrium, seed, stabil, resourc, period, nash]    |
|4    |[nois, align, imag, track, filter, particl, risk, block, admm, counter]               |
|5    |[pattern, model, social, infer, rank, spread, predict, chang, influenc, network]      |
|6    |[graph, tree, edg, planar, spectral, vertic, match, bound, problem, cluster]          |
|7    |[color, delta, graph, frac, point, equat, b

In [103]:
ldatopics_mapped.select('topic','topic_desc').toPandas().to_csv("lda_topics_articles.csv", header=True)

In [104]:
from pyspark.sql.types import IntegerType

def asignTopic(topicDistribution):
    topic = topicDistribution[0]
    index_topic = 0
    for index in range(len(topicDistribution)):
        if (topicDistribution[index]>topic):
            topic=topicDistribution[index]
            index_topic=index
    return index_topic

udf_asignTopic = udf(asignTopic, IntegerType())
dfMainTopic = ldaResults.withColumn("mainTopic", udf_asignTopic(ldaResults.topicDistribution))

In [109]:
dfMainTopic.columns

['identifier',
 'title',
 'description',
 'tokensCombined',
 'rawFeatures',
 'features',
 'topicDistribution',
 'mainTopic']

In [106]:
dfMainTopic.select('identifier','title','tokensCombined','mainTopic').toPandas().to_csv("ldaresults_articles.csv", header=True)

## Agrupamiento

In [None]:
from numpy import array
from math import sqrt
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans

In [None]:
rescaledData.show(10)

In [None]:
#Normalizando los datos calculando el TF-IDF, penalizando
normalizer = Normalizer(inputCol="features", outputCol="normFeatures")
l2NormData = normalizer.transform(rescaledData)

#Entrenando el modelo de K-means
kClusters=20
kmparmeter = KMeans().setK(kClusters).setMaxIter(20)
km_model = kmparmeter.fit(l2NormData)

In [None]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix

mat = RowMatrix(rescaledData.select("features"))
# Compute the top 5 singular values and corresponding singular vectors.

svd = mat.computeSVD(5, computeU=True)
U = svd.U # The U factor is a RowMatrix.
s = svd.s # The singular values are stored in a local dense vector.
V = svd.V # The V factor is a local dense matrix.
## $example off$
collected = U.rows.collect()
print("U factor is:")
for vector in collected:
    print(vector)
    print("Singular values are: %s" % s)
    print("V factor is:\n%s" % V)
sc.stop()

In [None]:
clustersTable = km_model.transform(l2NormData)

In [None]:
clustersTable.toPandas().to_csv("clustertable_articles.csv", header=True)

In [None]:
clustersTable.columns

In [None]:
clustersTable.select('identifier','title','tokensCombined','normFeatures','prediction').show(40)

In [None]:
clustersTable.registerTempTable("Clusters")
df2 = sqlContext.sql("select prediction from Clusters group by prediction")
df2.show()