O Spark pega as colunas que você especificou na repartição, faz o hash desse valor em 64b de comprimento e modula o valor pelo número de partições. Dessa forma, o número de partições é determinístico. A razão pela qual funciona dessa maneira é que as junções precisam de um número correspondente de partições nos lados esquerdo e direito de uma junção, além de garantir que o hash seja o mesmo em ambos os lados

repartition()é usado para particionar dados na memória e partitionByé usado para particionar dados no disco. Eles costumam ser usados ​​em conjunto.

Ambos repartition()e partitionBypodem ser usados ​​para "particionar dados com base na coluna do dataframe", mas repartition()particiona os dados na memória e partitionByparticiona os dados no disco.

Partition Tuning: https://luminousmen.com/post/spark-tips-partition-tuning
https://devblogs.microsoft.com/azure-sql/partitioning-on-spark-fast-loading-clustered-columnstore-index/
https://towardsdatascience.com/apache-spark-performance-boosting-e072a3ec1179

In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.types import StringType

In [14]:
from pyspark.sql import functions as F

In [3]:
spark = SparkSession.builder.master("local[*]").appName("Particao 2").getOrCreate()

In [5]:
url = "/Users/macca/Documents/spark/pyspark_com_grimaldo/arquivo_de_dados/Bikes.csv"

In [8]:
df = (spark.read
      .format("csv")
      .option("header", "true")
      .option("inferSchema","true")
      .option("delimiters", ",")
      .load(url))

                                                                                

In [9]:
df.show(10)

+--------------------+--------+---------+----------+------------+---+-----+-------------+
|           bike_name|   price|     city|kms_driven|       owner|age|power|        brand|
+--------------------+--------+---------+----------+------------+---+-----+-------------+
|TVS Star City Plu...| 35000.0|Ahmedabad|   17654.0| First Owner|3.0|110.0|          TVS|
|Royal Enfield Cla...|119900.0|    Delhi|   11000.0| First Owner|4.0|350.0|Royal Enfield|
|Triumph Daytona 675R|600000.0|    Delhi|     110.0| First Owner|8.0|675.0|      Triumph|
|TVS Apache RTR 180cc| 65000.0|Bangalore|   16329.0| First Owner|4.0|180.0|          TVS|
|Yamaha FZ S V 2.0...| 80000.0|Bangalore|   10000.0| First Owner|3.0|150.0|       Yamaha|
|    Yamaha FZs 150cc| 53499.0|    Delhi|   25000.0| First Owner|6.0|150.0|       Yamaha|
|Honda CB Hornet 1...| 85000.0|    Delhi|    8200.0| First Owner|3.0|160.0|        Honda|
|Hero Splendor Plu...| 45000.0|    Delhi|   12645.0| First Owner|3.0|100.0|         Hero|
|Royal Enf

In [52]:
contagem = df.groupBy("city").count().show()

+------------+-----+
|        city|count|
+------------+-----+
|   Bangalore| 2723|
|    Zirakpur|    1|
|      Kathua|    1|
|      Sanand|    1|
|     Udaipur|   12|
|     Khandwa|    2|
|       Kochi|    2|
|   Faridabad|  609|
|  Aurangabad|   18|
|     Chenani|    3|
|Vizianagaram|    1|
|       Hosur|    5|
|Swaimadhopur|    1|
|     Sangrur|    1|
|    Sholapur|    3|
| Bhubaneswar|    1|
|      Mysore|    8|
|    Uluberia|    1|
|    Rudrapur|    3|
|   Ferozepur|    2|
+------------+-----+
only showing top 20 rows



In [65]:
total_Bangalore = df.filter(df.city == 'Bangalore').groupBy("owner").count().show()

+--------------------+-----+
|               owner|count|
+--------------------+-----+
|         Third Owner|   17|
|        Second Owner|   78|
|         First Owner| 2625|
|Fourth Owner Or More|    3|
+--------------------+-----+



In [66]:
df.rdd.getNumPartitions()

1

In [27]:
df.select("owner").describe().show()

+-------+-----------+
|summary|      owner|
+-------+-----------+
|  count|      32648|
|   mean|       null|
| stddev|       null|
|    min|First Owner|
|    max|Third Owner|
+-------+-----------+



In [31]:
df.createOrReplaceTempView("tab_bikes")

In [38]:
delhi = spark.sql("select * from tab_bikes where city='Delhi'")

In [39]:
delhi.show(10)

+--------------------+--------+-----+----------+------------+---+-----+-------------+
|           bike_name|   price| city|kms_driven|       owner|age|power|        brand|
+--------------------+--------+-----+----------+------------+---+-----+-------------+
|Royal Enfield Cla...|119900.0|Delhi|   11000.0| First Owner|4.0|350.0|Royal Enfield|
|Triumph Daytona 675R|600000.0|Delhi|     110.0| First Owner|8.0|675.0|      Triumph|
|    Yamaha FZs 150cc| 53499.0|Delhi|   25000.0| First Owner|6.0|150.0|       Yamaha|
|Honda CB Hornet 1...| 85000.0|Delhi|    8200.0| First Owner|3.0|160.0|        Honda|
|Hero Splendor Plu...| 45000.0|Delhi|   12645.0| First Owner|3.0|100.0|         Hero|
|Royal Enfield Cla...| 88000.0|Delhi|   19000.0|Second Owner|7.0|500.0|Royal Enfield|
| Bajaj Discover 100M| 29499.0|Delhi|   20000.0| First Owner|8.0|100.0|        Bajaj|
| Bajaj Discover 125M| 29900.0|Delhi|   20000.0| First Owner|7.0|125.0|        Bajaj|
| Bajaj Discover 125M| 29900.0|Delhi|   20000.0| First

<b> Repartition por coluna </b>

In [91]:
df_repartition = df.repartition("owner")

In [92]:
df_repartition.rdd.glom().collect()

[[Row(bike_name='Bajaj Pulsar 150cc', price=11100.0, city='Pune', kms_driven=12000.0, owner='Third Owner', age=12.0, power=150.0, brand='Bajaj'),
  Row(bike_name='KTM RC 200cc', price=113000.0, city='Kottayam', kms_driven=28000.0, owner='Third Owner', age=6.0, power=200.0, brand='KTM'),
  Row(bike_name='Triumph Street Triple ABS 675cc', price=599999.0, city='Mumbai', kms_driven=7800.0, owner='Third Owner', age=5.0, power=675.0, brand='Triumph'),
  Row(bike_name='Kawasaki Ninja 250cc', price=125000.0, city='Coimbatore', kms_driven=21000.0, owner='Third Owner', age=11.0, power=250.0, brand='Kawasaki'),
  Row(bike_name='Mahindra Centuro Rockstar 110cc', price=18999.0, city='Pune', kms_driven=38669.0, owner='Third Owner', age=6.0, power=110.0, brand='Mahindra'),
  Row(bike_name='Royal Enfield Bullet Electra Twinspark 350cc', price=85000.0, city='Palwal', kms_driven=23399.0, owner='Third Owner', age=6.0, power=350.0, brand='Royal Enfield'),
  Row(bike_name='Royal Enfield Bullet Electra Twin

In [93]:
df_repartition.rdd.getNumPartitions()

1

In [95]:
spark.conf.get("spark.sql.shuffle.partitions")

'200'

In [98]:
print (df_repartition.rdd.partitioner)

None


In [104]:
df = records.rdd.map(lambda el: (el['owner'], el)).partitionBy( 'owner')

NameError: name 'records' is not defined