# Практика PySpark и SparkSQL (Dataframe API)

In [1]:
# cоздаём SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practice 2").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/08 02:59:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark

## Чтение текстовых данных

Данные [отсюда](https://www.kaggle.com/lepchenkov/usedcarscatalog) (Kaggle)

In [2]:
!head -n5 data/cars.csv

manufacturer_name,model_name,transmission,color,odometer_value,year_produced,engine_fuel,engine_has_gas,engine_type,engine_capacity,body_type,has_warranty,state,drivetrain,price_usd,is_exchangeable,location_region,number_of_photos,up_counter,feature_0,feature_1,feature_2,feature_3,feature_4,feature_5,feature_6,feature_7,feature_8,feature_9,duration_listed
Subaru,Outback,automatic,silver,190000,2010,gasoline,False,gasoline,2.5,universal,False,owned,all,10900.0,False,Минская обл.,9,13,False,True,True,True,False,True,False,True,True,True,16
Subaru,Outback,automatic,blue,290000,2002,gasoline,False,gasoline,3.0,universal,False,owned,all,5000.0,True,Минская обл.,12,54,False,True,False,False,True,True,False,False,False,True,83
Subaru,Forester,automatic,red,402000,2001,gasoline,False,gasoline,2.5,suv,False,owned,all,2800.0,True,Минская обл.,4,72,False,True,False,False,False,False,False,False,True,True,151
Subaru,Impreza,mechanical,blue,10000,1999,gasoline,False,gasoline,3.0,sedan,False,own

In [3]:
# читаем данные в DataFrame c автоматическим определение схемы:

df = spark.read.format("csv").option("header", "true").load("data/cars.csv")

## Смотрим на данные

In [4]:
# вывести в консоль 5 строк из датафрейма:

df.show(5)

+-----------------+----------+------------+------+--------------+-------------+-----------+--------------+-----------+---------------+---------+------------+-----+----------+---------+---------------+---------------+----------------+----------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------------+
|manufacturer_name|model_name|transmission| color|odometer_value|year_produced|engine_fuel|engine_has_gas|engine_type|engine_capacity|body_type|has_warranty|state|drivetrain|price_usd|is_exchangeable|location_region|number_of_photos|up_counter|feature_0|feature_1|feature_2|feature_3|feature_4|feature_5|feature_6|feature_7|feature_8|feature_9|duration_listed|
+-----------------+----------+------------+------+--------------+-------------+-----------+--------------+-----------+---------------+---------+------------+-----+----------+---------+---------------+---------------+----------------+----------+---------+---------+---------+----

Дополнительный параметр `vertical=True` выведет каждую строку данных построчно в виде `колонка | значение`

In [5]:
df.show(1, vertical=True)

-RECORD 0-------------------------
 manufacturer_name | Subaru       
 model_name        | Outback      
 transmission      | automatic    
 color             | silver       
 odometer_value    | 190000       
 year_produced     | 2010         
 engine_fuel       | gasoline     
 engine_has_gas    | False        
 engine_type       | gasoline     
 engine_capacity   | 2.5          
 body_type         | universal    
 has_warranty      | False        
 state             | owned        
 drivetrain        | all          
 price_usd         | 10900.0      
 is_exchangeable   | False        
 location_region   | Минская обл. 
 number_of_photos  | 9            
 up_counter        | 13           
 feature_0         | False        
 feature_1         | True         
 feature_2         | True         
 feature_3         | True         
 feature_4         | False        
 feature_5         | True         
 feature_6         | False        
 feature_7         | True         
 feature_8         |

# Смотрим на DataFrame API

## .select()

.select() выберет только нужные колонки (по аналогии с выражением `SELECT` в SQL)

In [6]:
# выбираем несколько колонок для отображения

df.select("manufacturer_name", "model_name", "transmission").show(5)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|           Subaru|   Outback|   automatic|
|           Subaru|   Outback|   automatic|
|           Subaru|  Forester|   automatic|
|           Subaru|   Impreza|  mechanical|
|           Subaru|    Legacy|   automatic|
+-----------------+----------+------------+
only showing top 5 rows



Можно обратиться к колонке через указание датафрейма:

In [7]:
df.select(df["manufacturer_name"], df["model_name"], df["transmission"]).show(5)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|           Subaru|   Outback|   automatic|
|           Subaru|   Outback|   automatic|
|           Subaru|  Forester|   automatic|
|           Subaru|   Impreza|  mechanical|
|           Subaru|    Legacy|   automatic|
+-----------------+----------+------------+
only showing top 5 rows



Или используя функцию колонки `.col()` из пакета `functions`

In [8]:
import pyspark.sql.functions as F

df.select(F.col("manufacturer_name"), F.col("model_name"), F.col("transmission")).show(5)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|           Subaru|   Outback|   automatic|
|           Subaru|   Outback|   automatic|
|           Subaru|  Forester|   automatic|
|           Subaru|   Impreza|  mechanical|
|           Subaru|    Legacy|   automatic|
+-----------------+----------+------------+
only showing top 5 rows



# .filter()

Метод .filter() принимает условия для фильтрации:

In [9]:
# выберем только марки Audi

df.select("manufacturer_name", "model_name", "transmission").filter("manufacturer_name = 'Audi'").show(5)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|             Audi|        Q7|   automatic|
|             Audi|        TT|   automatic|
|             Audi|       100|  mechanical|
|             Audi|        A6|   automatic|
|             Audi|        Q3|   automatic|
+-----------------+----------+------------+
only showing top 5 rows



Цепочка условий:

In [10]:
# выберем только марки Audi и ручной коробкой

(
    df
    .select("manufacturer_name", "model_name", "transmission")
    .filter("manufacturer_name = 'Audi'")
    .filter("transmission = 'mechanical'")
    .show(5)
)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|             Audi|       100|  mechanical|
|             Audi|A6 Allroad|  mechanical|
|             Audi|       100|  mechanical|
|             Audi|        A4|  mechanical|
|             Audi|        80|  mechanical|
+-----------------+----------+------------+
only showing top 5 rows



Цепочка условий в виде одного SQL выражения:

In [11]:
(
    df
    .select("manufacturer_name", "model_name", "transmission")
    .filter("manufacturer_name = 'Audi' and transmission = 'mechanical'")
    .show(5)
)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|             Audi|       100|  mechanical|
|             Audi|A6 Allroad|  mechanical|
|             Audi|       100|  mechanical|
|             Audi|        A4|  mechanical|
|             Audi|        80|  mechanical|
+-----------------+----------+------------+
only showing top 5 rows



Удобнее и логичнее использовать `col()` для составления условий фильтрации:

In [12]:
(
    df
    .select("manufacturer_name", "model_name", "transmission")
    .filter(F.col("manufacturer_name") == "Audi")
    .filter(F.col("transmission") != "mechanical")
    .show(5)
)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|             Audi|        Q7|   automatic|
|             Audi|        TT|   automatic|
|             Audi|        A6|   automatic|
|             Audi|        Q3|   automatic|
|             Audi|        Q5|   automatic|
+-----------------+----------+------------+
only showing top 5 rows



## .count()

Подсчет кол-ва строк:

In [13]:
df.count()

38531

Либо уникальных строк:

In [14]:
df.select("manufacturer_name").distinct().count()

55

## .groupBy() and .orderBy()

Группировка и агрегация (аналог `GROUP BY` в SQL):

In [15]:
# сгрупировать по manufacturer_name и посчитать кол-во каждого

df.groupBy("manufacturer_name").count().show(5)

+-----------------+-----+
|manufacturer_name|count|
+-----------------+-----+
|       Volkswagen| 4243|
|         Infiniti|  162|
|          Peugeot| 1909|
|            Lexus|  213|
|           Jaguar|   53|
+-----------------+-----+
only showing top 5 rows



Сортировка по колонке по возрастанию:

In [16]:
df.groupBy("manufacturer_name").count().orderBy("count").show(5)

+-----------------+-----+
|manufacturer_name|count|
+-----------------+-----+
|          Lincoln|   36|
|       Great Wall|   36|
|              ЗАЗ|   42|
|          Pontiac|   42|
|         Cadillac|   43|
+-----------------+-----+
only showing top 5 rows



Сортировка по колонке по убыванию. Тут надо явно задать колонку через `col()`:

In [17]:
df.groupBy("manufacturer_name").count().orderBy(F.col("count").desc()).show(5)

+-----------------+-----+
|manufacturer_name|count|
+-----------------+-----+
|       Volkswagen| 4243|
|             Opel| 2759|
|              BMW| 2610|
|             Ford| 2566|
|          Renault| 2493|
+-----------------+-----+
only showing top 5 rows



## .withColumnRenamed() and .withColumn()

Переименовать существующую колонку:

In [18]:
df.withColumnRenamed("manufacturer_name", "manufacturer").select("manufacturer").show(5)

+------------+
|manufacturer|
+------------+
|      Subaru|
|      Subaru|
|      Subaru|
|      Subaru|
|      Subaru|
+------------+
only showing top 5 rows



Создать новую колонку. Первый аргумент это название новой колонки, второй агрумент это выражение (обязательно использовать `col()` если ссылаемся на другую колонку):

In [19]:
df.withColumn("next_year", F.col("year_produced") + 1).select("year_produced", "next_year").show(5)

+-------------+---------+
|year_produced|next_year|
+-------------+---------+
|         2010|   2011.0|
|         2002|   2003.0|
|         2001|   2002.0|
|         1999|   2000.0|
|         2001|   2002.0|
+-------------+---------+
only showing top 5 rows



## .printSchema() and .describe()

Вывести схему датафрейма (типы колонок):

In [20]:
df.printSchema()

root
 |-- manufacturer_name: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- color: string (nullable = true)
 |-- odometer_value: string (nullable = true)
 |-- year_produced: string (nullable = true)
 |-- engine_fuel: string (nullable = true)
 |-- engine_has_gas: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- engine_capacity: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- has_warranty: string (nullable = true)
 |-- state: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- price_usd: string (nullable = true)
 |-- is_exchangeable: string (nullable = true)
 |-- location_region: string (nullable = true)
 |-- number_of_photos: string (nullable = true)
 |-- up_counter: string (nullable = true)
 |-- feature_0: string (nullable = true)
 |-- feature_1: string (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: string (nullable = true)


In [21]:
df.schema

StructType(List(StructField(manufacturer_name,StringType,true),StructField(model_name,StringType,true),StructField(transmission,StringType,true),StructField(color,StringType,true),StructField(odometer_value,StringType,true),StructField(year_produced,StringType,true),StructField(engine_fuel,StringType,true),StructField(engine_has_gas,StringType,true),StructField(engine_type,StringType,true),StructField(engine_capacity,StringType,true),StructField(body_type,StringType,true),StructField(has_warranty,StringType,true),StructField(state,StringType,true),StructField(drivetrain,StringType,true),StructField(price_usd,StringType,true),StructField(is_exchangeable,StringType,true),StructField(location_region,StringType,true),StructField(number_of_photos,StringType,true),StructField(up_counter,StringType,true),StructField(feature_0,StringType,true),StructField(feature_1,StringType,true),StructField(feature_2,StringType,true),StructField(feature_3,StringType,true),StructField(feature_4,StringType,tr

Вывести сводную статистику по датафрейму:

In [22]:
df.select("manufacturer_name", "model_name", "year_produced", "price_usd").describe().show()

+-------+-----------------+------------------+------------------+-----------------+
|summary|manufacturer_name|        model_name|     year_produced|        price_usd|
+-------+-----------------+------------------+------------------+-----------------+
|  count|            38531|             38531|             38531|            38531|
|   mean|             null|1168.2918056562726|2002.9437336170874|6639.971021255605|
| stddev|             null| 9820.119520829581| 8.065730511309935|6428.152018202911|
|    min|            Acura|               100|              1942|              1.0|
|    max|              УАЗ|            Таврия|              2019|           9999.0|
+-------+-----------------+------------------+------------------+-----------------+



In [23]:
spark.stop()