In [156]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import re

In [157]:
from typing import Tuple

In [158]:
spark = SparkSession.builder.getOrCreate()

##### Let's start with some preparations. Code below is not perfect but it does its job :)
In case that is may be hard to read, i left here some actions that i did to prepare data:
- remove `/t`,
- remove url link, 
- replace accented symbols with non-accented one,
- remove all special symbols,
- remove stopwords (both russian and english),
- remove extra spaces,
- remove single letters,
- remove empty rows

In [160]:
!pip install nltk



In [161]:
df = spark.read.option("encoding", "utf-8").text("wiki.txt")
df.show()

+--------------------+
|               value|
+--------------------+
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
|http://ru.wikiped...|
+--------------------+
only showing top 20 rows



In [162]:
df = df.withColumn('value', regexp_replace('value', '\t', ' '))
df = df.withColumn('value', regexp_replace('value', r'http\S+', ''))
df = df.withColumn('value', regexp_replace('value', 'а́', 'а'))
df = df.withColumn('value', regexp_replace('value', 'я́', 'я'))
df = df.withColumn('value', regexp_replace('value', 'у́', 'у'))
df = df.withColumn('value', regexp_replace('value', 'ю́', 'ю'))
df = df.withColumn('value', regexp_replace('value', 'о́', 'о'))
df = df.withColumn('value', regexp_replace('value', 'е́', 'е'))
df = df.withColumn('value', regexp_replace('value', 'ё', 'е'))
df = df.withColumn('value', regexp_replace('value', 'э́', 'э'))
df = df.withColumn('value', regexp_replace('value', 'и́', 'и'))
df = df.withColumn('value', regexp_replace('value', 'ы́', 'ы'))
df = df.withColumn('value', regexp_replace('value', 'А́', 'А'))
df = df.withColumn('value', regexp_replace('value', 'Я́', 'Я'))
df = df.withColumn('value', regexp_replace('value', 'У́', 'У'))
df = df.withColumn('value', regexp_replace('value', 'Ю́', 'Ю'))
df = df.withColumn('value', regexp_replace('value', 'О́', 'О'))
df = df.withColumn('value', regexp_replace('value', 'Е́', 'Е'))
df = df.withColumn('value', regexp_replace('value', 'Ё', 'Е'))
df = df.withColumn('value', regexp_replace('value', 'Э́', 'Э'))
df = df.withColumn('value', regexp_replace('value', 'И́', 'И'))
df = df.withColumn('value', regexp_replace('value', r'[^A-Za-zА-яа-я]', ' '))

In [163]:
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
 
nltk.download('stopwords')
rus_stopwords = set(stopwords.words('russian'))
en_stopwords = set(stopwords.words('english'))
STOPWORDS = rus_stopwords.union(en_stopwords)

[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [164]:
def _remove_stopwords_and_spaces(text: str) -> str:
    clear_sentence = ' '.join([word for word in text.split() if word.lower() not in STOPWORDS])
    clear_sentence = re.sub('\\s+', ' ', clear_sentence)
    return clear_sentence

In [165]:
remove_stopwords_and_spaces = udf(lambda s: _remove_stopwords_and_spaces(s) )

In [166]:
df = df.withColumn('value', remove_stopwords_and_spaces("value"))

In [167]:
def _remove_single_letters(text: str) -> str:
    return re.sub('(\\b[A-Za-zА-яа-я] \\b|\\b [A-Za-zА-яа-я]\\b)', '', text)
    
convert_remove_single_letters_UDF = udf(lambda s: _remove_single_letters(s) )

In [168]:
df = df.withColumn('value', convert_remove_single_letters_UDF("value"))

In [169]:
df.count()

12601

In [170]:
df = df.filter(df.value != '')

In [171]:
df.count()

12600

In [172]:
import builtins

##### We are done with some preparations. I believe it could be done better, but...) Here comes task 4.1!

In [174]:
rdd = df.rdd

In [175]:
rdd_longest_per_row = rdd.map(lambda row: builtins.max(row[0].split(), key=len)).max(key=len)


In [176]:
print(f"The longiest word is '{rdd_longest_per_row}'")

The longiest word is 'dreihundertvierundsechzigtausendachthundertneunzehn'


##### Here comes task 4.2!

In [178]:
def count_words_sum_and_amount(text: str) -> Tuple[int, int]:
    only_symbols = len(re.sub('[\W_]+', '', text))
    amount = len(re.findall(r'\w+', text))
    return only_symbols, amount

def add_tuples(t1: Tuple[int, int], t2: Tuple[int, int]) -> Tuple[int, int]:
    return (t1[0] + t2[0], t1[1] + t2[1])

In [179]:
words_sum_and_amount = rdd.map(lambda row: count_words_sum_and_amount(row[0])).reduce(add_tuples)

In [180]:
print(f"Mean length of word is {(words_sum_and_amount[0]/words_sum_and_amount[1]):.2f}")

Mean length of word is 7.70


##### Here comes task 4.3!

In [182]:
def remove_except_en(text: str) -> str:
    text = re.sub(r'[^a-zA-Z ]+', '', text)
    text = re.sub('\\s+', ' ', text)
    return text

In [183]:
from operator import add
from collections import Counter

In [184]:
en_rdd = rdd.map(lambda row: remove_except_en(row[0])).map(lambda row: Counter(row.split())).reduce(add)

In [185]:
print(f"Most common latin letters word is '{en_rdd.most_common(1)[0][0]}', it was met {en_rdd.most_common(1)[0][1]} times.")

Most common latin letters word is 'formula', it was met 11605 times.


##### Here comes task 4.4!

In [187]:
words_rdd = rdd.flatMap(lambda row: row[0].split(" "))

In [188]:
capital_rdd = words_rdd.filter(lambda word: word[0].isupper() and word[1:].islower()).map(lambda word: word.lower()).map(lambda word: (word,1))

In [193]:
lower_rdd = words_rdd.filter(lambda word: word.islower()).map(lambda word: (word,1))

In [190]:
capital_rdd = capital_rdd.reduceByKey(lambda a,b:a+b) 

In [191]:
capital_rdd = capital_rdd.filter(lambda word: word[1] > 10)

In [194]:
lower_rdd = lower_rdd.reduceByKey(lambda a,b:a+b)

In [195]:
joined = capital_rdd.join(lower_rdd)

In [196]:
joined = joined.filter(lambda word: (word[1][0] / (word[1][0] + word[1][1])) > 0.5)

In [197]:
coll = joined.sortBy(lambda word: word[1][0], ascending=False).collect()

In [198]:
print(f"Let's see first the most popular capital case in original doc words:\n{coll[:10]}")

Let's see first the most popular capital case in original doc words:
[('однако', (5349, 4428)), ('см', (3711, 2451)), ('категория', (3505, 82)), ('российской', (2820, 471)), ('кроме', (2729, 1460)), ('согласно', (2315, 1770)), ('германии', (2214, 2)), ('федерации', (1965, 231)), ('франции', (1685, 1)), ('республики', (1631, 1149))]


##### Here comes task 4.5!
Lets come back to unprepared data and do some simplier preparations: just remove extra spaces, stopwords and rm accents

In [199]:
df = spark.read.option("encoding", "utf-8").text("wiki.txt")
df = df.withColumn('value', regexp_replace('value', '\t', ' '))
df = df.withColumn('value', regexp_replace('value', r'http\S+', ''))
df = df.withColumn('value', remove_stopwords_and_spaces("value"))

In [200]:
rdd = df.rdd

In [202]:
pattern = r'(?:\s[а-я][а-я]\.)'

In [203]:
def _find_cuts_by_regex(pattern: str, text: str) -> Counter:
    cuts = re.findall(pattern, text)
    return Counter(cuts)

In [204]:
rdd_cuts = rdd.filter(lambda word: re.search(pattern, word[0]) is not None)

In [205]:
rdd_cuts = rdd.map(lambda word: _find_cuts_by_regex(pattern, word[0])).reduce(add)

Let's assume that we count only words met 50 or more times

In [207]:
rdd_cuts = Counter({k: c for k, c in rdd_cuts.items() if c >= 50})

In [145]:
rdd_cuts

Counter({' др.': 2406,
         ' гг.': 2345,
         ' км.': 1253,
         ' им.': 916,
         ' вв.': 661,
         ' см.': 488,
         ' ст.': 417,
         ' мм.': 345,
         ' ул.': 326,
         ' св.': 316,
         ' кв.': 249,
         ' пр.': 225,
         ' ок.': 208,
         ' га.': 198,
         ' их.': 158,
         ' кг.': 150,
         ' мг.': 86,
         ' ед.': 83,
         ' ср.': 80,
         ' мл.': 56,
         ' юг.': 52,
         ' шт.': 51})

##### Here comes task 4.6!
similar to the task 4.5

In [148]:
pattern = r'(?:[а-я]\.[а-я]\.)'

In [150]:
df = spark.read.option("encoding", "utf-8").text("wiki.txt")
df = df.withColumn('value', regexp_replace('value', '\t', ' '))
df = df.withColumn('value', regexp_replace('value', r'http\S+', ''))
df = df.withColumn('value', convertUDF("value"))
rdd = df.rdd

In [151]:
rdd_cuts = rdd.filter(lambda word: re.search(pattern, word[0]) is not None)

In [152]:
rdd_cuts = rdd.map(lambda word: _find_cuts_by_regex(pattern, word[0])).reduce(add)

Let's assume that we count only words met 4 or more times

In [155]:
rdd_cuts = Counter({k: c for k, c in rdd_cuts.items() if c >= 4})
rdd_cuts

Counter({'т.е.': 88,
         'н.э.': 82,
         'т.д.': 80,
         'г.р.': 45,
         'ю.ш.': 41,
         'л.с.': 36,
         'т.п.': 35,
         'в.м.': 32,
         'с.ш.': 30,
         'т.н.': 30,
         'т.к.': 27,
         'г.г.': 20,
         'д.ч.': 17,
         'т.ч.': 17,
         'в.д.': 13,
         'н.ч.': 13,
         'э.д.': 11,
         'р.п.': 10,
         'в.в.': 9,
         'и.т.': 8,
         'с.т.': 7,
         'д.н.': 6,
         'д.и.': 5,
         'т.о.': 5,
         'а.е.': 5,
         'м.р.': 5,
         'и.о.': 5,
         'д.м.': 5,
         'с.п.': 5,
         'р.з.': 4,
         'ж.р.': 4,
         'ж.д.': 4,
         'б.м.': 4,
         'п.п.': 4})

As you may see, we get some outliers such as  `их.` in task 4.5 or `а.е` in task 4.6. 

There is two options: 
- we can change threshold to larger value and lose some valuable results such as `ж.р` or `кг.`
- we may exclude outliers manually
- or use look-up table for abbreviations instead of statistic :)