# RDDs and DataFrames

* Creating RDDs and DataFrames using SparkContext
* Interoperability between RDDs and DataFrames
* Multiple rows and multiple column specifications for DataFrames
* Creating DataFrames using SQLContext
* Selecting, editing and renaming columns in dataframes
* Interoperability between Pandas and Spark dataframes

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Day1") \
    .getOrCreate()



In [3]:
sc = spark.sparkContext
sc.uiWebUrl

'http://4KS3J72.ww930.my-it-solutions.net:4041'

Here we use SparkContext. Normally, SparkSession is used.
Sparksession encapsulated Sparkcontext.
- simplified entry point
- no confusion on which context to uses (sql, Hives etc)
![](sparksession.jpg)


In [4]:
from pyspark.sql.types import Row
from datetime import datetime

#### Creating RDDs using sc.parallelize()
In this example different types of data are used, being string and integer


In [5]:
simple_data = sc.parallelize([1, "Alice", 50])
simple_data

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:184

Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:

<p>data = [1, 2, 3, 4, 5] </p>

distData = sc.parallelize(data)

Once created, the distributed dataset (distData) can be operated on in parallel. For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list. We describe operations on distributed datasets later on.

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.

In [6]:
simple_data.count()

3

![](count.png)

In [5]:
simple_data.first()

1

Remember: Count() en First() are ACTIONS on the RDD dataset.


In [None]:
simple_data.take(2)

(takes the first 2 elements of the dataset)

![](collect.png)

In [8]:
simple_data.collect()

[1, 'Alice', 50]

The collect Action, shows all the elements in de dataset

#### ERROR

* This RDD does not have "columns", it cannot be represented as a tabular data frame
* DataFrames are structured datasets
* So, instead of working with RDD, we want to work with Dataframes.

Error: This RDD has NO schema, contains elements of different types - it cannot be converted to a Dataframe.

In [9]:
df = simple_data.toDF()

TypeError: Can not infer schema for type: <class 'int'>

#### RDDs with records using sc.parallelize()

In [11]:
records = sc.parallelize([[1, "Alice", 50], [2, "Bob", 80]])
# Notice: this has 2 record, one for Alice, and one for Bob
records  
# The RDD is now created

ParallelCollectionRDD[6] at parallelize at PythonRDD.scala:184

In [12]:
records.collect()

[[1, 'Alice', 50], [2, 'Bob', 80]]

In [13]:
df = records.toDF()

In [14]:
df

DataFrame[_1: bigint, _2: string, _3: bigint]

In [15]:
df.show()
# Column names have been automatically generated and assigned

+---+-----+---+
| _1|   _2| _3|
+---+-----+---+
|  1|Alice| 50|
|  2|  Bob| 80|
+---+-----+---+



In [16]:

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
records = spark.sparkContext.parallelize([[1, "Alice", 50], [2, "Bob", 80]])
help(sc.parallelize)

Help on method parallelize in module pyspark.context:

parallelize(c, numSlices=None) method of pyspark.context.SparkContext instance
    Distribute a local Python collection to form an RDD. Using xrange
    is recommended if the input represents a range for performance.
    
    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    [[0], [2], [3], [4], [6]]
    >>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
    [[], [0], [], [2], [4]]



In [14]:
# spark.sql("SET -v").show(n=200, truncate=False)

The method show will present the first 20 (by default) rows.

In [17]:
df.show()

+---+-----+---+
| _1|   _2| _3|
+---+-----+---+
|  1|Alice| 50|
|  2|  Bob| 80|
+---+-----+---+



#### Creating dataframes using sc.parallelize() and Row() functions
* Row functions allow specifying column names for dataframes

In [18]:
data = sc.parallelize([Row(id=1,
                           name="Alice",
                           score=50)])
# Row is imported in the first lines of this notebook
data

ParallelCollectionRDD[22] at parallelize at PythonRDD.scala:184

In [19]:
data.count()

1

In [20]:
data.collect()

[Row(id=1, name='Alice', score=50)]

In [21]:
df = data.toDF()
df.show()

+---+-----+-----+
| id| name|score|
+---+-----+-----+
|  1|Alice|   50|
+---+-----+-----+



So: Structured data with a SCHEMA can be converted to a Dataframe

#### Working with multiple rows

In [22]:
data = sc.parallelize([Row(
                           id=1,
                           name="Alice",
                           score=50
                        ),
                        Row(
                            id=2,
                            name="Bob",
                            score=80
                        ),
                        Row(
                            id=3,
                            name="Charlee",
                            score=75
                        )])

In [23]:
df = data.toDF()
df.show()

+---+-------+-----+
| id|   name|score|
+---+-------+-----+
|  1|  Alice|   50|
|  2|    Bob|   80|
|  3|Charlee|   75|
+---+-------+-----+



#### Multiple columns with complex data types

In [24]:
complex_data = sc.parallelize([Row(
                                col_float=1.44,
                                col_integer=10,
                                col_string="John")
                           ])

In [25]:
complex_data_df = complex_data.toDF()
complex_data_df.show()

+---------+-----------+----------+
|col_float|col_integer|col_string|
+---------+-----------+----------+
|     1.44|         10|      John|
+---------+-----------+----------+



In [26]:
complex_data = sc.parallelize([Row(
                                col_float=1.44, 
                                col_integer=10, 
                                col_string="John", 
                                col_boolean=True, 
                                col_list=[1, 2, 3])
                           ])
# The last Column is a List....

In [27]:
complex_data_df = complex_data.toDF()
complex_data_df.show()

+-----------+---------+-----------+---------+----------+
|col_boolean|col_float|col_integer| col_list|col_string|
+-----------+---------+-----------+---------+----------+
|       true|     1.44|         10|[1, 2, 3]|      John|
+-----------+---------+-----------+---------+----------+



In [28]:
complex_data = sc.parallelize([Row(
                                col_list = [1, 2, 3], 
                                col_dict = {"k1": 0, "k2": 1, "k3": 2}, 
                                col_row = Row(columnA = 10, columnB = 20, columnC = 30), 
                                col_time = datetime(2014, 8, 1, 14, 1, 5)
                            )])

In [29]:
complex_data_df = complex_data.toDF()
complex_data_df.show()

+--------------------+---------+------------+-------------------+
|            col_dict| col_list|     col_row|           col_time|
+--------------------+---------+------------+-------------------+
|[k3 -> 2, k1 -> 0...|[1, 2, 3]|[10, 20, 30]|2014-08-01 14:01:05|
+--------------------+---------+------------+-------------------+



#### Multiple rows with complex data types

In [30]:
complex_data = sc.parallelize([Row(
                                col_list = [1, 2, 3],
                                col_dict = {"k1": 0},
                                col_row = Row(a=10, b=20, c=30),
                                col_time = datetime(2014, 8, 1, 14, 1, 5)
                            ),              
                            Row(
                                col_list = [1, 2, 3, 4, 5], 
                                col_dict = {"k1": 0,"k2": 1 }, 
                                col_row = Row(a=40, b=50, c=60),
                                col_time = datetime(2014, 8, 2, 14, 1, 6)
                            ),
                            Row(
                                col_list = [1, 2, 3, 4, 5, 6, 7], 
                                col_dict = {"k1": 0, "k2": 1, "k3": 2 }, 
                                col_row = Row(a=70, b=80, c=90),
                                col_time = datetime(2014, 8, 3, 14, 1, 7)
                            )]) 

In [18]:
complex_data_df = complex_data.toDF()
complex_data_df.show()

+--------------------+--------------------+------------+-------------------+
|            col_dict|            col_list|     col_row|           col_time|
+--------------------+--------------------+------------+-------------------+
|           [k1 -> 0]|           [1, 2, 3]|[10, 20, 30]|2014-08-01 14:01:05|
|  [k1 -> 0, k2 -> 1]|     [1, 2, 3, 4, 5]|[40, 50, 60]|2014-08-02 14:01:06|
|[k3 -> 2, k1 -> 0...|[1, 2, 3, 4, 5, 6...|[70, 80, 90]|2014-08-03 14:01:07|
+--------------------+--------------------+------------+-------------------+



#### Creating DataFrames using SQLContext

* SQLContext can create dataframes directly from raw data

https://spark.apache.org/docs/1.6.1/sql-programming-guide.html

<p>Example</p>:
<p>Create directly from JSON</p>
<p>sqlContext = SQLContext(sc)</p>
<p>df = sqlContext.read.json("examples/src/main/resources/people.json")</p>

In [35]:
from pyspark.sql import SQLContext

In [36]:
# sqlContext  = SQLContext(spark.sparkContext)
sqlContext = SQLContext(sc)
help(sqlContext)

Help on SQLContext in module pyspark.sql.context object:

class SQLContext(builtins.object)
 |  The entry point for working with structured data (rows and columns) in Spark, in Spark 1.x.
 |  
 |  As of Spark 2.0, this is replaced by :class:`SparkSession`. However, we are keeping the class
 |  here for backward compatibility.
 |  
 |  A SQLContext can be used create :class:`DataFrame`, register :class:`DataFrame` as
 |  tables, execute SQL over tables, cache tables, and read parquet files.
 |  
 |  :param sparkContext: The :class:`SparkContext` backing this SQLContext.
 |  :param sparkSession: The :class:`SparkSession` around which this SQLContext wraps.
 |  :param jsqlContext: An optional JVM Scala SQLContext. If set, we do not instantiate a new
 |      SQLContext in the JVM, instead we make all calls to this object.
 |  
 |  Methods defined here:
 |  
 |  __init__(self, sparkContext, sparkSession=None, jsqlContext=None)
 |      Creates a new SQLContext.
 |      
 |      >>> from date

In [37]:
sqlContext

<pyspark.sql.context.SQLContext at 0xadb7630>

In [38]:
df = sqlContext.range(5)
df

DataFrame[id: bigint]

In [39]:
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [40]:
df.count()

5

#### Rows specified in tuples

In [41]:
data = [('Alice', 50),
        ('Bob', 80),
        ('Charlee', 75)]

In [42]:
sqlContext.createDataFrame(data).show()

+-------+---+
|     _1| _2|
+-------+---+
|  Alice| 50|
|    Bob| 80|
|Charlee| 75|
+-------+---+



Specify explicitely column names!

In [43]:
sqlContext.createDataFrame(data, ['Name', 'Score']).show()

+-------+-----+
|   Name|Score|
+-------+-----+
|  Alice|   50|
|    Bob|   80|
|Charlee|   75|
+-------+-----+



In [44]:
complex_data = [
                 (1.0,
                  10,
                  "Alice", 
                  True, 
                  [1, 2, 3], 
                  {"k1": 0},
                  Row(a=1, b=2, c=3), 
                  datetime(2014, 8, 1, 14, 1, 5)),

                 (2.0,
                  20,
                  "Bob", 
                  True, 
                  [1, 2, 3, 4, 5], 
                  {"k1": 0,"k2": 1 }, 
                  Row(a=1, b=2, c=3), 
                  datetime(2014, 8, 1, 14, 1, 5)),

                  (3.0,
                   30,
                   "Charlee", 
                   False, 
                   [1, 2, 3, 4, 5, 6], 
                   {"k1": 0, "k2": 1, "k3": 2 }, 
                   Row(a=1, b=2, c=3), 
                   datetime(2014, 8, 1, 14, 1, 5))
                ] 

In [45]:
sqlContext.createDataFrame(complex_data).show()

+---+---+-------+-----+------------------+--------------------+---------+-------------------+
| _1| _2|     _3|   _4|                _5|                  _6|       _7|                 _8|
+---+---+-------+-----+------------------+--------------------+---------+-------------------+
|1.0| 10|  Alice| true|         [1, 2, 3]|           [k1 -> 0]|[1, 2, 3]|2014-08-01 14:01:05|
|2.0| 20|    Bob| true|   [1, 2, 3, 4, 5]|  [k1 -> 0, k2 -> 1]|[1, 2, 3]|2014-08-01 14:01:05|
|3.0| 30|Charlee|false|[1, 2, 3, 4, 5, 6]|[k3 -> 2, k1 -> 0...|[1, 2, 3]|2014-08-01 14:01:05|
+---+---+-------+-----+------------------+--------------------+---------+-------------------+



In [None]:
complex_data_df = sqlContext.createDataFrame(complex_data, [
        'col_integer',
        'col_float',
        'col_string',
        'col_boolean',
        'col_list',
        'col_dictionary',
        'col_row',
        'col_date_time']
    )
complex_data_df.show()

#### Creating dataframes using SQL Context and the Row function
* Row functions can be used without specifying column names

In [46]:
data = sc.parallelize([
    Row(1, "Alice", 50),
    Row(2, "Bob", 80),
    Row(3, "Charlee", 75)
])


You can always setup column names, as explained below:

In [47]:
column_names = Row('id', 'name', 'score')  
students = data.map(lambda r: column_names(*r))
# Lambda's : functional programming within on OO world....VERY HANDY, but VERY confusing.....

# The map() operations performs a transformation on every element in the RDD.qw


In [None]:
students

In [None]:
students.collect()

In [None]:
students_df = sqlContext.createDataFrame(students)
students_df

In [None]:
students_df.show()

#### Extracting specific rows from dataframes

In [None]:
complex_data_df.first()

In [None]:
complex_data_df.take(2)

#### Extracting specific cells from dataframes

In [None]:
cell_string = complex_data_df.collect()[0][2]
cell_string

In [None]:
cell_list = complex_data_df.collect()[0][4]
cell_list

In [None]:
cell_list.append(100)
cell_list

In [None]:
# The orignional dataframe has not been modified
# When you access a cell, that cell will be unchanged
complex_data_df.show()

#### Selecting specific columns

Every Dataframe contains the RDD equivalent of the record that it stores, in the RDD variable

In [None]:
complex_data_df.rdd\
    .map(lambda x: (x.col_string, x.col_dictionary))\
    .collect()

In [None]:
complex_data_df.select(
    'col_string',
    'col_list',
    'col_date_time'
).show()

#### Editing columns

In [None]:
# A map() operation which appends "Boo" to every string in the column
complex_data_df.rdd\
           .map(lambda x: (x.col_string + " Boo"))\
           .collect()
# Dataframes do not support the add function. To do this, you must create a new dataframe

#### Adding a column

In [None]:
complex_data_df.select(
                   'col_integer',
                   'col_float'
            )\
           .withColumn(
                   "col_sum",
                    complex_data_df.col_integer + complex_data_df.col_float
           )\
           .show()

Another example, producing the opposite boolean column

In [None]:
complex_data_df.select('col_boolean')\
               .withColumn(
                   "col_opposite",
                   complex_data_df.col_boolean == False )\
               .show()

#### Editing a column name / Renaming

In [None]:
complex_data_df.withColumnRenamed("col_dictionary","col_map").show()
# A new dataframe is actually made

In [None]:
complex_data_df.select(complex_data_df.col_string.alias("Name")).show()

#### Interoperablity between Pandas dataframe and Spark dataframe
Remember: RDD and Dataframes are distributed accros nodes. Pandas will be on ONE machine (in memory).

In [None]:
import pandas

In [None]:
df_pandas = complex_data_df.toPandas()
df_pandas

From Pandas to Dataframe....

In [None]:
df_spark = sqlContext.createDataFrame(df_pandas).show()  
df_spark