# RDD Transformations and Actions

In this lecture we will begin to delve deeper into using Spark and Python. Please view the video lecture for a full explanation.

## Important Terms

Let's quickly go over some important terms:

Term                   |Definition
----                   |-------
RDD                    |Resilient Distributed Dataset
Transformation         |Spark operation that produces an RDD
Action                 |Spark operation that produces a local object
Spark Job              |Sequence of transformations on data with a final action

## Creating an RDD

There are two common ways to create an RDD:

Method                      |Result
----------                               |-------
`sc.parallelize(array)`                  |Create RDD of elements of array (or list)
`sc.textFile(path/to/file)`                      |Create RDD of lines from file

## RDD Transformations

We can use transformations to create a set of instructions we want to preform on the RDD (before we call an action and actually execute them).

Transformation Example                          |Result
----------                               |-------
`filter(lambda x: x % 2 == 0)`           |Discard non-even elements
`map(lambda x: x * 2)`                   |Multiply each RDD element by `2`
`map(lambda x: x.split())`               |Split each string into words
`flatMap(lambda x: x.split())`           |Split each string into words and flatten sequence
`sample(withReplacement=True,0.25)`      |Create sample of 25% of elements with replacement
`union(rdd)`                             |Append `rdd` to existing RDD
`distinct()`                             |Remove duplicates in RDD
`sortBy(lambda x: x, ascending=False)`   |Sort elements in descending order

## RDD Actions

Once you have your 'recipe' of transformations ready, what you will do next is execute them by calling an action. Here are some common actions:

Action                             |Result
----------                             |-------
`collect()`                            |Convert RDD to in-memory list 
`take(3)`                              |First 3 elements of RDD 
`top(3)`                               |Top 3 elements of RDD
`takeSample(withReplacement=True,3)`   |Create sample of 3 elements with replacement
`sum()`                                |Find element sum (assumes numeric elements)
`mean()`                               |Find element mean (assumes numeric elements)
`stdev()`                              |Find element deviation (assumes numeric elements)

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .appName("new")\
        .getOrCreate()

In [3]:
spark

In [3]:
sc = spark.sparkContext

In [4]:
rdd = sc.parallelize(range(10))
rdd

PythonRDD[1] at RDD at PythonRDD.scala:53

In [5]:
rdd.first()
rdd.take(2)
rdd.takeSample(True,3)
rdd.takeSample(False,3)
rdd.count()
rdd.mean()

4.5

In [6]:
rdd2 = rdd.map(lambda x: x*x).collect()
rdd3 = rdd.map(lambda x: [x,x]).collect()
rdd4 = rdd.flatMap(lambda x: [x,x]).collect()

In [54]:
sc.parallelize(range(20)) \
.map(lambda x: x * 2) \
.filter(lambda x: x != 2) \
.reduce(lambda x,y: x + y)

378

In [7]:
rdd11a = sc.parallelize(('aa','bb','cc','dd','aa','cc','ee','ff','dd','dd','aa'))
rdd11b = rdd11a.map(lambda k: (k,1))
rdd11b.countByKey().items()

dict_items([('aa', 3), ('bb', 1), ('cc', 2), ('dd', 3), ('ee', 1), ('ff', 1)])

In [8]:
rdda1 = sc.parallelize(('aa','bb','cc','dd','ee','ff','gg','aa')).map(lambda k: (k,1))
rdda2 = sc.parallelize(('aa','cc','mm','rr','tt')).map(lambda k: (k,1))
rdda1.join(rdda2).collect()

rdda1.leftOuterJoin(rdda2).collect()

rdda1.rightOuterJoin(rdda2).collect()

[('aa', (1, 1)),
 ('aa', (1, 1)),
 ('tt', (None, 1)),
 ('cc', (1, 1)),
 ('mm', (None, 1)),
 ('rr', (None, 1))]

In [9]:
%%file example.txt
first
second line
the third line
then a fourth line

Overwriting example.txt


In [10]:
text_rdd = sc.textFile('example.txt')

In [11]:
text_rdd.first()

'first'

In [12]:
text_rdd.map(lambda line: line.split()).collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [13]:
def nasza_fun(line):
    return line.split()

text_rdd.map(nasza_fun).collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [14]:
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

In [15]:
rdd = sc.parallelize([(1, 2, 3, 'a b c'),
             (4, 5, 6, 'd e f'),
             (7, 8, 9, 'g h i')])
df = rdd.toDF(['col1', 'col2', 'col3','col4'])
df.show()  
df.printSchema()

+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|   1|   2|   3|a b c|
|   4|   5|   6|d e f|
|   7|   8|   9|g h i|
+----+----+----+-----+

root
 |-- col1: long (nullable = true)
 |-- col2: long (nullable = true)
 |-- col3: long (nullable = true)
 |-- col4: string (nullable = true)



In [16]:
dfe = spark.createDataFrame([
                        ('1', 'Joe',   '70000', '1'),
                        ('2', 'Henry', '80000', '2'),
                        ('3', 'Sam',   '60000', '2'),
                        ('4', 'Max',   '90000', '1')],
                        ['Id', 'Name', 'Sallary','DepartmentId']
                       )
dfe.show()
dfe.printSchema()

+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
|  1|  Joe|  70000|           1|
|  2|Henry|  80000|           2|
|  3|  Sam|  60000|           2|
|  4|  Max|  90000|           1|
+---+-----+-------+------------+

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sallary: string (nullable = true)
 |-- DepartmentId: string (nullable = true)



In [17]:
from pyspark.sql.functions import udf, array
from pyspark.sql.types import StringType
from datetime import datetime as Date
data = [
[10,'Direct Sales',Date(2019,1,1)],
[12,'Direct Sales',Date(2019,1,2)],
[20,'Online Sales',Date(2019,1,1)],
[25,'Online Sales',Date(2019,1,2)],
]
df = spark.createDataFrame(data , ['Revenue','Department','Date'])
df.show()


+-------+------------+-------------------+
|Revenue|  Department|               Date|
+-------+------------+-------------------+
|     10|Direct Sales|2019-01-01 00:00:00|
|     12|Direct Sales|2019-01-02 00:00:00|
|     20|Online Sales|2019-01-01 00:00:00|
|     25|Online Sales|2019-01-02 00:00:00|
+-------+------------+-------------------+



# read data from file 

In [5]:
adultDF = spark.read.csv("adult.data", inferSchema=True, ignoreLeadingWhiteSpace=True)

In [6]:
adultDF.take(1)

[Row(_c0=39, _c1='State-gov', _c2=77516, _c3='Bachelors', _c4=13, _c5='Never-married', _c6='Adm-clerical', _c7='Not-in-family', _c8='White', _c9='Male', _c10=2174, _c11=0, _c12=40, _c13='United-States', _c14='<=50K')]

In [7]:
adultDF.show(2)

+---+----------------+-----+---------+---+------------------+---------------+-------------+-----+----+----+----+----+-------------+-----+
|_c0|             _c1|  _c2|      _c3|_c4|               _c5|            _c6|          _c7|  _c8| _c9|_c10|_c11|_c12|         _c13| _c14|
+---+----------------+-----+---------+---+------------------+---------------+-------------+-----+----+----+----+----+-------------+-----+
| 39|       State-gov|77516|Bachelors| 13|     Never-married|   Adm-clerical|Not-in-family|White|Male|2174|   0|  40|United-States|<=50K|
| 50|Self-emp-not-inc|83311|Bachelors| 13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|   0|   0|  13|United-States|<=50K|
+---+----------------+-----+---------+---+------------------+---------------+-------------+-----+----+----+----+----+-------------+-----+
only showing top 2 rows



In [8]:
col_names = ["age", "workclass", "fnlwgt", 
             "education", "education-num", 
             "material-status", "occupation",
             "relationship", "race", "sex", 
             "capital-gain", "capital-loss",
             "hours-per-week", "native-country",
             "earings"]
adultDF = adultDF.toDF(*col_names)

In [10]:
adultDF.show(2, vertical=True)

-RECORD 0-----------------------------
 age             | 39                 
 workclass       | State-gov          
 fnlwgt          | 77516              
 education       | Bachelors          
 education-num   | 13                 
 material-status | Never-married      
 occupation      | Adm-clerical       
 relationship    | Not-in-family      
 race            | White              
 sex             | Male               
 capital-gain    | 2174               
 capital-loss    | 0                  
 hours-per-week  | 40                 
 native-country  | United-States      
 earings         | <=50K              
-RECORD 1-----------------------------
 age             | 50                 
 workclass       | Self-emp-not-inc   
 fnlwgt          | 83311              
 education       | Bachelors          
 education-num   | 13                 
 material-status | Married-civ-spouse 
 occupation      | Exec-managerial    
 relationship    | Husband            
 race            | White 

In [11]:
adultDF.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- material-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- earings: string (nullable = true)



In [12]:
adultDF = adultDF.drop("fnlwgt").dropna("any")

In [14]:
adultDF.show(1, vertical=True)

-RECORD 0------------------------
 age             | 39            
 workclass       | State-gov     
 education       | Bachelors     
 education-num   | 13            
 material-status | Never-married 
 occupation      | Adm-clerical  
 relationship    | Not-in-family 
 race            | White         
 sex             | Male          
 capital-gain    | 2174          
 capital-loss    | 0             
 hours-per-week  | 40            
 native-country  | United-States 
 earings         | <=50K         
only showing top 1 row



In [15]:
df = adultDF.toPandas()

In [16]:
df.describe()

Unnamed: 0,age,education-num,capital-gain,capital-loss,hours-per-week
count,32561.0,32561.0,32561.0,32561.0,32561.0
mean,38.581647,10.080679,1077.648844,87.30383,40.437456
std,13.640433,2.57272,7385.292085,402.960219,12.347429
min,17.0,1.0,0.0,0.0,1.0
25%,28.0,9.0,0.0,0.0,40.0
50%,37.0,10.0,0.0,0.0,40.0
75%,48.0,12.0,0.0,0.0,45.0
max,90.0,16.0,99999.0,4356.0,99.0


In [17]:
adultDF.select(['age','education-num']).describe().show()

+-------+------------------+-----------------+
|summary|               age|    education-num|
+-------+------------------+-----------------+
|  count|             32561|            32561|
|   mean| 38.58164675532078| 10.0806793403151|
| stddev|13.640432553581356|2.572720332067397|
|    min|                17|                1|
|    max|                90|               16|
+-------+------------------+-----------------+



In [18]:
adultDF.write.saveAsTable("adult")

In [19]:
new = spark.sql("select age, sex, education from adult where age > 50")

In [20]:
new.show(10)

+---+------+------------+
|age|   sex|   education|
+---+------+------------+
| 53|  Male|        11th|
| 52|  Male|     HS-grad|
| 54|Female|     HS-grad|
| 59|Female|     HS-grad|
| 56|  Male|   Bachelors|
| 54|  Male|Some-college|
| 53|  Male|   Bachelors|
| 57|  Male|   Bachelors|
| 53|  Male|     HS-grad|
| 53|Female|     HS-grad|
+---+------+------------+
only showing top 10 rows

