In [1]:
import os
import json
import boto3
import sklearn
import socket
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
#from pyspark.sql import types as T

print('user:', os.environ['JUPYTERHUB_SERVICE_PREFIX'])

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return "{}proxy/{}/jobs/".format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

# small fix to enable UI views
SparkContext.uiWebUrl = property(uiWebUrl)

# spark configurtion in local regime 
conf = SparkConf().set('spark.master', 'local[*]').set('spark.driver.memory', '8g')

#some needed objects
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark

user: /user/st054328/


# Homework 1

Transform text file "The Project Gutenberg eBook of Frankenstein, by Mary Wollstonecraft (Godwin) Shelle" into TF-IDF. Take sentence as "document".

### Part 1: 
- read text file as dataframe 
- filter out non-letters and empty strings 
- transform into dataframe doc_id -> tf_idf vector 


### Part 2:
- read text file as RDD
- filter out non-letters and empty strings 
- transform into rdd in format doc_id -> tf_idf vector


### Org part: 
I'm waiting your HW's as self-sufficient jupyter notebooks in github repository. 

Please, fill this table in specified comment with:

your name / github link / telegram (optionally, if u want to discuss some) / 

Fill the comment please and i will add your data in a few days

https://docs.google.com/spreadsheets/d/1p3yLsXqX2dp_TrPwNcikcS5FP_PTM0_gnSOzGn5Gn1E/edit#gid=0

Feel free to text me if u have some questions 

### Deadline: 01.05.2021 included

Dear students, dead in "deadline" means *dead*. This deadline is not for you - it's for me. Deadlines informs me from which point i should start to score your HWs.  You can commit anything after deadline but it's not guaranteed that I'll take it into account. It's possible to move deadline only for the whole group not "just for me plz cause I was ill / detentioned / skipped this message". 

### NB(!) 

It's not allowed to use TF-IDF code from Spark internal libraries. 
It's not allowed to cast DF/RDD into pandas and use scikit-learn. Please, keep it spark. 


## Part 1

Reading text file as dataframe

In [2]:
result_prefix = "malyutin_demo_hw1"

filepath = "file:///home/jovyan/shared/lectures_folder/84-0.txt"
from pyspark.sql.functions import monotonically_increasing_id

dataframe = sc.textFile(f"{filepath}")\
    .map(lambda x: (x,))\
    .toDF()\
    .select(F.col("_1").alias("text"))\
    .withColumn("id", monotonically_increasing_id())

dataframe.show()

+--------------------+---+
|                text| id|
+--------------------+---+
|The Project Guten...|  0|
|                    |  1|
|This eBook is for...|  2|
|most other parts ...|  3|
|whatsoever. You m...|  4|
|of the Project Gu...|  5|
|www.gutenberg.org...|  6|
|will have to chec...|  7|
|   using this eBook.|  8|
|                    |  9|
| Title: Frankenstein| 10|
|       or, The Mo...| 11|
|                    | 12|
|Author: Mary Woll...| 13|
|                    | 14|
|Release Date: 31,...| 15|
|[Most recently up...| 16|
|                    | 17|
|   Language: English| 18|
|                    | 19|
+--------------------+---+
only showing top 20 rows



In [3]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, MapType
from pyspark.mllib.linalg import Vectors, DenseVector

import string
import re
import math
import numpy as np

def process_string(data):
    """
    basic preprocessing function:
    - removes punctuation
    - lower
    - split by space
    """
    punct_removed = re.sub(r'[^\w\s]','',data)
    words = punct_removed.lower().split(" ")
    
    
    return list(filter(lambda x: len(x) > 0, words))

# spark udf -- user defined function (~ mapper)

process_string_udf = udf(lambda z: process_string(z), ArrayType(StringType()))

In [4]:
"""
process words

filter out empty and small sentences
"""

by_words = dataframe\
    .select(process_string_udf(F.col("text")).alias("text"), "id")\
    .where(F.size(F.col("text")) > 1)#\
    #.withColumn("id", monotonically_increasing_id())


by_words.show()

+--------------------+---+
|                text| id|
+--------------------+---+
|[the, project, gu...|  0|
|[this, ebook, is,...|  2|
|[most, other, par...|  3|
|[whatsoever, you,...|  4|
|[of, the, project...|  5|
|[wwwgutenbergorg,...|  6|
|[will, have, to, ...|  7|
|[using, this, ebook]|  8|
|[title, frankenst...| 10|
|[or, the, modern,...| 11|
|[author, mary, wo...| 13|
|[release, date, 3...| 15|
|[most, recently, ...| 16|
| [language, english]| 18|
|[character, set, ...| 20|
|[produced, by, ju...| 22|
|[further, correct...| 23|
|[start, of, the, ...| 25|
|[or, the, modern,...| 32|
|[by, mary, wollst...| 34|
+--------------------+---+
only showing top 20 rows



Let's explode the words in each text.

In [5]:
unfolded = by_words\
.select(by_words.text.alias('document'), by_words.id.alias('doc_id'), F.explode(by_words.text).alias('token'))\
.withColumn("id", monotonically_increasing_id())

unfolded.show()

+--------------------+------+--------------+---+
|            document|doc_id|         token| id|
+--------------------+------+--------------+---+
|[the, project, gu...|     0|           the|  0|
|[the, project, gu...|     0|       project|  1|
|[the, project, gu...|     0|     gutenberg|  2|
|[the, project, gu...|     0|         ebook|  3|
|[the, project, gu...|     0|            of|  4|
|[the, project, gu...|     0|  frankenstein|  5|
|[the, project, gu...|     0|            by|  6|
|[the, project, gu...|     0|          mary|  7|
|[the, project, gu...|     0|wollstonecraft|  8|
|[the, project, gu...|     0|        godwin|  9|
|[the, project, gu...|     0|       shelley| 10|
|[this, ebook, is,...|     2|          this| 11|
|[this, ebook, is,...|     2|         ebook| 12|
|[this, ebook, is,...|     2|            is| 13|
|[this, ebook, is,...|     2|           for| 14|
|[this, ebook, is,...|     2|           the| 15|
|[this, ebook, is,...|     2|           use| 16|
|[this, ebook, is,..

Let's count the number of each certain word `t` in the text and the total number of words `n` in each text. Then let's calculate `TF` as $t/n$.

In [6]:
tf0 = unfolded.groupBy("doc_id", "token").agg(F.count("document").alias("t"))

tf = tf0\
.join(tf0.groupBy("doc_id").agg(F.count("doc_id").alias("n")), on="doc_id", how = "left")\
.orderBy("doc_id")\
.withColumn("TF", F.col("t")/F.col("n"))

tf.show()

+------+--------------+---+---+-------------------+
|doc_id|         token|  t|  n|                 TF|
+------+--------------+---+---+-------------------+
|     0|wollstonecraft|  1| 11|0.09090909090909091|
|     0|  frankenstein|  1| 11|0.09090909090909091|
|     0|       project|  1| 11|0.09090909090909091|
|     0|          mary|  1| 11|0.09090909090909091|
|     0|           the|  1| 11|0.09090909090909091|
|     0|            by|  1| 11|0.09090909090909091|
|     0|       shelley|  1| 11|0.09090909090909091|
|     0|        godwin|  1| 11|0.09090909090909091|
|     0|     gutenberg|  1| 11|0.09090909090909091|
|     0|            of|  1| 11|0.09090909090909091|
|     0|         ebook|  1| 11|0.09090909090909091|
|     2|          this|  1| 13|0.07692307692307693|
|     2|        states|  1| 13|0.07692307692307693|
|     2|            of|  1| 13|0.07692307692307693|
|     2|           and|  1| 13|0.07692307692307693|
|     2|            is|  1| 13|0.07692307692307693|
|     2|    

Now let's calculate the numbers of documents `df` containing each word:

In [7]:
df = unfolded.groupBy("token").agg(F.countDistinct("doc_id").alias("df")).orderBy("df", ascending=False)
df.show()

+-----+----+
|token|  df|
+-----+----+
|  the|3282|
|  and|2702|
|   of|2435|
|    i|2354|
|   to|1896|
|   my|1534|
|    a|1310|
|   in|1126|
| that| 971|
|  was| 948|
|   me| 792|
| with| 694|
|  but| 681|
|  had| 649|
|which| 554|
|  you| 549|
|   he| 545|
|   it| 533|
|  not| 519|
|  for| 505|
+-----+----+
only showing top 20 rows



Let's devide the total number of documents `D` by the number of documents containing each word `df`:

In [8]:
d_df = df.withColumn('D/df', unfolded.select(F.countDistinct("doc_id").alias('len')).collect()[0]['len']/df.df)
d_df.show()

+-----+----+------------------+
|token|  df|              D/df|
+-----+----+------------------+
|  the|3282|2.0262035344302256|
|  and|2702| 2.461139896373057|
|   of|2435| 2.731006160164271|
|    i|2354|2.8249787595581988|
|   to|1896| 3.507383966244726|
|   my|1534| 4.335071707953064|
|    a|1310| 5.076335877862595|
|   in|1126| 5.905861456483126|
| that| 971| 6.848609680741504|
|  was| 948| 7.014767932489452|
|   me| 792| 8.396464646464647|
| with| 694|   9.5821325648415|
|  but| 681| 9.765051395007342|
|  had| 649| 10.24653312788906|
|which| 554| 12.00361010830325|
|  you| 549|12.112932604735883|
|   he| 545| 12.20183486238532|
|   it| 533|12.476547842401501|
|  not| 519|  12.8131021194605|
|  for| 505|13.168316831683168|
+-----+----+------------------+
only showing top 20 rows



And let's take 10-based logarithm of `D/df` to get `IDF`:

In [9]:
idf = d_df.withColumn('IDF', F.log(10.0, F.col('D/df')))
idf.show()

+-----+----+------------------+-------------------+
|token|  df|              D/df|                IDF|
+-----+----+------------------+-------------------+
|  the|3282|2.0262035344302256|0.30668306858603017|
|  and|2702| 2.461139896373057|0.39113630061709276|
|   of|2435| 2.731006160164271|0.43632267975245137|
|    i|2354|2.8249787595581988| 0.4510151867956887|
|   to|1896| 3.507383966244726| 0.5449833123010571|
|   my|1534| 4.335071707953064| 0.6369962856901424|
|    a|1310| 5.076335877862595| 0.7055503496473403|
|   in|1126| 5.905861456483126| 0.7712832547877772|
| that| 971| 6.848609680741504| 0.8356024153950996|
|  was| 948| 7.014767932489452| 0.8460133079650383|
|   me| 792| 8.396464646464647| 0.9240964637136111|
| with| 694|   9.5821325648415| 0.9814621748482496|
|  but| 681| 9.765051395007342| 0.9896745333903194|
|  had| 649| 10.24653312788906| 1.0105769485027352|
|which| 554| 12.00361010830325| 1.0793118805746749|
|  you| 549|12.112932604735883| 1.0832493008530126|
|   he| 545|

To calculate `TF_IDF`, let's multiply `TF` and `IDF`:

In [10]:
tf_idf = tf\
.join(idf, on='token', how='left')\
.orderBy("doc_id")\
.withColumn('TF_IDF', F.col("tf") * F.col("idf"))\
.select("token", "doc_id", "n", "TF_IDF")

tf_idf.show()

+--------------+------+---+--------------------+
|         token|doc_id|  n|              TF_IDF|
+--------------+------+---+--------------------+
|         ebook|     0| 11|  0.2462616629996607|
|       shelley|     0| 11|  0.3041545809621311|
|           the|     0| 11| 0.02788027896236638|
|       project|     0| 11| 0.17075808846844873|
|        godwin|     0| 11|  0.3041545809621311|
|            of|     0| 11| 0.03966569815931376|
|          mary|     0| 11|  0.3041545809621311|
|wollstonecraft|     0| 11|  0.3041545809621311|
|     gutenberg|     0| 11| 0.21195090467898473|
|            by|     0| 11| 0.10378003708431974|
|  frankenstein|     0| 11| 0.21596941945098955|
|        anyone|     2| 13| 0.22065993352798305|
|      anywhere|     2| 13| 0.27090704997224024|
|           for|     2| 13| 0.08611771286034178|
|           the|     2| 13|0.047182010551696955|
|            of|     2| 13| 0.03356328305788087|
|        united|     2| 13| 0.19398397304916334|
|            in|    

Let's recover the order of words in text...

In [11]:
tf_idf2 = unfolded.select("id", "doc_id", "token").join(tf_idf, on=["doc_id", "token"], how="left").orderBy("id")
tf_idf2.show()

+------+--------------+---+---+--------------------+
|doc_id|         token| id|  n|              TF_IDF|
+------+--------------+---+---+--------------------+
|     0|           the|  0| 11| 0.02788027896236638|
|     0|       project|  1| 11| 0.17075808846844873|
|     0|     gutenberg|  2| 11| 0.21195090467898473|
|     0|         ebook|  3| 11|  0.2462616629996607|
|     0|            of|  4| 11| 0.03966569815931376|
|     0|  frankenstein|  5| 11| 0.21596941945098955|
|     0|            by|  6| 11| 0.10378003708431974|
|     0|          mary|  7| 11|  0.3041545809621311|
|     0|wollstonecraft|  8| 11|  0.3041545809621311|
|     0|        godwin|  9| 11|  0.3041545809621311|
|     0|       shelley| 10| 11|  0.3041545809621311|
|     2|          this| 11| 13| 0.09056918276569328|
|     2|         ebook| 12| 13| 0.20837525330740522|
|     2|            is| 13| 13| 0.10296321584279375|
|     2|           for| 14| 13| 0.08611771286034178|
|     2|           the| 15| 13|0.0471820105516

I tried to transform this dataframe into the format doc_id -> tf_idf vector, but didn't succeed.

In [12]:
#from tqdm import tqdm

#tfidfs = []
#for i in tqdm(indicies):
    #df = tf_idf2.where(F.col('doc_id')==i).select("TF_IDF")
    #tfidfs.append([(row.TF_IDF) for row in df.collect()])

So i tried a little bit different approach.

Let's save the IDFs, we've already calculated into the dictionary `dict_dict`.

In [13]:
dict_list = [{row.token : row.IDF} for row in idf.collect()]

dict_dict = {}
for element in dict_list:
    dict_dict.update(element)

Now let's define a function returning a sparse vector with the values of TF-IDFs.

In [14]:
numFeatures = len(dict_dict) #the number of features equals the total number of words in the text

In [15]:
def transform(document):
    freq = {}
    tfidfs = {}
    for term in document:
        i = hash(term) % numFeatures #creating "index" for the word
        freq[i] = freq.get(i, 0) + 1.0 #calculate the number of occurances of each word in a text
        tfidfs[i] = freq[i]/len(document) * dict_dict[term] #calculate TF and multiply it by previously calculated IDF
    return [document, Vectors.sparse(numFeatures, tfidfs.items())] 

In [16]:
#Extracting the texts from previously created dataframe

documents2 = by_words.select('text').rdd.map(list)

def unpack(document):
    texts = []
    for term in document[0]:
        #text = term
        texts.append(term)
    return texts

documents3 = documents2.map(unpack)

Let's apply the function to get TF_IDF sparse vector.

In [17]:
columns = ['text', 'TFIDF_vector']
frequencyVectors = documents3.map(transform)
final = frequencyVectors.toDF(columns).join(by_words, on='text', how='left').orderBy('id').select('id', 'TFIDF_vector')
final.show()

+---+--------------------+
| id|        TFIDF_vector|
+---+--------------------+
|  0|(7525,[106,701,11...|
|  2|(7525,[1192,1779,...|
|  2|(7525,[1192,1779,...|
|  3|(7525,[720,1192,1...|
|  4|(7525,[1192,1504,...|
|  5|(7525,[102,106,72...|
|  6|(7525,[374,470,11...|
|  7|(7525,[374,1192,2...|
|  8|(7525,[5119,6092,...|
| 10|(7525,[701,1697],...|
| 11|(7525,[677,849,11...|
| 11|(7525,[677,849,11...|
| 13|(7525,[4765,6870,...|
| 15|(7525,[719,2210,4...|
| 16|(7525,[3101,4706,...|
| 18|(7525,[3621,5770]...|
| 20|(7525,[407,1876,3...|
| 22|(7525,[64,582,638...|
| 23|(7525,[1909,2069,...|
| 25|(7525,[106,701,11...|
+---+--------------------+
only showing top 20 rows



## Part 2

Reading text file into RDD

In [18]:
#def clear_line(line):
#    punct_removed = re.sub(r'[^\w\s]','',line)
#    return punct_removed.lower().split(" ")

Let's read texts and add index to each document:

In [19]:
rddText = sc.textFile(f"{filepath}").map(lambda line: process_string(line))#.repartition(1).zipWithIndex().repartition(5)

rdd_with_indicies = rddText.zipWithIndex()
rdd_with_indicies.take(3)

[(['the',
   'project',
   'gutenberg',
   'ebook',
   'of',
   'frankenstein',
   'by',
   'mary',
   'wollstonecraft',
   'godwin',
   'shelley'],
  0),
 ([], 1),
 (['this',
   'ebook',
   'is',
   'for',
   'the',
   'use',
   'of',
   'anyone',
   'anywhere',
   'in',
   'the',
   'united',
   'states',
   'and'],
  2)]

And separatly let's make rdd without empty elements (and with no indexing).

In [20]:
rdd_cleared = rddText.filter(bool)
rdd_cleared.take(3)

[['the',
  'project',
  'gutenberg',
  'ebook',
  'of',
  'frankenstein',
  'by',
  'mary',
  'wollstonecraft',
  'godwin',
  'shelley'],
 ['this',
  'ebook',
  'is',
  'for',
  'the',
  'use',
  'of',
  'anyone',
  'anywhere',
  'in',
  'the',
  'united',
  'states',
  'and'],
 ['most',
  'other',
  'parts',
  'of',
  'the',
  'world',
  'at',
  'no',
  'cost',
  'and',
  'with',
  'almost',
  'no',
  'restrictions']]

To calculate IDFs, let's save the total number of texts into `text_num`:

In [21]:
texts_num = rdd_cleared.count()
texts_num

6737

Now we need to count the number of texts, containing each word. Let's initialize counting by setting 1 to each word.

In [22]:
def counter(document):
    lst = []
    for word in document:
        lst.append((word, 1))
    return lst

In [23]:
count_initializer = rdd_cleared.map(lambda word: counter(word))
count_initializer.take(20)

[[('the', 1),
  ('project', 1),
  ('gutenberg', 1),
  ('ebook', 1),
  ('of', 1),
  ('frankenstein', 1),
  ('by', 1),
  ('mary', 1),
  ('wollstonecraft', 1),
  ('godwin', 1),
  ('shelley', 1)],
 [('this', 1),
  ('ebook', 1),
  ('is', 1),
  ('for', 1),
  ('the', 1),
  ('use', 1),
  ('of', 1),
  ('anyone', 1),
  ('anywhere', 1),
  ('in', 1),
  ('the', 1),
  ('united', 1),
  ('states', 1),
  ('and', 1)],
 [('most', 1),
  ('other', 1),
  ('parts', 1),
  ('of', 1),
  ('the', 1),
  ('world', 1),
  ('at', 1),
  ('no', 1),
  ('cost', 1),
  ('and', 1),
  ('with', 1),
  ('almost', 1),
  ('no', 1),
  ('restrictions', 1)],
 [('whatsoever', 1),
  ('you', 1),
  ('may', 1),
  ('copy', 1),
  ('it', 1),
  ('give', 1),
  ('it', 1),
  ('away', 1),
  ('or', 1),
  ('reuse', 1),
  ('it', 1),
  ('under', 1),
  ('the', 1),
  ('terms', 1)],
 [('of', 1),
  ('the', 1),
  ('project', 1),
  ('gutenberg', 1),
  ('license', 1),
  ('included', 1),
  ('with', 1),
  ('this', 1),
  ('ebook', 1),
  ('or', 1),
  ('online',

Now if we remove duplicates in each text and then sum up values with the same keys, we will get exactly the number of documents, containing each word.

In [24]:
def delete_duplicates(document):
    return list(dict.fromkeys(document))

In [25]:
counter_dropped_duplicates = count_initializer.map(lambda word: delete_duplicates(word))
counter_dropped_duplicates.take(20)

[[('the', 1),
  ('project', 1),
  ('gutenberg', 1),
  ('ebook', 1),
  ('of', 1),
  ('frankenstein', 1),
  ('by', 1),
  ('mary', 1),
  ('wollstonecraft', 1),
  ('godwin', 1),
  ('shelley', 1)],
 [('this', 1),
  ('ebook', 1),
  ('is', 1),
  ('for', 1),
  ('the', 1),
  ('use', 1),
  ('of', 1),
  ('anyone', 1),
  ('anywhere', 1),
  ('in', 1),
  ('united', 1),
  ('states', 1),
  ('and', 1)],
 [('most', 1),
  ('other', 1),
  ('parts', 1),
  ('of', 1),
  ('the', 1),
  ('world', 1),
  ('at', 1),
  ('no', 1),
  ('cost', 1),
  ('and', 1),
  ('with', 1),
  ('almost', 1),
  ('restrictions', 1)],
 [('whatsoever', 1),
  ('you', 1),
  ('may', 1),
  ('copy', 1),
  ('it', 1),
  ('give', 1),
  ('away', 1),
  ('or', 1),
  ('reuse', 1),
  ('under', 1),
  ('the', 1),
  ('terms', 1)],
 [('of', 1),
  ('the', 1),
  ('project', 1),
  ('gutenberg', 1),
  ('license', 1),
  ('included', 1),
  ('with', 1),
  ('this', 1),
  ('ebook', 1),
  ('or', 1),
  ('online', 1),
  ('at', 1)],
 [('wwwgutenbergorg', 1),
  ('if',

In [26]:
texts_with_word = counter_dropped_duplicates.flatMap(lambda value: value).reduceByKey(lambda a, b: a + b)
texts_with_word.take(20)

[('project', 88),
 ('gutenberg', 31),
 ('ebook', 13),
 ('of', 2435),
 ('mary', 3),
 ('shelley', 3),
 ('this', 442),
 ('is', 305),
 ('use', 21),
 ('anyone', 9),
 ('anywhere', 2),
 ('in', 1126),
 ('united', 20),
 ('other', 91),
 ('world', 47),
 ('at', 318),
 ('no', 171),
 ('restrictions', 2),
 ('whatsoever', 3),
 ('may', 108)]

So we got the number of documents `df`, containing each word. Comparing the results with the previously obtained in Part 1, we make sure that the calculations are correct. 

Now to calculate IDF, we need to devide the total number of texts `D` by the number of documents with each word `df` and take 10-based logarithm.

In [27]:
import numpy as np

In [28]:
idfs_rdd = texts_with_word.map(lambda x: {x[0] : np.log10(texts_num/x[1])})
pre_dict = idfs_rdd.collect()

dict_rdd = {}
for element in pre_dict:
    dict_rdd.update(element)
    
dict_rdd

{'project': 1.8839838752025098,
 'gutenberg': 2.337104853518406,
 'ebook': 2.7145231950458415,
 'of': 0.4419675818020252,
 'mary': 3.351345292633016,
 'shelley': 3.351345292633016,
 'this': 1.1830442780035864,
 'is': 1.3441667080058926,
 'use': 2.506247252618759,
 'anyone': 2.8742240379133532,
 'anywhere': 3.527436551688697,
 'in': 0.776928156837351,
 'united': 2.527436551688697,
 'other': 1.8694251550315848,
 'world': 2.156368689416961,
 'at': 1.3260394273682456,
 'no': 1.5954704369605244,
 'restrictions': 3.527436551688697,
 'whatsoever': 3.351345292633016,
 'may': 1.7950427918657286,
 'give': 2.448255305641072,
 'away': 2.1124632037178794,
 'reuse': 3.527436551688697,
 'online': 3.226406556024716,
 'are': 1.558953603134762,
 'have': 1.2843985030024028,
 'check': 3.1294965430166597,
 'country': 2.072591691680187,
 'where': 1.9146526949689617,
 'before': 1.6552802789404044,
 'using': 2.9833685073384215,
 'title': 3.527436551688697,
 'modern': 2.7492853013050533,
 'prometheus': 3.52743

Then let's apply the same function as was used in the Part 1 (only with a different dictionary of IDF) to get the sparse TF-IDF vector.

In [29]:
def transform2(document):
    freq = {}
    tfidfs = {}
    for term in document:
        i = hash(term) % numFeatures
        freq[i] = freq.get(i, 0) + 1.0
        tfidfs[i] = freq[i]/len(document) * dict_rdd[term]
    return [document, Vectors.sparse(numFeatures, tfidfs.items())]

In [30]:
frequencyVectors2 = rdd_cleared.map(transform2)
final2 = frequencyVectors2.toDF(columns)
final2.show()

+--------------------+--------------------+
|                text|        TFIDF_vector|
+--------------------+--------------------+
|[the, project, gu...|(7525,[106,701,11...|
|[this, ebook, is,...|(7525,[1192,1779,...|
|[most, other, par...|(7525,[720,1192,1...|
|[whatsoever, you,...|(7525,[1192,1504,...|
|[of, the, project...|(7525,[102,106,72...|
|[wwwgutenbergorg,...|(7525,[374,470,11...|
|[will, have, to, ...|(7525,[374,1192,2...|
|[using, this, ebook]|(7525,[5119,6092,...|
|[title, frankenst...|(7525,[701,1697],...|
|[or, the, modern,...|(7525,[677,849,11...|
|[author, mary, wo...|(7525,[4765,6870,...|
|[release, date, 3...|(7525,[719,2210,4...|
|[most, recently, ...|(7525,[3101,4706,...|
| [language, english]|(7525,[3621,5770]...|
|[character, set, ...|(7525,[407,1876,3...|
|[produced, by, ju...|(7525,[64,582,638...|
|[further, correct...|(7525,[1909,2069,...|
|[start, of, the, ...|(7525,[106,701,11...|
|      [frankenstein]|(7525,[701],[2.36...|
|[or, the, modern,...|(7525,[677

And finally, let's bring the text indices back.

In [31]:
final2.join(rdd_with_indicies.toDF(['text', 'id']), on='text', how='left').orderBy('id').select('id', 'TFIDF_vector').show()

+---+--------------------+
| id|        TFIDF_vector|
+---+--------------------+
|  0|(7525,[106,701,11...|
|  2|(7525,[1192,1779,...|
|  2|(7525,[1192,1779,...|
|  3|(7525,[720,1192,1...|
|  4|(7525,[1192,1504,...|
|  5|(7525,[102,106,72...|
|  6|(7525,[374,470,11...|
|  7|(7525,[374,1192,2...|
|  8|(7525,[5119,6092,...|
| 10|(7525,[701,1697],...|
| 11|(7525,[677,849,11...|
| 11|(7525,[677,849,11...|
| 13|(7525,[4765,6870,...|
| 15|(7525,[719,2210,4...|
| 16|(7525,[3101,4706,...|
| 18|(7525,[3621,5770]...|
| 20|(7525,[407,1876,3...|
| 22|(7525,[64,582,638...|
| 23|(7525,[1909,2069,...|
| 25|(7525,[106,701,11...|
+---+--------------------+
only showing top 20 rows

