In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
ss = SparkSession.builder.appName('mysession').getOrCreate()
sc = ss.sparkContext

In [2]:
rdd = sc.parallelize( [12,23,34,45,56,67,78,89,32,43,54,65,76,87,98] )

In [3]:
rdd1 = rdd.filter( lambda n: n < 50 )
rdd1.collect()

[12, 23, 34, 45, 32, 43]

In [4]:
rdd2 = rdd.filter( lambda n: n == 34 )
rdd2.collect()

[34]

In [7]:
rdd3 = rdd.groupBy( lambda n : 'LO' if n < 50 else 'HI' ).mapValues( list )
rdd3.collect()

[('LO', [12, 23, 34, 45, 32, 43]),
 ('HI', [56, 67, 78, 89, 54, 65, 76, 87, 98])]

In [8]:
rdd3 = rdd.groupBy( lambda n : 'LO' if n < 50 else 'HI' ).mapValues( max )
rdd3.collect()

[('LO', 45), ('HI', 98)]

In [10]:
rdd3 = rdd.groupBy( lambda n : 'LO' if n < 50 else 'HI' ).mapValues( lambda x : (min(x), max(x) ))
rdd3.collect()

[('LO', (12, 45)), ('HI', (54, 98))]

## partizioni

In [11]:
rdd4 = sc.parallelize(range(1, 50))
rdd4.getNumPartitions()

1

In [12]:
rdd5 = sc.parallelize(range(1, 50), 4)
rdd5.getNumPartitions()

4

In [14]:
rdd5.glom().collect()

[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
 [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24],
 [25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36],
 [37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]]

In [17]:
rdd6 = rdd4.map( lambda x : (x%7, x) ).partitionBy(7)
rdd6.glom().collect()

[[(0, 7), (0, 14), (0, 21), (0, 28), (0, 35), (0, 42), (0, 49)],
 [(1, 1), (1, 8), (1, 15), (1, 22), (1, 29), (1, 36), (1, 43)],
 [(2, 2), (2, 9), (2, 16), (2, 23), (2, 30), (2, 37), (2, 44)],
 [(3, 3), (3, 10), (3, 17), (3, 24), (3, 31), (3, 38), (3, 45)],
 [(4, 4), (4, 11), (4, 18), (4, 25), (4, 32), (4, 39), (4, 46)],
 [(5, 5), (5, 12), (5, 19), (5, 26), (5, 33), (5, 40), (5, 47)],
 [(6, 6), (6, 13), (6, 20), (6, 27), (6, 34), (6, 41), (6, 48)]]

In [18]:
rdd7 = rdd6.coalesce(3)
rdd7.glom().collect()

[[(0, 7),
  (0, 14),
  (0, 21),
  (0, 28),
  (0, 35),
  (0, 42),
  (0, 49),
  (3, 3),
  (3, 10),
  (3, 17),
  (3, 24),
  (3, 31),
  (3, 38),
  (3, 45)],
 [(1, 1),
  (1, 8),
  (1, 15),
  (1, 22),
  (1, 29),
  (1, 36),
  (1, 43),
  (4, 4),
  (4, 11),
  (4, 18),
  (4, 25),
  (4, 32),
  (4, 39),
  (4, 46),
  (6, 6),
  (6, 13),
  (6, 20),
  (6, 27),
  (6, 34),
  (6, 41),
  (6, 48)],
 [(2, 2),
  (2, 9),
  (2, 16),
  (2, 23),
  (2, 30),
  (2, 37),
  (2, 44),
  (5, 5),
  (5, 12),
  (5, 19),
  (5, 26),
  (5, 33),
  (5, 40),
  (5, 47)]]

In [21]:
rdd7.saveAsTextFile('mydata')