### Resilient Distributed Datasets(RDDs) 
- PySpark (python api for spark) loads datasets into the form of RDDs which are the foundational data strucure of sparks.
- RDDs are fault tolerant collections of elements partitioned across the nodes of a cluster than can be operated on in parallel



In [13]:
from pyspark.sql import SparkSession
student_data = [("Chris",1523,0.72,"CA"),
                ("Jake", 1555,0.83,"NY"),
                ("Cody", 1439,0.92,"CA"),
                ("Lisa",1442,0.81,"FL"),
                ("Daniel",1600,0.88,"TX"),
                ("Kelvin",1382,0.99,"FL"),
                ("Nancy",1442,0.74,"TX"),
                ("Pavel",1599,0.82,"NY"),
                ("Josh",1482,0.78,"CA"),
                ("Cynthia",1582,0.94,"CA")]

spark = SparkSession.builder.getOrCreate()
# confirm your session is built
print(spark)

<pyspark.sql.session.SparkSession object at 0x000001D2D5E6BDC0>


In [21]:
# Initialise RDD by parallelizing student_data with 5 partitions
student_rdd = spark.sparkContext.parallelize(student_data, 12) 

# confirm your RDD contains the correct data by colelcting them and displaying
student_rdd.collect()

[('Chris', 1523, 0.72, 'CA'),
 ('Jake', 1555, 0.83, 'NY'),
 ('Cody', 1439, 0.92, 'CA'),
 ('Lisa', 1442, 0.81, 'FL'),
 ('Daniel', 1600, 0.88, 'TX'),
 ('Kelvin', 1382, 0.99, 'FL'),
 ('Nancy', 1442, 0.74, 'TX'),
 ('Pavel', 1599, 0.82, 'NY'),
 ('Josh', 1482, 0.78, 'CA'),
 ('Cynthia', 1582, 0.94, 'CA')]

In [22]:
# Displays number of partitions an RDD is assigned
student_rdd.getNumPartitions()

12

## Transformations

In [23]:
# Transform the third column (index 2) by converting decimal into percentage, across all data in the RDD's various partitions
rdd_transformation = student_rdd.map(lambda x: (x[0], x[1], int(x[2] * 100), x[3]))

# confirm transformation is correct
rdd_transformation.collect()

[('Chris', 1523, 72, 'CA'),
 ('Jake', 1555, 83, 'NY'),
 ('Cody', 1439, 92, 'CA'),
 ('Lisa', 1442, 81, 'FL'),
 ('Daniel', 1600, 88, 'TX'),
 ('Kelvin', 1382, 99, 'FL'),
 ('Nancy', 1442, 74, 'TX'),
 ('Pavel', 1599, 82, 'NY'),
 ('Josh', 1482, 78, 'CA'),
 ('Cynthia', 1582, 94, 'CA')]

In [20]:
# Apply a filter() to return all data across RDD with score > 80
rdd_filtered = rdd_transformation.filter(lambda x: x[2] > 80)
rdd_filtered.collect()

[('Jake', 1555, 83, 'NY'),
 ('Cody', 1439, 92, 'CA'),
 ('Lisa', 1442, 81, 'FL'),
 ('Daniel', 1600, 88, 'TX'),
 ('Kelvin', 1382, 99, 'FL'),
 ('Pavel', 1599, 82, 'NY'),
 ('Cynthia', 1582, 94, 'CA')]

## Actions


In [25]:
# View first 5 elements of rdd_transformation:
rdd_transformation.take(5)

[('Chris', 1523, 72, 'CA'),
 ('Jake', 1555, 83, 'NY'),
 ('Cody', 1439, 92, 'CA'),
 ('Lisa', 1442, 81, 'FL'),
 ('Daniel', 1600, 88, 'TX')]

In [24]:
# Sum grades in rdd_transformation
sum_gpa = rdd_transformation.map(lambda x: x[2]).reduce(lambda x, y: x+y)
print(sum_gpa)

# Average grade:
sum_gpa / rdd_transformation.count()

843


84.3