In [0]:
#common 1st 4 lines

from pyspark import SparkContext

from pyspark.sql import SparkSession

sc=SparkContext.getOrCreate()

spark=SparkSession.builder.appName('pyspark first program').getOrCreate()


In [0]:
#create rdd
rdd = sc.parallelize([('C',85,76,87,91), ('B',85,76,87,91), ("A", 85,78,96,92), ("A", 92,76,89,96)])
print(type(rdd))
# Where type(rdd) is to check rdd creation.

<class 'pyspark.core.rdd.RDD'>


In [0]:
# creating data frames from rrd.

rdd=sc.parallelize ([('C',85,76,87,91), ('B',85,76,87,51), ("A", 85,78,96,92), ("A", 92,76,89,96)],4)

sub=['Division','English','Mathematics','Physics','Chemistry']

marks_df=spark.createDataFrame(rdd, schema=sub)

print(type(marks_df))

print(rdd)

marks_df.show()

marks_df.printSchema()


<class 'pyspark.sql.dataframe.DataFrame'>
ParallelCollectionRDD[1] at readRDDFromInputStream at PythonRDD.scala:450
+--------+-------+-----------+-------+---------+
|Division|English|Mathematics|Physics|Chemistry|
+--------+-------+-----------+-------+---------+
|       C|     85|         76|     87|       91|
|       B|     85|         76|     87|       51|
|       A|     85|         78|     96|       92|
|       A|     92|         76|     89|       96|
+--------+-------+-----------+-------+---------+

root
 |-- Division: string (nullable = true)
 |-- English: long (nullable = true)
 |-- Mathematics: long (nullable = true)
 |-- Physics: long (nullable = true)
 |-- Chemistry: long (nullable = true)



In [0]:
rdd.collect()

[('C', 85, 76, 87, 91),
 ('B', 85, 76, 87, 51),
 ('A', 85, 78, 96, 92),
 ('A', 92, 76, 89, 96)]

In [0]:
# creating a new table

spark = SparkSession.builder.appName('pyspark_ex').getOrCreate()

data = [('James','Smith','M',3000),
  ('Anna','Rose','F',4100),
  ('Robert','Williams','M',6200), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()


+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|  3000|
|     Anna|    Rose|     F|  4100|
|   Robert|Williams|     M|  6200|
+---------+--------+------+------+



# Actions in PySpark RDDs

In [0]:
# 1. The .collect() Action
collect_rdd = sc.parallelize([1,2,3,4,5])
print(collect_rdd.collect())

[1, 2, 3, 4, 5]


In [0]:
# 2. The .count() Action

from pyspark import SparkContext
sc = SparkContext.getOrCreate()
count_rdd = sc.parallelize([1,2,3,4,5,5,6,7,8,9])
print(count_rdd.count())

10


In [0]:
# 3. The .first() Action

from pyspark import SparkContext
sc = SparkContext.getOrCreate()
count_rdd = sc.parallelize([1,2,3,4,5,5,6,7,8,9])
print(count_rdd.count())
first_rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
print(first_rdd.first())

10
1


In [0]:
# 4. The .take() Action
take_rdd = sc.parallelize([1,2,3,4,5])
print(take_rdd.take(3))

[1, 2, 3]


In [0]:
# 5. The .reduce() Action

from pyspark import SparkContext
sc = SparkContext.getOrCreate()
reduce_rdd = sc.parallelize([1,3,4,6])
print(reduce_rdd.reduce(lambda x, y : x + y))


14


In [0]:
# 6. The .saveAsTextFile() Action

from pyspark import SparkContext
sc = SparkContext.getOrCreate()
save_rdd = sc.parallelize([1,2,3,4,5,6])
save_rdd.saveAsTextFile('file.txt')

# Transformations in PySpark RDDs

In [0]:
# 1. The .map() Transformation

my_rdd = sc.parallelize([1,2,3,4])
print(my_rdd.map(lambda x: x+ 10).collect())

[11, 12, 13, 14]


In [0]:
# 2. The .filter() Transformation

filter_rdd = sc.parallelize([2, 3, 4, 5, 6, 7])
print(filter_rdd.filter(lambda x: x%2 == 0).collect())

[2, 4, 6]


In [0]:
# 3. The .union() Transformation

union_inp = sc.parallelize([2,4,5,6,7,8,9])
union_rdd_1 = union_inp.filter(lambda x: x % 2 == 0)
union_rdd_2 = union_inp.filter(lambda x: x % 3 == 0)
print(union_rdd_1.union(union_rdd_2).collect())

[2, 4, 6, 8, 6, 9]


In [0]:
# 4. The .flatMap() Transformation

flatmap_rdd = sc.parallelize(["Hey there", "This is PySpark RDD Transformations"])
(flatmap_rdd.flatMap(lambda x: x.split(" ")).collect())

['Hey', 'there', 'This', 'is', 'PySpark', 'RDD', 'Transformations']

# Pair RDD Operations

In [0]:
# create Pair RDDs in PySpark.
# tuple
marks = [('Rahul', 88), ('Swati', 92), ('Shreya', 83), ('Abhay', 93), ('Rohan', 78)]
sc.parallelize(marks).collect()


[('Rahul', 88), ('Swati', 92), ('Shreya', 83), ('Abhay', 93), ('Rohan', 78)]

# Actions in Pair RDDs

In [0]:
# 1. The countByKey() Action

marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Rohan', 22), ('Rahul', 23), ('Swati', 19),
('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.countByKey().items()
for key, value in dict_rdd:
    print(key, value)

Rahul 2
Swati 2
Rohan 2
Shreya 1
Abhay 1


# Transformations in Pair RDDs

In [0]:
# 1. The .reduceByKey() Transformation

marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22),
('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
print(marks_rdd.reduceByKey(lambda x, y: x + y).collect())

[('Rohan', 44), ('Rahul', 48), ('Swati', 45), ('Shreya', 50), ('Abhay', 55)]


In [0]:
# 2. The .sortByKey() Transformation

marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22),
('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
print(marks_rdd.sortByKey('ascending').collect())

[('Abhay', 29), ('Abhay', 26), ('Rahul', 25), ('Rahul', 23), ('Rohan', 22), ('Rohan', 22), ('Shreya', 22), ('Shreya', 28), ('Swati', 26), ('Swati', 19)]


In [0]:
# 3. The .groupByKey() Transformation

marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22),
('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.groupByKey().collect()
for key, value in dict_rdd:
    print(key, list(value))

Rohan [22, 22]
Rahul [25, 23]
Swati [26, 19]
Shreya [22, 28]
Abhay [29, 26]
