### Pyspark RDD

1.Create rdd from list

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tst').getOrCreate()
li=[1,2,3,4]
rdd = spark.sparkContext.parallelize(li)
rdd1 = rdd.map(lambda x: x * 2)
print(rdd.collect())
print(rdd1.collect())

[1, 2, 3, 4]
[2, 4, 6, 8]


2.From Pair RDDs

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pair-rdd').getOrCreate()
data = [("Ram", 3), ("Meena", 2), ("Tharan", 5)]
rdd = spark.sparkContext.parallelize(data)
print(rdd.collect())

[('Ram', 3), ('Meena', 2), ('Tharan', 5)]


3.convert a DataFrame to an RDD using the rdd method.

In [0]:
data = [("Lakshmi", 25), ("Karthi", 30), ("Jovikha", 22)]
df=spark.createDataFrame(data,['Name','age'])
print(df.show())
print(type(df))
rdd1=df.rdd
print(rdd1.collect())
print(type(rdd1))


+-------+---+
|   Name|age|
+-------+---+
|Lakshmi| 25|
| Karthi| 30|
|Jovikha| 22|
+-------+---+

None
<class 'pyspark.sql.dataframe.DataFrame'>
[Row(Name='Lakshmi', age=25), Row(Name='Karthi', age=30), Row(Name='Jovikha', age=22)]
<class 'pyspark.rdd.RDD'>


4. from file

In [0]:
file_loc='dbfs:/FileStore/tables/single_line.txt'
display(dbutils.fs.ls("/FileStore/tables/single_line.txt"))
rdd=spark.sparkContext.textFile(file_loc)
print(rdd.collect())

path,name,size,modificationTime
dbfs:/FileStore/tables/single_line.txt,single_line.txt,60,1696233365000


['Lakshmipathy|30|M|Alagu|27|F|Meena|32|F|Varshi|6|M|Krish|3|M']


rdd transformation

In [0]:
sc= spark.sparkContext
rdd = sc.parallelize([1, 2, 3, 4, 5])
mapped_rdd = rdd.map(lambda x: x * 2)
print(mapped_rdd.collect())

[2, 4, 6, 8, 10]


In [0]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
print(filtered_rdd.collect())

[2, 4]


In [0]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
flat_mapped_rdd = rdd.flatMap(lambda x: (x, x * 2))
print(flat_mapped_rdd.collect())

[1, 2, 2, 4, 3, 6, 4, 8, 5, 10]


In [0]:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([4, 5, 6])
union_rdd = rdd1.union(rdd2)
print(union_rdd.collect())

[1, 2, 3, 4, 5, 6]


In [0]:
rdd = sc.parallelize([1, 2, 3, 1, 2, 4])
distinct_rdd = rdd.distinct()
print(distinct_rdd.collect())

[1, 2, 3, 4]


In [0]:
pair_rdd = sc.parallelize([(1, 'apple'), (2, 'banana'), (1, 'orange')])
grouped_rdd = pair_rdd.groupByKey()
grouped_rdd.collect()

Out[61]: [(1, <pyspark.resultiterable.ResultIterable at 0x7f95dc720070>),
 (2, <pyspark.resultiterable.ResultIterable at 0x7f95dc5a84c0>)]

In [0]:
pair_rdd = sc.parallelize([(1, 3), (2, 5), (1, 2)])
reduced_rdd = pair_rdd.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()

Out[62]: [(1, 5), (2, 5)]

In [0]:
pair_rdd = sc.parallelize([(3, 'bala'), (1, 'lax'), (2, 'Arjun')])
sorted_rdd = pair_rdd.sortByKey()
sorted_rdd.collect()

Out[64]: [(1, 'lax'), (2, 'Arjun'), (3, 'bala')]

In [0]:
rdd1 = sc.parallelize([(1, 'india'), (2, 'srilanka')])
rdd2 = sc.parallelize([(1, 3), (2, 5)])
joined_rdd = rdd1.join(rdd2)
joined_rdd.collect()

Out[65]: [(1, ('india', 3)), (2, ('srilanka', 5))]

In [0]:
rdd1 = sc.parallelize([(1, 'apple'), (2, 'banana')])
rdd2 = sc.parallelize([(1, 3), (2, 5), (1, 2)])
cogrouped_rdd = rdd1.cogroup(rdd2)
cogrouped_rdd.collect()

Out[67]: [(1,
  (<pyspark.resultiterable.ResultIterable at 0x7f95e4719400>,
   <pyspark.resultiterable.ResultIterable at 0x7f95e4988190>)),
 (2,
  (<pyspark.resultiterable.ResultIterable at 0x7f95e49887f0>,
   <pyspark.resultiterable.ResultIterable at 0x7f95e4988970>))]

In [0]:
result = rdd.collect()
print(result)

[1, 2, 3, 1, 2, 4]


In [0]:
count = rdd.count()
print(count)

6


In [0]:
first_element = rdd.first()
print(first_element)

1


In [0]:
first_n_elements = rdd.take(3)
print(first_n_elements)

[1, 2, 3]


In [0]:
top_elements = rdd.top(3)
print(top_elements)

[4, 3, 2]


In [0]:
from operator import add
print(rdd.collect())
total_sum = rdd.reduce(add)
print(total_sum)


[1, 2, 3, 1, 2, 4]
13
