# MScA, Advanced Machine Learning (32009)

# Week 2 Workshop: More Programming Tools

## Yuri Balasanov, &copy; iLykei, 2017

Main text: Jeffrey Aven, Sams Teach Yourself Apache SPARK in 24 Hours,Pearson Education, Inc., 2017 

## Tracking Jobs, Stages and Storage

Main principle of working with RDD is keeping them in memory as much as possible and discarding them as soon as they are not needed anymore.
In case when an RDD needsto be reused it can be cached or persisted to disk.



<font color=green>
**Example** <br>
<br>
In the following example `newrdd` is created, processed and recomputed again to reprocess it.

Type in your browser http://localhost:4040/storage/ to see the log of scheduler jobs and stages

In [3]:
originalrdd=sc.parallelize([0,1,2,3,4,5,6,7,8])

<font color=green>
This command creates a new RDD, but it is not executed yet (lazy execution). <br>
There are no new records in the log at port `:4040`.

In [4]:
originalrdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8]

<font color=green>
Function `.collect()` forces evaluation: scheduler log shows one collect job and stage 0 completed. Nomemory used according to Storage tab.

In [5]:
newrdd=originalrdd.filter(lambda x: x % 2)

<font color=green>
RDD `newrdd` created, but not evaluated yet.

In [6]:
noelements=newrdd.count()       # processing

<font color=green>
First time processing `newrdd` is `count`. It is reflected as new job and stage 1.
Click on each stage to see the corresponding DAG. <br>
Stage 0 is `parallelize`, stage 1 is `count`. <br>
Check timeline of each job. <br>
There is still no storage used. <br>

In [7]:
listofelements=newrdd.collect() # re-processing
print("There are %s elements in the collection %s" % (noelements,listofelements))

There are 4 elements in the collection [1, 3, 5, 7]


<font color=green>
RDD `newrdd` is reprocessed: collect action caused reevaluating the RDD because object `newrdd` was not stored. <br>
Find the corresponding DAG in the log. <br>

<font color=green>
<br>
Now persist newrdd to avoid re-computing it. <br>
Shutdown and restart the notebook Kernel to reset the scheduler log.

In [1]:
originalrdd=sc.parallelize([0,1,2,3,4,5,6,7,8])
newrdd=originalrdd.filter(lambda x: x % 2)

<font color=green>
Again, for the first time `originalrdd` is transformed to make `newrdd`. <br>
There is no information in the log.

In [2]:
newrdd.persist()

PythonRDD[1] at RDD at PythonRDD.scala:48

<font color=green>
The RDD object persisted to disk, but no records appeared in the log.

In [3]:
noelements=newrdd.count()       # processing

<font color=green>
First processing forced evaluation. <br>
There is new `Job 0` in the log: DAG of `Stage 0` shows the process from creation by `parallelize` to `persist` to `count`. <br>
All partitions of RDD are stored.

In [4]:
listofelements=newrdd.collect() # re-processing
print("There are %s elements in the collection %s" % (noelements,listofelements))

There are 4 elements in the collection [1, 3, 5, 7]


<font color=green>
Log shows that the same DAG was executed for reevaluation.

## RDD Lineage and Storage Level

Spark plans execution of a program as Directed Acyclic Graph (**DAG**). 
DAG is a sequence of operations separated into stages.
Some stages, like map, are completely parallelized, others, like reduce require a shuffle.
This results in stage dependency.

The Spark driver keeps track of RDD lineages and is able to reevaluate any RDDs when necessary.

Summary of lineage execution can be observed by `toDebugString()`. <br>  

RDDs are stored in their partitions on worker nodes.
There are 6 main storage levels available to RDDs: <br>

1 MEMORY_ONLY: default level; <br>
2 MEMORY_AND_DISK: RDDs are stored in memory and if they do not fit in memory are stored on disk; <br>
3 MEMORY_ONLY_SER: RDD partitions are stored as serialized (encoded) objects in memory; serialization requires less memory; <br>
4 MEMORY_AND_DISK_SER: Serialized RDD partitions are stored in memory and if they don't fit are spilled to disk; <br>
5 DISK_ONLY: RDD partitions are stored on disk only; <br>
6 OFF_HEAP: RDDs are stored using external storage system. <br>

Storage level is regulated by flags in StorageClass constructor: <br>

`StorageLevel(useDisk, useMemory, useOfHeap, deserialized, replication=1)`

All flage, but `replication` are Boolean, `replication` is an integer defaulting to 1.

Storage levels codes in flags: <br>

MEMORY_ONLY=(False, True, False, True, 1) <br>
MEMORY_AND_DISK=(True, True, False, True,1) <br>
MEMORY_ONLY_SER=(False, True, False, False,1) <br>
MEMORY_AND_DISK_SER=(True, True, False, False,1) <br>
DISK_ONLY-(True, False, False, False,1) <br>

Check storage level by `getStorageLevel()` <br>

Choosing storage level helps tuning Spark jobs and accomodating large scale operations that may not fit in default memory.

<font color=green>
**Example** <br>
<br>
Consider example of RDD lineage. <br>
Shutdown and Restart Kernel to reset scheduler log before running the following cells.

In [1]:
txtDoc=sc.parallelize(['These violent delights have violent ends',
                       'And in their triump die like fire and powder',
                       'Which as they kiss consume'])
txtDoc.collect()

['These violent delights have violent ends',
 'And in their triump die like fire and powder',
 'Which as they kiss consume']

In [2]:
print(txtDoc.toDebugString())
txtDoc.getStorageLevel()

b'(8) ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480 []'


StorageLevel(False, False, False, False, 1)

<font color=green>
RDD has been created and evaluated by `.collect()`; `toDebugString()` shows the same information that can be found in the DAG. <br>
Level of storage is not specified: all flags are `False`. <br>

In [3]:
words=txtDoc.flatMap(lambda x: x.split())
print(words.toDebugString())
print(words.collect())
words.getStorageLevel()

b'(8) PythonRDD[1] at RDD at PythonRDD.scala:48 []\n |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480 []'
['These', 'violent', 'delights', 'have', 'violent', 'ends', 'And', 'in', 'their', 'triump', 'die', 'like', 'fire', 'and', 'powder', 'Which', 'as', 'they', 'kiss', 'consume']


StorageLevel(False, False, False, False, 1)

<font color=green>
There has been new transformation: RDD, `words`, is a transformation of `txtDoc`. <br>
This transformation is reflected in `.toDebugString()` and in the `Stage 1` DAG.

In [4]:
longwords=words.filter(lambda x:len(x)>5)
print(longwords.toDebugString())
print(longwords.take(100))

b'(8) PythonRDD[2] at RDD at PythonRDD.scala:48 []\n |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480 []'
['violent', 'delights', 'violent', 'triump', 'powder', 'consume']


In [5]:
numwords=longwords.count()
numwords

6

In [6]:
print(longwords.toDebugString())

b'(8) PythonRDD[2] at RDD at PythonRDD.scala:48 []\n |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480 []'


In [7]:
longwords_distinct=longwords.distinct()
longwords_distinct.collect()
print(longwords_distinct.toDebugString())

b'(8) PythonRDD[11] at collect at <ipython-input-7-e70847af13b8>:2 []\n |  MapPartitionsRDD[10] at mapPartitions at PythonRDD.scala:427 []\n |  ShuffledRDD[9] at partitionBy at NativeMethodAccessorImpl.java:0 []\n +-(8) PairwiseRDD[8] at distinct at <ipython-input-7-e70847af13b8>:1 []\n    |  PythonRDD[7] at distinct at <ipython-input-7-e70847af13b8>:1 []\n    |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480 []'


<font color=green>
Observe new jobs and stages in the log. <br>
Note that using `distinct()` required `Shuffle` stage. 

## Caching, Persistence and Checkpointing

### Caching

Caching an RDD persists data in memory. <br>
Cached data can be reused multiple times without reevaluation when actions are called. <br>
Caching does not trigger execution or computation. It is a suggestion: if there is not enough memory for an RDD it will be reevaluated every time it is processed. <br>
Caching does not spill to disk because it only uses memory. <br>
However, a cached RDD will be persisted to memory if storage level is MEMORY_ONLY_SER. <br>

<font color=green>
**Example** <br>
<br>
Count frequencies of words from Shakespeare.

In [8]:
txtDoc=sc.parallelize(['These violent delights have violent ends',
                       'And in their triump die like fire and powder',
                       'Which as they kiss consume'])
words=txtDoc.flatMap(lambda x: x.split()) \
.map(lambda x: (x,1)) \
.reduceByKey(lambda x,y: x+y)

words.cache()

PythonRDD[17] at RDD at PythonRDD.scala:48

<font color=green>
Counting words will trigger computation for the first time.

In [9]:
words.count()

19

<font color=green>
Now Storage tab shows cached partitions. <br>
Log contains 1 job (`count`) and 2 stages (`reduceByKey` and `count`).

In [10]:
words.take(3)

[('These', 1), ('ends', 1), ('Which', 1)]

<font color=green>
New job appeared in the log showing skipped stages. <br>
This is a result of caching: new stage did not require reevaluating the lineage of RDDs. <br>

In [11]:
words.count()

19

<font color=green>
Second `count` job has skipped stage due to caching. 

### Persisting

Cached partitions are stored in memory on executor JVMs on Spark Workers. <br>
If a worker node fails the cached data will need to be recreated. <br>
<br>
The `persist` method offers additional storage options including MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER. <br>
When using persistence with one of the disk storage options, the persisted partitions are stored as local files on the worker nodes running Spark executors for the application. <br>
Persisted data can be used in case of memory failure. <br>
<br>
Persist can also use replication on alternative nodes which helps avoiding recreation of data when some nodes fail. <br>
<br>
**Spark RDDs are fault-tolerant regardless of persistence. Persistence increases efficiency of the property** <br>
<br>
Persistence is also a suggestion: it only takes place after an action is called to trigger evaluation of an RDD. <br>
If there is not enough memory persistence may not be implemented. <br>
Persistence storage status can be checked by `getStorageLevel()`.

#### Persist()

Syntax is:

`.persist(storageLevel=storageLevel.MEMORY_ONLY_SER)`

The method specifies the storage level and storage attributes. <br>
The argument `storageLevel` can be either a static constant or a set of storage flags.
<br>
The following instructions are equivalent: <br>

`.persist(StorageLevel.MEMORY_AND_DISK_SER_2)` <br>
`.persist(StorageLevel(True,True,False,False,2)`

#### Unpersist()

Syntax is:

`unpersist()`

The method `unpersist` unpersists the RDD. <br>
This method can also be used to remove an RDD that cached using `cache`.

### Checkpointing

Checkpointing saves data to a file and can be used to transport results beyond the application to other driver processes or applications. <br>

Checkpointing is an expensive operation. It is executed when an action is required.

#### SetCheckPointDir()

Syntax is:

`SparkContext.setCheckPointDir()`

This method sets the directory in which RDDs are checkpointed.

#### Checkpoint()

Syntax is:

`.checkpoint()`

This method marks RDD for checkpointing; it must be called before any action. <br>
After checkpointing is over the RDD and its lineage, including all references will be removed. <br>

Checkpoint directory must be specified using `setCheckPointDir()` before checkpointing RDD.

#### IsCheckPointed()

Syntax is:

`.isCheckPointed()`

Returns Boolean response telling if the RDD has been checkpointed.

#### GetCheckPointFile()

Syntax is:

`.getCheckPointFile()`

Returns the name of the file to which the RDD was checkpointed.

## Saving RDD Output

Typically it useful to save the final step of an RDD transformations to an external storage, which may be an HDFS, S3 or a local file system.

#### SaveAsTextFile()

Syntax is:

`.saveAsTextFile(path, compressionCodecClass=None)`

This method is an action that saves RDD to a directory of text file with one file per partition. Each file contains string representation of elements. <br>

The directory specified by the path cannot exist before the Spark job is run. Spark cannot override files and folders due to immutability rule.

#### SaveAsSequenceFile()

Syntax is:

`.saveAsSequenceFile(path, compressionCodecClass=None)`

This is Spark action saving RDD to a directory containing Sequence Filesconsisting of uniform key-value pairs.

<font color=green>
**Example** <br>
<br>
Create name of directory where the data will be saved.

In [12]:
from datetime import datetime
now=datetime.utcnow().strftime("%Y%m%d%H%M%S")
root_dir="savedData"
dataDir="{}-{}".format(root_dir,now)
dataDir

'savedData-20171009012726'

In [13]:
txtDoc=sc.parallelize(['These violent delights have violent ends',
                       'And in their triump die like fire and powder',
                       'Which as they kiss consume'])
words=txtDoc.flatMap(lambda x: x.split()).keyBy(lambda x: x)
words.saveAsSequenceFile(dataDir)

<font color=green>
Check that the directory appeared in the same folder where the notebook is located.

## Broadcast Variables

Broadcast variables are read-only variables set by Spark Driver that are made available to the worker nodes, and through them available to any tasks running on executors on those workers. <br>
Broadcast variables are shared by a special protocol which makes such sharing more efficient than directly pushing variables from Driver to executor processes. <br>

#### Broadcast()

Syntax is:

`SparkContext.broadcast(value)`

The method creates an instance of a Broadcast object within the specific SparkContext. <br>

Argument `value` is an object that will be serialized and used in the Broadcast object. It can be any valid Python object.

Once initialized, the broadcast variable can be called within the SparkContext.

#### Value()

Syntax is:

`.value()`

This method returns value from the broadcast variable. It can be used within a lambda function in a `map` or `filter` operation.

#### Unpersist()

Syntax is:

`.unpersist(blocking=False)`

This method is used to remove broadcast variable from memory from all the workers where it was present. <br>

Boolean argument `blocking` the operation shoud be blocking (block until the variable is unpersisted on all nodes). If memory needs to be released immediately set `blocking=True`.

<font color=green>
**Example** <br>
<br>
Create RDD `words` from a Sheakspeare qiote and broadcast it. <br>
Then unpersist it.

In [14]:
txtDoc=sc.parallelize(['These violent delights have violent ends',
                       'And in their triump die like fire and powder',
                       'Which as they kiss consume'])
words=txtDoc.flatMap(lambda x: x.split()) \
.map(lambda x: (x,1)) \
.reduceByKey(lambda x,y: x+y)
print(words.collect())
br_words=sc.broadcast(words.collect())

[('These', 1), ('ends', 1), ('Which', 1), ('And', 1), ('like', 1), ('their', 1), ('die', 1), ('as', 1), ('powder', 1), ('in', 1), ('violent', 2), ('delights', 1), ('fire', 1), ('have', 1), ('and', 1), ('triump', 1), ('they', 1), ('consume', 1), ('kiss', 1)]


In [15]:
print(br_words.value[1])
br_words

('ends', 1)


<pyspark.broadcast.Broadcast at 0x1935379cd68>

In [16]:
br_words.unpersist(blocking=True)

In [17]:
print(br_words.value[1])
br_words

('ends', 1)


<pyspark.broadcast.Broadcast at 0x1935379cd68>

<font color=green>
Note that after unpersisting the broadcast variable it still shows.
The variable was removed from all workers, but stayed on the driver.
When it is used the driver has to send it again.

## Accumulators

This is another type of shared variables in Spark: they can be updated in a special way: they are numeric values that can be incremented. <br>
Accumulators allow aggregating values as the program is running. <br>
Accumulators are updated once per successfully completed task in a Spark application. <br>
Worker nodes send updates for accumulator to the driver because driver is the only process that can read accumulator value. <br>
Accumulators can use integer or float values.


<font color=green>
**Example** <br>
<br>
In this example accumulator variable is created, updated and read.

In [18]:
acc=sc.accumulator(0)
def addone(x):
    global acc
    acc+=1
    return x+1
myrdd=sc.parallelize([1,2,3,4,5])
myrdd.map(lambda x: addone(x)).collect()

[2, 3, 4, 5, 6]

In [19]:
print("records processed: " + str(acc.value))

records processed: 5


## Spark SQL DataFrames

Spark SQL DataFrame is similar to dataframe object in R. <br>
DataFrames, like RDDs are evaluated as DAGs with lazy evaluation. <br>
They also provide lineage and fault tolerance. <br>
Caching and persistence methods can also be applied to DataFrames. <br>

**In latest versions of Spark it is recommended to use DataFrames rather than RDDs whenever possible.**

### Creating DataFrame from Existing RDD

#### CreateDataFrame()

Syntax is:

`SQLContext.createDataFrame(data, schema=None, samplingRatio=None)`

This method creates DataFrame object from an existing RDD. <br>
Argument `data` is a reference to a named RDD object consisting of tuples or list elements. <br>


<font color=green>
**Example** <br>
<br>

In [91]:
myrdd=sc.parallelize([('Jeff',46),('Kellie',44)])
sqlContext.createDataFrame(myrdd).collect()

[Row(_1='Jeff', _2=46), Row(_1='Kellie', _2=44)]

<font color=green>
The result is a list of `Row(pyspark.sql.Row)` objects. <br>
Since `schema` was not specified the fields are referenced by `<fieldnumber>` starting at 1. <br>

### Creating DataFrames from Files using DataFrameReader

Starting from version 1.4 Spark has interface to load DataFrames from external storage systems, the DataFrameReader. <br>
The DataFrameReader interface is accessed using `SQLContext.read()`. <br>

#### Text()

Syntax is:

`SQLContext.read.text(paths)`

This method is used to load DataFrames from text files in an external file system: local, NFS, HDFS, S3, etc. <br>
It is similar to `sc.textFile()`. <br>
Argument `path` refers to a file, directory or file glob.

<font color=green>
**Example** <br>
<br>
Create and RDD and save it as partitioned text files

In [20]:
now=datetime.utcnow().strftime("%Y%m%d%H%M%S")
root_dir="savedData"
dataDir="{}-{}".format(root_dir,now)
print(dataDir)

savedData-20171009125800


In [21]:
myrdd=sc.parallelize([('Jeff',46),('Kellie',44)])
myrdd.saveAsTextFile(dataDir+"/myTestFile.txt")

<font color=green>
Read the DataFrame and show its second row.

In [22]:
df=sqlContext.read.text(dataDir+"/myTestFile.txt/")
df.take(1)

[Row(value="('Kellie', 44)")]

<font color=green>
The `Row` object returned line of the text file or files contains one complete string. <br>

Remove folder `savedFiles` if you are planning to write to it again. <br>

#### Spark.read.csv()

Syntax is:

`spark.read.csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None)`

This method was added starting Spark version 2.0. <br>
It reads a `.csv` file pretty much like in R. <br>
Argument `schema` describes the column structure of the data. <br>

<font color=green>
**Example** <br>
<br>
Describe the columns as `schema`. <br>
Read file with table of data for ANOVA. <br>

In [1]:
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, Row

df_struct = StructType([StructField('y', DoubleType()), \
                        StructField('c', StringType())])
#df=spark.read.csv('./Examples/data.csv',sep=" ",schema=df_struct)
df = spark.read.csv('./data1.csv',sep=" ",schema=df_struct)
print(df)
df.take(5)

DataFrame[y: double, c: string]


[Row(y=1.08931931750671, c='C2'),
 Row(y=4.03892811665213, c='C4'),
 Row(y=0.710538426311777, c='C1'),
 Row(y=4.54878758817833, c='C4'),
 Row(y=3.54970766958625, c='C3')]

 <font color=green>
 Write the DataFrame to a single `csv` file and read it back.

In [2]:
df.toPandas().to_csv("sample_file.csv", sep=",",index=False,header=False,)
dfback=spark.read.csv('sample_file.csv', sep=",",schema=df_struct)
print(dfback)
dfback.take(5)

DataFrame[y: double, c: string]


[Row(y=1.08931931750671, c='C2'),
 Row(y=4.03892811665213, c='C4'),
 Row(y=0.710538426311777, c='C1'),
 Row(y=4.54878758817833, c='C4'),
 Row(y=3.54970766958625, c='C3')]

### Converting DataFrame to RDD

Converting a DataFrame to RDD object is done by method `.rdd`.

<font color=green>
**Example** <br>
<br>
Read the data for ANOVA as DataFrame, check that the result is a DataFrame. <br>
Show first 5 rows.

In [3]:
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, Row

df_struct = StructType([StructField('y', DoubleType()), \
                        StructField('c', StringType())])
df=spark.read.csv('./data1.csv',sep=" ",schema=df_struct)
print(df)
df.take(5)

DataFrame[y: double, c: string]


[Row(y=1.08931931750671, c='C2'),
 Row(y=4.03892811665213, c='C4'),
 Row(y=0.710538426311777, c='C1'),
 Row(y=4.54878758817833, c='C4'),
 Row(y=3.54970766958625, c='C3')]

Convert the DataFrame into RDD, show first 5 elements.

In [4]:
print(df.rdd)
df.rdd.take(5)

MapPartitionsRDD[15] at javaToPython at NativeMethodAccessorImpl.java:0


[Row(y=1.08931931750671, c='C2'),
 Row(y=4.03892811665213, c='C4'),
 Row(y=0.710538426311777, c='C1'),
 Row(y=4.54878758817833, c='C4'),
 Row(y=3.54970766958625, c='C3')]

### DataFrame Schemas

For DataFrame object `schema` can be explicitely defined or inferred. <br>
Generally, it is better to define schema.

#### Inferring the Schema

Spark SQL can analyze an object (RDD that is converted to a DataFrame) and infer its composition to then use it as schema for the created DataFrame. This process is called **reflection**.

During reflection each record of RDD is converted into a Row object and to each field gets  assigned a value. <br>

Datatyes are inferred from the first record, it is important to have the first row representative of the dataset with no missing values.


<font color=green>
**Example** <br>
<br>
Create an RDD `salespeople`, convert it to a DataFrame and print the inferred schema.

In [36]:
salespeople=sc.parallelize(['1\tHenry\t100', \
                            '2\tKaren\t100', \
                            '3\tPaul\t101', \
                            '4\tJimmy\t102', \
                            '5\tJanice\t103']) \
.map(lambda x: x.split('\t')) \
.map(lambda x: (int(x[0]),x[1],int(x[2])))
salespeople.collect()

[(1, 'Henry', 100),
 (2, 'Karen', 100),
 (3, 'Paul', 101),
 (4, 'Jimmy', 102),
 (5, 'Janice', 103)]

In [37]:
df=sqlContext.createDataFrame(salespeople)
df.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



In [38]:
df.take(1)

[Row(_1=1, _2='Henry', _3=100)]

<font color=green>
Fields identifiers by default are field numbers, they have property `nullable=True`, meaning that they do not need to be supplied.

#### Defining the Schema Programmatically

It is preferrable to create schema rather than infer it. <br>
Creating a schema requires creating a `StructType` object containing a collection of `StructField` objects. <br>
Then this schema is applied to a DataFrame when it is created. <br>

<font color=green>
**Example** <br>
<br>


In [39]:
from pyspark.sql.types import *
myschema=StructType([ \
                    StructField("SalesPerson_ID",IntegerType(),True), \
                    StructField("SalesPerson_Name",StringType(),nullable=True), \
                    StructField("Store_ID",IntegerType(),nullable=True) \
                    ])

In [40]:
df=sqlContext.createDataFrame(salespeople,myschema)
df.printSchema()

root
 |-- SalesPerson_ID: integer (nullable = true)
 |-- SalesPerson_Name: string (nullable = true)
 |-- Store_ID: integer (nullable = true)



In [41]:
df.take(1)

[Row(SalesPerson_ID=1, SalesPerson_Name='Henry', Store_ID=100)]

<font color=green>
Note differences with the inferred schema of the previous example:
-  Fields have custom names
-  Types are defined not "conservatively": integer instead of long.

## Using Spark SQL DataFrames

DataFrames has been the fastest developing area of the Spark project. <br>

### DataFrame Metadata Operations

Metadata functions return information about the data structure instead of data themselves. <br>

#### Columns()

Syntax is:

`.columns`

This method returns a list of column names for the given DataFrame.

<font color=green>
**Example** <br>
<br>

In [105]:
df.columns

['SalesPerson_ID', 'SalesPerson_Name', 'Store_ID']

#### Dtypes

Syntax is:

`.dtypes`

This method returns a list of tuples, each of them consisting of column names and data types for a column of a given DataFrame object.

<font color=green>
**Example** <br>
<br>

In [106]:
df.dtypes

[('SalesPerson_ID', 'int'),
 ('SalesPerson_Name', 'string'),
 ('Store_ID', 'int')]

### Basic DataFrame Operations

#### Count()

Function `count` returns the number of rows in the DataFrame. As in case of RDD it is an action that triggers evaluation of the DataFrame and its lineage.

#### Collect()

Same function as for RDD is available for DataFrames.

#### Take(n)

Same function as for RDD is available for DataFrames.

#### Show()

Syntax is:

`.show(n=20,truncate=True)`

Prints the first `n` rows of a DataFrame to the console. Unlike `collect()` or `take(n)` it cannot be returned to a variable. <br>
Argument `truncate` is used to truncate long strings and align cells to the right.

<font color=green>
**Example** <br>
<br>

In [107]:
print(df.count())
df.collect()

5


[Row(SalesPerson_ID=1, SalesPerson_Name='Henry', Store_ID=100),
 Row(SalesPerson_ID=2, SalesPerson_Name='Karen', Store_ID=100),
 Row(SalesPerson_ID=3, SalesPerson_Name='Paul', Store_ID=101),
 Row(SalesPerson_ID=4, SalesPerson_Name='Jimmy', Store_ID=102),
 Row(SalesPerson_ID=5, SalesPerson_Name='Janice', Store_ID=103)]

In [108]:
df.take(2)

[Row(SalesPerson_ID=1, SalesPerson_Name='Henry', Store_ID=100),
 Row(SalesPerson_ID=2, SalesPerson_Name='Karen', Store_ID=100)]

In [109]:
df.show(2)

+--------------+----------------+--------+
|SalesPerson_ID|SalesPerson_Name|Store_ID|
+--------------+----------------+--------+
|             1|           Henry|     100|
|             2|           Karen|     100|
+--------------+----------------+--------+
only showing top 2 rows



#### Select, drop, filter and distinct

These functions are used to prune columns or filter rows from a DataFrame. <br>
A new DataFrame is created as a result. <br>

Syntax:

`.drop(col)`

Returns a new DataFrame with column `col` removed. <br>

Syntax:

`.filter(condition)`

Returns a new DataFrame containing only rows satisfying the condition.

Syntax:

`.distinct()`

Returns DataFrame containing only distinct rows, equivalent to filtering out duplicate rows.

<font color=green>
**Example** <br>
<br>

In [110]:
df.drop('Store_ID').show(2)

+--------------+----------------+
|SalesPerson_ID|SalesPerson_Name|
+--------------+----------------+
|             1|           Henry|
|             2|           Karen|
+--------------+----------------+
only showing top 2 rows



In [111]:
df.filter(df.SalesPerson_Name=='Henry').show()

+--------------+----------------+--------+
|SalesPerson_ID|SalesPerson_Name|Store_ID|
+--------------+----------------+--------+
|             1|           Henry|     100|
+--------------+----------------+--------+



In [112]:
df.distinct().show()

+--------------+----------------+--------+
|SalesPerson_ID|SalesPerson_Name|Store_ID|
+--------------+----------------+--------+
|             2|           Karen|     100|
|             1|           Henry|     100|
|             5|          Janice|     103|
|             4|           Jimmy|     102|
|             3|            Paul|     101|
+--------------+----------------+--------+



#### Select, map, flatMap

These methods are used to apply column-level functions to rows in Spark SQL DataFrames, similar to `apply` in R. <br>
Use of lambda functions is a little different. <br>
Functions `map()` and `flatMap()` used to be transformations that operate on DataFrames, but return an RDD. <br>
Starting from PySpark 2 they are removed. <br>
Use `.rdd.map()` or `.rdd.flatMap()` instead.

<font color=green>
**Example** <br>
<br>

In [113]:
df.select(["SalesPerson_Name","Store_ID"]).show()

+----------------+--------+
|SalesPerson_Name|Store_ID|
+----------------+--------+
|           Henry|     100|
|           Karen|     100|
|            Paul|     101|
|           Jimmy|     102|
|          Janice|     103|
+----------------+--------+



In [114]:
print(df.rdd.flatMap(lambda x: x).collect())
df.rdd.map(lambda x: x).collect()

[1, 'Henry', 100, 2, 'Karen', 100, 3, 'Paul', 101, 4, 'Jimmy', 102, 5, 'Janice', 103]


[Row(SalesPerson_ID=1, SalesPerson_Name='Henry', Store_ID=100),
 Row(SalesPerson_ID=2, SalesPerson_Name='Karen', Store_ID=100),
 Row(SalesPerson_ID=3, SalesPerson_Name='Paul', Store_ID=101),
 Row(SalesPerson_ID=4, SalesPerson_Name='Jimmy', Store_ID=102),
 Row(SalesPerson_ID=5, SalesPerson_Name='Janice', Store_ID=103)]

# ANOVA Coding: DataFrame Approach

create structure type for 1-way ANOVA, name it `df_struct`.

In [1]:
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, Row

In [2]:
#Skipped code
df_struct = StructType([StructField('y', DoubleType()), \
                        StructField('c', StringType())])

Read file `data0.csv` into a DataFrame `df` and cache it.

df.take(10) returns output: <br>

[Row(y=0.0893193175067113, c='C2'), <br>
 Row(y=1.03892811665213, c='C4'), <br>
 Row(y=0.710538426311777, c='C1'), <br>
 Row(y=1.54878758817833, c='C4'), <br>
 Row(y=1.54970766958625, c='C3'), <br>
 Row(y=2.62544730346494, c='C2'), <br>
 Row(y=-0.890027143624024, c='C3'), <br>
 Row(y=-0.231707058414097, c='C2'), <br>
 Row(y=1.10632061985913, c='C4'), <br>
 Row(y=1.79791643753108, c='C2')] <br>

In [3]:
#Skipped code
df = spark.read.csv('./data0.csv', sep=" ", schema=df_struct)
print(df)
df.take(10)


DataFrame[y: double, c: string]


[Row(y=0.0893193175067113, c='C2'),
 Row(y=1.03892811665213, c='C4'),
 Row(y=0.710538426311777, c='C1'),
 Row(y=1.54878758817833, c='C4'),
 Row(y=1.54970766958625, c='C3'),
 Row(y=2.62544730346494, c='C2'),
 Row(y=-0.890027143624024, c='C3'),
 Row(y=-0.231707058414097, c='C2'),
 Row(y=1.10632061985913, c='C4'),
 Row(y=1.79791643753108, c='C2')]

Create a DataFrame that contains columns of classes, sums, numbers of elements per class and mean values:

[Row(c='C3', sum=106.72212894321214, n=100, mean=1.0672212894321214), <br>
 Row(c='C4', sum=100.32253230729513, n=100, mean=1.0032253230729513), <br>
 Row(c='C1', sum=102.26684498157968, n=100, mean=1.0226684498157967), <br>
 Row(c='C2', sum=95.44484415126036, n=100, mean=0.9544484415126036)] <br>

In [14]:
# use rdd to create this 
# make c the key, and y the value



[Row(y=0.0893193175067113, c='C2'),
 Row(y=1.03892811665213, c='C4'),
 Row(y=0.710538426311777, c='C1'),
 Row(y=1.54878758817833, c='C4'),
 Row(y=1.54970766958625, c='C3'),
 Row(y=2.62544730346494, c='C2'),
 Row(y=-0.890027143624024, c='C3'),
 Row(y=-0.231707058414097, c='C2'),
 Row(y=1.10632061985913, c='C4'),
 Row(y=1.79791643753108, c='C2'),
 Row(y=-0.147657009236351, c='C1'),
 Row(y=0.043108118674381, c='C2'),
 Row(y=2.47025699708299, c='C2'),
 Row(y=0.130217126313507, c='C2'),
 Row(y=-0.720650662077186, c='C4'),
 Row(y=0.551825763396604, c='C2'),
 Row(y=0.498621682204911, c='C3'),
 Row(y=1.07734227692233, c='C4'),
 Row(y=0.933533225922491, c='C4'),
 Row(y=1.00837095999603, c='C1'),
 Row(y=0.556615195636073, c='C3'),
 Row(y=4.26641451887544, c='C4'),
 Row(y=0.0254475312855701, c='C4'),
 Row(y=1.9514984696255, c='C3'),
 Row(y=0.761613049940232, c='C4'),
 Row(y=2.00736436564955, c='C4'),
 Row(y=0.595012841515629, c='C4'),
 Row(y=1.77214218580453, c='C1'),
 Row(y=0.930982702501831, c=

In [14]:
import pyspark
from pyspark.sql import Row
#df.createOrReplaceTempView("df")
df_classes=spark.sql("SELECT c, SUM(y) AS sum, \
            COUNT(y) AS n, \
            AVG(y) AS mean \
            FROM df \
            GROUP BY c")
df_classes.show()
classes_array = df_classes.collect()
print(classes_array)

+---+------------------+---+------------------+
|  c|               sum|  n|              mean|
+---+------------------+---+------------------+
| C3|106.72212894321214|100|1.0672212894321214|
| C4|100.32253230729513|100|1.0032253230729513|
| C1|102.26684498157968|100|1.0226684498157967|
| C2| 95.44484415126036|100|0.9544484415126036|
+---+------------------+---+------------------+

[Row(c='C3', sum=106.72212894321214, n=100, mean=1.0672212894321214), Row(c='C4', sum=100.32253230729513, n=100, mean=1.0032253230729513), Row(c='C1', sum=102.26684498157968, n=100, mean=1.0226684498157967), Row(c='C2', sum=95.44484415126036, n=100, mean=0.9544484415126036)]


Create class sums as dictionary:

{'C3': 100, 'C4': 100, 'C1': 100, 'C2': 100} <br>
{'C3': 106.72212894321214, 'C4': 100.32253230729513, 'C1': 102.26684498157968, 'C2': 95.44484415126036}  <br>
{'C3': 1.0672212894321214, 'C4': 1.0032253230729513, 'C1': 1.0226684498157967, 'C2': 0.9544484415126036}  <br>

In [24]:
#Skipped code
d1={row[0]:row[2] for row in classes_array}
d2={row[0]:row[1] for row in classes_array}
d3={row[0]:row[3] for row in classes_array}
print(d1,d2,d3)

{'C3': 100, 'C1': 100, 'C4': 100, 'C2': 100} {'C3': 106.72212894321214, 'C1': 102.26684498157968, 'C4': 100.32253230729513, 'C2': 95.44484415126036} {'C3': 1.0672212894321214, 'C1': 1.0226684498157967, 'C4': 1.0032253230729513, 'C2': 0.9544484415126036}


Calculate `nClasses`, `nObs` and `grandMean`, so that line of code
`nClasses, nObs, grandMean` returns:

(4, 400, 1.011890875958368)

In [30]:
#Skipped code
nClasses = df_classes.count() # count the number of rows in sqldf
nObs= df.count()
grandMean=(sum([row[3] for row in classes_array]))/nClasses
print(nClasses, nObs,grandMean)

4 400 1.0118908759583682


Calculate `between_ss` and `meanSqClass = between_ss / dfClass`

In [32]:
#meanSqClass
betweenSS = sum([(row[3]-grandMean)**2 for row in classes_array])
dfClass = nClasses -1
meanSqClass = 100*betweenSS/dfClass
print(meanSqClass)

0.2184111945098246


Calculate within sum of squares `withinSS` and `meanSqResid=withinSS / dfResid`

0.9281210461910595

In [62]:
# calc withinSS: use Spark again
#meanSqResid
large_df = df.join(df_classes,'c')
large_df = large_df.withColumn('sq',(large_df.y - large_df.mean)**2)
large_df.show(5)
wSS = large_df.groupBy().sum('sq').collect()
withinSS = wSS[0][0]





dfResid = nObs-4
meanSqResid = withinSS/dfResid
meanSqResid

+---+------------------+------------------+---+------------------+--------------------+
|  c|                 y|               sum|  n|              mean|                  sq|
+---+------------------+------------------+---+------------------+--------------------+
| C2|0.0893193175067113| 95.44484415126036|100|0.9544484415126036|  0.7484484012032026|
| C4|  1.03892811665213|100.32253230729513|100|1.0032253230729513|0.001274689469357445|
| C1| 0.710538426311777|102.26684498157968|100|1.0226684498157967| 0.09742515157261993|
| C4|  1.54878758817833|100.32253230729513|100|1.0032253230729513|  0.2976381851069114|
| C3|  1.54970766958625|106.72212894321214|100|1.0672212894321214| 0.23279310703423428|
+---+------------------+------------------+---+------------------+--------------------+
only showing top 5 rows



0.9281210461910595

Calculate `f-ststistics` and `p-value`

f_statistics:

0.2353261952265473

p-value:

0.87173536999418566

In [63]:
#Skipped code
f_stat=meanSqClass/meanSqResid
f_stat

0.2353261952265473

In [65]:
#p-value
from scipy.stats import f
1-f.cdf(f_stat,dfClass ,dfResid)

0.87173536999418877