In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id, col, expr, when, concat, lit, isnan
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
import pyspark

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
spark

In [5]:
df=spark.read.csv("news.csv", inferSchema=True, header=True)

In [6]:
df.show(10)

+---+-------+--------------------+--------------+--------------------+----------+------+-----+----+--------------------+
| id|id_news|               title|   publication|              author|      date|  year|month| url|             content|
+---+-------+--------------------+--------------+--------------------+----------+------+-----+----+--------------------+
|  0|  17283|House Republicans...|New York Times|          Carl Hulse|2016-12-31|2016.0| 12.0|null|WASHINGTON  —   C...|
|  1|  17284|Rift Between Offi...|New York Times|Benjamin Mueller ...|2017-06-19|2017.0|  6.0|null|After the bullet ...|
|  2|  17285|Tyrus Wong, ‘Bamb...|New York Times|        Margalit Fox|2017-01-06|2017.0|  1.0|null|When Walt Disney’...|
|  3|  17286|Among Deaths in 2...|New York Times|    William McDonald|2017-04-10|2017.0|  4.0|null|Death may be the ...|
|  4|  17287|Kim Jong-un Says ...|New York Times|       Choe Sang-Hun|2017-01-02|2017.0|  1.0|null|SEOUL, South Kore...|
|  5|  17288|Sick With a Cold,..

In [7]:
df.createOrReplaceTempView("data")

In [8]:
sqlDF = spark.sql("SELECT * FROM data")

In [9]:
import nltk
import pandas as pd
import numpy as np
import re
import codecs

In [10]:
from nltk.corpus import stopwords
 
stop_words_nltk = set(stopwords.words('english'))

In [11]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA, BisectingKMeans
from pyspark.sql.functions import monotonically_increasing_id
import re

In [13]:
#df.write.parquet("csv_to_paraquet_topics")
df_1 = spark.read.option("header","true").parquet("csv_to_paraquet_topics")

In [14]:
df_1.printSchema()

root
 |-- id: string (nullable = true)
 |-- id_news: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publication: string (nullable = true)
 |-- author: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- url: string (nullable = true)
 |-- content: string (nullable = true)



In [15]:
df_1= df_1.fillna({'content': ''})

In [16]:
df_1 = df_1.withColumn("uid", monotonically_increasing_id())     # Create Unique ID


In [17]:
df_1.show(10)

+-----+-------+--------------------+-----------+-------------+----------+------+-----+----+--------------------+---+
|   id|id_news|               title|publication|       author|      date|  year|month| url|             content|uid|
+-----+-------+--------------------+-----------+-------------+----------+------+-----+----+--------------------+---+
|41272|  60149|Dashcam video app...|        CNN| Kevin Conlon|2016-12-30|2016.0| 12.0|null| (CNN) A newly re...|  0|
|41273|  60150|Syria civil war: ...|        CNN| Angela Dewan|2016-12-29|2016.0| 12.0|null|Istanbul (CNN)  S...|  1|
|41274|  60151|Ex-cop’s retrial ...|        CNN|         null|2016-12-30|2016.0| 12.0|null| (CNN) A former p...|  2|
|41275|  60152|Israel risks slid...|        CNN|         null|2016-12-30|2016.0| 12.0|null| (CNN) Israel, an...|  3|
|41276|  60153|Does Melania Trum...|        CNN|         null|2016-12-29|2016.0| 12.0|null| (CNN) During the...|  4|
|41277|  60154|Conway wonders if...|        CNN|Daniella Diaz|20

In [18]:
for type in df_1.dtypes:
    print(type)

('id', 'string')
('id_news', 'string')
('title', 'string')
('publication', 'string')
('author', 'string')
('date', 'string')
('year', 'string')
('month', 'string')
('url', 'string')
('content', 'string')
('uid', 'bigint')


In [19]:
def cleanup_text(record):
    content   = record[9]
    uid   = record[10]
    words = content.split()
    # Default list of Stopwords
    stopwords_core = ['a', u'about', u'above', u'after', u'again', u'against', u'all', u'am', u'an', u'and', u'any', u'are', u'arent', u'as', u'at', 
    u'be', u'because', u'been', u'before', u'being', u'below', u'between', u'both', u'but', u'by', 
    u'can', 'cant', 'come', u'could', 'couldnt', 
    u'd', u'did', u'didn', u'do', u'does', u'doesnt', u'doing', u'dont', u'down', u'during', 
    u'each', 
    u'few', 'finally', u'for', u'from', u'further', 
    u'had', u'hadnt', u'has', u'hasnt', u'have', u'havent', u'having', u'he', u'her', u'here', u'hers', u'herself', u'him', u'himself', u'his', u'how', 
    u'i', u'if', u'in', u'into', u'is', u'isnt', u'it', u'its', u'itself', 
    u'just', 
    u'll', 
    u'm', u'me', u'might', u'more', u'most', u'must', u'my', u'myself', 
    u'no', u'nor', u'not', u'now', 
    u'o', u'of', u'off', u'on', u'once', u'only', u'or', u'other', u'our', u'ours', u'ourselves', u'out', u'over', u'own', 
    u'r', u're', 
    u's', 'said', u'same', u'she', u'should', u'shouldnt', u'so', u'some', u'such', 
    u't', u'than', u'that', 'thats', u'the', u'their', u'theirs', u'them', u'themselves', u'then', u'there', u'these', u'they', u'this', u'those', u'through', u'to', u'too', 
    u'under', u'until', u'up', 
    u'very', 
    u'was', u'wasnt', u'we', u'were', u'werent', u'what', u'when', u'where', u'which', u'while', u'who', u'whom', u'why', u'will', u'with', u'wont', u'would', 
    u'y', u'you', u'your', u'yours', u'yourself', u'yourselves']
    
    # Custom List of Stopwords - Add your own here
    stopwords_custom = ['']
    stopwords = stopwords_core + stopwords_custom
    stopwords = [word.lower() for word in stopwords]    
    
    text_out = [re.sub('[^a-zA-Z0-9]','',word) for word in words]                                       # Remove special characters
    text_out = [word.lower() for word in text_out if len(word)>2 and word.lower() not in stopwords]     # Remove stopwords and words under X length
    return text_out

In [20]:
udf_cleantext = udf(cleanup_text , ArrayType(StringType()))
clean_text = df_1.withColumn("words", udf_cleantext(struct([df_1[x] for x in df_1.columns])))

In [34]:
lda = LDA(k=10, seed=123, optimizer="em", featuresCol="features")

ldamodel = lda.fit(rescaledData)

ldatopics = ldamodel.describeTopics()

def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))
ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.topic_desc).show(50,False)

VBox()

VBox()

+-----+------------------------------------------------------------------------------------------+
|topic|topic_desc                                                                                |
+-----+------------------------------------------------------------------------------------------+
|0    |[trump, percent, people, one, new, says, clinton, like, health, president]                |
|1    |[trump, isis, president, clinton, state, people, police, states, government, syria]       |
|2    |[percent, trump, billion, company, new, tax, apple, people, china, companies]             |
|3    |[trump, people, police, students, says, one, new, school, like, president]                |
|4    |[trump, north, people, says, one, korea, like, president, new, clinton]                   |
|5    |[trump, people, says, one, like, president, new, time, think, also]                       |
|6    |[trump, says, people, one, new, police, like, percent, gun, clinton]                      |
|7    |[tr