In [22]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
conf = SparkConf()
conf.setMaster("local").setAppName('My app')
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
print('Запущен Spark версии', spark.version)

Запущен Spark версии 3.3.1


In [23]:
# читаем данные в DataFrame c автоматическим определение схемы:

df = spark.read.format("csv").option("header", "true").load("cars.csv")


In [28]:
# вывести в консоль 5 строк из датафрейма:
# ( Дополнительный параметр vertical=True выведет каждую строку данных построчно в виде колонка | значение)
print(df)

<bound method DataFrame.head of DataFrame[manufacturer_name: string, model_name: string, transmission: string, color: string, odometer_value: string, year_produced: string, engine_fuel: string, engine_has_gas: string, engine_type: string, engine_capacity: string, body_type: string, has_warranty: string, state: string, drivetrain: string, price_usd: string, is_exchangeable: string, location_region: string, number_of_photos: string, up_counter: string, feature_0: string, feature_1: string, feature_2: string, feature_3: string, feature_4: string, feature_5: string, feature_6: string, feature_7: string, feature_8: string, feature_9: string, duration_listed: string]>

In [29]:
df.show(1, vertical=True)

-RECORD 0-------------------------
 manufacturer_name | Subaru       
 model_name        | Outback      
 transmission      | automatic    
 color             | silver       
 odometer_value    | 190000       
 year_produced     | 2010         
 engine_fuel       | gasoline     
 engine_has_gas    | False        
 engine_type       | gasoline     
 engine_capacity   | 2.5          
 body_type         | universal    
 has_warranty      | False        
 state             | owned        
 drivetrain        | all          
 price_usd         | 10900.0      
 is_exchangeable   | False        
 location_region   | Минская обл. 
 number_of_photos  | 9            
 up_counter        | 13           
 feature_0         | False        
 feature_1         | True         
 feature_2         | True         
 feature_3         | True         
 feature_4         | False        
 feature_5         | True         
 feature_6         | False        
 feature_7         | True         
 feature_8         |

# **.select()**
.select() выберет только нужные колонки (по аналогии с выражением SELECT в SQL)

In [33]:
# выбираем несколько колонок для отображения

df.select("manufacturer_name", "model_name", "transmission").show(5)

df.select(df["manufacturer_name"], df["model_name"], df["transmission"]).show(5)

import pyspark.sql.functions as F
df.select(F.col("manufacturer_name"), F.col("model_name"), F.col("transmission")).show(5)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|           Subaru|   Outback|   automatic|
|           Subaru|   Outback|   automatic|
|           Subaru|  Forester|   automatic|
|           Subaru|   Impreza|  mechanical|
|           Subaru|    Legacy|   automatic|
+-----------------+----------+------------+
only showing top 5 rows

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|           Subaru|   Outback|   automatic|
|           Subaru|   Outback|   automatic|
|           Subaru|  Forester|   automatic|
|           Subaru|   Impreza|  mechanical|
|           Subaru|    Legacy|   automatic|
+-----------------+----------+------------+
only showing top 5 rows

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|           Subaru|   Outb

In [None]:
# выберем только марки Audi
df.select("manufacturer_name", "model_name", "transmission").filter("manufacturer_name = 'Audi'").show(5)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|             Audi|        Q7|   automatic|
|             Audi|        TT|   automatic|
|             Audi|       100|  mechanical|
|             Audi|        A6|   automatic|
|             Audi|        Q3|   automatic|
+-----------------+----------+------------+
only showing top 5 rows



In [None]:
# Цепочка условий:

# выберем только марки Audi и ручной коробкой

(
    df
    .select("manufacturer_name", "model_name", "transmission")
    .filter("manufacturer_name = 'Audi'")
    .filter("transmission = 'mechanical'")
    .show(5)
)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|             Audi|       100|  mechanical|
|             Audi|A6 Allroad|  mechanical|
|             Audi|       100|  mechanical|
|             Audi|        A4|  mechanical|
|             Audi|        80|  mechanical|
+-----------------+----------+------------+
only showing top 5 rows



In [None]:
# Цепочка условий в виде одного SQL выражения:

(
    df
    .select("manufacturer_name", "model_name", "transmission")
    .filter("manufacturer_name = 'Audi' and transmission = 'mechanical'")
    .show(5)
)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|             Audi|       100|  mechanical|
|             Audi|A6 Allroad|  mechanical|
|             Audi|       100|  mechanical|
|             Audi|        A4|  mechanical|
|             Audi|        80|  mechanical|
+-----------------+----------+------------+
only showing top 5 rows



In [None]:
# Удобнее и логичнее использовать col() для составления условий фильтрации:

(
    df
    .select("manufacturer_name", "model_name", "transmission")
    .filter(F.col("manufacturer_name") == "Audi")
    .filter(F.col("transmission") != "mechanical")
    .show(5)
)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|             Audi|        Q7|   automatic|
|             Audi|        TT|   automatic|
|             Audi|        A6|   automatic|
|             Audi|        Q3|   automatic|
|             Audi|        Q5|   automatic|
+-----------------+----------+------------+
only showing top 5 rows



In [None]:
# уникальные строки:

df.select("manufacturer_name").distinct().count()

55

In [39]:
df.groupBy("manufacturer_name").count().orderBy(F.col("count").desc()).show()

+-----------------+-----+
|manufacturer_name|count|
+-----------------+-----+
|       Volkswagen| 4243|
|             Opel| 2759|
|              BMW| 2610|
|             Ford| 2566|
|          Renault| 2493|
|             Audi| 2468|
|    Mercedes-Benz| 2237|
|          Peugeot| 1909|
|          Citroen| 1562|
|           Nissan| 1361|
|            Mazda| 1328|
|           Toyota| 1246|
|          Hyundai| 1116|
|            Skoda| 1089|
|              Kia|  912|
|       Mitsubishi|  887|
|             Fiat|  824|
|            Honda|  797|
|            Volvo|  721|
|              ВАЗ|  481|
+-----------------+-----+
only showing top 20 rows



In [None]:
df.withColumnRenamed("manufacturer_name", "manufacturer").select("manufacturer").show(5)

+------------+
|manufacturer|
+------------+
|      Subaru|
|      Subaru|
|      Subaru|
|      Subaru|
|      Subaru|
+------------+
only showing top 5 rows



In [None]:
df.withColumn("next_year", F.col("year_produced") + 1).select("year_produced", "next_year").show(5)

+-------------+---------+
|year_produced|next_year|
+-------------+---------+
|         2010|   2011.0|
|         2002|   2003.0|
|         2001|   2002.0|
|         1999|   2000.0|
|         2001|   2002.0|
+-------------+---------+
only showing top 5 rows



In [None]:
#Выведем схему датафрейма (типы колонок)
df.printSchema()

root
 |-- manufacturer_name: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- color: string (nullable = true)
 |-- odometer_value: string (nullable = true)
 |-- year_produced: string (nullable = true)
 |-- engine_fuel: string (nullable = true)
 |-- engine_has_gas: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- engine_capacity: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- has_warranty: string (nullable = true)
 |-- state: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- price_usd: string (nullable = true)
 |-- is_exchangeable: string (nullable = true)
 |-- location_region: string (nullable = true)
 |-- number_of_photos: string (nullable = true)
 |-- up_counter: string (nullable = true)
 |-- feature_0: string (nullable = true)
 |-- feature_1: string (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: string (nullable = true)


In [44]:
# Вывести сводную статистику по датафрейму:

df.select("manufacturer_name", "model_name", "year_produced", "price_usd").describe().show()

+-------+-----------------+------------------+------------------+-----------------+
|summary|manufacturer_name|        model_name|     year_produced|        price_usd|
+-------+-----------------+------------------+------------------+-----------------+
|  count|            38531|             38531|             38531|            38531|
|   mean|             null|1168.2918056562726|2002.9437336170874|6639.971021255604|
| stddev|             null| 9820.119520829561| 8.065730511309887|6428.152018202918|
|    min|            Acura|               100|              1942|              1.0|
|    max|              УАЗ|            Таврия|              2019|           9999.0|
+-------+-----------------+------------------+------------------+-----------------+



Задача. Сделать пайплайн обработки файла cars.csv.

Необходимо посчитать по каждому производителю (поле manufacturer_name):

кол-во объявлений
средний год выпуска автомобилей
минимальную цену
максимальную цену
Выгрузить результат в output.csv.

In [None]:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as t


def extract_data(spark: SparkSession) -> DataFrame:
    path = "/content/cars.csv"
    return spark.read.option("header", "true").csv(path)


def transform_data(df: DataFrame) -> DataFrame:
    output = (
        df
        .groupBy("manufacturer_name")
        .agg(
            F.count("manufacturer_name").alias("count_ads"),
            F.round(F.avg("year_produced")).cast(t.IntegerType()).alias("avg_year_produced"),
            F.min("price_usd").alias("min_price"),
            F.max("price_usd").alias("max_price"),
        )
        .orderBy(F.col("count_ads").desc())
    )
    return output


def save_data(df: DataFrame) -> None:
    df.coalesce(4).write.mode("overwrite").format("json").save("/content/output.json")


def main():
    spark = SparkSession.builder.appName("Practice").getOrCreate()
    df = extract_data(spark)
    output = transform_data(df)
    save_data(output)
    #spark.stop()

main()

In [None]:
spark.stop()
