In [5]:
# cоздаём SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("AppName").getOrCreate()

test

In [6]:
spark

In [7]:
# Вероятно для кластеров...

# conf = SparkConf().setAppName(appName).setMaster(master)
# sc = SparkContext(conf=conf)
# RDD
# rdd = sc.textFile('python/test_support/sql/ages.csv')
# df2 = spark.read.csv(rdd)

In [12]:
# Чтение датасета 

df = spark.read\
    .format("csv")\
    .option("header","true")\
    .load("data/cars.csv")
    # Можно еще явно передать схему

In [13]:
df.show(1, vertical=True)

-RECORD 0-------------------------
 manufacturer_name | Subaru       
 model_name        | Outback      
 transmission      | automatic    
 color             | silver       
 odometer_value    | 190000       
 year_produced     | 2010         
 engine_fuel       | gasoline     
 engine_has_gas    | False        
 engine_type       | gasoline     
 engine_capacity   | 2.5          
 body_type         | universal    
 has_warranty      | False        
 state             | owned        
 drivetrain        | all          
 price_usd         | 10900.0      
 is_exchangeable   | False        
 location_region   | Минская обл. 
 number_of_photos  | 9            
 up_counter        | 13           
 feature_0         | False        
 feature_1         | True         
 feature_2         | True         
 feature_3         | True         
 feature_4         | False        
 feature_5         | True         
 feature_6         | False        
 feature_7         | True         
 feature_8         |

In [11]:
# Итак у нас есть DataFrame, 
# теперь начнем над ним издеваться 

In [41]:
df.select('manufacturer_name', 'model_name', 'transmission')\
    .limit(2)\
    .show()

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|           Subaru|   Outback|   automatic|
|           Subaru|   Outback|   automatic|
+-----------------+----------+------------+



In [40]:
import pyspark.sql.functions as F

df.select(F.col("manufacturer_name"), F.col("model_name"), F.col("transmission"))\
    .show(2)

+-----------------+----------+------------+
|manufacturer_name|model_name|transmission|
+-----------------+----------+------------+
|           Subaru|   Outback|   automatic|
|           Subaru|   Outback|   automatic|
+-----------------+----------+------------+
only showing top 2 rows



In [44]:
# Выбор колонки по регулярному выражению
df.select(df.colRegex("`(^.*name*)`"))\
    .show(2)

+-----------------+----------+
|manufacturer_name|model_name|
+-----------------+----------+
|           Subaru|   Outback|
|           Subaru|   Outback|
+-----------------+----------+
only showing top 2 rows



In [53]:
df.select('manufacturer_name','price_usd')\
    .filter(F.col("manufacturer_name") == 'Audi')\
    .describe()\
    .select('summary', 'price_usd')\
    .show()

+-------+-----------------+
|summary|        price_usd|
+-------+-----------------+
|  count|             2468|
|   mean|7154.944923014588|
| stddev|6428.924114478054|
|    min|           1000.0|
|    max|           9999.0|
+-------+-----------------+



In [48]:
df.select("manufacturer_name").distinct().count()

55

In [71]:
# сгрупировать по manufacturer_name и посчитать кол-во каждого

from ast import alias
import pyspark.sql.types as t
(
df.groupBy("manufacturer_name")
    .agg(
        F.round(F.avg("price_usd")).cast(t.IntegerType()).alias("average_price"),
        F.count("manufacturer_name").alias("count")  
    )
).show(2)

+-----------------+-------------+-----+
|manufacturer_name|average_price|count|
+-----------------+-------------+-----+
|       Volkswagen|         6429| 4243|
|            Lexus|        17131|  213|
+-----------------+-------------+-----+
only showing top 2 rows



In [None]:
df.groupBy("manufacturer_name")\
    .count()\
    .orderBy(F.col("count").desc())\
    .show(5)

In [72]:
df.printSchema()

root
 |-- manufacturer_name: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- color: string (nullable = true)
 |-- odometer_value: string (nullable = true)
 |-- year_produced: string (nullable = true)
 |-- engine_fuel: string (nullable = true)
 |-- engine_has_gas: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- engine_capacity: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- has_warranty: string (nullable = true)
 |-- state: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- price_usd: string (nullable = true)
 |-- is_exchangeable: string (nullable = true)
 |-- location_region: string (nullable = true)
 |-- number_of_photos: string (nullable = true)
 |-- up_counter: string (nullable = true)
 |-- feature_0: string (nullable = true)
 |-- feature_1: string (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: string (nullable = true)


In [73]:
df.withColumnRenamed("manufacturer_name", "manufacturer").select("manufacturer").show(5)

+------------+
|manufacturer|
+------------+
|      Subaru|
|      Subaru|
|      Subaru|
|      Subaru|
|      Subaru|
+------------+
only showing top 5 rows



In [74]:
df.withColumn("next_year", F.col("year_produced") + 1).select("year_produced", "next_year").show(5)

+-------------+---------+
|year_produced|next_year|
+-------------+---------+
|         2010|   2011.0|
|         2002|   2003.0|
|         2001|   2002.0|
|         1999|   2000.0|
|         2001|   2002.0|
+-------------+---------+
only showing top 5 rows



# Теперь реально SQL


In [75]:
df.createOrReplaceTempView("cars")

In [96]:
spark.sql("""
        SELECT manufacturer_name , Round(AVG(price_usd), 1) as `average_price`
        FROM cars
        GROUP BY manufacturer_name
            """).show()

+-----------------+-------------+
|manufacturer_name|average_price|
+-----------------+-------------+
|       Volkswagen|       6428.9|
|            Lexus|      17130.6|
|           Jaguar|      17813.0|
|            Rover|       1734.2|
|           Lancia|       2901.5|
|             Jeep|      10912.5|
|       Mitsubishi|       5416.2|
|              Kia|       8156.1|
|             Mini|      13133.7|
|            Lifan|       8280.5|
|             LADA|       7598.7|
|        SsangYong|       7719.3|
|             Audi|       7154.9|
|             Seat|       3868.0|
|         Cadillac|      11093.1|
|          Москвич|        978.9|
|       Alfa Romeo|       2688.6|
|            Geely|       7769.2|
|          Renault|       4557.4|
|           Daewoo|       1576.6|
+-----------------+-------------+
only showing top 20 rows



In [89]:
spark.sql("""
        SELECT DISTINCT model_name
        FROM cars
        WHERE manufacturer_name = "Subaru"
        """).show()

+----------+
|model_name|
+----------+
|   Outback|
|  Forester|
|   Tribeca|
|   Impreza|
|     Justy|
|    Libero|
|    Legacy|
|        XV|
|     Leone|
|       WRX|
+----------+

