## Вспомним map-reduce

Давайте напишем простые маппер и редьюсер для подсчета числа слов в файле.

In [9]:
%%writefile mapper.py

import sys

def mapper():
    for line in sys.stdin:
        line = line.strip()
        words = line.split()
        for word in words:
            sys.stdout.write("{}\t{}\n".format(word, 1))
            
if __name__ == "__main__":
    mapper()

Overwriting mapper.py


In [10]:
%%writefile reducer.py

import sys

def reduce():
    prev_key = None
    counter = 0
    for line in sys.stdin:
        vals = line.strip().split("\t")
        if prev_key is None or prev_key == vals[0]:
            prev_key = vals[0]
            counter += 1
            continue
        sys.stdout.write("{}\t{}\n".format(prev_key, counter))
        prev_key = vals[0]
        counter = 1
    if prev_key is not None:
        sys.stdout.write("{}\t{}\n".format(prev_key, counter))
            
            
if __name__ == "__main__":
    mapper()

Overwriting reducer.py


cat text.txt | python3 mapper.py | sort -k1,1 | python3 reducer.py

## Давайте посмотрим на разные форматы хранения данных

### Тестовые форматы

csv, tsv, txt

In [9]:
import pandas as pd

tsv_sample = pd.read_csv("reddit_vm.tsv", sep="\t")
tsv_sample.head()

Unnamed: 0,title,score,id,url,comms_num,created,body,timestamp
0,Health Canada approves AstraZeneca COVID-19 va...,7,lt74vw,https://www.canadaforums.ca/2021/02/health-can...,0,1614400000.0,,2021-02-27 06:33:45
1,COVID-19 in Canada: 'Vaccination passports' a ...,2,lsh0ij,https://www.canadaforums.ca/2021/02/covid-19-i...,1,1614316000.0,,2021-02-26 07:11:07
2,Coronavirus variants could fuel Canada's third...,6,lohlle,https://www.canadaforums.ca/2021/02/coronaviru...,0,1613887000.0,,2021-02-21 07:50:08
3,Canadian government to extend COVID-19 emergen...,1,lnptv8,https://www.canadaforums.ca/2021/02/canadian-g...,0,1613796000.0,,2021-02-20 06:35:13
4,Canada: Pfizer is 'extremely committed' to mee...,6,lkslm6,https://www.canadaforums.ca/2021/02/canada-pfi...,0,1613468000.0,,2021-02-16 11:36:28


In [10]:
csv_sample = pd.read_csv("reddit_vm.csv")
csv_sample.head()

Unnamed: 0,title,score,id,url,comms_num,created,body,timestamp
0,Health Canada approves AstraZeneca COVID-19 va...,7,lt74vw,https://www.canadaforums.ca/2021/02/health-can...,0,1614400000.0,,2021-02-27 06:33:45
1,COVID-19 in Canada: 'Vaccination passports' a ...,2,lsh0ij,https://www.canadaforums.ca/2021/02/covid-19-i...,1,1614316000.0,,2021-02-26 07:11:07
2,Coronavirus variants could fuel Canada's third...,6,lohlle,https://www.canadaforums.ca/2021/02/coronaviru...,0,1613887000.0,,2021-02-21 07:50:08
3,Canadian government to extend COVID-19 emergen...,1,lnptv8,https://www.canadaforums.ca/2021/02/canadian-g...,0,1613796000.0,,2021-02-20 06:35:13
4,Canada: Pfizer is 'extremely committed' to mee...,6,lkslm6,https://www.canadaforums.ca/2021/02/canada-pfi...,0,1613468000.0,,2021-02-16 11:36:28


In [13]:
csv_sample.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1424 entries, 0 to 1423
Data columns (total 8 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   title      1424 non-null   object 
 1   score      1424 non-null   int64  
 2   id         1424 non-null   object 
 3   url        444 non-null    object 
 4   comms_num  1424 non-null   int64  
 5   created    1424 non-null   float64
 6   body       1059 non-null   object 
 7   timestamp  1424 non-null   object 
dtypes: float64(1), int64(2), object(5)
memory usage: 89.1+ KB


In [None]:
#TODO
# Адаптируйте маппер и редьюсер для чтения tsv файла

### Бинарные форматы

#### Protobuf

https://developers.google.com/protocol-buffers/docs/pythontutorial

In [14]:
import reddit_pb2

In [24]:
# Write example

reddit_dataset = reddit_pb2.ReditData()

for i in range(len(csv_sample)):
    reddit_record = reddit_dataset.record.add()
    reddit_record.title = csv_sample.loc[i, "title"]
    reddit_record.score = csv_sample.loc[i, "score"]
    reddit_record.id = csv_sample.loc[i, "id"]
    
    if str(csv_sample.loc[i, "url"]) != "nan":
        reddit_record.url = csv_sample.loc[i, "url"]
    reddit_record.comms_num = csv_sample.loc[i, "comms_num"]
    reddit_record.created = csv_sample.loc[i, "created"]
    
    if str(csv_sample.loc[i, "body"]) != "nan":
        reddit_record.body = csv_sample.loc[i, "body"]
    reddit_record.ts = csv_sample.loc[i, "timestamp"]

serialized = reddit_dataset.SerializeToString()
with open("redit_result.pb", "wb") as f:
    f.write(serialized)

In [25]:
# Read example
with open("redit_result.pb", "rb") as f:
    serialized = f.read()

reddit_dataset = reddit_pb2.ReditData()
reddit_dataset.ParseFromString(serialized)

568572

In [None]:
# TODO Сделайте сохранение в сплиты размером по 500 записей
# Посчитайте число записей с определенным скором на map-reduce

### Avro

In [29]:
import fastavro as avro

In [32]:
schema = {
    'doc': 'Reddit dataset',
    'name': 'reddit',
    'namespace': 'teexamplest',
    'type': 'record',
    'fields': [
        {'name': 'title', 'type': 'string'},
        {'name': 'score', 'type': 'long'},
        {'name': 'id', 'type': 'string'},
        {'name': 'url', 'type': 'string', 'default': ""},
        {'name': 'comms_num', 'type': 'long'},
        {'name': 'created', 'type': 'float'},
        {'name': 'body', 'type': 'string', 'default': ""},
        {'name': 'ts', 'type': 'string'},
    ],
}

In [33]:
records = []

for i in range(len(csv_sample)):
    record = {}
    record["title"] = csv_sample.loc[i, "title"]
    record["score"] = csv_sample.loc[i, "score"]
    record["id"] = csv_sample.loc[i, "id"]
    
    if str(csv_sample.loc[i, "url"]) != "nan":
        record["url"] = csv_sample.loc[i, "url"]
    record["comms_num"] = csv_sample.loc[i, "comms_num"]
    record["created"] = csv_sample.loc[i, "created"]
    
    if str(csv_sample.loc[i, "body"]) != "nan":
        record["body"] = csv_sample.loc[i, "body"]
    record["ts"] = csv_sample.loc[i, "timestamp"]
    
    records.append(record)

with open('reddit.avro', 'wb') as out:
    avro.writer(out, schema, records)

In [35]:
# Чтение
cnt = 0
with open('reddit.avro', 'rb') as fo:
    reader = avro.reader(fo, reader_schema=schema)
    for record in reader:
        cnt += 1
print(cnt)

1424


## Parquet

In [38]:
import pyarrow.parquet as pq
import pyarrow as pa

In [39]:
pq.write_table(pa.Table.from_pandas(csv_sample), 'reddit.parquet')

In [40]:
reddit = pq.read_table('reddit.parquet')
reddit.to_pandas()

Unnamed: 0,title,score,id,url,comms_num,created,body,timestamp
0,Health Canada approves AstraZeneca COVID-19 va...,7,lt74vw,https://www.canadaforums.ca/2021/02/health-can...,0,1.614400e+09,,2021-02-27 06:33:45
1,COVID-19 in Canada: 'Vaccination passports' a ...,2,lsh0ij,https://www.canadaforums.ca/2021/02/covid-19-i...,1,1.614316e+09,,2021-02-26 07:11:07
2,Coronavirus variants could fuel Canada's third...,6,lohlle,https://www.canadaforums.ca/2021/02/coronaviru...,0,1.613887e+09,,2021-02-21 07:50:08
3,Canadian government to extend COVID-19 emergen...,1,lnptv8,https://www.canadaforums.ca/2021/02/canadian-g...,0,1.613796e+09,,2021-02-20 06:35:13
4,Canada: Pfizer is 'extremely committed' to mee...,6,lkslm6,https://www.canadaforums.ca/2021/02/canada-pfi...,0,1.613468e+09,,2021-02-16 11:36:28
...,...,...,...,...,...,...,...,...
1419,Comment,1,ejackaa,,0,1.553486e+09,I didn't say thimerosal is mercury. I said thi...,2019-03-25 05:50:41
1420,Comment,2,ejacj98,,0,1.553486e+09,"The ""myth"" you're debunking is in regards to t...",2019-03-25 05:50:20
1421,Comment,2,ejabpdx,,0,1.553485e+09,You'll have to read it again because I didn't ...,2019-03-25 05:40:03
1422,Comment,0,ej9xuaf,,0,1.553475e+09,"What do you mean by ""your OP"". I am fairly new...",2019-03-25 02:45:21
