In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
    .master("local[2]")\
    .appName("Lesson_4")\
    .config("spark.executor.instances",2)\
    .config("spark.executor.memory",'2g')\
    .config("spark.executor.cores",1)\
    .getOrCreate()
sc = spark.sparkContext

# Самостоятельная работа к уроку 4
На уроке мы попробовали оконные и пользовательские функции. Теперь закрепим полученные знания.

## Данные: [google drive: raw_sales.csv](https://drive.google.com/file/d/1G2N7Mnt4-Tqz4JdJxutGDMbJiOr32kZp/view?usp=sharing)

 Каждая строчка это продажа жилья, которая состоит из следующих полей:
*   ```date of sale``` - дата продажи
*   ```price``` - цена
*   ```property type``` - тип недвижимости
*   ```number of bedrooms``` - количество спален
*   ```4digit postcode``` - 4-хзначный почтовый индекс

In [3]:
from pyspark.sql import Window
from pyspark.sql import functions as F 
from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType

import pandas as pd
import requests

In [4]:
# скачаем файл

train_response = requests.get('https://drive.google.com/uc?id=1uF1CStEeuFUMiXgQ2mhn42Cs9Ax8IUm3')
with open('raw_sales.csv', 'wb') as file:
    file.write(train_response.content)
data = spark.read.csv('raw_sales.csv', header=True, inferSchema=True)
data.show(5)
data.printSchema()

+-------------------+--------+------+------------+--------+
|           datesold|postcode| price|propertyType|bedrooms|
+-------------------+--------+------+------------+--------+
|2007-02-07 00:00:00|    2607|525000|       house|       4|
|2007-02-27 00:00:00|    2906|290000|       house|       3|
|2007-03-07 00:00:00|    2905|328000|       house|       3|
|2007-03-09 00:00:00|    2905|380000|       house|       4|
|2007-03-21 00:00:00|    2906|310000|       house|       3|
+-------------------+--------+------+------------+--------+
only showing top 5 rows

root
 |-- datesold: timestamp (nullable = true)
 |-- postcode: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- propertyType: string (nullable = true)
 |-- bedrooms: integer (nullable = true)



## Задание 1
Добавьте к таблице следующие поля:
*  Средняя стомость 10 проданных домов до текущего в том же районе (4digit postcode)
*  Средняя стомость 10 проданных домов после текущего в том же районе (4digit postcode)
*  Стоимость последнего проданного дома до текущего


In [5]:
# группируем данные по postcode и сортируем в порядке возрастания даты продажи
window = Window.partitionBy('postcode').orderBy('datesold')

# avg_price_before_sale - получим среднюю стоимость домов до текущей продажи
# диапазон задан с помощью window.rowsBetween(Window.currentRow-10, Window.currentRow-1)
# среднее найдено с помощью avg()
df = (
    data.withColumn('avg_price_before_sale', F.avg(F.col('price'))
    .over(window.rowsBetween(Window.currentRow-10, Window.currentRow-1)))
)
# avg_price_after_sale - аналогично - средняя стоимость домов после текущей продажи
df = (
    df.withColumn('avg_price_after_sale', F.avg(F.col('price'))
    .over(window.rowsBetween(Window.currentRow+1, Window.currentRow+10)))
)
# last_price_before_sale - получим стоимость последнего проданного дома до текущего
# предыдущее значение в рамках окна получено с помощью lag(колонка, на_сколько_назад)
# рамки окна определены с помощью over(window)
df = df.withColumn('last_price_before_sale', F.lag(F.col('price'), 1).over(window))

# фильтруем данные, чтобы оставить только проданные дома
df = df.filter(F.col('propertyType') == 'house')

# сделаем новые столбцы типом int (это пригодится позже для поиска уникальных значений в строке)
df = df.selectExpr('datesold', 'postcode', 'price', 'propertyType', 'bedrooms',
                   'cast(avg_price_before_sale as int) as avg_price_before_sale', 
                   'cast(avg_price_after_sale as int) as avg_price_after_sale',
                   'last_price_before_sale')

# и вот он результат
#df.select('*').orderBy('postcode', 'datesold').show(10)

# в итоговом выводе  с помощью filter() уберём строки с null
(
    df.filter(F.col('avg_price_before_sale').isNotNull())
    .select('*').orderBy('postcode', 'datesold').show(10)
)

+-------------------+--------+-------+------------+--------+---------------------+--------------------+----------------------+
|           datesold|postcode|  price|propertyType|bedrooms|avg_price_before_sale|avg_price_after_sale|last_price_before_sale|
+-------------------+--------+-------+------------+--------+---------------------+--------------------+----------------------+
|2007-08-16 00:00:00|    2600| 790000|       house|       4|               327000|              698350|                327000|
|2007-12-05 00:00:00|    2600| 825000|       house|       3|               558500|              679350|                790000|
|2008-04-24 00:00:00|    2600| 292500|       house|       1|               564250|              786600|                315000|
|2008-06-19 00:00:00|    2600| 765000|       house|       5|               479750|              868450|                329000|
|2008-07-29 00:00:00|    2600| 927000|       house|       4|               520500|              805750|        

## Задание 2
В итоге у вас таблица с колонками (или нечто похожее):
*   price
*   Среднегодовая цена
*  Средняя стомость 10 проданных домов до текущего в том же районе (4digit postcode) (1 балл)
*  Средняя стомость 10 проданных домов после текущего в том же районе (4digit postcode) (1 балл)
*  Стоимость последнего проданного дома до текущего ((1 балл)
*  и др.

Посчитайте кол-во уникальных значений в каждой строчке (unique(row)) (ипользуйте udf). Попробуйте сделать то же самое используя pandas udf.

In [6]:
# udf - в функцию с помощью array(*df) приходит список
@F.udf(returnType=IntegerType())
def count_unique_udf(row):
    return len(set(row))
(
    df.filter(F.col('avg_price_before_sale').isNotNull())
    .withColumn('unique_values', count_unique_udf(F.array(*df))).show(10)
)

+-------------------+--------+-------+------------+--------+---------------------+--------------------+----------------------+-------------+
|           datesold|postcode|  price|propertyType|bedrooms|avg_price_before_sale|avg_price_after_sale|last_price_before_sale|unique_values|
+-------------------+--------+-------+------------+--------+---------------------+--------------------+----------------------+-------------+
|2007-08-16 00:00:00|    2600| 790000|       house|       4|               327000|              698350|                327000|            7|
|2007-12-05 00:00:00|    2600| 825000|       house|       3|               558500|              679350|                790000|            8|
|2008-04-24 00:00:00|    2600| 292500|       house|       1|               564250|              786600|                315000|            8|
|2008-06-19 00:00:00|    2600| 765000|       house|       5|               479750|              868450|                329000|            8|
|2008-07-29 0

In [8]:
# pandas_udf - тип SCALAR возвращает одно значение для каждой строки
@pandas_udf(IntegerType(), PandasUDFType.SCALAR)
def count_unique_pandas_udf(series) -> int:
    return series.apply(lambda row: len(set(row)))
(
    df.filter(F.col('avg_price_before_sale').isNotNull())
    .withColumn('unique_values', count_unique_pandas_udf(F.array(*df))).show(10)
)

+-------------------+--------+-------+------------+--------+---------------------+--------------------+----------------------+-------------+
|           datesold|postcode|  price|propertyType|bedrooms|avg_price_before_sale|avg_price_after_sale|last_price_before_sale|unique_values|
+-------------------+--------+-------+------------+--------+---------------------+--------------------+----------------------+-------------+
|2007-08-16 00:00:00|    2600| 790000|       house|       4|               327000|              698350|                327000|            7|
|2007-12-05 00:00:00|    2600| 825000|       house|       3|               558500|              679350|                790000|            8|
|2008-04-24 00:00:00|    2600| 292500|       house|       1|               564250|              786600|                315000|            8|
|2008-06-19 00:00:00|    2600| 765000|       house|       5|               479750|              868450|                329000|            8|
|2008-07-29 0

# Задание 3
SQL like case when или if elif else

Создайте колонку, в которой в которой будет отображаться "+", "-" или "=", если "Средняя стомость 10 проданных домов до текущего в том же районе" больше, меньше или равно "Средняя стомость 10 проданных домов после текущего в том же районе (4digit postcode)", соотвественно.

Если одно из полей Null, запишите в эту колонку "Нет данных"

In [9]:
# сделаем задание через функцию when(если TRUE, тогда)
df = df.withColumn(
    'sign',
    F.when(
        F.col('avg_price_before_sale').isNull() | 
        F.col('avg_price_after_sale').isNull(),
        'Нет данных'
    ).when(
        F.col('avg_price_before_sale') > F.col('avg_price_after_sale'),
        '-'
    ).when(
        F.col('avg_price_before_sale') < F.col('avg_price_after_sale'),
        '+'
    ).otherwise(
        '='
    )
)
# здесь уже не фильтруем null, чтоб посмотреть на "Нет данных" :)
df.show()

+-------------------+--------+-------+------------+--------+---------------------+--------------------+----------------------+----------+
|           datesold|postcode|  price|propertyType|bedrooms|avg_price_before_sale|avg_price_after_sale|last_price_before_sale|      sign|
+-------------------+--------+-------+------------+--------+---------------------+--------------------+----------------------+----------+
|2007-07-08 00:00:00|    2600| 327000|       house|       1|                 null|              708350|                  null|Нет данных|
|2007-08-16 00:00:00|    2600| 790000|       house|       4|               327000|              698350|                327000|         +|
|2007-12-05 00:00:00|    2600| 825000|       house|       3|               558500|              679350|                790000|         +|
|2008-04-24 00:00:00|    2600| 292500|       house|       1|               564250|              786600|                315000|         +|
|2008-06-19 00:00:00|    2600| 765