### Opening the data to check it before continuing forward

##### Checking the files in `data/clicks` and `data/users` folders

In [840]:
import os

# looking at a list of all files in data/clicks folder
clicks_list = os.listdir(clicks_path)
clicks_list

['part-001.csv', 'part-002.csv', 'part-003.csv', 'part-004.csv']

In [841]:
# looking at a list of all files in data/users folder
users_list = os.listdir(users_path)
users_list

['part-001.csv', 'part-002.csv']

In [842]:
from csv import reader

clicks_path = 'D:/Vinted data engineering task/data/clicks/'
users_path = 'D:/Vinted data engineering task/data/users/'
path_save = 'D:/Vinted data engineering task/'

def lookup(path, file_name):
    with open(path + file_name, 'r', encoding='utf8') as f:
        read_file = reader(f)
        dataset = list(read_file)
        header = dataset[0]
#         dataset = dataset[1:]
        
        print('Number of rows:', len(dataset))
        print('Number of columns:', len(dataset[0]))
    
    return dataset

##### Checking the contents of the dataset

In [843]:
def explore_data(dataset, start, end):
    dataset_slice = dataset[start:end]    
    for row in dataset_slice:
        print(row, '\n')

##### To check any file from `/data` folder, pass the necessary path variable with the name of the file

In [844]:
clicks1_dt = lookup(clicks_path, 'part-001.csv')
clicks1_explore = explore_data(clicks1_dt, 0, 5)

Number of rows: 12
Number of columns: 3
['date', 'user_id', 'click_target'] 

['2017-12-15', '4', 'item'] 

['2017-12-15', '4', 'ad'] 

['2017-12-12', '4', 'profile'] 

['2017-12-13', '4', 'profile'] 



## Task #1: use implemented map-reduce framework for aggregation.

Having in mind `data/clicks` dataset with "date" column, count how many clicks there were for each date and write the results to `data/total_clicks` dataset with "date" and "count" columns.

##### To prepare for aggregation of data from `data/clicks`, we first take a look at how many files there are to help us choose any file if needed easier later:

In [845]:
# looking at a list of all files in data/clicks folder
clicks_list = os.listdir(clicks_path)
clicks_list

['part-001.csv', 'part-002.csv', 'part-003.csv', 'part-004.csv']

### Counting clicks for each date in any file in `data/clicks`

##### This function works as a mapper:

In [826]:
def key_pair(dataset):    
    freq_table = {}
    for row in dataset[1:]:
        date = row[0]
        if date in freq_table:
            freq_table[date] += 1
        else:
            freq_table[date] = 1
            
    return freq_table

##### To count clicks of any file from `data/clicks` folder, pass the necessary path variable with the name of the file

In [846]:
clicks1_dt = lookup(clicks_path, 'part-001.csv')
clicks1_explore = explore_data(clicks1_dt, 0, 5)

keypair = key_pair(clicks1_dt)
print(keypair)

Number of rows: 12
Number of columns: 3
['date', 'user_id', 'click_target'] 

['2017-12-15', '4', 'item'] 

['2017-12-15', '4', 'ad'] 

['2017-12-12', '4', 'profile'] 

['2017-12-13', '4', 'profile'] 

{'2017-12-15': 2, '2017-12-12': 2, '2017-12-13': 2, '2017-12-18': 1, '2017-12-21': 1, '2017-12-17': 1, '2017-12-14': 1, '2017-12-16': 1}


##### Writing the results to `data/total_clicks` dataset with "date" and "count" columns

In [847]:
import pandas as pd

totalclicks_df = pd.DataFrame(list(keypair.items()), columns=['Date', 'Count'])

totalclicks_df.to_csv('D:/Vinted data engineering task/data/total_clicks.csv')

## Task #2: join two datasets using implemented map-reduce framework

There are two datasets:

* `data/users` dataset with columns "id" and "country"
* `data/clicks` dataset with columns "date", "user_id" and "click_target"

Produce a new dataset called `data/filtered_clicks` that includes only those clicks that belong to users from Lithuania (`country=LT`).

##### To make it easier to filter "clicks" by "country", we combine files from `data/users` folder into one file and `data/clicks` folder into one file:

In [848]:
# Combining all files in data/clicks folder into a single file

df_concat_clicks = pd.concat([pd.read_csv(f'{clicks_path}{clicks_list}') 
                              for clicks_list in clicks_list], ignore_index=True)
df_concat_clicks

Unnamed: 0,date,user_id,click_target,screen
0,2017-12-15,4,item,
1,2017-12-15,4,ad,
2,2017-12-12,4,profile,
3,2017-12-13,4,profile,
4,2017-12-13,4,ad,
...,...,...,...,...
95,2017-12-20,7,ad,feed
96,2017-12-10,7,ad,feed
97,2017-12-20,7,profile,feed
98,2017-12-17,7,item,catalog


In [830]:
# df_concat_clicks.to_csv('D:/Vinted data engineering task/concat_clicks.csv')

In [849]:
# Combining all files in data/users folder into a single file

df_concat_users = pd.concat([pd.read_csv(f'{users_path}{users_list}') 
                              for users_list in users_list], ignore_index=True)
df_concat_users

Unnamed: 0,id,country,city
0,1,LT,
1,2,DE,
2,3,LT,
3,100,LT,
4,200,DE,
5,300,LT,
6,4,LT,Vilnius
7,5,DE,Berlin
8,6,DE,Munich
9,101,DE,Hamburg


##### Combining "users" and "clicks" datasets to one:

In [851]:
data1 = df_concat_clicks
data2 = df_concat_users

df1 = pd.DataFrame(data1)
df2 = pd.DataFrame(data2)

# Merging two DataFrames
df3 = pd.merge(df1, df2, left_on='user_id', right_on='id')

# Display result
print("Result:\n",df3)
# print('Length of dataframe:', len(df3))

df3.to_csv('D:/Vinted data engineering task/users_clicks.csv')

Result:
           date  user_id click_target   screen  id country     city
0   2017-12-15        4         item      NaN   4      LT  Vilnius
1   2017-12-15        4           ad      NaN   4      LT  Vilnius
2   2017-12-12        4      profile      NaN   4      LT  Vilnius
3   2017-12-13        4      profile      NaN   4      LT  Vilnius
4   2017-12-13        4           ad      NaN   4      LT  Vilnius
..         ...      ...          ...      ...  ..     ...      ...
71  2017-12-11        3           ad     feed   3      LT      NaN
72  2017-12-15        3         item  catalog   3      LT      NaN
73  2017-12-20        3      profile  catalog   3      LT      NaN
74  2017-12-16        3           ad  catalog   3      LT      NaN
75  2017-12-12        3           ad     feed   3      LT      NaN

[76 rows x 7 columns]


##### Inspecting the new dataset:

In [834]:
df3_dt = lookup(path_save, 'users_clicks.csv')
df3_explore = explore_data(df3_dt, 0, 5)

Number of rows: 77
Number of columns: 8
['', 'date', 'user_id', 'click_target', 'screen', 'id', 'country', 'city'] 

['0', '2017-12-15', '4', 'item', '', '4', 'LT', 'Vilnius'] 

['1', '2017-12-15', '4', 'ad', '', '4', 'LT', 'Vilnius'] 

['2', '2017-12-12', '4', 'profile', '', '4', 'LT', 'Vilnius'] 

['3', '2017-12-13', '4', 'profile', '', '4', 'LT', 'Vilnius'] 



### Getting a number of clicks based on a country

##### This function works as a reducer:

In [836]:
def counting_clicks(dataset, country):
    
    click_freq_table = {}
    for row in dataset[1:]:
        date = row[1]
        country_name = row[6]
        if date in click_freq_table and country_name == country:
            click_freq_table[date] += 1
        else:
            click_freq_table[date] = 1
#         print(click_freq_table)
    
    return click_freq_table

##### Here we pass a combined file of "users" and "clicks" and filter the results by "country":

In [852]:
freq_count = counting_clicks(df3_dt, 'LT')
print('Clicks per country:')
print(freq_count)

Clicks per country:
{'2017-12-15': 2, '2017-12-12': 3, '2017-12-13': 1, '2017-12-18': 1, '2017-12-21': 2, '2017-12-17': 3, '2017-12-14': 1, '2017-12-16': 3, '2017-12-11': 2, '2017-12-20': 3, '2017-12-19': 3, '2017-12-10': 1}


In [853]:
filteredclicks_df = pd.DataFrame(list(freq_count.items()), columns=['Date', 'Count'])

filteredclicks_df.to_csv('D:/Vinted data engineering task/data/filtered_clicks.csv')

##### To check if the reducer worked correctly:

In [None]:
# counted_df = pd.crosstab(df3['date'], df3['country'])
# print(counted_df)
# counted_df.to_csv('D:/Vinted data engineering task/data/filtered_clicks.csv')