In [1]:
from collections import Counter
import socket
import time
import ray
import requests 
import pandas as pd 
from bs4 import BeautifulSoup 

In [2]:
#Place 3.84.123.210 with your head VM public IP address.
#If your head VM public IP address is 11.11.11.11, addr = "ray://11.11.11.11:10001"
addr = "ray://54.234.69.49:10001"
ray.init(address=addr)

SIGTERM handler is not set because current thread is not the main thread.


0,1
Python version:,3.10.12
Ray version:,2.9.2
Dashboard:,http://127.0.0.1:8265


In [75]:
ray.shutdown()

In [3]:
print('''This cluster consists of
    {} nodes in total
    {} CPU resources in total
    {} Memory resources in total
    {} Object store memory resources in total
'''.format(len(ray.nodes()), ray.cluster_resources()['CPU'], 
           ray.cluster_resources()['memory'] / (1024*1024*1024), 
           ray.cluster_resources()['object_store_memory'] / (1024*1024*1024)))

This cluster consists of
    3 nodes in total
    6.0 CPU resources in total
    14.35987892281264 Memory resources in total
    6.457735060714185 Object store memory resources in total



In [5]:
def getdata(url): 
    r = requests.get(url) 
    return r.text 


In [9]:
htmldata = getdata("https://sherlock-holm.es/stories/html/scan.html")
soup = BeautifulSoup(htmldata, 'html.parser') 
corpus = '' 
for data in soup.find_all("p"): 
    corpus += data.get_text()

In [13]:
txtf = open('merged_ebooks.txt','r')
corpus = txtf.read()

In [14]:
corpus = corpus.split()

In [15]:
corpus[:10]

['\ufeffThe',
 'Project',
 'Gutenberg',
 'eBook',
 'of',
 'The',
 'Complete',
 'Works',
 'of',
 'William']

In [16]:
num_partitions = 3
chunk = len(corpus) // num_partitions
partitions = [
    corpus[i * chunk: (i + 1) * chunk] for i in range(num_partitions)
]

In [17]:
def map_function(document):
    """
    Converts a document into a list of key-value pairs, where each word is a key, and the value is set to 1.
    Steps:
        1. Create an empty list named kvs to store key-value pairs.
        2. Iterate through each word in the document and create a tuple (word, 1) and append it to the kvs list.
        3. After processing all words, return the kvs list as the result.

    Parameters:
    - document (list): List of words representing a document.

    Returns:
    - list: A list of key-value pairs generated from the input document.
    """

    kvs = list()
    for word in document:
        kvs.append((word.lower(), 1))

    return kvs

@ray.remote(scheduling_strategy="SPREAD")
def do_map(corpus, num_partitions=3):
    """
    Applies a mapping operation on the given 'corpus' to partition it into 'num_partitions'.
    Steps:
    1. initialize an empty list named map_results with a length equal to the num_partitions.
    2. Call the map_function on the corpus to generate a list of key-value pairs.
    3. iterate through each entry in key-value pair list and calculate the partition 
        index using the hash(key) the modulo operation with num_partitions to decide which reducer this key-value pair should go to.
    4. return the list of partitions. 

    Parameters:
    - corpus (list): The input data to be mapped.
    - num_partitions (int): The number of partitions to divide the 'corpus' into (default is 3).

    Returns:
    - list: A list containing the mapped partitions of the 'corpus'.
    """
    # Function implementation goes here

    # Initialize an empty list for each partition
    map_results = [list() for _ in range(num_partitions)]

    # Generate key-value pairs using the map_function
    kv_pairs = map_function(corpus)


    # Distribute key-value pairs to partitions based on hash of the key
    for entry in kv_pairs:
        paritition_index = hash(entry[0]) % num_partitions
        map_results[paritition_index].append(entry)
        
    return map_results

In [18]:
map_results = [
    do_map.options(num_returns=num_partitions)
    .remote(data, num_partitions)
    for data in partitions
]



In [26]:
def reduce_function(key, kvs):
    return len(kvs[key])

@ray.remote(scheduling_strategy="SPREAD")
def do_reduce(*my_partition_list):
    """
    Performs a reduce operation on a list of key-value pair partitions.

    Parameters:
    - *my_partition_list: Variable-length list of partitions containing key-value pairs.

    Returns:
    - Any: The result of the reduce operation.
    """
    
    reduce_results = dict()
    res = dict()
    
    for partition in my_partition_list:
        for key,value in partition:
            if key not in reduce_results:
                vals = list()
                vals.append(value)
                reduce_results[key] =  vals
            else:
                reduce_results[key].append(value)

    for key, val in reduce_results.items():
        res[key] = reduce_function(key, reduce_results)
                
    return res

In [27]:
merged_res = []
for i in range(num_partitions):
    merged_res.append(
        do_reduce.remote(*[partition[i] for partition in map_results])
    )

2024-02-20 15:03:13,867	INFO worker.py:1715 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


ValueError: An application is trying to access a Ray object whose owner is unknown(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000). Please make sure that all Ray objects you are trying to access are part of the current Ray session. Note that object IDs generated randomly (ObjectID.from_random()) or out-of-band (ObjectID.from_binary(...)) cannot be passed as a task argument because Ray does not know which task created them. If this was not how your object ID was generated, please file an issue at https://github.com/ray-project/ray/issues/

In [22]:
counts = dict()
for res in ray.get(merged_res):
    for key, val in res.items():
        counts[key] =val
        
counts_descending = sorted(counts.items(), key=lambda item: item[1], reverse=True)

In [24]:
print(counts_descending[:100])

[('the', 1357475), ('of', 522828), ('a', 464353), ('in', 384916), ('and', 269139), ('i', 248046), ('to', 197945), ('is', 156626), ('was', 148937), ('you', 141206), ('his', 132176), ('it', 120792), ('had', 90948), ('he', 78697), ('on', 76795), ('that', 74999), ('were', 70400), ('this', 66789), ('with', 63469), ('for', 61257), ('their', 59962), ('or', 57736), ('no', 53887), ('there', 52363), ('would', 51696), ('as', 51101), ('are', 45130), ('so', 45116), ('but', 41867), ('him', 41102), ('at', 39366), ('not', 38989), ('be', 38534), ('could', 36799), ('said', 33755), ('by', 32485), ('they', 32266), ('from', 31793), ('she', 31705), ('her', 30232), ('my', 29186), ('who', 28805), ('do', 28489), ('have', 28133), ('our', 28059), ('will', 27937), ('now', 26945), ('which', 26909), ('can', 26700), ('thou', 24710), ('shall', 23561), ('upon', 23361), ('two', 23160), ('all', 23133), ('about', 22559), ('if', 22324), ('time', 21644), ('come', 21439), ('some', 21227), ('an', 21141), ('what', 19753), ('g

In [25]:
ray.shutdown()