In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.sql import *

In [2]:
sc = SparkContext('local', 'pyspark')
sql = SQLContext(sc)

In [3]:
##############################     Inspect Pyspark       ###########################################

In [4]:
sc.version

'2.3.1'

In [5]:
sc.pythonVer

'3.6'

In [6]:
sc.master  

'local'

In [7]:
str(sc.sparkHome)

'None'

In [8]:
str(sc.sparkUser())

'vaibv'

In [9]:
sc.appName

'pyspark'

In [10]:
sc.applicationId 

'local-1547096948542'

In [11]:
sc.defaultParallelism

1

In [12]:
sc.defaultMinPartitions

1

In [13]:
##############################           configuration           #################################

In [14]:
from pyspark import SparkConf, SparkContext
conf = (SparkConf().setMaster("local").setAppName("My app").set("spark.executor.memory", "1g"))

In [15]:
#Loading Data

In [16]:
#Parallelized Collections

rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([("a",["x","y","z"]), ("b",["p", "r"])])

In [17]:
#External Data

#textFile = sc.textFile("/my/directory/*.txt")
#textFile2 = sc.wholeTextFiles("/my/directory/")

In [18]:
##################      Retrieving RDD Information          ######################## 

In [19]:
#Basic Information

In [20]:
rdd.getNumPartitions()

1

In [21]:
rdd.count()

3

In [22]:
rdd.countByKey()

defaultdict(int, {'a': 2, 'b': 1})

In [85]:
print(rdd2.countByValue()) #outputs distinct tuples
sc.parallelize([1,2,3,2]).countByValue()

defaultdict(<class 'int'>, {('a', 2): 1, ('d', 1): 1, ('b', 1): 1})


defaultdict(int, {1: 1, 2: 2, 3: 1})

In [24]:
rdd.collectAsMap() #doesn't give multiple values for a key in order to reduce strain on driver

{'a': 2, 'b': 2}

In [25]:
rdd3.sum() 

4950

In [26]:
sc.parallelize([]).isEmpty()

True

In [27]:
#Summary

In [28]:
rdd3.max() 

99

In [29]:
rdd3.min() 

0

In [30]:
rdd3.mean() 

49.5

In [31]:
rdd3.stdev() 

28.86607004772212

In [32]:
rdd3.variance() 

833.25

In [33]:
rdd3.histogram(3) 

([0, 33, 66, 99], [33, 33, 34])

In [34]:
rdd3.stats() 

(count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99.0, min: 0.0)

In [35]:
#Applying Functions

In [36]:
rdd.map(lambda x: x+(x[1],x[0]))

PythonRDD[18] at RDD at PythonRDD.scala:49

In [37]:
rdd.map(lambda x: x+(x[1],x[0])).collect() #After using collect

[('a', 7, 7, 'a'), ('a', 2, 2, 'a'), ('b', 2, 2, 'b')]

In [38]:
rdd5 = rdd.flatMap(lambda x: x+(x[1],x[0]))

In [39]:
rdd5.collect()

['a', 7, 7, 'a', 'a', 2, 2, 'a', 'b', 2, 2, 'b']

In [40]:
rdd4.mapValues(lambda x: x+x).collect()

[('a', ['x', 'y', 'z', 'x', 'y', 'z']), ('b', ['p', 'r', 'p', 'r'])]

In [41]:
rdd4.flatMapValues(lambda x: x+x).collect() #key remains same and value changes

[('a', 'x'),
 ('a', 'y'),
 ('a', 'z'),
 ('a', 'x'),
 ('a', 'y'),
 ('a', 'z'),
 ('b', 'p'),
 ('b', 'r'),
 ('b', 'p'),
 ('b', 'r')]

In [42]:
#######################                Selecting Data                 #################################

In [43]:
#Getting

In [44]:
rdd.collect() 

[('a', 7), ('a', 2), ('b', 2)]

In [45]:
rdd.take(2) 

[('a', 7), ('a', 2)]

In [46]:
rdd.first()

('a', 7)

In [47]:
rdd.top(2)

[('b', 2), ('a', 7)]

In [48]:
#Filtering

In [49]:
rdd.filter(lambda x: "a" in x).collect() 

[('a', 7), ('a', 2)]

In [50]:
rdd5.distinct().collect()

['a', 7, 2, 'b']

In [51]:
rdd.keys().collect()

['a', 'a', 'b']

In [52]:
def g(x): print(x)  

In [53]:
rdd.foreach(g)  #prints on jupyter command line

In [54]:
############################               Reshaping Data          ###########################

In [55]:
#Reducing

In [56]:
rdd.reduceByKey(lambda acc,val : acc+val).collect() #acc= 0 initially

[('a', 9), ('b', 2)]

In [57]:
rdd.reduce(lambda a, b: a + b)

('a', 7, 'a', 2, 'b', 2)

In [58]:
rdd3.groupBy(lambda x: x % 2).collect()

[(0, <pyspark.resultiterable.ResultIterable at 0x1f50fc036a0>),
 (1, <pyspark.resultiterable.ResultIterable at 0x1f50fc036d8>)]

In [59]:
rdd3.groupBy(lambda x: x % 2).mapValues(list).collect()

[(0,
  [0,
   2,
   4,
   6,
   8,
   10,
   12,
   14,
   16,
   18,
   20,
   22,
   24,
   26,
   28,
   30,
   32,
   34,
   36,
   38,
   40,
   42,
   44,
   46,
   48,
   50,
   52,
   54,
   56,
   58,
   60,
   62,
   64,
   66,
   68,
   70,
   72,
   74,
   76,
   78,
   80,
   82,
   84,
   86,
   88,
   90,
   92,
   94,
   96,
   98]),
 (1,
  [1,
   3,
   5,
   7,
   9,
   11,
   13,
   15,
   17,
   19,
   21,
   23,
   25,
   27,
   29,
   31,
   33,
   35,
   37,
   39,
   41,
   43,
   45,
   47,
   49,
   51,
   53,
   55,
   57,
   59,
   61,
   63,
   65,
   67,
   69,
   71,
   73,
   75,
   77,
   79,
   81,
   83,
   85,
   87,
   89,
   91,
   93,
   95,
   97,
   99])]

In [60]:
rdd.groupByKey().mapValues(list).collect()

[('a', [7, 2]), ('b', [2])]

In [61]:
# Aggregating

In [62]:
seqOp = (lambda x,y: (x[0]+y,x[1]+1)) #Here x will be (0,0) and y will be first num (0) 
combOp = (lambda x,y:(x[0]+y[0],x[1]+y[1])) 
rdd3.aggregate((0,0),seqOp,combOp)

(4950, 100)

In [63]:
rdd.aggregateByKey((0,0),seqOp,combOp).collect()

[('a', (9, 2)), ('b', (2, 1))]

In [87]:
print(rdd3.fold(0,lambda x,y:x+y)) 
print(rdd3.reduce(lambda x,y:x+y))

4950
4950


In [65]:
rdd.foldByKey(0, lambda x,y:x+y).collect()

[('a', 9), ('b', 2)]

In [66]:
rdd3.keyBy(lambda x: x+x).collect()  

[(0, 0),
 (2, 1),
 (4, 2),
 (6, 3),
 (8, 4),
 (10, 5),
 (12, 6),
 (14, 7),
 (16, 8),
 (18, 9),
 (20, 10),
 (22, 11),
 (24, 12),
 (26, 13),
 (28, 14),
 (30, 15),
 (32, 16),
 (34, 17),
 (36, 18),
 (38, 19),
 (40, 20),
 (42, 21),
 (44, 22),
 (46, 23),
 (48, 24),
 (50, 25),
 (52, 26),
 (54, 27),
 (56, 28),
 (58, 29),
 (60, 30),
 (62, 31),
 (64, 32),
 (66, 33),
 (68, 34),
 (70, 35),
 (72, 36),
 (74, 37),
 (76, 38),
 (78, 39),
 (80, 40),
 (82, 41),
 (84, 42),
 (86, 43),
 (88, 44),
 (90, 45),
 (92, 46),
 (94, 47),
 (96, 48),
 (98, 49),
 (100, 50),
 (102, 51),
 (104, 52),
 (106, 53),
 (108, 54),
 (110, 55),
 (112, 56),
 (114, 57),
 (116, 58),
 (118, 59),
 (120, 60),
 (122, 61),
 (124, 62),
 (126, 63),
 (128, 64),
 (130, 65),
 (132, 66),
 (134, 67),
 (136, 68),
 (138, 69),
 (140, 70),
 (142, 71),
 (144, 72),
 (146, 73),
 (148, 74),
 (150, 75),
 (152, 76),
 (154, 77),
 (156, 78),
 (158, 79),
 (160, 80),
 (162, 81),
 (164, 82),
 (166, 83),
 (168, 84),
 (170, 85),
 (172, 86),
 (174, 87),
 (176, 88

In [67]:
rdd.subtract(rdd2).collect()

[('a', 7), ('b', 2)]

In [68]:
#Return each (key,value) pair of rdd2 with no matching key in rdd
rdd2.subtractByKey(rdd).collect()

[('d', 1)]

In [69]:
rdd.cartesian(rdd2).collect() 

[(('a', 7), ('a', 2)),
 (('a', 7), ('d', 1)),
 (('a', 7), ('b', 1)),
 (('a', 2), ('a', 2)),
 (('a', 2), ('d', 1)),
 (('a', 2), ('b', 1)),
 (('b', 2), ('a', 2)),
 (('b', 2), ('d', 1)),
 (('b', 2), ('b', 1))]

In [70]:
#Sort

In [71]:
rdd2.sortBy(lambda x: x[1]).collect()

[('d', 1), ('b', 1), ('a', 2)]

In [72]:
rdd2.sortByKey().collect()

[('a', 2), ('b', 1), ('d', 1)]

In [73]:
rdd.toDebugString

<bound method RDD.toDebugString of ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:184>

In [74]:
#Repartitioning 

rdd.repartition(4)
rdd.coalesce(1)
#sc.stop()

CoalescedRDD[92] at coalesce at NativeMethodAccessorImpl.java:0