In [1]:
import pandas as pd
import numpy as np
import os
import sys  
import glob
import csv
import matplotlib

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StopWordsRemover

In [3]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [4]:
header = ['text']

with open('english2.txt', 'r') as f_data, open('output_english2.csv', 'w') as f_output:
    csv_data = csv.reader(f_data)
    csv_output = csv.writer(f_output)
    csv_output.writerow(header)

    for row in csv_data:
        row = [cell.replace(r + ',', '') for cell, r in zip(row, header)]

        csv_output.writerow(row)

In [6]:
spark = SparkSession.builder.appName('characters_count').getOrCreate()

In [7]:
df = spark.read.csv("output_english2.csv", sep=",", header=True)

In [8]:
#remove empty rows
newDF = df.filter("text != ''")

In [9]:
#remove punctuation
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
regexTokenized = regexTokenizer.transform(newDF)
regexTokenized.select("words").show(truncate=False)

+-------------------------------------------------------------------------------------+
|words                                                                                |
+-------------------------------------------------------------------------------------+
|[part, one]                                                                          |
|[chapter, 1]                                                                         |
|[happy, families, are, all, alike, every, unhappy, family, is, unhappy, in, its]     |
|[own, way]                                                                           |
|[everything, was, in, confusion, in, the, oblonskys, house, the, wife, had]          |
|[discovered, that, the, husband, was, carrying, on, an, intrigue, with, a, french]   |
|[girl]                                                                               |
|[to, her, husband, that, she, could, not, go, on, living, in, the, same, house, with]|
|[him, this, position, of, affai

In [10]:
#transform lists of words into rows of words 
from pyspark.sql.functions import col, explode
counted=regexTokenized.withColumn('words', explode(col('words')).alias('words'))
counted.select('words').show(10, truncate=False)

+--------+
|words   |
+--------+
|part    |
|one     |
|chapter |
|1       |
|happy   |
|families|
|are     |
|all     |
|alike   |
|every   |
+--------+
only showing top 10 rows



In [11]:
#creating windows of 1000 words and count names
from pyspark.sql import Window
from pyspark.sql import functions as F
name_window = Window.orderBy('id').rangeBetween(-1000,0)
namef = counted.select('words').withColumn('id', F.monotonically_increasing_id())
# anna
namef = namef.withColumn('anna', F.when((col('words') == 'anna') | (col('words') == 'karenina'), 1).otherwise(0))
namef = namef.withColumn('anna_f', F.sum(namef.anna).over(name_window))
# oblonsky
namef = namef.withColumn('oblonsky', F.when((col('words') == 'oblonsky') | (col('words') == 'stiva') | (col('words') == 'stepan arkadyevitch'), 1).otherwise(0))
namef = namef.withColumn('oblonsky_f', F.sum(namef.oblonsky).over(name_window))
# vronsky
namef = namef.withColumn('vronsky', F.when((col('words') == 'vronsky') | (col('words') == 'alexey kirillovitch'), 1).otherwise(0))
namef = namef.withColumn('vronsky_f', F.sum(namef.vronsky).over(name_window))
# levin
namef = namef.withColumn('levin', F.when((col('words') == 'levin') | (col('words') == 'konstantin') | (col('words') == 'kostya'), 1).otherwise(0))
namef = namef.withColumn('levin_f', F.sum(namef.levin).over(name_window))
# dolly
namef = namef.withColumn('dolly', F.when((col('words') == 'dolly') | (col('words') == 'darya'), 1).otherwise(0))
namef = namef.withColumn('dolly_f', F.sum(namef.dolly).over(name_window))
# kitty
namef = namef.withColumn('kitty', F.when((col('words') == 'kitty') | (col('words') == 'katerina') | (col('words') == 'ekaterina'), 1).otherwise(0))
namef = namef.withColumn('kitty_f', F.sum(namef.kitty).over(name_window))
# karenin
namef = namef.withColumn('karenin', F.when((col('words') == 'karenin') | (col('words') == 'alexey alexandrovitch'), 1).otherwise(0))
namef = namef.withColumn('karenin_f', F.sum(namef.karenin).over(name_window))
# namef.show(2000)

In [12]:
# Compress namef dataframe. For each 1000 words we will take the maximum observed frequency in *_f columns

namef_compr = namef
namef_compr = namef_compr.withColumn('anna_mf', F.max(namef.anna_f).over(name_window))
namef_compr = namef_compr.withColumn('oblonsky_mf', F.max(namef.oblonsky_f).over(name_window))
namef_compr = namef_compr.withColumn('vronsky_mf', F.max(namef.vronsky_f).over(name_window))
namef_compr = namef_compr.withColumn('levin_mf', F.max(namef.levin_f).over(name_window))
namef_compr = namef_compr.withColumn('dolly_mf', F.max(namef.dolly_f).over(name_window))
namef_compr = namef_compr.withColumn('kitty_mf', F.max(namef.kitty_f).over(name_window))
namef_compr = namef_compr.withColumn('karenin_mf', F.max(namef.karenin_f).over(name_window))

namef_compr = namef_compr.filter(col('id') % 1000 == 0)
namef_compr = namef_compr.select('id', 'anna_mf', 'oblonsky_mf', 'vronsky_mf', 'levin_mf', 'dolly_mf', 'kitty_mf', 'karenin_mf')
namef_compr.show(50)

+-----+-------+-----------+----------+--------+--------+--------+----------+
|   id|anna_mf|oblonsky_mf|vronsky_mf|levin_mf|dolly_mf|kitty_mf|karenin_mf|
+-----+-------+-----------+----------+--------+--------+--------+----------+
|    0|      0|          0|         0|       0|       0|       0|         0|
| 1000|      0|          2|         0|       0|       0|       0|         0|
| 2000|      0|          2|         0|       0|       3|       0|         0|
| 3000|      0|          1|         0|       0|       8|       0|         0|
| 4000|      2|          6|         0|       0|      10|       0|         2|
| 5000|      2|         10|         0|      16|       3|       0|         2|
| 6000|      0|         13|         0|      21|       0|       5|         0|
| 7000|      0|          5|         0|      18|       0|       5|         0|
| 8000|      0|          1|         0|      18|       0|       4|         0|
| 9000|      0|          4|         0|      12|       1|       7|         0|

In [13]:
#write a csv file. NOTE: every time it overite existing csv with a new file name
namef_compr.coalesce(1).write.csv('characters', mode='overwrite', header = 'true')

In [14]:
#read this csv. NOTE: every time it will be a different file name
df = pd.read_csv("characters/part-00000-1d8d2de0-304e-49bc-9233-b7f4086eb8e4-c000.csv")
df.head(10)

Unnamed: 0,id,anna_mf,oblonsky_mf,vronsky_mf,levin_mf,dolly_mf,kitty_mf,karenin_mf
0,0,0,0,0,0,0,0,0
1,1000,0,2,0,0,0,0,0
2,2000,0,2,0,0,3,0,0
3,3000,0,1,0,0,8,0,0
4,4000,2,6,0,0,10,0,2
5,5000,2,10,0,16,3,0,2
6,6000,0,13,0,21,0,5,0
7,7000,0,5,0,18,0,5,0
8,8000,0,1,0,18,0,4,0
9,9000,0,4,0,12,1,7,0


In [15]:
#rename columns
df = df.rename(columns={'id':'1K Words', 'anna_mf':'Anna', 'oblonsky_mf':'Oblonsky', 'vronsky_mf':'Vronsky', 
                        'levin_mf':'Levin', 'dolly_mf':'Dolly', 'kitty_mf':'Kitty', 'karenin_mf':'Karenin'})
df.to_csv('words_by_character.csv')
df.head()

Unnamed: 0,1K Words,Anna,Oblonsky,Vronsky,Levin,Dolly,Kitty,Karenin
0,0,0,0,0,0,0,0,0
1,1000,0,2,0,0,0,0,0
2,2000,0,2,0,0,3,0,0
3,3000,0,1,0,0,8,0,0
4,4000,2,6,0,0,10,0,2


In [16]:
#create a dataframe of total words count by character
df1 = df.sum()
df_sum = df1.to_frame()
df_sum.columns = ['count']
df_sum = df_sum.drop(['1K Words'])
df_sum.to_csv('total_words_by_character.csv')
df_sum

Unnamed: 0,count
Anna,829
Oblonsky,214
Vronsky,867
Levin,1434
Dolly,436
Kitty,672
Karenin,61


In [17]:
# instantiate remover with stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
new_df2=remover.transform(regexTokenized)

In [18]:
#remove additional stopwords and names
stopwordList = ['ing', 'm', 've', 're', 'll', 'oh', 'upon','chapter', 'ah', 'whether']

name_remover = StopWordsRemover(inputCol="words", outputCol="filtered_without_names", stopWords=stopwordList)

new_df_without_names2=name_remover.transform(new_df2)
# instantiate remover with stopwords
remover_after_names = StopWordsRemover(inputCol="filtered_without_names", outputCol="filtered_wo_names")
clean_df2=remover_after_names.transform(new_df_without_names2)

In [19]:
# Count unique words
counted_words_2 = clean_df2.withColumn('filtered_wo_names', explode(col('filtered_wo_names')).alias('filtered_wo_names')).\
groupBy('filtered_wo_names').count()\
.orderBy('count',ascending=False)

counted_words_2.show(10, truncate=False)

+-----------------+-----+
|filtered_wo_names|count|
+-----------------+-----+
|levin            |927  |
|said             |841  |
|one              |703  |
|vronsky          |535  |
|well             |502  |
|anna             |494  |
|come             |449  |
|know             |415  |
|alexandrovitch   |415  |
|yes              |410  |
+-----------------+-----+
only showing top 10 rows



In [20]:
#write the daraframe into csv and read that csv. Note: every time different file name
counted_words_2.coalesce(1).write.csv('words2', mode='overwrite', header = 'true')

In [None]:
df = pd.read_csv('words2/part-00000-d2b26535-8acc-470d-93ea-b36a14539643-c000.csv')
# create a reduced dataframe and write it into csv
words_reduced = df.iloc[:250]
words_reduced.to_csv('words_reduced.csv')

In [None]:
from sqlalchemy import create_engine
import sqlalchemy
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
import pymysql
pymysql.install_as_MySQLdb()

In [None]:
connection_string = "root:password@localhost/anna_karenina"
engine = create_engine(f'mysql://{connection_string}')

In [None]:
words_reduced.to_sql(name='words_reduced', con=engine, if_exists='append', index=False)

In [None]:
df11=df_sum.reset_index()
df11.head()

In [None]:
df3 = df11.rename(columns={'index':'characters'})

In [None]:
df3.to_sql(name='total_words_by_character', con=engine, if_exists='append', index=False)

In [None]:
df.to_sql(name='words_by_character', con=engine, if_exists='append', index=False)

In [None]:
engine.table_names()

In [None]:
pd.read_sql_query('select * from total_words_by_character', con=engine).head()

In [None]:
pd.read_sql_query('select * from words_by_character', con=engine).head()

In [None]:
pd.read_sql_query('select * from words_reduced', con=engine).head()