# Мини задание 2

В этой домашней работе будет два подзадания.

Мы продолжает работать с датасетом из Авито - https://www.kaggle.com/c/avito-context-ad-clicks .

**1. [+5 баллов]** В `VisitsStream.tsv` лежит информация про пользователей, которые открывают сайт. Используя классический Hadoop MapReduce необходимо посчитать топ 10 пользователей с самыми длинными по времени сессиями и время этой самой длинной сессии (в секундах).

Сессия определяется следующим образом - это окно времени, внутри которого временное расстояние от двух соседних посещений не более **15 минут**. 

Иными словами - если пользователь зашел на сайт в момент X и последнее предыдущее посещение сайта в момент Y было не позднее чем 15 минут назад, то сессия "продлевается" до текущего момента. Если же временное расстояние от X до Y более 15 минут, то считается, что предыдущая сессия закончилась в момент Y, а новая сессия началась в момент X.

Сессия может длится 0 секунд, если пользователь сделал всего 1 запрос в течение 30 минутного окна (в середине этого окна). Считается, что в начале у пользователя нет открытой сессии и что сессия автоматически заканчивается, когда записей больше не осталось.

Выводить нужно только уникальных пользователей и для каждого такого пользователя находить время самой длинной его сессии. 

При решении можно использовать произвольное количество MapReduce задач, но чем меньше, тем лучше. За излишне неоптимальное решение можно потерять балл. 

Полученный файл с топ 10 нужно будет выложить в облако, обеспечить публичный доступ до него и приложить к решению.

В ноутбуке должны присутствовать ячейки с 

1) Всеми необходимыми скриптами для работы ваших MapReduce задач

2) Командами запуска самих MapReduce задач

3) Ссылкой на итоговый результат работы в вашем облаке. Ссылки должны быть рабочими до того момента, как вашу домашку не проверят.

Пример итогового файла

```bash
1000094	6852
1000030	4237
1000003	1932
1000058	1885
100010	1132
1000012	1086
1000067	657
1000111	244
1000085	197
1000049	131
```

In [93]:
! head -n5 /home/ubuntu/mhw1/VisitsStream.tsv

UserID	IPID	AdID	ViewDate
59703	1259356	469877	2015-04-25 00:00:00.0
154389	1846749	27252551	2015-04-25 00:00:00.0
218628	2108380	31685325	2015-04-25 00:00:00.0
231535	837110	18827716	2015-04-25 00:00:00.0


Убираем голову

In [94]:
! sed -i -e '1'd /home/ubuntu/mhw1/VisitsStream.tsv

Кладем в hdfs

In [95]:
! hdfs dfs -rm -r /user/visits/data
! hdfs dfs -mkdir -p /user/visits/data
! hdfs dfs -put /home/ubuntu/mhw1/VisitsStream.tsv /user/visits/data/

Deleted /user/visits/data


In [96]:
! head -n5 /home/ubuntu/mhw1/VisitsStream.tsv

59703	1259356	469877	2015-04-25 00:00:00.0
154389	1846749	27252551	2015-04-25 00:00:00.0
218628	2108380	31685325	2015-04-25 00:00:00.0
231535	837110	18827716	2015-04-25 00:00:00.0
282306	1654210	29363673	2015-04-25 00:00:00.0


In [None]:
# ! sudo hdfs balancer

In [1]:
! sudo chmod 0777 /usr/lib/hadoop/logs
! sudo -u hdfs hdfs balancer 

2022-02-21 11:09:38,819 INFO balancer.Balancer: namenodes  = [hdfs://rc1a-dataproc-m-5ohwrulcy2ee6jxr.mdb.yandexcloud.net:8020]
2022-02-21 11:09:38,823 INFO balancer.Balancer: parameters = Balancer.BalancerParameters [BalancingPolicy.Node, threshold = 10.0, max idle iteration = 5, #excluded nodes = 0, #included nodes = 0, #source nodes = 0, #blockpools = 0, run during upgrade = false]
2022-02-21 11:09:38,824 INFO balancer.Balancer: included nodes = []
2022-02-21 11:09:38,824 INFO balancer.Balancer: excluded nodes = []
2022-02-21 11:09:38,824 INFO balancer.Balancer: source nodes = []
Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved  NameNode
2022-02-21 11:09:38,827 INFO balancer.NameNodeConnector: getBlocks calls for hdfs://rc1a-dataproc-m-5ohwrulcy2ee6jxr.mdb.yandexcloud.net:8020 will be rate-limited to 20 per second
2022-02-21 11:09:40,051 INFO balancer.Balancer: dfs.namenode.get-blocks.max-qps = 20 (default=20)
2022-02-21 11:09:40,051 IN

In [98]:
! hdfs dfs -ls /user/visits/data/

Found 1 items
-rw-r--r--   1 ubuntu hadoop 13180996366 2022-02-20 14:12 /user/visits/data/VisitsStream.tsv


### 1st MapReduce

#### Calculate Longest session

In [108]:
%%writefile longest-session.py
import sys
import csv
import re
from datetime import datetime

start_date = datetime(1970,1,1)
delta_session = 900.0

# конвертируем дату в секунды
def date2seconds(date_iso_str):
    cur_date = datetime.strptime(date_iso_str, "%Y-%m-%d %H:%M:%S.%f")
    delta = cur_date - start_date
    return delta.total_seconds()


def mapper():
    for row in csv.reader(iter(sys.stdin.readline, ''),  delimiter="\t"):
        user, date = row[0], row[3]
        seconds = date2seconds(date)
        print("{}\t{}".format(user, seconds))

def reducer():
    user, enter = next(sys.stdin).split('\t')
    enter = float(enter)
    
    max_duration = cur_duration = 0.0
    last_enter = enter
    
    for line in sys.stdin:
        current_user, current_enter = line.split('\t')
        current_enter =  float(current_enter)
        
        if current_user != user:
            max_duration = max(max_duration, cur_duration)
            print("{}\t{}".format(user, max_duration))
            
            user = current_user
            enter = current_enter
            
            max_duration = cur_duration = 0.0
        else:
            if current_enter - last_enter <= delta_session:
                cur_duration += current_enter - last_enter 
            else:
                max_duration = max(max_duration, cur_duration)
                cur_duration = 0.0
        last_enter = current_enter
                
                
    max_duration = max(max_duration, cur_duration)      
    print("{}\t{}".format(user, max_duration))


if __name__ == '__main__':
    mr_command = sys.argv[1]
    {
        'map': mapper,
        'reduce': reducer
    }[mr_command]()

Overwriting longest-session.py


In [124]:
%%time

! hdfs dfs -rm -r /user/visits/result || true
! yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.name="longest-session" \
-D mapreduce.job.reduces=10 \
-files ~/mhw2/longest-session.py \
-mapper "python3 longest-session.py map" \
-reducer "python3 longest-session.py reduce" \
-input /user/visits/data/ \
-output /user/visits/result/

Deleted /user/visits/result
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob3761870572196447539.jar tmpDir=null
2022-02-20 15:10:42,072 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-5ohwrulcy2ee6jxr.mdb.yandexcloud.net/10.128.0.31:8032
2022-02-20 15:10:42,307 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-5ohwrulcy2ee6jxr.mdb.yandexcloud.net/10.128.0.31:10200
2022-02-20 15:10:42,350 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-5ohwrulcy2ee6jxr.mdb.yandexcloud.net/10.128.0.31:8032
2022-02-20 15:10:42,351 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-5ohwrulcy2ee6jxr.mdb.yandexcloud.net/10.128.0.31:10200
2022-02-20 15:10:42,590 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1645357742880_0002
2022-02-20 15:10:43,297 INFO mapred.FileInputFormat: Total input files t

### 2nd MapReduce

#### Calculate top10

In [138]:
%%writefile top10.py
import sys

def _rewind_stream(stream):
    for _ in stream:
        pass


def mapper():
    for row in sys.stdin:
        key, value = row.split('\t')
        values = value.strip()
        value, _ = value.split('.')
        print("{}+{}\t".format(key, value))


def reducer():
    for _ in range(10):
        key, _ = next(sys.stdin).split('\t')
        user, duration = key.split("+")
        print("{}\t{}".format(user, duration))
    _rewind_stream(sys.stdin)

if __name__ == '__main__':
    mr_command = sys.argv[1]
    {
        'map': mapper,
        'reduce': reducer
    }[mr_command]()

Overwriting top10.py


In [139]:
%%time

! hdfs dfs -rm -r /user/visits/top10/
! yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.job.name="top-10" \
-D mapreduce.job.reduces=1 \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options="-k2,2nr -k1,1" \
-D mapreduce.map.output.key.field.separator='+' \
-files top10.py \
-mapper "python top10.py map" \
-reducer "python top10.py reduce" \
-input /user/visits/result/ \
-output /user/visits/top10/

Deleted /user/visits/top10
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-3.2.2.jar] /tmp/streamjob7945251626992603150.jar tmpDir=null
2022-02-20 15:51:18,581 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-5ohwrulcy2ee6jxr.mdb.yandexcloud.net/10.128.0.31:8032
2022-02-20 15:51:18,799 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-5ohwrulcy2ee6jxr.mdb.yandexcloud.net/10.128.0.31:10200
2022-02-20 15:51:18,837 INFO client.RMProxy: Connecting to ResourceManager at rc1a-dataproc-m-5ohwrulcy2ee6jxr.mdb.yandexcloud.net/10.128.0.31:8032
2022-02-20 15:51:18,838 INFO client.AHSProxy: Connecting to Application History server at rc1a-dataproc-m-5ohwrulcy2ee6jxr.mdb.yandexcloud.net/10.128.0.31:10200
2022-02-20 15:51:19,061 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/ubuntu/.staging/job_1645357742880_0006
2022-02-20 15:51:19,383 INFO mapred.FileInputFormat: Total input files to

In [140]:
! hdfs dfs -ls /user/visits/top10

Found 2 items
-rw-r--r--   1 ubuntu hadoop          0 2022-02-20 15:52 /user/visits/top10/_SUCCESS
-rw-r--r--   1 ubuntu hadoop        128 2022-02-20 15:52 /user/visits/top10/part-00000


In [2]:
! hdfs dfs -cp /user/visits/top10/part-* s3a://bucket4lsml/mhw2_1.txt

2022-02-21 17:19:42,838 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2022-02-21 17:19:42,929 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2022-02-21 17:19:42,929 INFO impl.MetricsSystemImpl: s3a-file-system metrics system started
2022-02-21 17:19:46,548 INFO impl.MetricsSystemImpl: Stopping s3a-file-system metrics system...
2022-02-21 17:19:46,548 INFO impl.MetricsSystemImpl: s3a-file-system metrics system stopped.
2022-02-21 17:19:46,548 INFO impl.MetricsSystemImpl: s3a-file-system metrics system shutdown complete.


**ANS:**

https://storage.yandexcloud.net/bucket4lsml/mhw2_1.txt

In [3]:
! hdfs dfs -cat /user/visits/top10/part-*

3492618	5015
1305201	4521
1978363	4429
1813207	3973
3719053	3722
164411	3302
576329	3302
2574666	3301
1649546	3218
3320567	3156


**2. [+5 баллов]** В этой секции будет работать с большим количеством таблиц, которые есть в датасете. Подробное описание данных в этих таблицах и их взаимосвязей есть на странице Kaggle - https://www.kaggle.com/c/avito-context-ad-clicks/data

Схема данных следующая 

<img src="https://storage.googleapis.com/kaggle-competitions/kaggle/4438/media/DB_schema.png">

Для данных нужно подсчитать некоторые статистики.


**Важно!** Результаты каждого из 5 пунктов сохраните в виде файла в облачное хранилище. Ссылки на все 5 файлов должны быть указаны в работе. 

Итого, в ноутбуке должны присутствовать 

1) Ячейки с кодом на Spark

2) Ссылки на все файлы в облаке

_Подгрузим все файлы:_

In [5]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession, Row

sc = pyspark.SparkContext(appName="hw2")
se = SparkSession(sc)

In [None]:
# sudo -u hdfs hdfs dfsadmin -safemode leave

In [19]:
! hdfs dfs -rm -r /user/avito
! hdfs dfs -mkdir -p /user/avito

! hdfs dfs -put /home/ubuntu/avito-dataset/SearchInfo.tsv /user/avito/
! hdfs dfs -put /home/ubuntu/avito-dataset/testSearchStream.tsv /user/avito/
! hdfs dfs -put /home/ubuntu/avito-dataset/trainSearchStream.tsv /user/avito/
! hdfs dfs -put /home/ubuntu/avito-dataset/AdsInfo.tsv /user/avito/
! hdfs dfs -put /home/ubuntu/avito-dataset/VisitsStream.tsv /user/avito/

Deleted /user/avito


In [169]:
! hdfs dfs -rm -r /user/avito/parquet
! hdfs dfs -mkdir -p /user/avito/parquet

rm: `/user/avito/parquet': No such file or directory


In [None]:
search_info_df = se.read.option("mode","DROPMALFORMED") \
                .option('sep', "\t") \
                .csv("/user/avito/SearchInfo.tsv", header=True, inferSchema=True)
search_info_df.write.parquet("/user/avito/parquet/SearchInfo.parquet")

stream_train_df = se.read.option("mode","DROPMALFORMED") \
                .option('sep', "\t") \
                .csv("/user/avito/trainSearchStream.tsv", header=True, inferSchema=True)
stream_train_df.write.parquet("/user/avito/parquet/trainSearchStream.parquet")


ads_info_df = se.read.option("mode","DROPMALFORMED") \
                .option('sep', "\t") \
                .csv("/user/avito/AdsInfo.tsv", header=True, inferSchema=True)

ads_info_df.write.parquet("/user/avito/parquet/AdsInfo.parquet")

visits_df = se.read.option("mode","DROPMALFORMED") \
                .option('sep', "\t") \
                .csv("/user/avito/VisitsStream.tsv", header=True, inferSchema=True)

visits_df.write.parquet("/user/avito/parquet/VisitsStream.parquet")

In [6]:
search_info = se.read.parquet("/user/avito/parquet/SearchInfo.parquet")
search_info.registerTempTable('search_info')

stream_df = se.read.parquet("/user/avito/parquet/trainSearchStream.parquet")
stream_df.registerTempTable('stream_train')

ads_df = se.read.parquet("/user/avito/parquet/AdsInfo.parquet")
ads_df.registerTempTable('ads')

visits_df = se.read.parquet("/user/avito/parquet/VisitsStream.parquet")
visits_df.registerTempTable('visits')

**1. [1 балл]** Найти топ 10 самых популярных фильтров. Фильтры кодируются ключами в словаре SearchParams (именно ключи (числа), а не значения). Задача внезапно творческая и будут приниматься любые разумные подходы (aka костыли), которые решат задачу. Удачи :)

In [175]:
params_not_null = se.sql("""
    SELECT
        SearchParams
    FROM search_info
    WHERE SearchParams IS NOT NULL
""")

In [176]:
params_not_null.show()

+--------------------+
|        SearchParams|
+--------------------+
|{83:'Обувь', 175:...|
|  {175:'Аксессуары'}|
|      {156:'Горные'}|
|{45:'Кровати, див...|
|{45:'Кровати, див...|
|{5:'Шины, диски и...|
|{110:'Верхняя оде...|
|{45:'Подставки и ...|
|{127:'Детские кол...|
|{45:'Кухонные гар...|
|{178:'Для мальчик...|
|{127:'Детская меб...|
|{124:'32', 110:'О...|
|   {223:'Объективы'}|
|{83:'Обувь', 175:...|
|{486:'Микроволнов...|
|{44:'Сантехника и...|
|  {44:'Инструменты'}|
|{83:'Верхняя одеж...|
|{487:'Швейные маш...|
+--------------------+
only showing top 20 rows



In [177]:
def extract_filters(text):
    text = text.SearchParams[1:-1]
    pattern = re.compile(r"[\d]+:\'(.*?)\'")
    result = []
    
    for match in pattern.finditer(text.lower()):
        key_val = match.group(0)
        result.append(key_val)
    return result

In [178]:
params_splitted = params_not_null.rdd.flatMap(extract_filters).cache()

In [179]:
params_splitted.take(5)

["83:'обувь'",
 "175:'женская одежда'",
 "88:'38'",
 "175:'аксессуары'",
 "156:'горные'"]

In [126]:
! hdfs dfs -rm -r /user/avito/results/first || true

rm: `/user/avito/results/first': No such file or directory


In [127]:
(
    params_splitted
    .map(lambda x: (x, 1))
    .reduceByKey(lambda a, b: a + b)
    .map(lambda x: (x[1], x[0]))
    .sortByKey(ascending=False)
    .zipWithIndex()
    .filter(lambda x: x[1] < 10)
    .map(lambda x: x[0][1].split(':')[1])
    .saveAsTextFile('/user/avito/results/first')
)

In [128]:
! hdfs dfs -cp /user/avito/results/first/part-00000 s3a://bucket4lsml/mhw2_2_1.txt

2022-02-21 20:01:04,761 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2022-02-21 20:01:04,851 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2022-02-21 20:01:04,852 INFO impl.MetricsSystemImpl: s3a-file-system metrics system started
2022-02-21 20:01:08,301 INFO impl.MetricsSystemImpl: Stopping s3a-file-system metrics system...
2022-02-21 20:01:08,302 INFO impl.MetricsSystemImpl: s3a-file-system metrics system stopped.
2022-02-21 20:01:08,302 INFO impl.MetricsSystemImpl: s3a-file-system metrics system shutdown complete.


**ANS:**
https://storage.yandexcloud.net/bucket4lsml/mhw2_2_1.txt

In [8]:
! hdfs dfs -cat /user/avito/results/first/*

'женская одежда'
'шины, диски и колёса'
'для девочек'
'обувь'
'для мальчиков'
'детские коляски'
'запчасти'
'кровати, диваны и кресла'
'мужская одежда'
'для автомобилей'


**2. [1 балл]** Найдите топ 10 самых встречаемых слов в запросах, в которых пользователь кликнул по рекламе. Слова должны быть приведены к нижнеу регистру. 

Для примера

```bash
"Купить стол" -> Кликнул
"Ноутбук" -> Кликнул
"Купить машину" -> Кликнул
"Красивый стол" -> Не кликнул
"Большой стол" -> Кликнул
"Купить маску" -> Кликнул
```

Топ слов (с указанием того, сколько оно встретилось)

```bash
купить - 3
стол - 2
ноутбук - 1
большой - 1
маску - 1
машину - 1
```

In [12]:
queries_clicked = se.sql("""
    SELECT 
        r.SearchQuery
    FROM
        stream_train l
        JOIN search_info r ON l.SearchID = r.SearchID
    WHERE
        l.IsClick = 1 AND r.SearchQuery IS NOT NULL
""").registerTempTable('clicked')

In [203]:
# queries_clicked.show()

+--------------------+
|         SearchQuery|
+--------------------+
|             монопод|
|сварочный аппарат бу|
|     iphone 5s новый|
|            ваз 2110|
|               туфли|
|             счетчик|
|  микроволновая печь|
|            мокасины|
|  продажа велосипеда|
|бампер ниссан при...|
|           huawei m1|
|         мультиварка|
|             кровать|
|             коляска|
|    видеорегистратор|
|    видеорегистратор|
|                шкаф|
|toyota land cruis...|
|        babyliss pro|
|     тумба для обуви|
+--------------------+
only showing top 20 rows



_из интереса в этой задаче попробовала двумя способами_

##### 1

In [13]:
from pyspark.sql import functions as F
import re
import string

def wordify(text):
    pattern = re.compile(r"[\w]+")
    result = []

    for match in pattern.finditer(text.lower()):
        word = match.group(0)
        result.append(word)
    return '\t'.join(result)

f_wordify = se.udf.register("wordify", wordify, "string")

In [14]:
top_words_clicked = se.sql("""
    SELECT
        word,
        COUNT(word) as frequency
    FROM (
        SELECT
            explode(split(wordify(SearchQuery), '\t')) as word
        FROM
            clicked
    )
    GROUP BY word
    ORDER BY frequency DESC
    LIMIT 10
""")
top_words_clicked.show()

+----------+---------+
|      word|frequency|
+----------+---------+
| велосипед|    11417|
|        бу|    10418|
|    iphone|     6891|
|       для|     5508|
|     диван|     5254|
|        на|     4590|
|    платье|     4021|
|велосипеды|     3443|
|   коляска|     3421|
|   самокат|     3208|
+----------+---------+



##### 2

In [206]:
def extract_words(text):
    pattern = re.compile(r"[\w]+")
    result = []

    for match in pattern.finditer(text.lower()):
        word = match.group(0)
        result.append(word)
    return result

words = queries_clicked.rdd.flatMap(lambda x: extract_words(x['SearchQuery'])).cache()

In [207]:
# words.take(5)

['монопод', 'сварочный', 'аппарат', 'бу', 'iphone']

In [211]:
! hdfs dfs -rm -r /user/avito/results/second || true

Deleted /user/avito/results/second


In [212]:
(
    words
    .map(lambda x: (x, 1))
    .reduceByKey(lambda a, b: a + b)
    .map(lambda x: (x[1], x[0]))
    .sortByKey(ascending=False)
    .zipWithIndex()
    .filter(lambda x: x[1] < 10)
    .map(lambda x: f'{x[0][1]} - {x[0][0]}')
    .saveAsTextFile('/user/avito/results/second')
)

In [131]:
! hdfs dfs -cp /user/avito/results/second/part-00000 s3a://bucket4lsml/mhw2_2_2.txt

2022-02-21 20:02:26,331 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2022-02-21 20:02:26,425 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2022-02-21 20:02:26,425 INFO impl.MetricsSystemImpl: s3a-file-system metrics system started
2022-02-21 20:02:29,835 INFO impl.MetricsSystemImpl: Stopping s3a-file-system metrics system...
2022-02-21 20:02:29,835 INFO impl.MetricsSystemImpl: s3a-file-system metrics system stopped.
2022-02-21 20:02:29,835 INFO impl.MetricsSystemImpl: s3a-file-system metrics system shutdown complete.


**ANS:**
https://storage.yandexcloud.net/bucket4lsml/mhw2_2_2.txt

In [15]:
! hdfs dfs -cat /user/avito/results/second/*

велосипед - 11417
бу - 10418
iphone - 6891
для - 5508
диван - 5254
на - 4590
платье - 4021
велосипеды - 3443
коляска - 3421
самокат - 3208


**3. [1 балл]** Для каждого слова из заголовка объявления подсчитать его среднюю стоимость. Средняя стоимость слова - это среднее стоимости всех объявлений, где оно встретилось. Так например если слово появилось в заголовке рекламы с ценой A и в заголовке рекламы с ценой B, то его средняя стоимость - (A+B)/2 . Слова должны быть приведены к нижнему регистру.

Учитывать необходимо только записи, где есть указаная стоимость и заголовок.

В качестве ответа - топ 10 самых дорогих слов с указанием их средней стоимости.


_Воспользуемся udf-м из предыдущей задачи:_

In [19]:
titles = se.sql("""
    WITH titles_by_words AS (
        SELECT
            explode(split(wordify(Title), '\t')) as word,
            Price,
            AdID
        FROM
            ads
        WHERE
            Title IS NOT NULL AND Price IS NOT NULL
    )
    
    SELECT
        word,
        mean(Price) as mean_price
    FROM titles_by_words
    WHERE word IS NOT NULL
    GROUP BY
        word, AdID
    ORDER BY mean_price DESC
    LIMIT 10
""")

In [None]:
titles.registerTempTable('titles_with_clicks')

In [20]:
titles.toPandas()

Unnamed: 0,word,mean_price
0,куплю,1000000000000.0
1,аптеку,1000000000000.0
2,промназначения,1000000000000.0
3,5,1000000000000.0
4,с,1000000000000.0
5,маникюра,1000000000000.0
6,сниму,1000000000000.0
7,производственное,1000000000000.0
8,м²,1000000000000.0
9,трон,1000000000000.0


In [124]:
# title_words = se.sql("""
#     SELECT
#         explode(split(wordify(Title), '\t')) as word,
#         Price,
#         AdID
#     FROM
#         ads
#     WHERE
#         Title IS NOT NULL AND Price IS NOT NULL
# """).rdd.cache()

# %%time
# (
#     title_words
#     .map(lambda x: (x[0], (x[1], 1)))
#     .reduceByKey(lambda a, b: a + b)
#     .map(lambda x: (x[1][0] / x[1][1], x[0]))
#     .sortByKey(ascending=False)
#     .map(lambda x: f'{x[1]} - {x[0]}')
#     .saveAsTextFile('/user/avito/results/third_2.txt') 
# )


KeyboardInterrupt: 

In [24]:
titles_rdd = titles.rdd.map(lambda x: '{}-{}'.format(x[0], x[1])).cache()

In [28]:
titles_rdd.collect()

['1960-999999999999.0',
 'помещение-999999999999.0',
 'м²-999999999999.0',
 '5-999999999999.0',
 'помещение-999999999999.0',
 'м²-999999999999.0',
 'складское-999999999999.0',
 'квартиру-999999999999.0',
 'на-999999999999.0',
 'участок-999999999999.0']

In [34]:
titles_rdd.saveAsTextFile("/user/avito/results/third.txt")

In [38]:
! hdfs dfs -cp /user/avito/results/third.txt/part-00000 s3a://bucket4lsml/mhw2_2_3.txt

2022-02-21 17:57:32,152 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2022-02-21 17:57:32,249 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2022-02-21 17:57:32,249 INFO impl.MetricsSystemImpl: s3a-file-system metrics system started
2022-02-21 17:57:36,158 INFO impl.MetricsSystemImpl: Stopping s3a-file-system metrics system...
2022-02-21 17:57:36,159 INFO impl.MetricsSystemImpl: s3a-file-system metrics system stopped.
2022-02-21 17:57:36,159 INFO impl.MetricsSystemImpl: s3a-file-system metrics system shutdown complete.


**ANS:**
https://storage.yandexcloud.net/bucket4lsml/mhw2_2_3.txt

In [35]:
! hdfs dfs -cat /user/avito/results/third.txt/*

1960-999999999999.0
помещение-999999999999.0
м²-999999999999.0
5-999999999999.0
помещение-999999999999.0
м²-999999999999.0
складское-999999999999.0
квартиру-999999999999.0
на-999999999999.0
участок-999999999999.0


In [None]:
# отфильтровывать стоп-слова?

**4 [1 балл]** Найдите всех пользователей, которые заходили каждый день на протяжении всего времени измерений. В качестве ответа запишите одно число - количество этих пользвателей.



_Уберем время из даты и оставим только дни: YYYY-MM-DD_

_И уберем дупликаты, чтобы в дальнейшем считать уникальные значения_

In [39]:
def take_day(text):
    return text.split(' ')[0]

daytify = se.udf.register("daytify", take_day, "string")

In [43]:
visits_days = se.sql("""
    SELECT DISTINCT
        UserID,
        daytify(ViewDate) as Day
    FROM visits
""")
visits_days.registerTempTable("days")

In [97]:
visits_rdd = visits_days.rdd.cache()

_Найдем количество уникальных дней_

In [44]:
unique_days_num = visits_days.select('Day').distinct().rdd.map(lambda x: x.Day).count()
unique_days_num

26

_Теперь сгруппируем по юзерам, посчитаем число дней и сравним с unique_days_num_


In [110]:
%%time
active_users_list = (
    visits_rdd
    .map(lambda x: (x[0], 1))
    .reduceByKey(lambda a, b: a + b)
    .filter(lambda x: x[1] == unique_days_num)
    .map(lambda x: x[0])
    .collect()
)

CPU times: user 44.5 ms, sys: 35.1 ms, total: 79.7 ms
Wall time: 47.2 s


In [111]:
len(active_users_list)

807

In [45]:
# active_users = se.sql("""
#     SELECT
#         UserID
#     FROM (
#         SELECT
#             UserID,
#             count(Day) as num_days
#         FROM days
#         GROUP BY
#             UserID
#     )
#     WHERE num_days = {}
# """.format(unique_days_num))

In [51]:
res24 = sc.parallelize([len(active_users_list)])

In [52]:
res24.saveAsTextFile("/user/avito/results/fourth.txt")

In [71]:
! hdfs dfs -cp /user/avito/results/fourth.txt/part-00003 s3a://bucket4lsml/mhw2_2_4.txt

2022-02-21 18:26:42,730 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2022-02-21 18:26:42,832 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2022-02-21 18:26:42,832 INFO impl.MetricsSystemImpl: s3a-file-system metrics system started
2022-02-21 18:26:46,429 INFO impl.MetricsSystemImpl: Stopping s3a-file-system metrics system...
2022-02-21 18:26:46,429 INFO impl.MetricsSystemImpl: s3a-file-system metrics system stopped.
2022-02-21 18:26:46,430 INFO impl.MetricsSystemImpl: s3a-file-system metrics system shutdown complete.


**ANS:**

https://storage.yandexcloud.net/bucket4lsml/mhw2_2_4.txt

In [69]:
! hdfs dfs -cat /user/avito/results/fourth.txt/part-*

807


**5 [1 балл]** Для каждого дня найдите количество уникальных пользователей, которые заходили на сайт в этот день. Выкиньте из рассмотрения всех пользователей, которые вы нашли в пункте 4 (то есть тех, которые заходили каждый день какого-либо месяца). 

В ответе укажите пары день-число уникальных пользователей в порядке убывания количества.

In [72]:
active_users_set = set(active_users_list)

def remove_active(user_id):
    return user_id not in active_users_set

nonactive = se.udf.register("nonactive", remove_active, "boolean")

In [73]:
from pyspark.sql.functions import countDistinct as countDistinct

_Воспользуемся табличкой из предыдущего пункта и уберем слишком активных юзеров_

In [75]:
basic_users = visits_days.select('Day', 'UserID').filter(nonactive(F.col('UserID'))).cache()

In [123]:
basic_users.show()

+----------+-------+
|       Day| UserID|
+----------+-------+
|2015-05-19|3339459|
|2015-05-19|3472056|
|2015-05-19|3498500|
|2015-05-19|3581440|
|2015-05-19|3756375|
|2015-05-19|3891751|
|2015-05-19|3971739|
|2015-05-19|4006974|
|2015-05-19|4102599|
|2015-05-19|4106979|
|2015-05-19|4203357|
|2015-05-19|4271481|
|2015-05-19|  29832|
|2015-05-19| 106382|
|2015-05-19| 232031|
|2015-05-19| 263551|
|2015-05-19| 296159|
|2015-05-19| 323952|
|2015-05-19| 384101|
|2015-05-19| 399210|
+----------+-------+
only showing top 20 rows



In [93]:
%%time
collect_users_sql = (
    basic_users.rdd
    .map(lambda x: (x[0], 1))
    .reduceByKey(lambda a, b: a + b)
    .map(lambda x: (x[1], x[0]))
    .sortByKey(ascending=False)
    .map(lambda x: f'{x[1]} - {x[0]}')
    .saveAsTextFile('/user/avito/results/fifth_2.txt')
)

CPU times: user 106 ms, sys: 53.4 ms, total: 159 ms
Wall time: 31 s


In [77]:
# users_by_day = basic_users.select('Day', 'UserID').groupBy('Day').agg(countDistinct(basic_users.UserID)).cache()
# count_users = users_by_day.sort("count(UserID)", ascending=False).cache()
# count_users.toPandas()
# count_users.rdd.saveAsTextFile("/user/avito/results/fifth.txt")

In [135]:
! hdfs dfs -cat /user/avito/results/fifth_2.txt/* > tmp.txt 

In [141]:
! hdfs dfs -put tmp.txt s3a://bucket4lsml/mhw2_2_5.txt

2022-02-21 20:07:44,027 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2022-02-21 20:07:44,113 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2022-02-21 20:07:44,113 INFO impl.MetricsSystemImpl: s3a-file-system metrics system started
2022-02-21 20:07:46,834 INFO impl.MetricsSystemImpl: Stopping s3a-file-system metrics system...
2022-02-21 20:07:46,834 INFO impl.MetricsSystemImpl: s3a-file-system metrics system stopped.
2022-02-21 20:07:46,835 INFO impl.MetricsSystemImpl: s3a-file-system metrics system shutdown complete.


**ANS:**

https://storage.yandexcloud.net/bucket4lsml/mhw2_2_5.txt

In [118]:
! hdfs dfs -cat /user/avito/results/fifth_2.txt/*

2015-05-12 - 973248
2015-05-13 - 847446
2015-05-14 - 745904
2015-05-06 - 733027
2015-05-05 - 731624
2015-05-07 - 729708
2015-05-11 - 706240
2015-04-27 - 676433
2015-05-08 - 664982
2015-04-28 - 655791
2015-04-29 - 641148
2015-05-15 - 633132
2015-05-04 - 607205
2015-04-30 - 596224
2015-05-10 - 562151
2015-04-26 - 540051
2015-05-03 - 511687
2015-04-25 - 498069
2015-05-16 - 493577
2015-05-18 - 488250
2015-05-17 - 480968
2015-05-02 - 468010
2015-05-01 - 452800
2015-05-09 - 450135
2015-05-19 - 367777
2015-05-20 - 184972


**Важно!** Следите за объемом потребляемой памяти! За решения, которые работают не оптимально по памяти, можно терять баллы. Понятное дело, что на текущих объемах скорее всего сработает примерно любое решение, но это не повод плохо писать алгоритм.

Если в вашем алгоритме есть спорный момент в отношении использования памяти, но вы сделали это намерено - напишите явно в комментарии, что это осознаное решение, которое вы приняли по такой-то причине. Например вы могли запустить отдельную MR\Spark задачу, которая бы показала, что во всем датасете определенного типа данных не более чем `M`, а значит мы не упремся в ограничения по памяти и вполне уместно использовать для его обработки именно такой подход.

Если результат работы вашего алгоритма "размазался" по нескольким партам, то нужно дополнительно склеить их в один файл.