### Инициализация

In [None]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

sp_conf = SparkConf() \
    .setAppNaame('NewProLab Practice #1') \
    .set('spark.executor.memory', '2g') \
    .set('spark.executor.cores', 1) \
    .set('spark.executor.instances', 8)

spark = SparkSession.builder \
    .config(conf=sp_conf) \
    .getOrCreate()

### Загружаем данные в память

In [None]:
df = spark.read.json('npl_news.json') \
    .repartition(16) \
    .cache()

df.printSchema()
print('Загружено новостных записей: %d' % df.count())

### Чистка текста
Удаляем знаки препинания и числа, оставляем только последовательности букв английского алфавита

In [None]:
clustering_col = 'short_description'
clustering_col = 'headline'

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
import re

cleaner = F.udf(lambda s: re.sub(r'[^a-z ]', '', s), StringType())

df = df \
    .filter(F.length(clustering_col) > 0) \
    .withColumn(
        clustering_col, cleaner(F.lower(F.col(clustering_col)))
    )

df.select(clustering_col) \
    .show(1, truncate=False)

### Токенизация

In [None]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

tokenizer = Tokenizer(
    inputCol=clustering_col,
    outputCol=clustering_col + '_tokens'
)

rtokenizer = RegexTokenizer(
    inputCol=clustering_col,
    outputCol=clustering_col + '_tokens',
    minTokenLength=1,  # minimum token length (>= 0)
    gaps=True,         # whether regex splits on gaps (True) or matches tokens (False)
    pattern='\s+',     # regex pattern (Java dialect) used for tokenizing
    toLowercase=True   # whether to convert all characters to lowercase before tokenizing
)

tokenized_df = tokenizer.transform(df).cache()
# tokenized_df = rtokenizer.transform(df).cache()

tokenized_df.select(F.col(clustering_col + '_tokens')) \
    .show(1, truncate=False)

### Удаление стоп-слов

In [None]:
tokenized_df \
    .withColumn('token', F.explode(clustering_col + '_tokens')) \
    .groupBy('token') \
    .count() \
    .orderBy('count', ascending=False) \
    .show(100)

In [None]:
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

In [None]:
from pyspark.ml.feature import StopWordsRemover

swr = StopWordsRemover(
    inputCol=clustering_col + '_tokens', 
    outputCol=clustering_col + '_clean',  
    stopWords=stopwords.words('english') + [
        'trump'
    ],
    caseSensitive=False
)

clean_df = swr.transform(tokenized_df)

clean_df.select(clustering_col + '_clean') \
    .show(1, truncate=False)

In [None]:
clean_df \
    .withColumn('token', F.explode(clustering_col + '_clean')) \
    .groupBy('token') \
    .count() \
    .orderBy('count', ascending=False) \
    .show(100)

## Векторизация

In [None]:
vocab_size = clean_df \
    .withColumn('token', F.explode(clustering_col + '_clean')) \
    .distinct() \
    .count()

print('Размер словаря: %d' % vocab_size)

#### One-Hot Encoding

In [None]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(
        inputCol=clustering_col + '_clean',
        outputCol='features',
        vocabSize=vocab_size,
        minDF=0,
        binary=False
    )

cv_model = cv.fit(clean_df)

vectorized_df = cv_model.transform(clean_df).cache()

#### TF-IDF

In [None]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(
    inputCol=clustering_col + '_clean',
    outputCol="rawFeatures",
    numFeatures=vocab_size,
    binary=False
)

tf_features = hashingTF.transform(clean_df)

idf = IDF(
    inputCol="rawFeatures",
    outputCol="features",
    minDocFreq=0
)

idfModel = idf.fit(tf_features)

vectorized_df = idfModel \
    .transform(tf_features).cache()

#### word2vec

In [None]:
from pyspark.ml.feature import Word2Vec
print(Word2Vec().explainParams())

In [None]:
from pyspark.ml.feature import Word2Vec

w2v = Word2Vec(
    inputCol=clustering_col + '_clean',
    outputCol='features',
    vectorSize=32,
    numPartitions=4,
    minCount=1,
    stepSize=0.025,
    maxIter=5,
    windowSize=4,
    maxSentenceLength=36
)

w2v_model = w2v.fit(clean_df)
vectorized_df = w2v_model.transform(clean_df).cache()

In [None]:
w2v_model.findSynonyms('president', 10).show()

#### Смротрим результат

In [None]:
vectorized_df.select('features') \
    .show(1, truncate=False)

### Кластеризация

In [None]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(
    featuresCol='features',
    predictionCol='prediction',
    k=31,
    initMode='k-means||',
    initSteps=2,
    tol=0.0001,
    maxIter=20
)

kmeans_model = kmeans.fit(vectorized_df)
summary = kmeans_model.summary

result = kmeans_model.transform(vectorized_df).cache()

In [None]:
# размеры кластеров
_ = [print('в кластер', x[0], 'попало', x[1], 'новостей') for x in zip(
    map(lambda x: x.prediction, summary.cluster.collect()), summary.clusterSizes
)]

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator(predictionCol="prediction")
silhouette = evaluator.evaluate(result)

print('silhouette: %.4f' % silhouette)

In [None]:
result.groupBy('prediction', 'category') \
    .count() \
    .groupBy('prediction') \
    .agg(
        F.sort_array(
            F.collect_list(
                F.struct(
                    F.col('count'),
                    F.col('category')
                )
            ), asc=False
        ).alias('res')
    ).orderBy(F.size('res')) \
    .show(5, False)

In [None]:
spark.stop()