# Spark и RDD

Инициализируйте объект SparkContext. Укажите параметр appName равный 'appName'. Создайте переменную weather_entry (англ. «запись о погоде»), в которой сохраните RDD с такими элементами:
* дата заказа — 2009-01-01;
* самая низкая температура воздуха в этот день (°C) — 15.1;
* самая высокая температура воздуха в этот день (°C) — 26.1.

Выведите на экран содержимое RDD. Для этого вызовите метод take() (англ. «взять»). Посмотрите в документации, как он работает.

```python 
from pyspark import SparkContext

sc = SparkContext(appName='appName')
weather_entry = sc.parallelize(['2009-01-01', 15.1, 26.1])
print(weather_entry.take(3))
```

```
[Stage 0:>        (0 + 1) / 1]
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        
['2009-01-01', 15.1, 26.1]
```

## Создание датафреймов

1.Загрузите датафрейм из файла /datasets/pickups_terminal_5.csv. Посмотрите в документации, как работает функция show(). Напечайте на экране пять строк из датафрейма.

```python
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

APP_NAME = "DataFrames"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \
        .config('spark.ui.showConsoleProgress', 'false') \
        .getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv', format='csv', header='true', inferSchema='true')
taxi.show(5)
```

```
+-------------------+----+------+-------+
|               date|hour|minute|pickups|
+-------------------+----+------+-------+
|2009-01-01 00:00:00|   0|     0|   24.0|
|2009-01-01 00:00:00|   0|    30|   35.0|
|2009-01-01 00:00:00|   1|     0|   25.0|
|2009-01-01 00:00:00|   1|    30|   25.0|
|2009-01-01 00:00:00|   2|     0|   16.0|
+-------------------+----+------+-------+
only showing top 5 rows
```

2. Методом show() размер датасета не получить. Найдите в документации функцию, которая посчитает количество строк. Напечайте результат на экране.

```python
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

APP_NAME = "DataFrames"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \
        .config('spark.ui.showConsoleProgress', 'false') \
        .getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv', 
                       format='csv', header='true', inferSchema='true')

print(taxi.count())
```

```
128974
```

3. Выберите из датафрейма только столбцы с датами, часами и минутами в указанном порядке. Выбор подмножества столбцов выполняется так же, как в Pandas. 

Напечатайте на экране пять строк получившейся таблицы.

```python
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

APP_NAME = "DataFrames"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \
        .config('spark.ui.showConsoleProgress', 'false') \
        .getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv', 
                       format='csv', header='true', inferSchema='true')

print(taxi[['date', 'hour', 'minute']].show(5))
```

```
+-------------------+----+------+
|               date|hour|minute|
+-------------------+----+------+
|2009-01-01 00:00:00|   0|     0|
|2009-01-01 00:00:00|   0|    30|
|2009-01-01 00:00:00|   1|     0|
|2009-01-01 00:00:00|   1|    30|
|2009-01-01 00:00:00|   2|     0|
+-------------------+----+------+
only showing top 5 rows

None
```

## Обработка пропущенных значений

1. Удалите из датафрейма пропущенные значения. Затем напечатайте на экране количество строк в датафрейме.

```python
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

APP_NAME = "DataFrames"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \
        .config('spark.ui.showConsoleProgress', 'false') \
        .getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv', 
                       format='csv', header='true', inferSchema='true')

taxi = taxi.dropna()
print(taxi.count())
```

```
128969
```

2. Заполните пропущенные значения в датафрейме нулями. Функцией describe() выведите на экран результаты, чтобы убедиться в корректности заполнения значений.

```python
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

APP_NAME = "DataFrames"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \
        .config('spark.ui.showConsoleProgress', 'false') \
        .getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv', 
                       format='csv', header='true', inferSchema='true')
taxi = taxi.fillna(0)
print(taxi.describe().show())
```

```
+-------+------------------+------------------+------------------+
|summary|              hour|            minute|           pickups|
+-------+------------------+------------------+------------------+
|  count|            128974|            128974|            128974|
|   mean|11.566509529052366|15.004419495402175| 29.00832725975778|
| stddev| 6.908556452594711|15.000057500526209|22.449669931429067|
|    min|                 0|                 0|               0.0|
|    max|                23|                30|             310.0|
+-------+------------------+------------------+------------------+

None
```

## SQL-запросы в датафреймах

1. Изучите статистические выбросы. В переменной result сохраните результат запроса, который выберет даты с числом заказов такси у терминала №5, расположив их от большего к меньшему. Выведите на экран первые пять строк, используя функцию show.

```python
from pyspark.sql import SparkSession

APP_NAME = "DataFrames"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \
        .config('spark.ui.showConsoleProgress', 'false') \
        .getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv', 
                       format='csv', header='true', inferSchema='true')

taxi = taxi.fillna(0)

taxi.registerTempTable("taxi")

result = spark.sql("SELECT * FROM taxi ORDER BY pickups DESC")
print(result.show(5))
```

```
+-------------------+----+------+-------+
|               date|hour|minute|pickups|
+-------------------+----+------+-------+
|2015-11-01 00:00:00|   1|    30|  310.0|
|2010-09-23 00:00:00|  22|    30|  288.0|
|2012-03-07 00:00:00|  21|     0|  268.0|
|2011-03-02 00:00:00|  20|    30|  264.0|
|2011-03-02 00:00:00|  18|    30|  263.0|
+-------------------+----+------+-------+
only showing top 5 rows

None
```

2. Найдите все даты, на которые пришлось более 200 заказов такси за любой период в 30 минут в этот день. Напечатайте на экране количество таких дней, сохранив результат в переменную result.

```python
from pyspark.sql import SparkSession

APP_NAME = "DataFrames"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \
        .config('spark.ui.showConsoleProgress', 'false') \
        .getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv', 
                       format='csv', header='true', inferSchema='true')

taxi = taxi.fillna(0)

taxi.registerTempTable("taxi")

result = spark.sql("SELECT COUNT(DISTINCT date) FROM taxi WHERE pickups > 200")
print(result.show()) 
```

```
+--------------------+
|count(DISTINCT date)|
+--------------------+
|                  21|
+--------------------+

None
```

## GroupBy в PySpark

1. Сгруппируйте записи по месяцам. По каждому месяцу рассчитайте среднее количество заказов. 

Напечатайте на экране таблицу с месяцами и средним количеством заказов по убыванию.

```python
from pyspark.sql import SparkSession

APP_NAME = "DataFrames"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \
        .config('spark.ui.showConsoleProgress', 'false') \
        .getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv', 
                       format='csv', header='true', inferSchema='true')

taxi = taxi.fillna(0)

taxi.registerTempTable("taxi")

result = spark.sql('SELECT EXTRACT(MONTH FROM date), AVG(pickups) '
                   'FROM taxi '
                   'GROUP BY EXTRACT(MONTH FROM date) '
                   'ORDER BY AVG(pickups) DESC')
print(result.show()) 
```

```
+-------------------------+------------------+
|month(CAST(date AS DATE))|      avg(pickups)|
+-------------------------+------------------+
|                        3| 34.61413319776309|
|                       10|31.492839171666343|
|                        2|29.856671982987773|
|                        5| 29.81593638978176|
|                        4|29.313725490196077|
|                        9|29.158446485623003|
|                       11|28.860367558929283|
|                        1|  28.5473244004438|
|                        6| 27.03835736129314|
|                        7| 26.45983005021244|
|                       12| 26.45916884626562|
|                        8| 25.88592750533049|
+-------------------------+------------------+

None
```

2. Вычислите среднее количество заказов за каждый час. Затем отсортируйте данные по убыванию. 

Выведите самые загруженные 10 часов и среднее количество заказов такси в эти часы.

```python
from pyspark.sql import SparkSession

APP_NAME = "DataFrames"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME) \
        .config('spark.ui.showConsoleProgress', 'false') \
        .getOrCreate()

taxi = spark.read.load('/datasets/pickups_terminal_5.csv', 
                       format='csv', header='true', inferSchema='true')

taxi = taxi.fillna(0)

taxi.registerTempTable("taxi")

result = spark.sql('SELECT hour, AVG(pickups) '
                   'FROM taxi '
                   'GROUP BY hour '
                   'ORDER BY AVG(pickups) DESC')
print(result.show(10)) 
```

```
+----+------------------+
|hour|      avg(pickups)|
+----+------------------+
|   8| 48.98208348725527|
|   9| 45.74220335855324|
|  18|45.131967515688444|
|  19| 40.18456995201181|
|  17| 37.68493909191584|
|  12| 36.91678966789668|
|  10|36.391031555637575|
|  14|35.965867158671585|
|   7| 35.93711855002774|
|  13| 35.34939091915836|
+----+------------------+
only showing top 10 rows

None
```