### Dataset
The most basic form of a dataset we have is a collection of Objects (done as a `list` here). This class will support a few operations (we will add more below):
1. `map`: this takes in a function as input, and applies that function to each object in the collection, and returns the resulting collection.
1. `filter`: this also takes in a Boolean function, and returns the subset of objects that satisfies the condition.
1. `cartesianproduct`: this takes in another collection as input and returns 2-tuples where every object for first collection is combined with every object from the other collection.

In [1]:
class Dataset:
    def __init__(self, data):
        assert isinstance(data, list), "Data must be a list"
        self.data = data

    def map(self, function):
        l = [function(x) for x in self.data]  ## can use Python's map to do this as well
        return Dataset(l)

    def filter(self, function):
        l = [x for x in self.data if function(x)]  ## can use Python's filter to do this as well
        return Dataset(l)

    def cartesianproduct(self, other):
        joined_data = [(self_item, other_item) for self_item in self.data 
                                               for other_item in other.data]
        return Dataset(joined_data)    

In [2]:
dataset = Dataset([1, 2, 3, 4])
def f1(x):
    return x*2
mapped_dataset = dataset.map(f1)
print(mapped_dataset.data)

[2, 4, 6, 8]


In [3]:
mapped_dataset = dataset.map(lambda x: x * 2)
print(mapped_dataset.data)

[2, 4, 6, 8]


In [6]:
sentences = Dataset(["Hello world", "This is a test"])
res = sentences.map(lambda x: (x, len(x)))
print(res.data)

[('Hello world', 11), ('This is a test', 14)]


In [7]:
filtered_dataset = dataset.filter(lambda x: x % 2 == 0)
print(filtered_dataset.data)

[2, 4]


In [9]:
dataset1 = Dataset([[1, 2, 3]])
dataset2 = Dataset(['a', 'b', 'c'])
product = dataset1.cartesianproduct(dataset2)
print(product.data) 

[([1, 2, 3], 'a'), ([1, 2, 3], 'b'), ([1, 2, 3], 'c')]


In [10]:
with open('README.md') as f:
    file_lines = f.readlines()
lines = Dataset(file_lines)

ret = lines.map(lambda x: (x, len(x)))

for i in range(0, 3):
    print("-----------")
    print(ret.data[i])

-----------
('# Assignment 0: Computing Environment\n', 38)
-----------
('\n', 1)
-----------
('Over the course of the semester, you will work with a variety of software packages, including PostgreSQL, Apache Spark, MongoDB, and others. Installing those\n', 158)


Next we will define flatMap and join. 

1. `flatMap`: similar to map in that the function is applied to every object in the collection. However, the function must return a `list` and the final output is a flattened union of all those lists.
1. `join`: similar to `cartesianproduct`, but also takes in a function as input that can check an arbitrary condition on the two objects.

Next we will define a `reduce` method, which takes in a function as input and repeatadly applies it to pairs of items until there is a single value left.

In [11]:
def flatMap(self, function):
    # This is a more verbose way of writing this
    ret = []
    for x in self.data:
        y = function(x)
        ret += y
    return Dataset(ret)

def join(self, other, function):
    joined_data = [(self_item, other_item) for self_item in self.data 
                                           for other_item in other.data 
                                           if function(self_item, other_item)]
    return Dataset(joined_data)

Dataset.flatMap = flatMap
Dataset.join = join

In [12]:
sentences = Dataset(["Hello world", "This is a test"])
words = sentences.flatMap(lambda x: x.split(" "))
print(words.data) 

['Hello', 'world', 'This', 'is', 'a', 'test']


In [13]:
log_entries = [
    "2023-03-13 10:00:00 - INFO - Server started.",
    "2023-03-13 10:15:00 - ERROR - Failed to connect to database.",
    "2023-03-13 10:30:00 - INFO - User 'admin' logged in.",
    "2023-03-13 10:45:00 - ERROR - Timeout occurred processing request."
]
dataset = Dataset(log_entries)

formatted_logs = dataset.map(lambda x: x.split(" - ")[1:])
print(formatted_logs.data)

[['INFO', 'Server started.'], ['ERROR', 'Failed to connect to database.'], ['INFO', "User 'admin' logged in."], ['ERROR', 'Timeout occurred processing request.']]


In [14]:
def extract_keywords(log_entry):
    _, level, message = log_entry.split(" - ")
    if level == "ERROR":
        # Simple example, in reality, you'd filter out common stop words more effectively
        return [word for word in message.split() if word.lower() not in ["to", "the", "a", "in"]]
    else:
        return []

error_keywords = dataset.flatMap(extract_keywords)
print(error_keywords.data)

['Failed', 'connect', 'database.', 'Timeout', 'occurred', 'processing', 'request.']


In [15]:
json_books = [
    '{"id": 1, "title": "The Great Gatsby", "author": "F. Scott Fitzgerald", "genres": ["Fiction", "Classic"]}',
    '{"id": 2, "title": "Moby Dick", "author": "Herman Melville", "genres": ["Fiction", "Adventure", "Classic"]}',
    '{"id": 3, "title": "1984", "author": "George Orwell", "genres": ["Fiction", "Dystopian", "Political"]}'
]
dataset = Dataset(json_books)

import json

title_author_pairs = dataset.map(lambda x: (json.loads(x)['title'], json.loads(x)['author']))
print(title_author_pairs.data)

[('The Great Gatsby', 'F. Scott Fitzgerald'), ('Moby Dick', 'Herman Melville'), ('1984', 'George Orwell')]


In [16]:
genres = dataset.flatMap(lambda x: json.loads(x)['genres'])
print(genres.data)

['Fiction', 'Classic', 'Fiction', 'Adventure', 'Classic', 'Fiction', 'Dystopian', 'Political']


In [17]:
res1 = genres.map(lambda x: (x, 1))
print(res1.data)

[('Fiction', 1), ('Classic', 1), ('Fiction', 1), ('Adventure', 1), ('Classic', 1), ('Fiction', 1), ('Dystopian', 1), ('Political', 1)]


#### reduce
Next we will define `reduce` which is like an aggregate but takes in a function that merges two items as input.

In [18]:
def reduce(self, function):
    result = self.data[0]
    for item in self.data[1:]:
        result = function(result, item)
    return result

Dataset.reduce = reduce

In [19]:
dataset = Dataset([1, 2, 3, 4])
print(dataset.reduce(lambda x, y: x + y))

10


In [24]:
res2 = genres.reduce(lambda x, y: ",".join([x, y]))
print(res2)

Fiction,Classic,Fiction,Adventure,Classic,Fiction,Dystopian,Political


The below is a slightly better implementation that takes in a starting value.

In [None]:
def reduce(self, function, initial=None):
    if initial is None:
        if not self.data:
            raise ValueError("reduce() of empty dataset with no initial value")
        result = self.data[0]
        for item in self.data[1:]:
            result = function(result, item)
    else:
        result = initial
        for item in self.data:
            result = function(result, item)
    return Dataset(joined_data)

Dataset.reduce = reduce

### DatasetKV

Let's move on to DatasetKV. This will be a subclass of the above, with the requirement that the objects in the dataset are 2-tuples, which will be interpreted as (key, value) pairs. We typically require keys to be hashable -- we won't enforce it in the code below, but some functions may not work.

The first twp operations we will define on this will be: 
1. "equiJoin" -- here we will join on the "keys", i.e., we will pair up objects that have the same keys across the two collections
1. "groupByKey", which simply groups the objects by the key.

In [25]:
class DatasetKV(Dataset):
    def __init__(self, data):
        assert all(isinstance(item, tuple) and len(item) == 2 for item in data), "Data must be a list of key-value pairs"
        super().__init__(data)

    def groupByKey(self):
        grouped = {}
        for k, v in self.data:
            if k in grouped:
                grouped[k].append(v)
            else:
                grouped[k] = [v]
        return DatasetKV([(k, grouped[k]) for k in grouped])

    
    ## This is a naive implementation that wouldn't scale to large values
    ## At the least, a hash table should be built on other.data
    def equiJoin(self, other):
        joined_data = [(self_item[0], (self_item[1], other_item[1])) 
                                           for self_item in self.data 
                                           for other_item in other.data 
                                           if self_item[0] == other_item[0]]
        return DatasetKV(joined_data)

In [26]:
pairs = DatasetKV([('fruit', 'apple'), ('fruit', 'banana'), ('vegetable', 'carrot')])
grouped = pairs.groupByKey()
print(grouped.data)  

[('fruit', ['apple', 'banana']), ('vegetable', ['carrot'])]


In [27]:
genres_grouped = DatasetKV(res1.data).groupByKey()
print(genres_grouped.data)

[('Fiction', [1, 1, 1]), ('Classic', [1, 1]), ('Adventure', [1]), ('Dystopian', [1]), ('Political', [1])]


In [28]:
dataset1 = DatasetKV([('a', 1), ('a', 3), ('b', 2)])
dataset2 = DatasetKV([('a', 3), ('a', 4), ('c', 4), ('b', 3)])
joined_dataset = dataset1.equiJoin(dataset2)
print(joined_dataset.data) 

[('a', (1, 3)), ('a', (1, 4)), ('a', (3, 3)), ('a', (3, 4)), ('b', (2, 3))]


Note: There are a few choices for the way the join result is represented. 
1. As shown above, which mimics how SQL does it.
1. However, we may also do: (('a', 1), ('a', 4)) for the first one -- this keeps both the original tuples and makes more sense for non-equijoins
1. Another option is do grouping instead, where the result could look like: 
[('a', ( [1, 3], [3, 4] )), ('b', ( [2], [3] ))

In the last one, we are essentially grouping both the relations by the key and then joining them while keeping groups as they are. This is called a "cogroup" in Apache Spark.

#### ReduceByKey and SemiJoin
Next, let's add reduceByKey and a semijoin operation to DatasetKV.

1. reduceByKey: This allows us to "merge" or "aggregate" or "reduce" all the entries in a group. In most implementations, you would have to pass in a `reduce` function that takes in two arguments and "reduces/merges" then into a single value. Both the arguments and the return value must be of the same type.

2. semiJoin: A variation of join, which is really more like a `filter`, where we only keep those entries in `self.data` that have a match in `other.data`.

In [30]:
def reduceByKey(self, function):
    reduced = {}
    for k, v in self.data:
        if k in reduced:
            reduced[k] = function(reduced[k], v)
        else:
            reduced[k] = v
    return DatasetKV([(k, reduced[k]) for k in reduced])

def semijoin(self, other):
    other_keys = set(k for k, _ in other.data)  # Extract keys from the other dataset.
    filtered_data = [(k, v) for k, v in self.data if k in other_keys]
    return DatasetKV(filtered_data)

DatasetKV.reduceByKey = reduceByKey
DatasetKV.semijoin = semijoin

In [31]:
sales = DatasetKV([('January', 100), ('February', 150), ('January', 200), ('February', 100)])

def reduce_function(x, y):
    return x+y

total_sales = sales.reduceByKey(lambda x, y: x + y)
print(total_sales.data)

[('January', 300), ('February', 250)]


In [33]:
dataset1 = DatasetKV([('a', 1), ('a', 3), ('b', 2), ('d', 1)])
dataset2 = DatasetKV([('a', 3), ('a', 4), ('c', 4), ('b', 3)])
semijoined_dataset = dataset1.semijoin(dataset2)
print(semijoined_dataset.data) 

[('a', 1), ('a', 3), ('b', 2)]


#### Lookup
Finally, let's add a lookup function, which behaves like a "left outer join". For each entry in `self.data`, if there are matches in `other.data`, it will return all of them in a list. If there are no matches, an empty list will be returned. This is a common operation in MongoDB (only way to really do joins) and also in Excel.

In [34]:
def lookup(self, other):
        joined_data = []

        for k, v in self.data:
            matches = [y[1] for y in other.data if y[0] == k]
            joined_data.append(((k, (v, matches))))
        
        return DatasetKV(joined_data)

DatasetKV.lookup = lookup

In [35]:
dataset1 = DatasetKV([('a', 1), ('a', 3), ('b', 2), ('d', 3)])
dataset2 = DatasetKV([('a', 3), ('a', 4), ('c', 4), ('b', 3)])
lookup_result = dataset1.lookup(dataset2) 
print(lookup_result.data)

[('a', (1, [3, 4])), ('a', (3, [3, 4])), ('b', (2, [3])), ('d', (3, []))]


### DatasetTuple
Next, we will define a Dataset of tuples for completeness. The functionality is largely the same as above, but because of the availability of "schema", we can do more structured stuff (e.g., joining on attributes).

In [None]:
class DatasetTuple(Dataset):
    def __init__(self, data, schema):
        assert all(isinstance(item, tuple) for item in data), "Data must be a list of tuples"
        super().__init__(data)
        self.schema = schema  # List of attributes

    def join(self, other, attribute):
        if attribute not in self.schema or attribute not in other.schema:
            raise ValueError("Attribute not found in schema(s)")
        
        self_index = self.schema.index(attribute)
        other_index = other.schema.index(attribute)
        
        joined_data = []
        for self_tuple in self.data:
            for other_tuple in other.data:
                if self_tuple[self_index] == other_tuple[other_index]:
                    joined_data.append((self_tuple, other_tuple))
        
        return DatasetTuple(joined_data, self.schema + other.schema)

In [None]:
dataset1 = DatasetTuple([(1, 'apple'), (2, 'banana')], ['id', 'name'])
dataset2 = DatasetTuple([(2, 'yellow'), (1, 'red')], ['id', 'color'])
joined_dataset = dataset1.join(dataset2, 'id')
print(joined_dataset.data)  # Output: [((1, 'apple'), (1, 'red')), ((2, 'banana'), (2, 'yellow'))]
print(joined_dataset.schema)  # Output: ['id', 'name', 'id', 'color']

### Questions

In [36]:
#1
dataset = Dataset([1, 2, 3])
squared = dataset.map(lambda x: x**2)

In [None]:
#2
dataset = Dataset([1, 2, 3, 4])
evens = dataset.filter(lambda x: x % 2 == 0)

In [None]:
#3
dataset = Dataset([1, 2, 3, 4])
sum_result = dataset.reduce(lambda acc, x: acc + x, 0)  # Assuming a reduce method exists

In [37]:
#4
dataset = Dataset(["hello", "world"])
uppercase = dataset.map(str.upper)
print(uppercase.data)

['HELLO', 'WORLD']


In [41]:
#5
dataset = Dataset([ ["hi"], ["bye"]])
characters = dataset.flatMap(lambda x: list(x))
print(characters.data)

['hi', 'bye']


In [None]:
#6
dataset = Dataset(["hello", "a", "world"])
long_strings = dataset.filter(lambda x: len(x) > 1)

In [43]:
#7
json_dataset = Dataset(['{"title": "Book A", "author": "Author X"}', '{"title": "Book B", "author": "Author Y"}'])
titles = json_dataset.map(lambda x: json.loads(x)['title'])
print(titles.data)

['Book A', 'Book B']


In [44]:
#8
json_dataset = Dataset(['{"title": "Book A", "author": "Author X"}', '{"title": "Book B", "author": "Author Y"}'])
all_info = json_dataset.flatMap(lambda x: list(json.loads(x).values()))
print(all_info.data)

['Book A', 'Author X', 'Book B', 'Author Y']


In [None]:
datasetkv = DatasetKV([("a", 1), ("b", 2), ("a", 3)])
grouped = datasetkv.groupByKey()

In [None]:
datasetkv = DatasetKV([("a", 1), ("b", 2), ("a", 3)])
reduced_sum = datasetkv.reduceByKey(lambda x, y: x + y)

In [None]:
dataset1 = DatasetKV([("a", 1), ("b", 2)])
dataset2 = DatasetKV([("a", 3), ("c", 4)])
semijoin_result = dataset1.semijoin(dataset2)

In [None]:
dataset1 = DatasetTuple([(1, "apple"), (2, "banana")], ["id", "name"])
dataset2 = DatasetTuple([(2, "yellow"), (1, "red")], ["id", "color"])
joined_on_id = dataset1.join(dataset2, "id")

In [None]:
json_dataset = Dataset(['{"id": 1, "active": true}', '{"id": 2, "active": false}'])
active_ids = json_dataset.map(lambda x: json.loads(x)['id']).filter(lambda id: id > 1)

In [None]:
json_dataset = Dataset(['{"name": "John", "tags": ["friend", "school"]}', '{"name": "Doe", "tags": ["work", "gym"]}'])
tags = json_dataset.flatMap(lambda x: json.loads(x)['tags'])

In [None]:
json_dataset = Dataset(['{"name": "John", "tags": ["friend", "school"]}', '{"name": "Doe", "tags": ["work", "gym"]}'])
tags = json_dataset.flatMap(lambda x: json.loads(x)['tags'])

In [None]:
dataset1 = Dataset(["user1", "user2"])
dataset2 = Dataset(["data1", "data2"])
joined_data = dataset1.join(dataset2).map(lambda x: f"{x[0]}_info: {x[1]}")