## 1. Сложность по времени и сложность по памяти. 

Существует великая дилемма! Память или время? 

Многие алгоритмы предлагают выбор между объёмом памяти и скоростью. Задачу можно решить быстро, использую большой объём памяти, или медленнее, занимая меньший объём.

Не так давно мы с вами написали код, который позволяет нам скачивать ЦИАН. Мы амбициозные ребята и хотели бы скачать по максимуму наблюдений. Однако, это может занять довольно много времени. Выясним сколько!

In [1]:
# Подгружаем уже знакомые нам пакеты.
import requests
from bs4 import BeautifulSoup
import pandas as pd

# Подгружаем пакет, который умеет считать время...
import time

# А также подгружаем модуль, необходимый нам для многопоточности
# Не забываем сначала установить его! 
from multiprocessing import Pool

Посмотрим сколько времени понадобится для работы той части кода, которая вытаскивает хрефы. Сделаем это для 10 страниц.

In [15]:
start_time = time.time() # время работы кода 

### Первая часть кода. Вытаскиваем список хрефов.

page_part_1 = "https://www.cian.ru/cat.php?deal_type=sale&engine_version=2&offer_type=flat&p="
page_part_2 = "&region=1&room1=1&room2=1&room3=1&room4=1&room5=1&room6=1&room7=1&room9=1"

all_hrefs = [ ]  # Тут мы будем хранить наши хрефы!

for i in range(1,11): # Тут регулируем количество квартир (на одной странице их 28)
    # Делаем ссылку!
    page = page_part_1 + str(i) + page_part_2
    
    # Загружаем страницу
    response = requests.get(page)
    html = response.content
    soup = BeautifulSoup(html,'html.parser')
    
    # Забираем себе хрефы и очищаем их
    hrefs = soup.findAll('div', attrs = {'class':"serp-item__content__bottom__left"})
    clean_hrefs = [item.a.attrs['href'] for item in hrefs]
    all_hrefs.extend(clean_hrefs)
    print('Качаю хреф номер ',i)

print("--- %s seconds ---" % (time.time() - start_time))  #время работы кода

Качаю хреф номер  1
Качаю хреф номер  2
Качаю хреф номер  3
Качаю хреф номер  4
Качаю хреф номер  5
Качаю хреф номер  6
Качаю хреф номер  7
Качаю хреф номер  8
Качаю хреф номер  9
Качаю хреф номер  10
--- 14.110546588897705 seconds ---


На $10$ страниц ($280$ наблюдений, по $28$ квартир на каждой странице) уходит от $12$ до $15$ секунд. Чтобы скачать $150 000$ наблюдений (около $5358$ страниц) нам понадобится $15 \cdot 535.8 = 8036$ секунд. То есть около двух с половиной часов. Это не критично и можно подождать. 

Посмотрим как обстоит дело с основной частью кода, которая занимается прогрузкой информации о квартирах по каждому хрефу.

In [16]:
start_time = time.time() # время работы кода 

### Вторая часть кода. Функция, которая вытаскивает наблюдения по одной квартирке.        
def One_Flat_Downloader(href):
    """
    Функция производит выкачку по одной ЦИАНовской ссылке 
    всей существующей информации о квартире.
    Ввод: ссылка на описание квартиры
    Вывод: словарь с информацией о квартире    
    """
    
    data = { }  # Задали пустой словарь, в который мы будем сохранять данные
    
    # Подгружаем страничку с информацией по квартире
    response = requests.get(href)
    html = response.content
    soup = BeautifulSoup(html,'html.parser')
    
    # Вытаскиваем цену на квартиру
    price = soup.findAll('div', attrs = {'class':"object_descr_price"})[0].text.strip()
    data['Цена'] = price
    
    # Вытаскиваем метро
    try:
        station = soup.findAll('p', attrs = {'class':"objects_item_metro_prg"})[0].a.text
        data['Метро'] = station
    except Exception:
        data['Метро'] = "NA"
    
    # Вытаскиваем расстояние до метро то ли пешком то ли на машине...
    try:
        Do_metro = soup.findAll('p', attrs = {'class':"objects_item_metro_prg"})[0]
        Do_metro = Do_metro.find('span',{'class':'object_item_metro_comment'}).text.strip()
        data['До метро'] = Do_metro
    except Exception:
        data['До метро'] = "NA"
    
    # Вытаскиваем координаты квартиры
    coordinates = soup.findAll('div',{'class':"object-descr__map-tabs__content js-object_descr__panorama"})[0]
    data['Координаты'] = coordinates.panorama.attrs['point']
    
        
    # Вытаскиваем количество комнат в квартире
    komnata = soup.findAll('div', attrs = {'class':"object_descr_title"})[0].text.strip()
    data['Комнаты'] = komnata
    
    # Вытаскиваем всё остальное
    table = soup.findAll('table',attrs = {'class':"object_descr_props flat sale"})[0]
    rows = table.find_all('tr')[1:]  # Вытащили все строки
    for row in rows:
        cols = [row.findAll('th')[0].text.strip(),row.findAll('td')[0].text.strip()] # очищаем каждую строку! 
        data[cols[0]] = cols[1] # запоминаем
    return(data) 


### Третья часть кода. Собираем данные по всем хрефам и объединяем их в огромную таблицу.

# В эту табличку будем собирать данные
df = pd.DataFrame( )
k = 0 # Это номера наблюдений
n = len(all_hrefs)
for item in all_hrefs:
    k = k + 1
    # грузим новое наблюдение
    df1 = pd.DataFrame.from_dict(One_Flat_Downloader(item),orient='index')
    # присваиваем этому наблюдению номер
    df1.columns =[k]
    # закидываем его в итоговую таблицу
    df = df.join(df1, how='outer')
    # Выдавать информацию о том, что сделана каждая десятая итерация! 
    if k%10 == 0:
        print('скачал ', k, ' квартир', n)

        
df = df.T  # Для удобства транспонируем таблицу        

# Добавим к итоговой таблице колонку из хрефов. Когда мы будем смотреть на данные,
# будет возникать довольно большое количество аномалий. Хотелось бы получше изучить
# их природу...

df['Хрефы'] = all_hrefs

print("--- %s seconds ---" % (time.time() - start_time))  #время работы кода

скачал  10  квартир 280
скачал  20  квартир 280
скачал  30  квартир 280
скачал  40  квартир 280
скачал  50  квартир 280
скачал  60  квартир 280
скачал  70  квартир 280
скачал  80  квартир 280
скачал  90  квартир 280
скачал  100  квартир 280
скачал  110  квартир 280
скачал  120  квартир 280
скачал  130  квартир 280
скачал  140  квартир 280
скачал  150  квартир 280
скачал  160  квартир 280
скачал  170  квартир 280
скачал  180  квартир 280
скачал  190  квартир 280
скачал  200  квартир 280
скачал  210  квартир 280
скачал  220  квартир 280
скачал  230  квартир 280
скачал  240  квартир 280
скачал  250  квартир 280
скачал  260  квартир 280
скачал  270  квартир 280
скачал  280  квартир 280
--- 194.09195709228516 seconds ---


На выкачку $280$ наблюдений уходит около $200$ секунд. Значит для выкачки $150000$ наблюдений нам понадобится примерно $107142$ секунд ($30$ часов). Это довольна большая цифра. Такое количество времени ждать нам не очень хочется. Хотелось бы получить свои заслуженные наблюдения побыстрее. 

Вспомним, что у нашего компьютера есть несколько ядер и все вычисления, которые идут на нём можно распаралелить. Если бы у нас получилось качать данные в два потока, то код проработал бы $15$ часов. Если бы потоков было бы $10$, то код проработал бы $3$ часа. Если потоков было бы $30$, то код проработал бы всего один час. 

Обратите внимание, что мы при расчётах руководствуемся предпосылкой, от том что время работы нашего кода растёт линейно по объёму необходимой нам информации. Это довольно логично. Все циклы, прописанные нами выше, кроме самого главного никак не зависят от объема поступающей информации. Сложность нашего алгоритма по времени $O(n)$! 

Круто! Дело осталось за малым. Выбрать сколько потоков потянет наш компьютер и распаралелить код. 

## 2. Что такое Map - Reduce и кто такой Hadoop. 

![Картинка со слоником]( ) 


Эксперименты Google

https://cloud.google.com/blog/big-data/2016/02/history-of-massive-scale-sorting-experiments-at-google

In [9]:
import numpy as np

In [15]:
from multiprocessing import Pool, cpu_count

Выясним сколько ядер на нашем компьютере.

In [16]:
cpu_count()

4

В общем случае, распаралеливание кода делается по числу ядер. И обычно большее количество ядер означает большую скорость (если этот код вообще стоило паралелить).

Мы будем использовать немного иную штуку, а именно многопоточную обработку. Разобьём имеющуюся у нас информацию на несколько частей и будем обрабатывать её в несколько потоков. Есть смысл поэкспериментировать с количеством потоков. Если их будет очень много, то код будет тратить лишнее время на переключение между ними. 

Итак, создаём 30 воркеров и пробуем прогнать наш алгоритм.  

In [None]:
pool = ThreadPool(13)
results = pool.map(np.mean, parts)

In [None]:
tart_time = time.time() # время работы кода 


print("--- %s seconds ---" % (time.time() - start_time))  #время работы кода

## 3. Применяем концепцию Map - Reduce к нашему случаю. 

Итак, наш код состоит из двух частей. Первая - тащит хрефы и работает около 2-х часов. Её особо трогать не хочется. Распаралеливание может принести нам много головной боли. Нужно выбрать с какой страницы стартует какая часть кода и на какой странице какая часть кода свою работу заканчивает. Забьём на этот кусок и просто подождём пару часов. Два часа это не двое суток! 

In [20]:
start_time = time.time() # время работы кода 

### Первая часть кода. Вытаскиваем список хрефов.

page_part_1 = "https://www.cian.ru/cat.php?deal_type=sale&engine_version=2&offer_type=flat&p="
page_part_2 = "&region=1&room1=1&room2=1&room3=1&room4=1&room5=1&room6=1&room7=1&room9=1"

all_hrefs = [ ]  # Тут мы будем хранить наши хрефы!

for i in range(1,5361): # Тут регулируем количество квартир (на одной странице их 28)
    # Делаем ссылку!
    page = page_part_1 + str(i) + page_part_2
    
    # Загружаем страницу
    response = requests.get(page)
    html = response.content
    soup = BeautifulSoup(html,'html.parser')
    
    # Забираем себе хрефы и очищаем их
    hrefs = soup.findAll('div', attrs = {'class':"serp-item__content__bottom__left"})
    clean_hrefs = [item.a.attrs['href'] for item in hrefs]
    all_hrefs.extend(clean_hrefs)
    print('Качаю хреф номер ',i)

print("--- %s seconds ---" % (time.time() - start_time))  #время работы кода

Качаю хреф номер  1
Качаю хреф номер  2
Качаю хреф номер  3
Качаю хреф номер  4
Качаю хреф номер  5
Качаю хреф номер  6
Качаю хреф номер  7
Качаю хреф номер  8
Качаю хреф номер  9
Качаю хреф номер  10
Качаю хреф номер  11
Качаю хреф номер  12
Качаю хреф номер  13
Качаю хреф номер  14
Качаю хреф номер  15
Качаю хреф номер  16
Качаю хреф номер  17
Качаю хреф номер  18
Качаю хреф номер  19
Качаю хреф номер  20
Качаю хреф номер  21
Качаю хреф номер  22
Качаю хреф номер  23
Качаю хреф номер  24
Качаю хреф номер  25
Качаю хреф номер  26
Качаю хреф номер  27
Качаю хреф номер  28
Качаю хреф номер  29
Качаю хреф номер  30
Качаю хреф номер  31
Качаю хреф номер  32
Качаю хреф номер  33
Качаю хреф номер  34
Качаю хреф номер  35
Качаю хреф номер  36
Качаю хреф номер  37
Качаю хреф номер  38
Качаю хреф номер  39
Качаю хреф номер  40
Качаю хреф номер  41
Качаю хреф номер  42
Качаю хреф номер  43
Качаю хреф номер  44
Качаю хреф номер  45
Качаю хреф номер  46
Качаю хреф номер  47
Качаю хреф номер  48
К

На всякий случай сохраним вектор из ссылок. Чтобы не потерять структуру данных (вектор), запишем их в формате `pickle`.

In [22]:
len(all_hrefs)

150080

In [24]:
import pickle

with open('all_refs.pickle', 'wb') as handle:
    pickle.dump(all_hrefs, handle)

In [3]:
import pickle

with open('all_refs.pickle', 'rb') as f:
     all_hrefs = pickle.load(f)

Как мы изначально и планировали, код проработал около 2,5 часов. Круто! Займёмся распаралеливанием второго куска кода. Функция `One_Flat_downloader` была написана нами довольно удачно. Она выкачивает только одно наблюдение и никак не повязана на циклы. Вообще не будем трогать её. 

In [4]:
### Вторая часть кода. Функция, которая вытаскивает наблюдения по одной квартирке.    
    
def One_Flat_Downloader(href):
    """
    Функция производит выкачку по одной ЦИАНовской ссылке 
    всей существующей информации о квартире.
    Ввод: ссылка на описание квартиры
    Вывод: словарь с информацией о квартире    
    """
    
    data = { }  # Задали пустой словарь, в который мы будем сохранять данные
    
    # Подгружаем страничку с информацией по квартире
    response = requests.get(href)
    html = response.content
    soup = BeautifulSoup(html,'html.parser')
    
    # Вытаскиваем цену на квартиру
    price = soup.findAll('div', attrs = {'class':"object_descr_price"})[0].text.strip()
    data['Цена'] = price
    
    # Вытаскиваем метро
    try:
        station = soup.findAll('p', attrs = {'class':"objects_item_metro_prg"})[0].a.text
        data['Метро'] = station
    except Exception:
        data['Метро'] = "NA"
    
    # Вытаскиваем расстояние до метро то ли пешком то ли на машине...
    try:
        Do_metro = soup.findAll('p', attrs = {'class':"objects_item_metro_prg"})[0]
        Do_metro = Do_metro.find('span',{'class':'object_item_metro_comment'}).text.strip()
        data['До метро'] = Do_metro
    except Exception:
        data['До метро'] = "NA"
    
    # Вытаскиваем количество комнат в квартире
    komnata = soup.findAll('div', attrs = {'class':"object_descr_title"})[0].text.strip()
    data['Комнаты'] = komnata
    
    # Вытаскиваем координаты квартиры
    coordinates = soup.findAll('div',{'class':"object-descr__map-tabs__content js-object_descr__panorama"})[0]
    data['Координаты'] = coordinates.panorama.attrs['point']
    
    # Вытаскиваем всё остальное
    table = soup.findAll('table',attrs = {'class':"object_descr_props flat sale"})[0]
    rows = table.find_all('tr')[1:]  # Вытащили все строки
    for row in rows:
        cols = [row.findAll('th')[0].text.strip(),row.findAll('td')[0].text.strip()] # очищаем каждую строку! 
        data[cols[0]] = cols[1] # запоминаем
    return(data) 

Третья часть нашего кода - главный цикл, который и выкачивает информацию. Как раз в ней и происходит скачёк времени при росте объёма входа. Превратим этот цикл в функции `Map` и `Reduce`. 

Функция `Map` будет обращаться к вектору из ссылок `part_hrefs` и вытаскивать по ним всю информацию. 

In [5]:
def Map(part_hrefs):
    # В эту табличку будем собирать данные
    df = pd.DataFrame( )
    k = 0 # Это номера наблюдений
    n = len(part_hrefs)
    for item in part_hrefs:
        k = k + 1
        # грузим новое наблюдение/
        df1 = pd.DataFrame.from_dict(One_Flat_Downloader(item),orient='index')
        # присваиваем этому наблюдению номер
        df1.columns =[k]
        # закидываем его в итоговую таблицу
        df = df.join(df1, how='outer')
        # Выдавать информацию о том, что сделана каждая десятая итерация! 
        if k%100 == 0:
            print('сделано ', k, ' итераций из', n)
    return(df)

### Создаём потоки

Раздробим весь список из ссылок на какое-то количество частей. Например, на 30. 

In [12]:
current_hrefs = all_hrefs[:60000]
len(current_hrefs)

60000

In [13]:
# Делим весь список из имён на 30 частей
parts = [round(len(current_hrefs)/30)*i for i in range(30)]
parts.append(len(current_hrefs))
names = [current_hrefs[parts[i]:parts[i+1]] for i in range(30)]

Проверим, работает ли функция `Map`.

In [7]:
Map(names[0][:10])

Unnamed: 0,1,2,3,4,5,6,7,8,9,10
Балкон:,нет,нет,2 лодж.,–,–,–,–,–,–,–
Вид из окна:,улица,улица,двор и улица,–,–,двор,–,двор,–,улица
Высота потолков:,,"2,70 м",,,,"2,70 м",,"2,64 м",,
Год постройки:,,1965,,2014,,2015,,1972,1982,
До метро,7\n мин.\n \n ...,11\n мин.\n \n ...,15\n мин.\n \n ...,15\n мин.\n \n ...,25\n мин.\n \n ...,,30\n мин.\n \n ...,16\n мин.\n \n ...,8\n мин.\n \n ...,35\n мин.\n \n ...
Жилая площадь:,"30,8 м2",35 м2,60 м2,–,26 м2,10 м2,18 м2,7 м2,–,"18,1 м2"
Комнаты,2-комн. кв.,2-комн. кв.,3-комн. кв.,студия,студия,студия \n \n\n \n ...,1-комн. кв. \n \n\n \n ...,студия,1-комн. кв.,1-комн. кв. \n \n\n \n ...
Координаты,"[37.544467,55.860792]","[37.638216,55.786151]","[37.479888,55.785422]","[37.552543,55.491804]","[37.322593,55.648382]","[37.554439,55.495098]","[36.917628,55.497660]","[37.617986,55.606690]","[37.580310,55.563661]","[36.917628,55.497660]"
Лифт:,2 пасс. + 1 груз.,1 пасс. + 1 груз.,2 пасс. + 2 груз.,–,–,1 пасс. + 1 груз.,–,–,–,–
Метро,"Петровско-Разумовская,","Проспект Мира,","Октябрьское поле,","Бунинская аллея,","Киевская,","Бунинская аллея,","Юго-Западная,","Пражская,","Бульвар Дмитрия Донского,","Саларьево,"


Собираем по каждому из 30 векторов со ссылками информацию. Все потоки работаю независимо друг от друга и записывают информацию в свои таблички. После все таблички закидываются в вектор `l`. 

In [14]:
start_time = time.time() # время работы кода 
    
if __name__ == '__main__':
    with Pool(30) as p:
        l = p.map(Map, names)

print("--- %s seconds ---" % (time.time() - start_time))  #время работы кода

сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  100  итераций из 2000
сделано  200  итераций из 2000
сделано  200  итераций из 2000
сделано 

Первые 60 000 наблюдений скачались за 8544 секунд (2 с лишним часа). Возможно, мы  переборщили с количеством потоков. Ожидалось, что алгоритм проработает около часа. При прямой работе алгоритм бы работал около 12 часов. Можно провести парочку экспериментов, но в любом случае получилось быстрее, чем 12 часов! Ну и господь с этой шайтан-машиной...

Остался последний шаг - `Reduce` - шаг. Функция `Reduce` будет брать таблички из списка `l` и объединять их все в один большой `dataframe`.

In [44]:
def Reduce(l):
    df = pd.DataFrame( )  # новый датафрейм, куда мы всё закинем! 
    k = 0
    for item in l:        # Этот цикл уже вам знаком...
        n = item.shape[1] + k
        item.columns = range(k,n)
        df = df.join(item, how = 'outer')
        k = n 
    return(df)   

In [45]:
start_time = time.time() # время работы кода 

itog_data = Reduce(l) 
itog_data = itog_data.T             # Для удобства транспонируем таблицу        
itog_data['Хрефы'] = current_hrefs  # Добавим к итоговой таблице колонку из хрефов.

print("--- %s seconds ---" % (time.time() - start_time))  #время работы кода

--- 109.7799117565155 seconds ---


Смотрим на итоговые характеристики нашей таблицы и сохраняем результат на свой компьютер.

In [46]:
itog_data.shape

(60000, 27)

In [47]:
pd.DataFrame.to_csv(itog_data,'CIAN_data1.csv')