Program written in Python 3.11.1 using Jupiter Notebook.

To run program, execute cell below. (data file directory structures should be the same)

In [1]:
# Mapper function responsible of generating key-alue dictionary pairs and returning them
def mapper_task_1(data):
    results = []
    for click in data:
        key = click['date']
        value = click
        results.append(({'key': key, 'value': value}))
    return results

# Reducer funtion responsible for count, how many each day there were clicks
def reducer_task_1(key, values):
    result = []
    result.append({'date': key, 'count': len(values)})
    # Returning all key-value pairs
    return result

# CUSTOM mapReduce IMPLEMENTATION
# As number of datasets differs in both tasks, mapReduce takes list of datasets and list of mappers (if only one should be passed, it should be in a list as well)
def map_reduce(datasets, mappers, reducer):
    # Apply the mapper to the datasets
    intermediate_data = []
    for i, dataset in enumerate(datasets):
        mapper = mappers[i]
        intermediate_data += mapper(dataset)
    # Group the intermediate data by key
    grouped_data = {}
    for item in intermediate_data:
        if item['key'] in grouped_data:
            grouped_data[item['key']].append(item['value'])
        else:
            grouped_data[item['key']] = [item['value']]
    # Apply the reducer to each group of values
    final_data = []
    for key, values in grouped_data.items():
        final_data.extend(reducer(key, values))
    # Return the final output
    return final_data

# TWO MAP FUNCTIONS FOR SECOND TASK
# Same return as mapper function in first task, just additional implementation of table key value pair
# 'table' : 'users' <-- for users dataset
# 'table' : 'clicks' <-- for clicks dataset
def mapper_task_2_users(users):
    results = []
    for user in users:
        if user['country'] == 'LT':
            key = user['id']
            value = user.copy()
            # indicator that this is a users table
            value['table'] = 'users'
            results.append({'key': key, 'value': value})
    return results

def mapper_task_2_clicks(clicks):
    results = []
    for click in clicks:
        key = click['user_id']
        value = click
        # indicator that this is a users table
        value['table'] = 'clicks'
        results.append(({'key': key, 'value': value}))
    return results

# Reducer accepts mapped data (LT user + his clicks) + unmatched clicks, which are later filtered out
def reducer_task_2(key, values):
    # Values -> LT user : his click | LT user : no clicks | clicks without LT user (first mapper filters out users that are not from LT)
    # user -> Filters out from values onlt user data (needed to filter out clicks)
    user = next((value for value in values if value['table'] == 'users'), None)
    filtered_clicks = []
    for click in values:
        # From grouped dataset takes only data that are clicks
        # And merges user data with click data (like join) (user data + click data) --> single line
        # After that it appends all merged clicks to filtered list (user data + click data) --> multiple lines
        if click['table'] == 'clicks':
            merged_click = click.copy()
            if user is not None:
                merged_click.update(user)
            if 'country' in merged_click and merged_click['country'] is not None:
                filtered_clicks.append(merged_click)
    return filtered_clicks


import os
import csv


dir_path_clicks = f'{os.getcwd()}/data/clicks'
file_path_clicks = 'data/clicks'

dir_path_users = f'{os.getcwd()}/data/users'
file_path_users = 'data/users'

# File reader function to read files from data source
def file_reader(dir_path, file_path):
    # Loop responsible for opening files
    data_set = []
    for file in os.listdir(dir_path):
        data = open(f'{file_path}/{file}', encoding='utf-8')
        csv_data = list(csv.reader(data))
        # Below loop applies key to every csv file cell. That data for both datasets would be homogenious
        headers = csv_data[0]
        for data_row in csv_data[1:]:
            my_dict = {}
            for i, cell in enumerate(data_row):
                my_dict[headers[i]] = cell
            data_set.append(my_dict)
    data.close()
    return data_set


data_set_clicks = file_reader(dir_path_clicks, file_path_clicks)
data_set_users = file_reader(dir_path_users, file_path_users)

final_data_1 = map_reduce(datasets=[data_set_clicks], mappers=[mapper_task_1], reducer=reducer_task_1)
final_data_2 = map_reduce(datasets=[data_set_users, data_set_clicks], mappers=[mapper_task_2_users, mapper_task_2_clicks], reducer=reducer_task_2)


with open('data/filtered_clicks', 'w', encoding='UTF8') as f:
    writer = csv.writer(f)
    writer.writerow(['date','user_id', 'screen', 'click_target','table','id','country'])
    for data in final_data_2:
        if 'screen' not in data:
            row = [data['date'], data['user_id'], 'null', data['click_target'], data['table'], data['id'], data['country']]
        else:
            row = [data['date'], data['user_id'], data['screen'],data['click_target'], data['table'], data['id'], data['country']]
        writer.writerow(row)

with open('data/total_clicks', 'w', encoding='UTF8') as f:
    writer = csv.writer(f)
    writer.writerow(['date', 'count'])
    for data in final_data_1:
        row = [data['date'], data['count']]        
        writer.writerow(row)