###### Spark is a platform for cluster computing. Spark lets you spread data and computations over clusters with multiple nodes (think of each node as a separate computer). Splitting up your data makes it easier to work with very large datasets because each node only works with a small amount of data.

###### As each node works on its own subset of the total data, it also carries out a part of the total calculations required, so that both data processing and computation are performed in parallel over the nodes in the cluster. It is a fact that parallel computation can make certain types of programming tasks much faster.

###### The cluster will be hosted on a remote machine that's connected to all other nodes. There will be one computer, called the *master* that manages splitting up the data and the computations. The master is connected to the rest of the computers in the cluster, which are called *worker*. The master sends the workers data and calculations to run, and they send their results back to the master.

###### When you're just getting started with Spark it's simpler to just run a cluster locally. Thus, for this course, instead of connecting to another computer

###### Creating the connection is as simple as creating an instance of the SparkContext class. The class constructor takes a few optional arguments that allow you to specify the attributes of the cluster you're connecting to.

###### An object holding all these attributes can be created with the SparkConf() constructor.

### http://spark.apache.org/docs/2.1.0/api/python/pyspark.html

### Use spark only for Big data and complex complications

In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession # connecting to spark cluster
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

### Using DataFrames

###### Spark's core data structure is the Resilient Distributed Dataset (RDD). This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster. 

###### Spark DataFrame was designed to behave a lot like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, DataFrames are also more optimized for complicated operations than RDDs.

###### When you start modifying and combining columns and rows of data, there are many ways to arrive at the same result, but some often take much longer than others. When using RDDs, it's up to the data scientist to figure out the right way to optimize the query, but the DataFrame implementation has much of this optimization built in

###### To start working with Spark DataFrames, you first have to create a SparkSession object from your SparkContext. You can think of the SparkContext as your connection to the cluster and the SparkSession as your interface with that connection.

In [2]:
from pyspark import SparkContext
sc = SparkContext("local", "First App")

###### To check if there is already a spark session active or not

In [3]:
SparkSession.builder.getOrCreate()

In [4]:
print(sc)

<SparkContext master=local appName=First App>


In [5]:
print(sc.version)

2.4.5


In [6]:
# Create spark
spark = SparkSession.builder.getOrCreate()

# Print spark
print(spark)

<pyspark.sql.session.SparkSession object at 0x000002084D3D57B8>


###### There is an attribute called catalog which lists all the data inside the cluster
###### .listTables() = returns the names of all the tables in your cluster as a list.

In [32]:
spark.catalog.listTables()

[Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

###### We can read tables in the spark catalogs by using
spark.sql("FROM flights SELECT * LIMIT 10")

###### We can also convert spark tables to pandas dataframe for easy access by using
df = table1.toPandas()

###### Converting pandas dataframe to spark dataframes
spark_df = spark.createDataFrame(pandas_df) 

###### However, this dataframe wont get saved to the catalog
###### To add this to the catalog
spark_df.createOrReplaceTempView("temp")

In [37]:
titanic_df.createOrReplaceTempView("titanic_df")

#### Reading files directly from the directly into spark files

In [19]:
titanic_df = spark.read.csv('train.csv',header = 'True',inferSchema='True')

In [36]:
display(titanic_df)

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [38]:
# Pulling data from the spark catalog and creating a dataframe out of it
titanic_df = spark.table("titanic_df")

In [45]:
# We cannot edit a dataframe in spark so we overwrite them
titanic_df.show(2)

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|sum_survived_pclass|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+-------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25| null|       S|                  3|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0| PC 17599|71.2833|  C85|       C|                  2|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+-------------------+
only showing top 2 rows



In [41]:
# adding a column name
titanic_df = titanic_df.withColumn("sum_survived_pclass", titanic_df.Survived + titanic_df.Pclass)

In [46]:
titanic_df.show(2)

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|sum_survived_pclass|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+-------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25| null|       S|                  3|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0| PC 17599|71.2833|  C85|       C|                  2|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+-------------------+
only showing top 2 rows



###### Filtering a dataframe

In [52]:
fare_greater_3 = titanic_df.filter("Fare > 3")
fare_greater_3.show(2)

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|sum_survived_pclass|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+-------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25| null|       S|                  3|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0| PC 17599|71.2833|  C85|       C|                  2|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+-------------------+
only showing top 2 rows



In [53]:
fare_greater_3 = titanic_df.filter(titanic_df.Fare > 3)
fare_greater_3.show(2)

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|sum_survived_pclass|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+-------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25| null|       S|                  3|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0| PC 17599|71.2833|  C85|       C|                  2|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+-------------------+
only showing top 2 rows



###### Selecting columns from a dataset

In [54]:
columns = titanic_df.select("Survived","Pclass")

In [55]:
columns.show(2)

+--------+------+
|Survived|Pclass|
+--------+------+
|       0|     3|
|       1|     1|
+--------+------+
only showing top 2 rows



In [56]:
columns = titanic_df.select(titanic_df.Survived,titanic_df.Pclass)

In [59]:
columns.show(2)

+--------+------+
|Survived|Pclass|
+--------+------+
|       0|     3|
|       1|     1|
+--------+------+
only showing top 2 rows



###### Doing operations and selecting columns at the same time and also naming it

In [62]:
sum_Survived_Pclass = titanic_df.select(titanic_df.Survived + titanic_df.Pclass)

In [64]:
sum_Survived_Pclass.show(3)

+-------------------+
|(Survived + Pclass)|
+-------------------+
|                  3|
|                  2|
|                  4|
+-------------------+
only showing top 3 rows



In [65]:
sum_Survived_Pclass = titanic_df.select(titanic_df.Survived + titanic_df.Pclass).alias("sum")

In [66]:
sum_Survived_Pclass.show(3)

+-------------------+
|(Survived + Pclass)|
+-------------------+
|                  3|
|                  2|
|                  4|
+-------------------+
only showing top 3 rows



In [21]:
titanic_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [22]:
passengers_count = titanic_df.count()

In [23]:
print(passengers_count)

891


In [24]:
titanic_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [25]:
titanic_df.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

In [26]:
titanic_df.select("Age").show()

+----+
| Age|
+----+
|22.0|
|38.0|
|26.0|
|35.0|
|35.0|
|null|
|54.0|
| 2.0|
|27.0|
|14.0|
| 4.0|
|58.0|
|20.0|
|39.0|
|14.0|
|55.0|
| 2.0|
|null|
|31.0|
|null|
+----+
only showing top 20 rows

