### Repartitions in Spark

In [115]:
from pyspark.sql import SparkSession
import datetime
from pyspark.sql.functions import to_date, col, spark_partition_id, monotonically_increasing_id, rand
import random

In [5]:
spark = SparkSession.builder.appName("Repartition").getOrCreate()

In [45]:
t = spark.createDataFrame([{"foo": random.randint(0, 10)} for x in range(0, 1000)])
t.count()

1000

### Default partitioning 

In [39]:
t.limit(3).show()

+---+
|foo|
+---+
|  4|
| 10|
|  9|
+---+



In [20]:
t.rdd.getNumPartitions()

8

In [47]:
t.rdd.glom().map(len).collect()

[125, 125, 125, 125, 125, 125, 125, 125]

In [51]:
t.write.format('parquet').save("./partition_data/default_partition")

In [56]:
!ls ./partition_data/default_partition | grep part | wc -l 

       8


### Repartition with a column name

In [21]:
t_rep = t.repartition("foo")
t_rep.limit(3).show()

+---+
|foo|
+---+
|  0|
|  0|
|  0|
+---+



In [32]:
t_rep.rdd.getNumPartitions()

200

In [38]:
t_rep.rdd.glom().map(len).collect()[0:10]

[0, 0, 0, 0, 0, 90, 0, 0, 0, 0]

In [57]:
t_rep.write.format('parquet').save('./partition_data/with_col_name')

In [58]:
!ls ./partition_data/with_col_name | grep part | wc -l 

      12


### Repartition with number

In [90]:
t = spark.createDataFrame([{"foo": random.randint(0, 10)} for x in range(0, 1000)])

In [91]:
t_num = t.repartition(2)

In [102]:
t_num.limit(3).show()

+---+
|foo|
+---+
|  3|
|  3|
|  3|
+---+



In [95]:
t_num.rdd.getNumPartitions()

2

In [96]:
t_num.rdd.glom().map(len).collect()

[504, 496]

In [65]:
t_num.write.format('parquet').save('./partition_data/with_size')

In [66]:
!ls ./partition_data/with_size | grep part | wc -l 

       2


### Repartition with number and column name

In [23]:
t_rep_size = t.repartition(2, "foo")
t_rep_size.limit(3).show()

+---+
|foo|
+---+
|  4|
|  2|
|  4|
+---+



In [24]:
t_rep_size.rdd.getNumPartitions()

2

In [30]:
t_rep_size.rdd.glom().map(len).collect()

[341, 659]

In [59]:
t_rep_size.write.format('parquet').save('./partition_data/with_col_name_and_size')

In [60]:
!ls ./partition_data/with_col_name_and_size | grep part | wc -l 

       2


### Repartition with number, column name and random

In [117]:
t_rep_rand = t.repartition(2, "foo", rand())
t_rep_rand.limit(3).show()

+---+
|foo|
+---+
|  8|
| 10|
|  4|
+---+



In [118]:
t_rep_rand.rdd.glom().map(len).collect()

[480, 520]

In [121]:
t_rep_rand.write.format('parquet').save('./partition_data/with_col_name_and_size_and_rand')

In [122]:
!ls -R ./partition_data/with_col_name_and_size_and_rand | grep part | wc -l

       2


### RepartitionBy on Writer

In [73]:
t.select("foo", monotonically_increasing_id().alias('id')).write.format("parquet").partitionBy("foo").save('./partition_data/with_writer')

In [75]:
!ls ./partition_data/with_writer

_SUCCESS [1m[36mfoo=1[m[m    [1m[36mfoo=2[m[m    [1m[36mfoo=4[m[m    [1m[36mfoo=6[m[m    [1m[36mfoo=8[m[m
[1m[36mfoo=0[m[m    [1m[36mfoo=10[m[m   [1m[36mfoo=3[m[m    [1m[36mfoo=5[m[m    [1m[36mfoo=7[m[m    [1m[36mfoo=9[m[m


In [76]:
!ls -R ./partition_data/with_writer/ | grep part | wc -l

      99


# Loading data

In [81]:
r = spark.read.format("parquet").load('./partition_data/with_writer')

In [108]:
r.filter("foo = 1").explain(extended=True)

== Parsed Logical Plan ==
'Filter ('foo = 1)
+- Relation[id#132L,foo#133] parquet

== Analyzed Logical Plan ==
id: bigint, foo: int
Filter (foo#133 = 1)
+- Relation[id#132L,foo#133] parquet

== Optimized Logical Plan ==
Filter (isnotnull(foo#133) && (foo#133 = 1))
+- Relation[id#132L,foo#133] parquet

== Physical Plan ==
*(1) FileScan parquet [id#132L,foo#133] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/mdatberg/Codez/Springer-Media/media_impact_bucket_testing/jupyter/p..., PartitionCount: 1, PartitionFilters: [isnotnull(foo#133), (foo#133 = 1)], PushedFilters: [], ReadSchema: struct<id:bigint>


In [85]:
r2 = spark.read.format("parquet").load('./partition_data/with_col_name')

In [114]:
r2.filter("foo = 1").explain(extended=True)

== Parsed Logical Plan ==
'Filter ('foo = 1)
+- Relation[foo#136L] parquet

== Analyzed Logical Plan ==
foo: bigint
Filter (foo#136L = cast(1 as bigint))
+- Relation[foo#136L] parquet

== Optimized Logical Plan ==
Filter (isnotnull(foo#136L) && (foo#136L = 1))
+- Relation[foo#136L] parquet

== Physical Plan ==
*(1) Project [foo#136L]
+- *(1) Filter (isnotnull(foo#136L) && (foo#136L = 1))
   +- *(1) FileScan parquet [foo#136L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/mdatberg/Codez/Springer-Media/media_impact_bucket_testing/jupyter/p..., PartitionFilters: [], PushedFilters: [IsNotNull(foo), EqualTo(foo,1)], ReadSchema: struct<foo:bigint>
