# Introduction to ![spark](../data/pics/spark.png) using ![scala](../data/pics/python.png)

### Checking the version of spark

In [1]:
spark.version

'2.2.0'

### Create a dataframe

In [11]:
myRange = spark.range(1000).toDF("range")

In [12]:
myRange.show(5)

+-----+
|range|
+-----+
|    0|
|    1|
|    2|
|    3|
|    4|
+-----+
only showing top 5 rows



### Some transformations

- Spark will not act on transformations.
- All transformations in Spark are lazy => we wait until an action is called
- Spark will create a DAG (Directed Acyclic Graph) and act upon the source data
- Spark will optimize the pipeline
- Examples of [transformations](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations): map, filter, join, groupBy, sortByKey ... etc

In [13]:
div3 = myRange.where("range % 3 = 0")

### Action

- Trigger the computation on the logic transformation
- Examples of [actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions): reduce, count, collect, take, saveAsTextFile ... etc

In [14]:
div3.count()

334

We can check the results using the Spark UI: http://localhost:4040/

In [21]:
div3.filter(div3["range"] < 25).show()

+-----+
|range|
+-----+
|    0|
|    3|
|    6|
|    9|
|   12|
|   15|
|   18|
|   21|
|   24|
+-----+



In [22]:
div3.filter("range > 10 AND range < 25").count()

5

### Loading some data

In [23]:
data = spark.read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("../data/titanic.csv")

If we are calling this data frequently, it is better to cache it for faster access.

In [25]:
data.cache()

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [26]:
data.count()

891

In [29]:
#data.take(3)
data.head(3)

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'),
 Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C'),
 Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S')]

In [35]:
#data.show(3, truncate=False)
#data.select("Survived", "Sex", "Age").show(10)
#data.show(3)
data.select(data.Age, data.Sex, data["Survived"], "Pclass").show(3)

+----+------+--------+------+
| Age|   Sex|Survived|Pclass|
+----+------+--------+------+
|22.0|  male|       0|     3|
|38.0|female|       1|     1|
|26.0|female|       1|     3|
+----+------+--------+------+
only showing top 3 rows



In [39]:
data.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [41]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [44]:
# summary statistics about the data
data.describe('Sex', "Age").show()
#data.describe().show()

+-------+------+------------------+
|summary|   Sex|               Age|
+-------+------+------------------+
|  count|   891|               714|
|   mean|  null| 29.69911764705882|
| stddev|  null|14.526497332334035|
|    min|female|              0.42|
|    max|  male|              80.0|
+-------+------+------------------+



### DataFrames overview

 - __Immutable__: once created cannot be changed. we applying transformation to the existing DF, a new one will be created
 - __Lazy__: unless there is an action performed on the DF, no transformation will be computed
 - __Distributed__

### Manipulating DataFrames (or SparkSQL)

- __sort()__ :

    - When we are using `sort`, spark will not perform anything on the data, because it is just a transformation. However, it will create a plan for when an action is called. We can use `explain` to see the plan.
    - When reading the `explain`, on top we have the end result and at the bottom is the data we start with.
    - Only when we call an action on the data frame, the entire DAG is computed as shown in the `explain` pipeline

In [45]:
data.sort("Survived").explain()

== Physical Plan ==
*Sort [Survived#92 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(Survived#92 ASC NULLS FIRST, 200)
   +- InMemoryTableScan [PassengerId#91, Survived#92, Pclass#93, Name#94, Sex#95, Age#96, SibSp#97, Parch#98, Ticket#99, Fare#100, Cabin#101, Embarked#102]
         +- InMemoryRelation [PassengerId#91, Survived#92, Pclass#93, Name#94, Sex#95, Age#96, SibSp#97, Parch#98, Ticket#99, Fare#100, Cabin#101, Embarked#102], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *FileScan csv [PassengerId#91,Survived#92,Pclass#93,Name#94,Sex#95,Age#96,SibSp#97,Parch#98,Ticket#99,Fare#100,Cabin#101,Embarked#102] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/resueman/Dev/spark/MLeap/data/titanic.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<PassengerId:int,Survived:int,Pclass:int,Name:string,Sex:string,Age:double,SibSp:int,Parch:...


In [50]:
from pyspark.sql.functions import desc

data.sort(desc("Survived")).select("Sex", "Survived").show(4)

+------+--------+
|   Sex|Survived|
+------+--------+
|female|       1|
|female|       1|
|female|       1|
|female|       1|
+------+--------+
only showing top 4 rows



### Manipulating DataFrames (or SparkSQL)

- __createOrReplaceTempView()__
    - Spark SQL will create a temporary table from your DataFrame, which you can query with normal SQL
    - The temporary table can be manipulated with DataFrame code also
    - There is no performance difference between SQL and DF code

In [None]:
data.createOrReplaceTempView("titanic_data")

In [None]:
# this is a SparSQL query

spark.sql("""
    SELECT Sex, Survived, count(Survived) as count FROM titanic_data GROUP BY Sex, Survived ORDER BY Sex
""").show()

In [None]:
# this is a Spark DataFrame query
data.groupBy("Sex", "Survived").count().sort("Sex").show()

### Manipulating DataFrames (or SparkSQL)

- __crosstab__(*col1, col2*)
    - pairwise frequency (contigency table)

In [None]:
data.crosstab("Sex", "Survived").show()

### Manipulating DataFrames (or SparkSQL)

- __distinct()__
    - this will return a new DF containing the distinct rows in the original DF

In [None]:
data.select('Embarked').distinct().show()

### Manipulating DataFrames (or SparkSQL)

- __dropna__(*how='any', thresh=None, subset=None*)
    - this will return a new DF omitting the rows containing null values
    

- __fillna__(*value, subset=None*)
    - it will replace null values

In [None]:
data.count(), data.dropna(subset="Embarked").count()
#data.count(), data.dropna().count()

In [None]:
data.fillna("X", subset="Embarked").select("Embarked").distinct().show()

### Manipulating DataFrames (or SparkSQL)

- __filter__(*condition*)
    - this will filter rows given a certain condition

In [None]:
# data.filter(data.Sex == "male").count()
# data.filter(data["Sex"] == "female").count()
data.filter(data.Age < 25).count()

### Manipulating DataFrames (or SparkSQL)

- __groupBy__(*\*cols*)
    - groups the specified columns and runs aggregations on it
    
    
- __agg__(*\*expression*)
    - aggregating on a DF

In [None]:
data.groupBy("Sex").agg({"Age": "average"}).show()

In [None]:
data.agg({"Age": "max"}).show()

In [None]:
data.groupBy("Sex").count().show()

### Manipulating DataFrames (or SparkSQL)

Something more complex:
 - transform the data frame into RDD (resilient distributed dataset)
 - apply a mapping function to each row in the data frame
 - transform it back to a data frame
 - rename the column to `gender`
 - join the newly formed data frame with the original data
 - drop the `id` column

In [None]:
def getGender(string):
    if(string == 'male'): return 0
    elif(string == 'female'): return 1
    else: return -1

In [None]:
rdd = data.select("PassengerId","Sex").rdd.map(lambda x: (x.PassengerId,getGender(x.Sex)))
df = spark.createDataFrame(rdd, ["id", "gender"])
data.join(df, data.PassengerId == df.id, how='inner').drop("id").show()

### Manipulating DataFrames (or SparkSQL)

Same thing as above, but using UDF (user defined functions)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

In [None]:
udf_gender = udf(lambda x: getGender(x))
data.withColumn('Gender', udf_gender(data.Sex)).show()