In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MoreExamples").getOrCreate()

In [4]:
words_string = "Spark is an awesome language for bigdata. Spark is used for bigdata"
print(words_string)

Spark is an awesome language for bigdata. Spark is used for bigdata


In [5]:
words_list = words_string.split(" ")
print(words_list)

['Spark', 'is', 'an', 'awesome', 'language', 'for', 'bigdata.', 'Spark', 'is', 'used', 'for', 'bigdata']


In [11]:
words_rdd = spark.sparkContext.parallelize(words_list)
words_rdd_list = words_rdd.collect()
print(words_rdd)

ParallelCollectionRDD[2] at readRDDFromFile at PythonRDD.scala:274


In [12]:
for word in words_rdd_list:
    print (word)

Spark
is
an
awesome
language
for
bigdata.
Spark
is
used
for
bigdata


In [15]:
words_rdd_distinct = words_rdd.distinct()
print(words_rdd_distinct.collect())

['bigdata.', 'awesome', 'bigdata', 'an', 'Spark', 'language', 'for', 'used', 'is']


In [16]:
### Display words that starts wih letter s
def matchingLetter(word, letter):
    return word.startswith(letter)

In [18]:
words_rdd.filter(lambda word: matchingLetter(word,'S')).collect()

['Spark', 'Spark']

In [20]:
range(1,21)

range(1, 21)

In [24]:
num_list = [*range(1,30)]
print(num_list)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]


In [26]:
nums_rdd = spark.sparkContext.parallelize(num_list)
print(nums_rdd)

ParallelCollectionRDD[19] at readRDDFromFile at PythonRDD.scala:274


In [27]:
### SQUARE OF THOSE NUMBERS 
nums_sq_rdd = nums_rdd.map(lambda val: val*val).collect()
print(nums_sq_rdd)

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841]


In [28]:
### FROM WORDS_RDD CAPTUES (SPARK,S,TRUE) DETAILS USING LAMBDA

words_transformed_rdd = words_rdd.map(lambda word : (word,word[0],matchingLetter(word,'s')))

In [30]:
print(words_transformed_rdd.collect())

[('Spark', 'S', False), ('is', 'i', False), ('an', 'a', False), ('awesome', 'a', False), ('language', 'l', False), ('for', 'f', False), ('bigdata.', 'b', False), ('Spark', 'S', False), ('is', 'i', False), ('used', 'u', False), ('for', 'f', False), ('bigdata', 'b', False)]


In [32]:
for word in words_transformed_rdd.collect():
    print(word)

('Spark', 'S', False)
('is', 'i', False)
('an', 'a', False)
('awesome', 'a', False)
('language', 'l', False)
('for', 'f', False)
('bigdata.', 'b', False)
('Spark', 'S', False)
('is', 'i', False)
('used', 'u', False)
('for', 'f', False)
('bigdata', 'b', False)


In [33]:
### FLAT_MAP IS A SIMPLE EXTENSION OF MAP 
words_flatmap_rdd = words_rdd.flatMap(lambda word: list(word)).take(20)
print(words_flatmap_rdd)

['S', 'p', 'a', 'r', 'k', 'i', 's', 'a', 'n', 'a', 'w', 'e', 's', 'o', 'm', 'e', 'l', 'a', 'n', 'g']


In [38]:
for word in words_rdd.collect():
    print(list(word))

['S', 'p', 'a', 'r', 'k']
['i', 's']
['a', 'n']
['a', 'w', 'e', 's', 'o', 'm', 'e']
['l', 'a', 'n', 'g', 'u', 'a', 'g', 'e']
['f', 'o', 'r']
['b', 'i', 'g', 'd', 'a', 't', 'a', '.']
['S', 'p', 'a', 'r', 'k']
['i', 's']
['u', 's', 'e', 'd']
['f', 'o', 'r']
['b', 'i', 'g', 'd', 'a', 't', 'a']


In [51]:
### using SORTBY 

country_ranking = [("India",20),("UK",50),("USA",1),("Russia",200)]
country_ranking_rdd = spark.sparkContext.parallelize(country_ranking)
country_ranking_rdd.collect()

[('India', 20), ('UK', 50), ('USA', 1), ('Russia', 200)]

In [49]:
country_ranking_rdd.collect()[0]

('India', 20)

In [52]:
country_ranking_rdd.sortByKey().collect()
#### Sorting by the Alpabetical order

[('India', 20), ('Russia', 200), ('UK', 50), ('USA', 1)]

In [56]:
country_ranking_rdd.map(lambda val: (val[1],val[0])).sortByKey(False).collect()
#### SORTING BY THE RANKING BY IT IS ACHIEVED MY 
#### REPLACING THE / SWITCHING THE NAME AND RAKING VALUES VIA MAP AND 
### DID THE SORTING ON TOP OF IT 

[(200, 'Russia'), (50, 'UK'), (20, 'India'), (1, 'USA')]

In [58]:
country_ranking_rdd.sortBy(lambda val: val[1]).collect()

[('USA', 1), ('India', 20), ('UK', 50), ('Russia', 200)]

In [59]:
country_ranking_rdd.sortBy(lambda val: val[0]).collect()

[('India', 20), ('Russia', 200), ('UK', 50), ('USA', 1)]

In [65]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
tmp[1]

('b', 2)

In [63]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
spark.sparkContext.parallelize(tmp).sortBy(lambda x: x[0]).collect()
### [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]


[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

In [62]:
spark.sparkContext.parallelize(tmp).sortBy(lambda x: x[1]).collect()
### [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]