Resilient Distributed Datasets (RDDs) are a distributed collection of immutable JVM objects that allow you to perform calculations very quickly, and they are the backbone of Apache Spark.

the dataset is distributed; it is split into chunks based on some key and distributed to executor nodes.

RDDs keep track (log) of all the transformations applied to each chunk to speed up the computations and provide a fallback if things go wrong


In [1]:
import re
import numpy as np
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName("CDSC").getOrCreate()
sc = spark.sparkContext

##### Creating RDDs

In [2]:
data = sc.parallelize(
    [
        ('Amber', 22),
        ('Alfred', 23),
        ('Skye', 4),
        ('Albert', 12),
        ('Amber', 9)
    ]
)
data

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

sc.textFile(..., n) specifies the number of partitions the data dataset is divided into.

Spark can read from multitude of filesystems

Local
    
        * NTFS
        * FAT
        * HFS+
        
Distributed FS
    
        * HDFS
        * Cassandra
        
Spark can automatically work with compressed datasets

In [3]:
data_from_file = sc.\
    textFile(
        '/home/ictpuser/Downloads/VS14MORT.txt.gz',4
)
data_from_file

/home/ictpuser/Downloads/VS14MORT.txt.gz MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:0

##### Schema

RDDs are schema-less data structures

In [4]:
data_heterogenous = sc.parallelize([
    ('Ferrari', 'fast'),
    {'porsche': 100000},
    ['spain', 'visited, 4504']
]).collect()
data_heterogenous

[('Ferrari', 'fast'), {'porsche': 100000}, ['spain', 'visited, 4504']]

In [5]:
data_heterogenous[1]['porsche']

100000

In [6]:
data_from_file.take(1)

['                   1                                          2101  M1087 432311  4M4                2014U7CN                                    I64 238 070   24 0111I64                                                                                                                                                                           01 I64                                                                                                  01  11                                 100 601']

#### Lambda expressions

In [7]:
def extractInformation(row):
    selected_indices = [
        2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
         19,21,22,23,24,25,27,28,29,30,32,33,34,
         36,37,38,39,40,41,42,43,44,45,46,47,48,
         49,50,51,52,53,54,55,56,58,60,61,62,63,
         64,65,66,67,68,69,70,71,72,73,74,75,76,
         77,78,79,81,82,83,84,85,87,89
    ]
    record_split = re\
        .compile(
            r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + 
            r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + 
            r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
            r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
            r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + 
            r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + 
            r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
    try:
        rs = np.array(record_split.split(row))[selected_indices]
    except:
        rs = np.array(['-99'] * len(selected_indices))
    return rs

In [8]:
data_from_file_conv = data_from_file.map(extractInformation)
data_from_file_conv

PythonRDD[5] at RDD at PythonRDD.scala:53

In [9]:
data_from_file_conv.take(1)

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

#### Transformation

Transformations shape your dataset. These include mapping, filtering, joining, and transcoding the values in your dataset.

In [10]:
# The .map(...) transformation
data_2014 = data_from_file_conv.map(lambda row: int(row[16]))
data_2014.take(10)

[2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, -99]

In [11]:
data_2014_2 = data_from_file_conv.map(
    lambda row: (row[16], int(row[16]))
)
data_2014_2.take(10)

[('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('-99', -99)]

In [12]:
# The .filter(...) transformation
# .filter(...) method, which allows you to select elements from your dataset that fit specified criteria. 
data_filtered = data_from_file_conv.filter(
    lambda row: row[16 == '2014' and row[21] == '0']
)

data_filtered.count()

0

In [13]:
# The .flatMap(...) transformation
# The .flatMap(...) method works similarly to .map(...), but it returns a flattened result instead of a list. 

data_2014_flat = data_from_file_conv.flatMap(
        lambda row: (row[16],
                    int(row[16]) + 1)
)
data_2014_flat.take(10)

['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]

In [14]:
# The .distinct(...) transformation
# This method returns a list of distinct values in a specified column. It is extremely useful if you want to get to know your dataset or validate it.
distinct_gender = data_from_file_conv.map(
        lambda row: row[5]).distinct()
distinct_gender.collect()

['M', 'F', '-99']

In [15]:
# The .sample(...) transformation
#The .sample(...) method returns a randomized sample from the dataset.
fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 666)
print('Original dataset {0}, sample: {1}'\
     .format(data_from_file_conv.count(), data_sample.count()))

Original dataset 2631171, sample: 263247


In [16]:
# The .leftOuterJoin(...) transformation
# .leftOuterJoin(...), just like in the SQL world, joins two RDDs based on the values found in both datasets, and returns records from the left RDD with records from the right one appended in places where the two RDDs match

rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.collect()

[('a', (1, 4)), ('a', (1, 1)), ('b', (4, '6')), ('c', (10, None))]

In [17]:
rdd4 = rdd1.join(rdd2)
rdd4.collect()

[('a', (1, 4)), ('a', (1, 1)), ('b', (4, '6'))]

In [18]:
rdd5 = rdd1.intersection(rdd2)
rdd5.collect()

[('a', 1)]

In [19]:
#The .repartition(...) transformation
#.leftOuterJoin(...), just like in the SQL world, joins two RDDs based on the values found in both datasets, and returns records from the left RDD with records from the right one appended in places where the two RDDs match
rdd1 = rdd1.repartition(6)
len(rdd1.glom().collect())

6

In [20]:
rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)

15

In [22]:
data_key = sc.parallelize(
    [('a', 4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3)], 4)
data_key.reduceByKey(lambda x, y: x+y).collect()

[('b', 4), ('c', 2), ('a', 12), ('d', 5)]

In [23]:
data_key.count()

7

In [24]:
len(data_key.collect())

7

In [25]:
data_key.countByKey().items()

dict_items([('a', 2), ('b', 2), ('c', 1), ('d', 2)])

In [26]:
data_key.saveAsTextFile(
'/home/ictpuser/Downloads/data_key.txt')

In [37]:
def parseInput(row):
    pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
    row_split = pattern.split(row)
    
    return (row_split[1], int(row_split[2]))

In [38]:
data_key_reread = sc \
    .textFile(
        '/home/ictpuser/Downloads/data_key.txt') \
    .map(parseInput)


In [39]:
data_key_reread.collect()

[('a', 4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3)]