<a href="https://colab.research.google.com/github/protmaks/PJ_Colab_Learning_pySpark/blob/main/learning_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#1. Установка pySpark

In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m21.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=3323ec447878fdf58ba7c9161c23a0ed13ae467a6327ca255a8125d7c5d73f83
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

#2. Создаём сессию Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, round, col, count, asc, desc
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()

df_path = 'drive/MyDrive/stocks_price_final.csv'
df_path2 = 'drive/MyDrive/bank.csv'
df_path3 = 'drive/MyDrive/data.csv'

#3. Читаем CSV файл

In [None]:
data_schema = [
               StructField('_c0', IntegerType(), nullable=True),
               StructField('symbol', StringType(), True),
               StructField('date', DateType(), True),
               StructField('open', DoubleType(), True),
               StructField('high', DoubleType(), True),
               StructField('low', DoubleType(), True),
               StructField('close', DoubleType(), True),
               StructField('volume', IntegerType(), True),
               StructField('adjusted', DoubleType(), True),
               StructField('market.cap', StringType(), True),
               StructField('sector', StringType(), True),
               StructField('industry', StringType(), True),
               StructField('exchange', StringType(), True),
            ]

final_struc = StructType(fields = data_schema)

data = spark.read.csv(
    df_path,
    sep=',',
    header=True,
    schema=final_struc
)

#4. Различные методы инспекции данных

data.schema - тот метод возвращает схему данных (фрейма данных). Ниже показан пример с ценами на акции.

data.dtypes - возвращает список кортежей с именами столбцов и типами данных.

data.show() - по умолчанию отображает первые 20 строк, а также принимает число в качестве параметра для выбора их количества.

data.head(n) - возвращает n строк в виде списка.

data.first() - возвращает первую строку данных.

data.take(n) - возвращает первые n строк.

data.describe() - вычисляет некоторые статистические значения для столбцов с числовым типом данных.

data.columns - возвращает список, содержащий названия столбцов.

data.count() - возвращает общее число строк в датасете.

data.distinct() — количество различных строк в используемом наборе данных.

data.printSchema() - отображает схему данных.

In [None]:
data.printSchema()

#5. Манипуляции со столбцами

5.1. Добавление столбца: используйте withColumn, чтобы добавить новый столбец к существующим. Метод принимает два параметра: имя столбца и данные.

`data = data.withColumn('new_date_column', data.date)`

5.2. Обновление столбца: используйте withColumnRenamed, чтобы переименовать существующий столбец. Метод принимает два параметра: название существующего столбца и его новое имя.

`data = data.withColumnRenamed('date', 'date_changed')`

5.3. Удаление столбца: используйте метод drop, который принимает имя столбца и возвращает данные.

`data = data.drop('date_changed')`

**Задача**

1. Прочитать уже знакомый нам датасет stocks_price_final;

2. Создать в конце новый столбец new_volume с данными из столбца volume;

3. Избавиться от следующих столбцов: symbol, close, volume , adjusted, market.cap, exchange;

4. Поменять местами названия столбцов high и low;

5. Столбец date переместить в конец;

6. Вывести первые 3 строки, скопировать вывод и вставить в ответ (в ответ включить "only showing top 3 rows");

In [None]:
data = data.withColumn('new_volume', data.volume)
data = data.drop('symbol', 'close', 'volume', 'adjusted', 'market.cap', 'exchange')

In [None]:
data = data.withColumnRenamed('high', 'low_new')
data = data.withColumnRenamed('low', 'high_new')
data = data.withColumnRenamed('low_new', 'low')
data = data.withColumnRenamed('high_new', 'high')

In [None]:
data = data.withColumn('new_date', data.date)
data = data.drop('date')
data = data.withColumnRenamed('new_date', 'date')
data.show(3)

#6. Работа с недостающими значениями

Мы часто сталкиваемся с отсутствующими значениями при работе с данными реального времени. Эти пропущенные значения обозначаются как NaN, пробелы или другие заполнители. Существуют различные методы работы с пропущенными значениями, некоторые из самых популярных:

1. Удаление: удалить строки с пропущенными значениями в любом из столбцов.

2. Замена средним/медианным значением: замените отсутствующие значения, используя среднее или медиану соответствующего столбца. Это просто, быстро и хорошо работает с небольшими наборами числовых данных.

3. Замена на наиболее частые значения: как следует из названия, используйте наиболее часто встречающееся значение в столбце, чтобы заменить отсутствующие. Это хорошо работает с категориальными признаками, но также может вносить смещение (bias) в данные.

4. Замена с использованием KNN: метод K-ближайших соседей — это алгоритм классификации, который рассчитывает сходство признаков новых точек данных с уже существующими, используя различные метрики расстояния, такие как Евклидова, Махаланобиса, Манхэттена, Минковского, Хэмминга и другие. Такой подход более точен по сравнению с вышеупомянутыми методами, но он требует больших вычислительных ресурсов и довольно чувствителен к выбросам.

Давайте посмотрим, как мы можем использовать PySpark для решения проблемы отсутствующих значений:


`import pyspark.sql.functions as f`

6.1. Удаление строк с пропущенными значениями

`data.na.drop()`

6.2. Замена отсутствующих значений средним

`data.na.fill(data.select(f.mean(data['open'])).collect()[0][0])`

6.3. Замена отсутствующих значений новыми

`data.na.replace(old_value, new_value)`

**Задача**

1. Прочитать уже знакомый нам датасет stocks_price_final;

2. Вычислить разницу между исходным количеством строк в датасете и количеством строк после удаления пропущенных значений.

In [None]:
data_schema = [
               StructField('_c0', IntegerType(), nullable=True),
               StructField('symbol', StringType(), True),
               StructField('date', DateType(), True),
               StructField('open', DoubleType(), True),
               StructField('high', DoubleType(), True),
               StructField('low', DoubleType(), True),
               StructField('close', DoubleType(), True),
               StructField('volume', IntegerType(), True),
               StructField('adjusted', DoubleType(), True),
               StructField('market.cap', StringType(), True),
               StructField('sector', StringType(), True),
               StructField('industry', StringType(), True),
               StructField('exchange', StringType(), True),
            ]

final_struc = StructType(fields = data_schema)

data = spark.read.csv(df_path, sep=',', header=True, schema=final_struc).withColumnRenamed('market.cap','market_cap')

In [None]:
data_pr = data.count()
data_dr = data.na.drop().count()
data_pr - data_dr

3827

#7. Получение данных

7.1. **Select** - используется для выбора одного или нескольких столбцов, используя их имена.

7.1.1. Выбор одного столбца

`data.select('sector').show(5)`

7.1.2. Выбор нескольких столбцов

`data.select(['open', 'close', 'adjusted']).show(5)`


7.2. **Filter** - Данный метод фильтрует данные на основе заданного условия. Вы также можете указать несколько условий, используя операторы AND (&), OR (|) и NOT (~). Вот пример получения данных о ценах на акции за январь 2020 года.

`data.filter( (col('data') >= lit('2020-01-01')) & (col('data') <= lit('2020-01-31')) ).show(5)`


7.3. **Between** - Этот метод возвращает True, если проверяемое значение принадлежит указанному отрезку, иначе — False. Давайте посмотрим на пример отбора данных, в которых значения adjusted находятся в диапазоне от 100 до 500.

`data.filter(data.adjusted.between(100.0, 500.0)).show()`


7.4. **When** - Он возвращает 0 или 1 в зависимости от заданного условия. В приведенном ниже примере показано, как выбрать такие цены на момент открытия и закрытия торгов, при которых скорректированная цена была больше или равна 200.
```
data.select('open', 'close', when(data.adjusted >= 200.0, 1).otherwise(0)).show(5)
```


7.5. **Like** - Этот метод похож на оператор Like в SQL. Приведенный ниже код демонстрирует использование rlike() для извлечения имен секторов, которые начинаются с букв M или C.

```
data.select('sector', data.sector.rlike('^[B,C]').alias('Колонка sector начинается с B или C')).distinct().show()
```


7.6. **GroupBy** - Само название подсказывает, что данная функция группирует данные по выбранному столбцу и выполняет различные операции, такие как вычисление суммы, среднего, минимального, максимального значения и т. д. В приведенном ниже примере объясняется, как получить среднюю цену открытия, закрытия и скорректированную цену акций по отраслям.

```
(data.select(['industry', 'open', 'close', 'adjusted'])
    .groupBy('industry')
    .mean()
    .show()
)
```

7.7. **Агрегирование** - PySpark предоставляет встроенные стандартные функции агрегации, определенные в API DataFrame, они могут пригодится, когда нам нужно выполнить агрегирование значений ваших столбцов. Другими словами, такие функции работают с группами строк и вычисляют единственное возвращаемое значение для каждой группы.

В приведенном ниже примере показано, как отобразить минимальные, максимальные и средние значения цен открытия, закрытия и скорректированных цен акций в промежутке с января 2019 года по январь 2020 года для каждого сектора.

```
(data.filter((col("data") >= lit("2019-01-02")) & (col("data") <= lit("2020-01-31")))
    .groupBy("sector")
    .agg(min("data").alias("С"), 
         max("data").alias("По"), 
         
         min("open").alias("Минимум при открытии"),
         max("open").alias("Максимум при открытии"), 
         avg("open").alias("Среднее в open"), 

         min("close").alias("Минимум при закрытии"), 
         max("close").alias("Максимум при закрытии"), 
         avg("close").alias("Среднее в close"), 

         min("adjusted").alias("Скорректированный минимум"), 
         max("adjusted").alias("Скорректированный максимум"), 
         avg("adjusted").alias("Среднее в adjusted"), 

      ).show(truncate=False)
)
```



**Задача**

Сгруппировать значения по возрасту и найти количество элементов в группе.

In [None]:
data_bank = spark.read.csv(df_path2, sep=';', header=True)

In [None]:
(data_bank
  .groupBy('age')
  .agg(count('age').alias('count'))
  .show(5)
)

**Задача**

Найти, сотрудники с каким возрастом работают в банке чаще всего.

In [None]:
(data_bank
  .groupBy('age')
  .agg(count('age').alias('count'))
  .sort(desc('count'))
  .show(5)
)

+---+-----+
|age|count|
+---+-----+
| 34|  231|
| 32|  224|
| 31|  199|
| 36|  188|
| 33|  186|
+---+-----+
only showing top 5 rows



**Задача**

Необходимо найти возраст и количество самых молодых сотрудников банка

In [None]:
(data_bank
  .groupBy('age')
  .agg(count('age').alias('count'))
  .sort(asc('age'))
  .show(5)
)

**Задача**

Необходимо вывести возраст и количество сотрудников, которым более 30 лет. Произвести сортировку полученной таблицы по столбцу age по возрастанию.

In [None]:
from pyspark.sql.types import IntegerType
data_bank = data_bank.withColumn("age", data_bank["age"].cast(IntegerType()))

(data_bank.filter(col('age') > 30)
  .groupBy('age')
  .agg(count('age').alias('count'))
  .sort(asc('age'))
  .show(5)
)

+---+-----+
|age|count|
+---+-----+
| 31|  199|
| 32|  224|
| 33|  186|
| 34|  231|
| 35|  180|
+---+-----+
only showing top 5 rows



#8. Визуализация данных

Для визуализации данных мы воспользуемся библиотеками matplotlib и pandas. Метод toPandas() позволяет нам осуществить преобразование данных в dataframe pandas, который мы используем при вызове метода визуализации plot(). В приведенном ниже коде показано, как отобразить гистограмму, отображающую средние значения цен открытия, закрытия и скорректированных цен акций для каждого сектора.

```
from matplotlib import pyplot as plt

sec_df =  data.select(['sector', 
                       'open', 
                       'close', 
                       'adjusted']
                     )\
                     .groupBy('sector')\
                     .mean()\
                     .toPandas()

ind = list(range(12))
ind.pop(6)

sec_df.iloc[ind,:].plot(kind='bar', x='sector', y=sec_df.columns.tolist()[1:], 
                         figsize=(12, 6), ylabel='Stock Price', xlabel='Sector')
plt.show()
```
Теперь давайте визуализируем те же средние показатели, но уже по отраслям.


```
industries_x = data.select(['industry', 'open', 'close', 'adjusted']).groupBy('industry').mean().toPandas()

q  = industries_x[(industries_x.industry != 'Major Chemicals') & (industries_x.industry != 'Building Products')]
q.plot(kind='barh', x='industry', y=q.columns.tolist()[1:], figsize=(10, 50), xlabel='Stock Price', ylabel='Industry')

plt.show()
```

Также построим временные ряды для средних цен открытия, закрытия и скорректированных цен акций технологического сектора.

**Задача**

Отобразить изменение средних значений аудио характеристик от года к году.

Такими характеристиками являются acousticness, danceability, energy, speechiness, liveness и valence . Произвести сортировку полученной таблицы по столбцу year по возрастанию. Средние значения округлить до 2ух знаков после запятой.

In [None]:
data_e = (
    spark.read.option("delimiter", ",")
    .option("header", True)
    .option("escape", '"')
    .csv(df_path3)
)

In [None]:
(data_e
  .groupBy('year')
  .agg(
      round(avg('acousticness'),2).alias('acousticness'),
      round(avg('danceability'),2).alias('danceability'),
      round(avg('energy'),2).alias('energy'),
      round(avg('liveness'),2).alias('liveness'),
      round(avg('speechiness'),2).alias('speechiness'),
      round(avg('valence'),2).alias('valence')
  )
  .sort(asc('year'))
  .show(5)
)

+----+------------+------------+------+--------+-----------+-------+
|year|acousticness|danceability|energy|liveness|speechiness|valence|
+----+------------+------------+------+--------+-----------+-------+
|1921|         0.9|        0.43|  0.24|    0.22|       0.08|   0.43|
|1922|        0.94|        0.48|  0.24|    0.24|       0.12|   0.53|
|1923|        0.98|        0.57|  0.25|    0.24|        0.1|   0.62|
|1924|        0.94|        0.55|  0.35|    0.24|       0.09|   0.67|
|1925|        0.97|        0.57|  0.26|    0.24|       0.12|   0.62|
+----+------------+------------+------+--------+-----------+-------+
only showing top 5 rows



**Задача**

Найти количество произведений, выпущенных с 1951 года, в авторах которых присутствует "Sergei Rachmaninoff".

In [None]:
schema = StructType(
    [
        StructField("id", StringType(), nullable=True),
        StructField("name", StringType(), True),
        StructField("artists", StringType(), True),
        StructField("duration_ms", DoubleType(), True),
        StructField("release_date", DateType(), True),
        StructField("year", IntegerType(), True),
        StructField("acousticness", StringType(), True),
        StructField("danceability", StringType(), True),
        StructField("energy", StringType(), True),
        StructField("instrumentalness", StringType(), True),
        StructField("liveness", StringType(), True),
        StructField("loudness", StringType(), True),
        StructField("speechiness", StringType(), True),
        StructField("tempo", StringType(), True),
        StructField("valence", StringType(), True),
        StructField("mode", StringType(), True),
        StructField("key", StringType(), True),
        StructField("popularity", DoubleType(), True),
        StructField("explicit", StringType(), True),
    ]
)

data_e2 = (
    spark.read.option("delimiter", ",")
    .option("header", True)
    .option("escape", '"')
    .schema(schema)
    .csv(df_path3)
)

In [None]:
count_sr = data_e2.filter( (data_e2.artists.contains('Sergei Rachmaninoff')) & (data_e2.year > 1951))
count_sr.count()

46

**Задача**

Найти наиболее популярных артистов (средняя популярность всех произведений, в которых упомянут артист).

Выборку производить из тех артистов, у которых общее количество упоминаний в произведениях не менее 200.

In [None]:
schema = StructType(
    [
        StructField("id", StringType(), nullable=True),
        StructField("name", StringType(), True),
        StructField("artists", StringType(), True),
        StructField("duration_ms", DoubleType(), True),
        StructField("release_date", DateType(), True),
        StructField("year", IntegerType(), True),
        StructField("acousticness", StringType(), True),
        StructField("danceability", StringType(), True),
        StructField("energy", StringType(), True),
        StructField("instrumentalness", StringType(), True),
        StructField("liveness", StringType(), True),
        StructField("loudness", StringType(), True),
        StructField("speechiness", StringType(), True),
        StructField("tempo", StringType(), True),
        StructField("valence", StringType(), True),
        StructField("mode", StringType(), True),
        StructField("key", StringType(), True),
        StructField("popularity", DoubleType(), True),
        StructField("explicit", StringType(), True),
    ]
)

data_fn = (
    spark.read.option("delimiter", ",")
    .option("header", True)
    .option("escape", '"')
    .schema(schema)
    .csv(df_path3)
)

In [None]:
# мой вариант решения
from pyspark.sql.functions import split, explode
import pyspark.sql.functions as f
from pyspark.sql.functions import trim


# отбор столбцов 'artists', 'popularity'
data_f = data_fn.select(['artists', 'popularity'])


# замена лишних символов в колонке 'artists'
art_rep = f.regexp_replace(f.col("artists"), "[\[\]\'\"]", "")
data_f = data_f.withColumn('artists_one', art_rep)
#data_f.show(5)


# разбите записей по одному исполнителю
data_f = data_f.select(
    explode(split(col("artists_one"), ","))
    .alias("artists"),
    'popularity'
    ).sort(desc('popularity'))
#data_f.show(5)


# убираем лишние пробелы по краям
data_f = data_f.withColumn("artist", trim(data_f.artists))
#data_f.show(5)


data_f = (
    data_f.groupBy('artist')
    .agg(
        avg('popularity').alias('popularity'),
        count('popularity').alias('count')
    )
    .sort(desc('popularity'))
    .filter(col('count') > 200)
)

data_f.show(5)
data_f.select('artist').show(5)


+------------+------------------+-----+
|      artist|        popularity|count|
+------------+------------------+-----+
|       Drake| 61.60567823343849|  317|
|  Kanye West| 58.10762331838565|  223|
|Taylor Swift|57.367149758454104|  207|
|      Eminem|56.382838283828384|  303|
|   Lil Wayne|  54.1501976284585|  253|
+------------+------------------+-----+
only showing top 5 rows

+------------+
|      artist|
+------------+
|       Drake|
|  Kanye West|
|Taylor Swift|
|      Eminem|
|   Lil Wayne|
+------------+
only showing top 5 rows



In [None]:
# второй вариант решения от stepik

data_f = data_f.withColumn(
    "artists", f.split(f.regexp_replace(f.col("artists"), "[\]\[']", ""), ", ")
)

data_f = (
    data_f.withColumn("artists_exp", f.explode("artists"))
    .withColumn(
        "artists_exp",
        f.explode(f.split(f.regexp_replace("artists_exp", "[\\[\\]]", ""), ",")),
    )
    .groupBy("artists_exp")
    .agg(f.avg("popularity"), f.count("*").alias("count"))
)

data_f = (
    data_f.filter(f.col("count") >= 200)
    .orderBy(f.col("avg(popularity)").desc())
    .withColumnRenamed("artists_exp", "artist")
)

data_f.select("artist").show(5)

+------------+
|      artist|
+------------+
|       Drake|
|  Kanye West|
|Taylor Swift|
|      Eminem|
|   Lil Wayne|
+------------+
only showing top 5 rows

