# Introduction to Apache Spark

### Overview of Spark
Apache Spark is an open-source powerful distributed querying and processing
engine. It provides flexibility and extensibility of MapReduce but at significantly
higher speeds: Up to 100 times faster than Apache Hadoop when data is stored in
memory and up to 10 times when accessing disk


### Spark Ecosystem
Other than Spark Core API, there are additional libraries that are part of the Spark ecosystem and provide additional capabilities in Big Data analytics and Machine Learning areas.

These libraries include:

* Spark Streaming:

* Spark SQL:

* Spark MLlib:

* Spark GraphX:


![spark ecosystem](ecosystem.png)

### Spark Architecture
Spark Architecture includes following three main components:

* Data Storage
* API
* Management Framework


Spark can be deployed as a Stand-alone server or it can be on a distributed computing framework like Mesos or YARN.

![spark architecture](architecture.png)

### Spark running modes
When you install Spark on the local machine or use a Cloud based installation, there are few different modes you can connect to Spark engine.

The following table shows the Master URL parameter for the different modes of running Spark.
![spark modes](modes.png)

### Spark Web Console
When Spark is running in any mode, you can view the Spark job results and other statistics by accessing Spark Web Console via the following URL:

http://localhost:4040


### Spark Architecture
Spark applications run as independent sets of processes on a cluster as described in the below diagram
![spark cluster](cluster.png)




### Resilient Distributed Dataset


RDDs support two kinds of operations:

* Transformation 
* Action 


### Spark Machine Learning library
The Spark MLlib module offers machine learning functionality over a number of
domains. The documentation available at the Spark website introduces the data
types used (for example, vectors and the LabeledPoint structure). This module
offers functionality that includes:
* Statistics
* Classification
* Regression
* Collaborative Filtering
* Clustering
* Dimensionality Reduction
* Feature Extraction
* Frequent Pattern Mining
* Optimization

### Performance issues
1. The cluster structure 
2. The Hadoop file system 
3. Data locality
4. Coding 
5. Cloud 

# First example 

Test if spark is running properly. We have to install findspark before we can run this code in cmd call:  
>pip install findspark

In [3]:
from pyspark import SparkContext
sc = SparkContext(master='local[2]',appName='my-spark') #use 2 cores during session #name for context


In [4]:
#Import the data
rdd = sc.textFile('mtcars.csv') #Get the data from csv to rdd
rdd.take(5) #take first 5 csv rows

[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1',
 'Hornet 4 Drive,21.4,6,258,110,3.08,3.215,19.44,1,0,3,1']

In [5]:
rdd2=sc.parallelize([1,2,3]) #Convert list to RDD
rdd2.collect()

[1, 2, 3]

# How to start Spark

## Two entry points to Spark

With the release of Spark 2.0.0 there is a new abstraction available to developers - the Spark Session - which can be instantiated and called upon just like the Spark Context that was previously available. The Spark Session encapsulates the existing Spark Context therefore existing functionality should not be affected and developers may continue using the Spark Context as desired. However the new Spark Session abstraction is preferred by the Spark community in Spark 2.0.0 on beyond therefore we will be using this in the future.

* **SparkContext**: create *RDD* and broadcast variables on the cluster.
* **SparkSession**: create *DataFrame* (pyspark.sql.dataframe.DataFrame).

## SparkContext


## SparkSession



In [49]:
# Stop a running SparkContext before opening the new one
sc.stop()

In [None]:
#The new way to get a spark entry point

In [6]:
from pyspark.sql import SparkSession
spark=SparkSession(sparkContext=sc)

In [7]:
#Create dataframe by loading file
df=spark.read.csv('mtcars.csv',header=True,inferSchema=True)
df.show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|              _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [8]:
#Another way to create dataframe
import pandas as pd
pdf=pd.DataFrame({
    'x1':range(1,6),
    'x2': list('abcde')
})
print(pdf)

   x1 x2
0   1  a
1   2  b
2   3  c
3   4  d
4   5  e


In [9]:
df=spark.createDataFrame(pdf)
df.show(5)

+---+---+
| x1| x2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
|  4|  d|
|  5|  e|
+---+---+



# RDD object



The class pyspark.SparkContext creates a client which connects to a Spark cluster. This client can be used to create an RDD object. There are two methods from this class for directly creating RDD objects:
* `parallelize()`
* `textFile()`

## `parallelize()`

`parallelize()` distribute a local **python collection** for form an RDD. Common built-in python collections include `dist`, `list`, `tuple` or `set`.

Examples:

In [10]:
rdd=sc.parallelize(('cat','dog','fish'))
rdd.collect()

['cat', 'dog', 'fish']

## `textFile()`

The `textFile()` function reads a text file and returns it as an **RDD of strings**. Usually, you will need to apply some **map** functions to transform each elements of the RDD to some data structure/type that is suitable for data analysis.

**When using `textFile()`, each line of the text file becomes an element in the resulting RDD.**

Examples:

In [11]:
rdd=sc.textFile('twitter.txt')
rdd.take(5)

['Fresh install of XP on new computer. Sweet relief! fuck vista\t1018769417\t1.0',
 'Well. Now I know where to go when I want my knives. #ChiChevySXSW http://post.ly/RvDl\t10284216536\t1.0',
 '"Literally six weeks before I can take off ""SSC Chair"" off my email. Its like the torturous 4th mile before everything stops hurting."\t10298589026\t1.0',
 'Mitsubishi i MiEV - Wikipedia, the free encyclopedia - http://goo.gl/xipe Cutest car ever!\t109017669432377344\t1.0',
 "'Cheap Eats in SLP' - http://t.co/4w8gRp7\t109642968603963392\t1.0"]

## Commonly used functions with RDD objects 

### Map functions
These functions are probably the most commonly used functions when dealing with an RDD object. 

* `map()`
* `mapValues()`
* `flatMap()`
* `flatMapValues()`

#### `map()`
The map() applies a function to each elements of the RDD

In [12]:
rdd=sc.textFile('mtcars.csv')
rdd.take(5)

[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1',
 'Hornet 4 Drive,21.4,6,258,110,3.08,3.215,19.44,1,0,3,1']

In [14]:
#convert csv to tuple with car model and list of values
rdd1=rdd.map(lambda x: x.split(',')) #define end line function to split by pattern defined by column, get list of string values
rdd1.take(4)

[['',
  'mpg',
  'cyl',
  'disp',
  'hp',
  'drat',
  'wt',
  'qsec',
  'vs',
  'am',
  'gear',
  'carb'],
 ['Mazda RX4',
  '21',
  '6',
  '160',
  '110',
  '3.9',
  '2.62',
  '16.46',
  '0',
  '1',
  '4',
  '4'],
 ['Mazda RX4 Wag',
  '21',
  '6',
  '160',
  '110',
  '3.9',
  '2.875',
  '17.02',
  '0',
  '1',
  '4',
  '4'],
 ['Datsun 710',
  '22.8',
  '4',
  '108',
  '93',
  '3.85',
  '2.32',
  '18.61',
  '1',
  '1',
  '4',
  '1']]

In [16]:
rdd_2=rdd1.map(lambda x: (x[0],x[1:]))
rdd_2.take(4) #all numbers are strings

[('',
  ['mpg',
   'cyl',
   'disp',
   'hp',
   'drat',
   'wt',
   'qsec',
   'vs',
   'am',
   'gear',
   'carb']),
 ('Mazda RX4',
  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']),
 ('Mazda RX4 Wag',
  ['21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4']),
 ('Datsun 710',
  ['22.8', '4', '108', '93', '3.85', '2.32', '18.61', '1', '1', '4', '1'])]

In [17]:
#Remove the first element of the csv file, the header
rdd_temp=rdd_2.filter(lambda x: x[0] !='')
rdd_temp.take(5)

[('Mazda RX4',
  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']),
 ('Mazda RX4 Wag',
  ['21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4']),
 ('Datsun 710',
  ['22.8', '4', '108', '93', '3.85', '2.32', '18.61', '1', '1', '4', '1']),
 ('Hornet 4 Drive',
  ['21.4', '6', '258', '110', '3.08', '3.215', '19.44', '1', '0', '3', '1']),
 ('Hornet Sportabout',
  ['18.7', '8', '360', '175', '3.15', '3.44', '17.02', '0', '0', '3', '2'])]

In [18]:
#each element is a tuple then a list. Convert all strings to lists
rdd3=rdd_temp.map(lambda x: (x[0],[*map(float,x[1])]))
rdd3.take(4)


[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),
 ('Mazda RX4 Wag',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0]),
 ('Datsun 710',
  [22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]),
 ('Hornet 4 Drive',
  [21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0])]

#### `mapValues()`

This map function requires that each element in the RDD has a **key/value** pair structure, for example, a tuple of 2 items, or a list of 2 items.

The RDD object **rdd_temp** and **rdd_3** belong to this category. If we only want to operate on the values, we can use the `mapValues()` function.

In [19]:
rdd_temp.take(4) #Mazda RX4 is key and then values are stored in a list

[('Mazda RX4',
  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']),
 ('Mazda RX4 Wag',
  ['21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4']),
 ('Datsun 710',
  ['22.8', '4', '108', '93', '3.85', '2.32', '18.61', '1', '1', '4', '1']),
 ('Hornet 4 Drive',
  ['21.4', '6', '258', '110', '3.08', '3.215', '19.44', '1', '0', '3', '1'])]

In [21]:
#Convert all list elements to numbers
rdd_mapValues=rdd_temp.mapValues(lambda x: [*map(float,x)])
rdd_mapValues.take(5)
#only works for key values

[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),
 ('Mazda RX4 Wag',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0]),
 ('Datsun 710',
  [22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]),
 ('Hornet 4 Drive',
  [21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0]),
 ('Hornet Sportabout',
  [18.7, 8.0, 360.0, 175.0, 3.15, 3.44, 17.02, 0.0, 0.0, 3.0, 2.0])]

#### `flatMap()`

This function **first** applies a function to each elements of an RDD and **then** flatten the results. We can simply use this function to flatten elements of an RDD without extra operation on each elements.

Example:

In [22]:
rdd=sc.parallelize([('a','a'),('b','b')])
rdd.collect()

[('a', 'a'), ('b', 'b')]

In [23]:
rdd.flatMap(lambda x: x).collect() #every element inside 1 list - not saving results so use 'collect'

['a', 'a', 'b', 'b']

#### `flatMapValues()`

This function implements the `flatMap` function on the value for each **key/value** pair elements. It applies a function only to the value of each **key/value** pairs and then flatten the results. 

A good use case is to use this function to **"melt"** a data frame, like the `melt()` function from the R package `reshape2`. To better explain this idea, we create a data frame with the **SparkSession** class.

In [26]:
df=spark.read.csv('airquality.csv',inferSchema=True,header=True,nullValue='NA')
df.toPandas().iloc[:4,] #convert df to pandas

Unnamed: 0,ozone,solar.r,wind,temp,month,day
0,41.0,190.0,7.4,67,5,1
1,36.0,118.0,8.0,72,5,2
2,12.0,149.0,12.6,74,5,3
3,18.0,313.0,11.5,62,5,4


In [27]:
air=df.rdd 

In [28]:
#Combine month and day into a tuple and the remaining values into a 2nd tuple
air1=air.map(lambda x: [x[4:],x[:4]])
air1.take(5)

[[(5, 1), (41, 190, 7.4, 67)],
 [(5, 2), (36, 118, 8.0, 72)],
 [(5, 3), (12, 149, 12.6, 74)],
 [(5, 4), (18, 313, 11.5, 62)],
 [(5, 5), (None, None, 14.3, 56)]]

In [31]:
air2=air1.mapValues(lambda x: [('ozone',x[0]),('solar.r', x[1]), ('wind',x[2]), ('temp',x[3])])
air2.take(5) #associate numeric values with a category

[((5, 1), [('ozone', 41), ('solar.r', 190), ('wind', 7.4), ('temp', 67)]),
 ((5, 2), [('ozone', 36), ('solar.r', 118), ('wind', 8.0), ('temp', 72)]),
 ((5, 3), [('ozone', 12), ('solar.r', 149), ('wind', 12.6), ('temp', 74)]),
 ((5, 4), [('ozone', 18), ('solar.r', 313), ('wind', 11.5), ('temp', 62)]),
 ((5, 5), [('ozone', None), ('solar.r', None), ('wind', 14.3), ('temp', 56)])]

In [32]:
air3=air2.flatMapValues(lambda x: x)
air3.take(4) #data format is (month,day) then (weather variable,value)

[((5, 1), ('ozone', 41)),
 ((5, 1), ('solar.r', 190)),
 ((5, 1), ('wind', 7.4)),
 ((5, 1), ('temp', 67))]

### Aggregate functions
Two aggregate functions:

* `aggregate()`
* `aggregateByKey()`

#### `aggregate(zeroValue, seqOp, combOp)`

* **zeroValue** is like a data container. Its structure should match with the data structure of the returned values from the seqOp function.
* **seqOp** is a function that takes two arguments: the first argument is the zeroValue and the second argument is an element from the RDD. The zeroValue gets updated with the returned value after every run.
* **combOp** is a function that takes two arguments: the first argument is the final zeroValue from one partition and the other is another final zeroValue from another partition.

In [34]:
mtcars_df=spark.read.csv('mtcars.csv',inferSchema=True, header=True).select(['mpg','disp'])
mtcars_df.take(5)

[Row(mpg=21.0, disp=160.0),
 Row(mpg=21.0, disp=160.0),
 Row(mpg=22.8, disp=108.0),
 Row(mpg=21.4, disp=258.0),
 Row(mpg=18.7, disp=360.0)]

In [36]:
#calculate average
mpg_mean = mtcars_df.select('mpg').rdd.map(lambda x: x[0]).mean()
disp_mean = mtcars_df.select('disp').rdd.map(lambda x: x[0]).mean()
print(mpg_mean,disp_mean)

20.090625000000003 230.721875


In [37]:
zeroValue =(0,0)
seqOp=lambda z, x: (z[0]+(x[0]-mpg_mean)**2, z[1]+(x[1]-disp_mean)**2) #z is zeroValue, x is element in RDD partition
#sum of squares calculation, values updated after every row in the dataframe
#function combines all zeroValues into 1
comboOp= lambda px, py: (px[0]+py[0],px[1]+py[1])

mtcars_df.rdd.aggregate(zeroValue, seqOp, comboOp)
#'.rdd' converts from spark df to spark RDD
#Output is sum of squares for mpg and disp values in the dataframe

(1126.0471874999998, 476184.7946875)

#### `aggregateByKey(zeroValue, seqOp, combOp)`

This function does similar things as aggregate(). The aggregate() aggregate all results to the very end, but aggregateByKey() merge results by key.

# DataFrame object
DataFrames, like RDDs, are immutable collections of data distributed among the
nodes in a cluster. However, unlike RDDs, in DataFrames data is organized into
named columns.



## Create a DataFrame object

#### Create DataFrame by reading a file

### Create DataFrame with `createDataFrame` function

#### From RDD object

In [39]:
from pyspark.sql import Row
df=sc.parallelize([
   Row(x=[1,2,3],y=['a','b','c']) 
])
df.collect()

[Row(x=[1, 2, 3], y=['a', 'b', 'c'])]

In [41]:
#convert to dataframe
df_1=spark.createDataFrame(df)
df_1.show()

+---------+---------+
|        x|        y|
+---------+---------+
|[1, 2, 3]|[a, b, c]|
+---------+---------+



#### From pandas DataFrame

#### From a list

In [42]:
#Create spark dataframe from a list
my_list=[['a',1],['b',2]]
df=spark.createDataFrame(my_list,['letter','number'])
df.show()

+------+------+
|letter|number|
+------+------+
|     a|     1|
|     b|     2|
+------+------+



#### From a list of tuple

## Operating on DataFrame columns

Column instances can be created in two ways:

1. directly select a column out of a *DataFrame*: `df.colName`
2. create from a column expression: `df.colName + 1`

The column classes come with some methods that can operate on a column instance. ***However, almost all functions from the `pyspark.sql.functions` module take one or more column instances as argument(s)***. These functions are important for data manipulation tools.

## DataFrame column methods

### Methods that take column names as arguments:

* `corr(col1, col2)`: two column names.
* `cov(col1, col2)`: two column names.
* `crosstab(col1, col2)`: two column names.
* `describe(*cols)`: ***`*cols` refers to only column names (strings).***

In [44]:
mtcars_df.show(4)
mtcars_df.corr('mpg','disp')

+----+-----+
| mpg| disp|
+----+-----+
|21.0|160.0|
|21.0|160.0|
|22.8|108.0|
|21.4|258.0|
+----+-----+
only showing top 4 rows



-0.8475513792624786

### Methods that take column names or column expressions or **both** as arguments:

* `cube(*cols)`: column names (string) or column expressions or **both**.
* `drop(*cols)`: ***a list of column names OR a single column expression.***
* `groupBy(*cols)`: column name (string) or column expression or **both**.
* `rollup(*cols)`: column name (string) or column expression or **both**.
* `select(*cols)`: column name (string) or column expression or **both**.
* `sort(*cols, **kwargs)`: column name (string) or column expression or **both**.
* `sortWithinPartitions(*cols, **kwargs)`: column name (string) or column expression or **both**.
* `orderBy(*cols, **kwargs)`: column name (string) or column expression or **both**.
* `sampleBy(col, fractions, sed=None)`: a column name.
* `toDF(*cols)`: **a list of column names (string).**
* `withColumn(colName, col)`: `colName` refers to column name; `col` refers to a column expression.
* `withColumnRenamed(existing, new)`: takes column names as arguments.
* `filter(condition)`: ***condition** refers to a column expression that returns `types.BooleanType` of values. 

In [47]:
mtcars_df.drop('mpg').show(5)

+-----+
| disp|
+-----+
|160.0|
|160.0|
|108.0|
|258.0|
|360.0|
+-----+
only showing top 5 rows



# Conversion between RDD and DataFrame

## DataFrame to RDD
A **DataFrame** can be easily converted to an **RDD** by calling the `pyspark.sql.DataFrame.rdd()` function. Each element in the returned RDD is an **pyspark.sql.Row** object. An Row is a list of key-value pairs.

In [48]:
mtcars_df.rdd.take(5) #each element is a row object

[Row(mpg=21.0, disp=160.0),
 Row(mpg=21.0, disp=160.0),
 Row(mpg=22.8, disp=108.0),
 Row(mpg=21.4, disp=258.0),
 Row(mpg=18.7, disp=360.0)]

In [49]:
mtcars_df.rdd.map(lambda x: (x['mpg'],x['disp'])).take(4) #combine with map function to aggregate variables

[(21.0, 160.0), (21.0, 160.0), (22.8, 108.0), (21.4, 258.0)]

## RDD to DataFrame

To convert an RDD to a DataFrame, we can use the `SparkSession.createDataFrame()` function. Every element in the RDD has to be a Row.

Create an RDD

#### Convert RDD elements to RDD Row objects
First we define a function which takes a list of column names and a list of values and create a Row of key-value pairs. Since keys in an Row object are variable names, we can’t simply pass a dictionary to the Row() function. We can think of a dictionary as an argument list and use the ** to unpack the argument list.

## Merge and split columns

Sometimes we need to merge multiple columns in a Dataframe into one column, or split a column into multiple columns. We can easily achieve this by converting a DataFrame to RDD, applying map functions to manipulate elements, and then converting the RDD back to a DataFrame.

#### Convert DataFrame to RDD and merge values

In [108]:
#Get the data
mtcars=spark.read.csv('mtcars.csv',inferSchema=True, header=True)
mtcars.show(4)

+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|           _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|     Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
| Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|    Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 4 rows



In [109]:
type(mtcars)

pyspark.sql.dataframe.DataFrame

In [118]:
#Convert dataframe to RDD
rdd_merged=mtcars.rdd.map(lambda x: Row(model=x[0], values=list(map(float, x[1:]))))
rdd_merged.collect()

[Row(model='Mazda RX4', values=[21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),
 Row(model='Mazda RX4 Wag', values=[21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0]),
 Row(model='Datsun 710', values=[22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]),
 Row(model='Hornet 4 Drive', values=[21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0]),
 Row(model='Hornet Sportabout', values=[18.7, 8.0, 360.0, 175.0, 3.15, 3.44, 17.02, 0.0, 0.0, 3.0, 2.0]),
 Row(model='Valiant', values=[18.1, 6.0, 225.0, 105.0, 2.76, 3.46, 20.22, 1.0, 0.0, 3.0, 1.0]),
 Row(model='Duster 360', values=[14.3, 8.0, 360.0, 245.0, 3.21, 3.57, 15.84, 0.0, 0.0, 3.0, 4.0]),
 Row(model='Merc 240D', values=[24.4, 4.0, 146.7, 62.0, 3.69, 3.19, 20.0, 1.0, 0.0, 4.0, 2.0]),
 Row(model='Merc 230', values=[22.8, 4.0, 140.8, 95.0, 3.92, 3.15, 22.9, 1.0, 0.0, 4.0, 2.0]),
 Row(model='Merc 280', values=[19.2, 6.0, 167.6, 123.0, 3.92, 3.44, 18.3, 1.0, 0.0, 4.0, 4.0]),
 Row(model=

#### Convert RDD back to DataFrame

In [120]:
df_merged=spark.createDataFrame(rdd_merged)
df_merged.show(5, truncate=False)

+-----------------+-----------------------------------------------------------------+
|model            |values                                                           |
+-----------------+-----------------------------------------------------------------+
|Mazda RX4        |[21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]  |
|Mazda RX4 Wag    |[21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0] |
|Datsun 710       |[22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]  |
|Hornet 4 Drive   |[21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0]|
|Hornet Sportabout|[18.7, 8.0, 360.0, 175.0, 3.15, 3.44, 17.02, 0.0, 0.0, 3.0, 2.0] |
+-----------------+-----------------------------------------------------------------+
only showing top 5 rows



#### Split the values column to two columns


# Dealing with Categorical data

In [66]:
# Let's create some example dataset
import pandas as pd
pdf = pd.DataFrame({
        'x1': ['a','a','b','b', 'b', 'c'],
        'x2': ['apple', 'orange', 'orange','orange', 'peach', 'peach'],
        'x3': [1, 1, 2, 2, 2, 4],
        'x4': [2.4, 2.5, 3.5, 1.4, 2.1,1.5],
        'y1': [1, 0, 1, 0, 0, 1],
        'y2': ['yes', 'no', 'no', 'yes', 'yes', 'yes']
    })
df = spark.createDataFrame(pdf)
df.show()

+---+------+---+---+---+---+
| x1|    x2| x3| x4| y1| y2|
+---+------+---+---+---+---+
|  a| apple|  1|2.4|  1|yes|
|  a|orange|  1|2.5|  0| no|
|  b|orange|  2|3.5|  1| no|
|  b|orange|  2|1.4|  0|yes|
|  b| peach|  2|2.1|  0|yes|
|  c| peach|  4|1.5|  1|yes|
+---+------+---+---+---+---+



### Mapping String Column to Index column with StringIndexer

`StringIndexer` maps a string column to a index column that will be treated as a categorical column by spark. **The indices start with 0 and are ordered by label frequencies**. If it is a numerical column, the column will first be casted to a string column and then indexed by  StringIndexer.

There are three steps to implement the StringIndexer

1. Build the StringIndexer model: specify the input column and output column names.
2. Learn the StringIndexer model: fit the model with your data.
3. Execute the indexing: call the transform function to execute the indexing process.

In [68]:
from pyspark.ml.feature import StringIndexer

s_indexer = StringIndexer(inputCol='x1',outputCol='indexed_x1')
s_indexer_model=s_indexer.fit(df)
df_s_indexer=s_indexer_model.transform(df)
df_s_indexer.show()

#B is most frequent which is why it has a value of 0 for 'indexed_x1' colun

+---+------+---+---+---+---+----------+
| x1|    x2| x3| x4| y1| y2|indexed_x1|
+---+------+---+---+---+---+----------+
|  a| apple|  1|2.4|  1|yes|       1.0|
|  a|orange|  1|2.5|  0| no|       1.0|
|  b|orange|  2|3.5|  1| no|       0.0|
|  b|orange|  2|1.4|  0|yes|       0.0|
|  b| peach|  2|2.1|  0|yes|       0.0|
|  c| peach|  4|1.5|  1|yes|       2.0|
+---+------+---+---+---+---+----------+



In [69]:
#Look at the datatypes in the dataframe
df_s_indexer.dtypes

[('x1', 'string'),
 ('x2', 'string'),
 ('x3', 'bigint'),
 ('x4', 'double'),
 ('y1', 'bigint'),
 ('y2', 'string'),
 ('indexed_x1', 'double')]

### OneHotEncoder

In [71]:
from pyspark.ml.feature import OneHotEncoder
df_ohe=df.select('x1')
df_ohe.show()

+---+
| x1|
+---+
|  a|
|  a|
|  b|
|  b|
|  b|
|  c|
+---+



In [86]:
#String index this categorical column
df_x1_indexed = StringIndexer(inputCol='x1',outputCol='indexed_x1').fit(df_ohe).transform(df_ohe)
df_x1_indexed.show() #If assigning output to a name, use separate line for show function

+---+----------+
| x1|indexed_x1|
+---+----------+
|  a|       1.0|
|  a|       1.0|
|  b|       0.0|
|  b|       0.0|
|  b|       0.0|
|  c|       2.0|
+---+----------+



In [87]:
OneHotEncoder(inputCol='indexed_x1',outputCol='encoded_x1').transform(df_x1_indexed).show()
#Converts categories to sparse vector

+---+----------+-------------+
| x1|indexed_x1|   encoded_x1|
+---+----------+-------------+
|  a|       1.0|(2,[1],[1.0])|
|  a|       1.0|(2,[1],[1.0])|
|  b|       0.0|(2,[0],[1.0])|
|  b|       0.0|(2,[0],[1.0])|
|  b|       0.0|(2,[0],[1.0])|
|  c|       2.0|    (2,[],[])|
+---+----------+-------------+



In [89]:
OneHotEncoder(inputCol='indexed_x1',outputCol='encoded_x1',dropLast=False).transform(df_x1_indexed).show()
#We have 3 dummy variables 

+---+----------+-------------+
| x1|indexed_x1|   encoded_x1|
+---+----------+-------------+
|  a|       1.0|(3,[1],[1.0])|
|  a|       1.0|(3,[1],[1.0])|
|  b|       0.0|(3,[0],[1.0])|
|  b|       0.0|(3,[0],[1.0])|
|  b|       0.0|(3,[0],[1.0])|
|  c|       2.0|(3,[2],[1.0])|
+---+----------+-------------+



## Binarization from continous to categorical

There are two functions we can use to split a continuous variable into categories:

* `pyspark.ml.feature.Binarizer`: split a column of continuous features given a threshold
* `pyspark.ml.feature.Bucktizer`: split a column of continuous features into categories given several breaking points.
    + with n+1n+1 split points, there are n categories (buckets).


In [90]:
# Generating example data
import numpy as np
import pandas as pd
np.random.seed(seed=1234)
pdf = pd.DataFrame({
        'x1': np.random.randn(10),
        'x2': np.random.rand(10)*10
    })
np.random.seed(seed=None)
df = spark.createDataFrame(pdf)
df.show()

+--------------------+------------------+
|                  x1|                x2|
+--------------------+------------------+
| 0.47143516373249306| 6.834629351721363|
| -1.1909756947064645| 7.127020269829002|
|  1.4327069684260973|3.7025075479039495|
| -0.3126518960917129| 5.611961860656249|
| -0.7205887333650116| 5.030831653078097|
|  0.8871629403077386|0.1376844959068224|
|  0.8595884137174165| 7.728266216123741|
| -0.6365235044173491| 8.826411906361166|
|0.015696372114428918| 3.648859839013723|
| -2.2426849541854055| 6.153961784334937|
+--------------------+------------------+



#### Binarize the column x1

In [92]:
from pyspark.ml.feature import Binarizer, Bucketizer
binarizer = Binarizer(threshold=0,inputCol='x1',outputCol='x1_new')
temp=binarizer.transform(df)

In [93]:
temp.dtypes #'double' is binary representation of variable

[('x1', 'double'), ('x2', 'double'), ('x1_new', 'double')]

#### Bucketize the column x2b

In [94]:
bucketizer=Bucketizer(splits=[0,2.5,5,7.5,10],inputCol='x2',outputCol='x2_new')
bucketizer.transform(df).show()

+--------------------+------------------+------+
|                  x1|                x2|x2_new|
+--------------------+------------------+------+
| 0.47143516373249306| 6.834629351721363|   2.0|
| -1.1909756947064645| 7.127020269829002|   2.0|
|  1.4327069684260973|3.7025075479039495|   1.0|
| -0.3126518960917129| 5.611961860656249|   2.0|
| -0.7205887333650116| 5.030831653078097|   2.0|
|  0.8871629403077386|0.1376844959068224|   0.0|
|  0.8595884137174165| 7.728266216123741|   3.0|
| -0.6365235044173491| 8.826411906361166|   3.0|
|0.015696372114428918| 3.648859839013723|   1.0|
| -2.2426849541854055| 6.153961784334937|   2.0|
+--------------------+------------------+------+



#### Combining it all together with Pipeline

In [95]:
#Combine binarizer and bucketizer into one dataframe
#Must use ml library with Pipeline
from pyspark.ml import Pipeline
stages=[binarizer,bucketizer]
pipeline = Pipeline(stages=stages)
#transform the data
pipeline.fit(df).transform(df).show()

+--------------------+------------------+------+------+
|                  x1|                x2|x1_new|x2_new|
+--------------------+------------------+------+------+
| 0.47143516373249306| 6.834629351721363|   1.0|   2.0|
| -1.1909756947064645| 7.127020269829002|   0.0|   2.0|
|  1.4327069684260973|3.7025075479039495|   1.0|   1.0|
| -0.3126518960917129| 5.611961860656249|   0.0|   2.0|
| -0.7205887333650116| 5.030831653078097|   0.0|   2.0|
|  0.8871629403077386|0.1376844959068224|   1.0|   0.0|
|  0.8595884137174165| 7.728266216123741|   1.0|   3.0|
| -0.6365235044173491| 8.826411906361166|   0.0|   3.0|
|0.015696372114428918| 3.648859839013723|   1.0|   1.0|
| -2.2426849541854055| 6.153961784334937|   0.0|   2.0|
+--------------------+------------------+------+------+



# Data Import/Export

## Read tabular data

#### `spark.read.csv`

In [106]:
#Export data 
from pyspark.sql import DataFrameWriter

#Convert spark dataframe to pandas
df=df.coalesce(numPartitions=1)
#mt_cars.write.csv('saved_mtcars',header=True) does not work without Hadoop binary file 
df.toPandas().to_csv('mycsv.csv')

In [107]:
df.show()

+--------------------+------------------+
|                  x1|                x2|
+--------------------+------------------+
| 0.47143516373249306| 6.834629351721363|
| -1.1909756947064645| 7.127020269829002|
|  1.4327069684260973|3.7025075479039495|
| -0.3126518960917129| 5.611961860656249|
| -0.7205887333650116| 5.030831653078097|
|  0.8871629403077386|0.1376844959068224|
|  0.8595884137174165| 7.728266216123741|
| -0.6365235044173491| 8.826411906361166|
|0.015696372114428918| 3.648859839013723|
| -2.2426849541854055| 6.153961784334937|
+--------------------+------------------+



In [121]:
sc.stop()

## Read non-tabular data