In [14]:
# install pyspark
!pip install pyspark --quiet

In [15]:
#jdk
import os
os.environ["JAVA_HOME"] = '/lib/jvm/java-11-openjdk-amd64'

In [16]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().set('spark.ui.port','4050').setAppName('films').setMaster('local[2]')
sc = SparkContext.getOrCreate(conf=conf)

In [17]:
movie_list = ['RRR','Pushpa','Linga','Brahmayugam','love AajKal']
type(movie_list)

list

In [18]:
movie_rdd = sc.parallelize(movie_list)
type(movie_rdd)

In [19]:
movie_rdd

ParallelCollectionRDD[8] at readRDDFromFile at PythonRDD.scala:289

In [20]:
movie_rdd.collect()

['RRR', 'Pushpa', 'Linga', 'Brahmayugam', 'love AajKal']

In [21]:
type(movie_rdd.collect())

list

In [22]:
movie_rdd.glom().collect()   #glom() transforms each partition of your RDD into a list.
#It groups the elements within each partition into sublists, so you can inspect how data is distributed.

[['RRR', 'Pushpa'], ['Linga', 'Brahmayugam', 'love AajKal']]

In [44]:
conf = SparkConf().set('spark.ui.port','4051').setAppName('test1').setMaster('local[8]')
sc = SparkContext.getOrCreate(conf=conf)
#rdd1 = sc.parallelize([2,3,4,5,6,7,11])
rdd1 = sc.parallelize([2,3,4,5,6,7,11],8)
rdd1.glom().collect()

[[], [2], [3], [4], [5], [6], [7], [11]]

In [45]:
rdd1.stats()

(count: 7, mean: 5.428571428571429, stdev: 2.770102775666474, max: 11.0, min: 2.0)

In [46]:
rdd1.getNumPartitions()

8

In [48]:
rdd1.coalesce(2).glom().collect()    #coalesce(n) is used to reduce the number of partitions in an RDD or DataFrame.
#No shuffling involved
#Considered a narrow transformation. Each input partition contributes to only one output partition. Quick, efficient

[[2, 3, 4], [5, 6, 7, 11]]

In [49]:
#Repartition is used when partitions need to be increased or decreased. Here shuffling is involved.
#Wide transformation. Each input partition contributes to multiple output partitions.
rdd1.repartition(12).glom().collect()

[[4], [], [5, 6], [7], [2], [3, 11], [], [], [], [], [], []]

In [24]:
movie_rdd.take(2)

['RRR', 'Pushpa']

In [25]:
movie_list[0].title()

'Rrr'

In [26]:
movie_rdd.collect()[0].title()

'Rrr'

In [27]:
movie_rdd.collect()[-3:]

['Linga', 'Brahmayugam', 'love AajKal']

In [28]:
transform = lambda i:i.title()
movie_title_rdd = movie_rdd.map(transform)
movie_title_rdd.collect()

['Rrr', 'Pushpa', 'Linga', 'Brahmayugam', 'Love Aajkal']

In [29]:
transform1 = lambda i: i[0]=='l'
movie_rdd.filter(transform1).collect()

['love AajKal']

In [30]:
# Actions
# Count the number of elements in the RDD
num_movies = movie_rdd.count()
print(f"Number of movies: {num_movies}")
#print('Number of movies:', num_movies)

# Find the first element in the RDD
first_movie = movie_rdd.first()
print(f"First movie: {first_movie}")

# Reduce the RDD to a single value (e.g., concatenate all movie titles)
concatenated_titles = movie_rdd.reduce(lambda x, y: x + ", " + y)
print(f"Concatenated movie titles: {concatenated_titles}")


# Transformations
# Map each movie title to its length
movie_lengths = movie_rdd.map(lambda x: len(x))
print("Movie lengths:", movie_lengths.collect())


# Filter movies with titles longer than 5 characters
long_movie_titles = movie_rdd.filter(lambda x: len(x) > 5)
print("Long movie titles:", long_movie_titles.collect())

# FlatMap: Explode a list into individual elements
movie_chars_rdd = movie_rdd.flatMap(lambda x : list(x))
print("Characters:",movie_chars_rdd.collect())

# Distinct elements
print("Distinct movie characters:",movie_chars_rdd.distinct().collect())

# GroupByKey: Group the elements based on a key
# We need key-value pairs for this transformation.
# Let's create a new RDD
movie_with_length = movie_rdd.map(lambda x: (x, len(x)))
grouped_movies = movie_with_length.groupByKey().mapValues(list).collect()
print("Movies grouped by length:", grouped_movies)

# SortByKey
sorted_movies = movie_with_length.sortByKey().collect()
print("Movies sorted by Title:", sorted_movies)

Number of movies: 5
First movie: RRR
Concatenated movie titles: RRR, Pushpa, Linga, Brahmayugam, love AajKal
Movie lengths: [3, 6, 5, 11, 11]
Long movie titles: ['Pushpa', 'Brahmayugam', 'love AajKal']
Characters: ['R', 'R', 'R', 'P', 'u', 's', 'h', 'p', 'a', 'L', 'i', 'n', 'g', 'a', 'B', 'r', 'a', 'h', 'm', 'a', 'y', 'u', 'g', 'a', 'm', 'l', 'o', 'v', 'e', ' ', 'A', 'a', 'j', 'K', 'a', 'l']
Distinct movie characters: ['R', 'p', 'L', 'g', 'l', 'o', ' ', 'A', 'j', 'K', 'P', 'u', 's', 'h', 'a', 'i', 'n', 'B', 'r', 'm', 'y', 'v', 'e']
Movies grouped by length: [('Pushpa', [6]), ('Linga', [5]), ('RRR', [3]), ('Brahmayugam', [11]), ('love AajKal', [11])]
Movies sorted by Title: [('Brahmayugam', 11), ('Linga', 5), ('Pushpa', 6), ('RRR', 3), ('love AajKal', 11)]


In [31]:
rdd1 = sc.parallelize([2,3,4,5,6,7,11])

In [32]:
rdd2 = rdd1.map(lambda x: x * 2)
rdd2.collect()

[4, 6, 8, 10, 12, 14, 22]

In [33]:
rdd2 = rdd1.filter(lambda x: x > 10)
rdd2.collect()

[11]

In [34]:
rdd3 = sc.parallelize([[2,3,4],[5,6,7],[11,12,13]])
rdd2 = rdd3.map(lambda iter: [x * 2 for x in iter])
rdd2.collect()

[[4, 6, 8], [10, 12, 14], [22, 24, 26]]

In [35]:
rdd3 = sc.parallelize([[2,3,4],[5,6,7],[11,12,13]])
rdd2 = rdd3.mapPartitions(lambda iter: [x * 2 for x in iter])
rdd2.collect()


[[2, 3, 4, 2, 3, 4], [5, 6, 7, 5, 6, 7], [11, 12, 13, 11, 12, 13]]

In [36]:
rdd2 = rdd1.union(rdd3)
rdd2.collect()

[2, 3, 4, 5, 6, 7, 11, [2, 3, 4], [5, 6, 7], [11, 12, 13]]

In [37]:
rdd3 = sc.parallelize([7,11,12,13])
rdd2 = rdd1.intersection(rdd3)
rdd2.collect()

[7, 11]

In [38]:
rdd3 = sc.parallelize([7,7,11,12,13])
rdd2 = rdd3.distinct()
rdd2.collect()

[12, 7, 11, 13]

In [39]:
rdd2 = rdd1.groupBy(lambda x: x % 2)
rdd2.collect()

[(0, <pyspark.resultiterable.ResultIterable at 0x7c45a03fcb90>),
 (1, <pyspark.resultiterable.ResultIterable at 0x7c45a15d4510>)]

In [40]:
rdd1.groupBy(lambda x: x % 2).mapValues(list).collect()

[(0, [2, 4, 6]), (1, [3, 5, 7, 11])]

In [41]:
rdd1.collect()

[2, 3, 4, 5, 6, 7, 11]

In [42]:
result = rdd1.count()
result

7

In [43]:
result = rdd1.top(3)
result

[11, 7, 6]