# **Spark**

* If Data contains velocity, variety, volume & veracity.
* Hadoop only works on **'On Disk'** computation and **Batch Data**. It has lengthy and complex framework. Low Cost
* Hadoop has two main components - 
    * **HDFS** - Stores data in distributed fashion. Scaling is easier here.
    * **Map Reduce** - used for distributed processing.
* If you want to run SQL on hadoop then you need to learn HIVE
* Hbase, Apache Storm (Handling realtime data), oozie, Scoop, pig.
* Spark supports both realtime and batch processing. High Cost
* In memory computation is supported i.e transformations are done on RAM, read write happens on disk. Supports tools like Spark SWL, Mlib, GraphX, and Spark Streaming.
* Spark is simple and user friendly system.
* If you want to do 10 different things you need to operate 10 different tools, to overcome this Spark was introduced. It is 100x faster than Hadoop. This is made possible by reducing the number of read/write iperations on the disk.
* 350+ projects are there under Apache Foundation, Spark is one of them.
* Spark can be used with Java, Scala, Python, SQL, and R.
* To run Spark, Databricks was introduced.
* Microsoft Fabric, Azure Synapse, AWS Blue.
* Databricks is preffered to run Spark. (Why??)


Apache Spark unifies
  * Batch Processing
  * Stream Analytics
  * Machine Learning
  * SQL Processing



#### **Spark's Basic Architecture**
------
![Alt text](https://hacked.work/blog/wp-content/uploads/2015/03/spark-cluster.png)



***Apache Spark works in a master-slave architecture where the master is called “Driver” and slaves are called “Workers”. When you run a Spark application, Spark Driver creates a context (Spark Context) that is an entry point to your application, and all operations (transformations and actions) are executed on worker nodes, and the resources are managed by Cluster Manager.***


* **Driver Program** – The process running the main() function of the application and creating the SparkContext. It is also the program/job, written by the developers which is submitted to Spark for processing. Driver program will partition the data. There will always be only 1 driver program.


* **Spark Context** – Spark Context is the entry point to use Spark Core services and features. It sets up internal services and establishes a connection to a Spark execution environment. It communicates with cluster and to create RDD. Every Spark job creates a spark context object before it can do any processing. It allows your Spark Application to access Spark Context with the help of resource manager. It will start the Driver Program. ***There is one Spark Context per JVM***



* **Cluster Manager** – Spark uses cluster manager to acquire resources across the cluster for executing a job. However, Spark is also agnostic of cluster managers and does not really care how it can get its hands on cluster resources. It supports the following cluster managers

    * Spark standalone cluster manager - A simple cluster manager included with spark that makes it easy to set up a cluster
    * YARN - resource manager in hadoop2
    * Mesos
    * Kubernetes


* **Worker Node** – Worker Nodes are nodes which actually do data processing/heavy lifting on data.

* **Executor** – Executors are independent processes which run inside the Worker Nodes in their own JVMs. Data processing is actually done by these executor processes.


* **Cache** – Data stored in physical memory. Jobs can cache data so that it does not need to re-compute RDDs and hence increases the performance storing intermediary data.


* **Task** – A task is a unit of work performed independently by the executor on one partition.


* **Partition** – Spark manages its data by splitting data into manageable chunks across the nodes in a cluster. These chunks are called partitions. The splitting of data is done in a way so that it leads to reduction of network traffic and also optimise the operations to be performed on the data.

[Imp Link](https://www.mrstonewallin.com/post/spark-knowledge-series-i)



#### **Spark Deployment Modes: Client Mode vs Cluster Mode**
---
<img src="https://th.bing.com/th/id/R.5b4223cfa8490f2a8ac960b3e3d3738b?rik=sN29WUb1k7JWxw&riu=http%3a%2f%2fblog.brainlounge.de%2fmemoryleaks%2f2018-12-getting-started-with-spark-on-kubernetes-deploy-modes.png&ehk=zTXeqqjcdNpkjexQ77%2bl3JSIvFN1ljY4scGGGNdGo6Y%3d&risl=&pid=ImgRaw&r=0" width="550" height="300" />

* **Cluster Mode:** In cluster mode, the driver runs on one of the worker nodes, and this node shows as a driver on the Spark Web UI of your application. cluster mode is used to run production jobs.
* **Client Mode:** In client mode, the driver runs locally from where you are submitting your application using spark-submit command. client mode is majorly used for interactive and debugging purposes. Note that in client mode only the driver runs locally and all tasks run on cluster worker nodes.


#### **Spark Toolset**

-------



<img src="https://miro.medium.com/max/1104/1*_Dy9w0lUXIeH6WHALkQC-g.png" width="400" height="400" />

#### **Data Structures in Spark: RDD, DataFrame, Dataset**
------

* **Resilient Distributed Dataset**: resilient, immutable, collection of data. 
  * **Resilient:** RDDs are fault tolerant
  * **Collection of Data:** RDD holds data and appears to be scala collection.
  * **Partition:** Sparks break RDD into smaller cgunks of data.
  * **Distributed:** Spark distributes the partition along the cluster.
\n

* **Dataframe:** Most common Structured API and simply represents a table of data with rows and columns. Similar to DB table. The list that defines the columns and the types within those columns is called Schema.


* **Dataset:** 

In [1]:
import pyspark
import findspark
findspark.init('/opt/anaconda3/lib/python3.11/site-packages/pyspark')
from pyspark import SparkContext
# pyspark.__version__

In [8]:
conf=pyspark.SparkConf().setMaster('local').setAppName("first")
sc = SparkContext(conf = conf)

In [9]:
rdd = sc.parallelize([1,2,3])

In [10]:
rdd.collect()

[1, 2, 3]

In [5]:
sc.stop()

In [11]:
sc

In [12]:
rdd2 = sc.parallelize(['Python','SQL','PySpark'])

In [13]:
rdd2.collect()

['Python', 'SQL', 'PySpark']

In [14]:
type(rdd2)

pyspark.rdd.RDD

In [15]:
rdd3 = sc.parallelize([1,2,3,4,5,6,7,8,9])
rdd3.collect()
type(rdd3)


pyspark.rdd.RDD

In [16]:
rdd4 = rdd3.map(lambda x:x*2)

In [17]:
rdd4.collect()

                                                                                

[2, 4, 6, 8, 10, 12, 14, 16, 18]

In [18]:
findspark.find()

'/opt/anaconda3/lib/python3.11/site-packages/pyspark'

In [19]:
rdd5 = rdd3.filter(lambda x:x%2==0)

In [20]:
rdd5.collect()

[2, 4, 6, 8]

In [21]:
from pyspark.sql import SparkSession

In [22]:
#Create a session
spark = SparkSession.builder.appName("RDDExample").getOrCreate()

In [23]:
df = spark.createDataFrame([(1,2,3)])
df.show()

+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  2|  3|
+---+---+---+



In [24]:
df = spark.createDataFrame([(1,2,3),(4,5,6)])
df.show()

+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
+---+---+---+



In [25]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
df = spark.createDataFrame([Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),    
                            Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),    
                            Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [26]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [27]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [28]:
data = [
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
]

schema = 'a long, b double, c string, d date, e timestamp'

In [29]:
df2 = spark.createDataFrame(data, schema)
df2.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [30]:
df2.dtypes

[('a', 'bigint'),
 ('b', 'double'),
 ('c', 'string'),
 ('d', 'date'),
 ('e', 'timestamp')]

In [31]:
data2 = [(1, 'a', 'z', 777000, 'India'), (2, 'b', 'y', 89000, 'India')]

schema2 = 'id int, name string, last_name string, mo int, country string'

df3 = spark.createDataFrame(data2, schema2)



In [32]:
df3.show()

+---+----+---------+------+-------+
| id|name|last_name|    mo|country|
+---+----+---------+------+-------+
|  1|   a|        z|777000|  India|
|  2|   b|        y| 89000|  India|
+---+----+---------+------+-------+



In [33]:
df3.dtypes

[('id', 'int'),
 ('name', 'string'),
 ('last_name', 'string'),
 ('mo', 'int'),
 ('country', 'string')]

In [34]:
df3.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- mo: integer (nullable = true)
 |-- country: string (nullable = true)



In [35]:
users = [{
    "id":1,
    "first_name":"Saumya",
    "amount_paid":78000
},
{
    "id":2,
    "first_name":"Shreya",
    "amount_paid":8000
},
{
    "id":3,
    "first_name":"Anushka",
    "amount_paid":8990
}]

df4 = spark.createDataFrame(users)

In [36]:
df4.show()

+-----------+----------+---+
|amount_paid|first_name| id|
+-----------+----------+---+
|      78000|    Saumya|  1|
|       8000|    Shreya|  2|
|       8990|   Anushka|  3|
+-----------+----------+---+



In [37]:
#path = '/home/labuser/Desktop/Untitled Folder/sales_data.csv'
path = '/home/labuser/Desktop/Untitled Folder/employees.csv'
df = spark.read.option("header", True).option("inferSchema", True).csv('employees.csv')

In [38]:
df6 = spark.read.csv(path, header = True, inferSchema = True)

In [39]:
df.show()

+----------+------+----------+---------------+------+-------+-----------------+--------------------+
|First Name|Gender|Start Date|Last Login Time|Salary|Bonus %|Senior Management|                Team|
+----------+------+----------+---------------+------+-------+-----------------+--------------------+
|   Douglas|  Male|  8/6/1993|       12:42 PM| 97308|  6.945|             true|           Marketing|
|    Thomas|  Male| 3/31/1996|        6:53 AM| 61933|   4.17|             true|                null|
|     Maria|Female| 4/23/1993|       11:17 AM|130590| 11.858|            false|             Finance|
|     Jerry|  Male|  3/4/2005|        1:00 PM|138705|   9.34|             true|             Finance|
|     Larry|  Male| 1/24/1998|        4:47 PM|101004|  1.389|             true|     Client Services|
|    Dennis|  Male| 4/18/1987|        1:35 AM|115163| 10.125|            false|               Legal|
|      Ruby|Female| 8/17/1987|        4:20 PM| 65476| 10.012|             true|            

In [40]:
df6.show()

+----------+------+----------+---------------+------+-------+-----------------+--------------------+
|First Name|Gender|Start Date|Last Login Time|Salary|Bonus %|Senior Management|                Team|
+----------+------+----------+---------------+------+-------+-----------------+--------------------+
|   Douglas|  Male|  8/6/1993|       12:42 PM| 97308|  6.945|             true|           Marketing|
|    Thomas|  Male| 3/31/1996|        6:53 AM| 61933|   4.17|             true|                null|
|     Maria|Female| 4/23/1993|       11:17 AM|130590| 11.858|            false|             Finance|
|     Jerry|  Male|  3/4/2005|        1:00 PM|138705|   9.34|             true|             Finance|
|     Larry|  Male| 1/24/1998|        4:47 PM|101004|  1.389|             true|     Client Services|
|    Dennis|  Male| 4/18/1987|        1:35 AM|115163| 10.125|            false|               Legal|
|      Ruby|Female| 8/17/1987|        4:20 PM| 65476| 10.012|             true|            

In [41]:
df6.printSchema()

root
 |-- First Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Last Login Time: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Bonus %: double (nullable = true)
 |-- Senior Management: boolean (nullable = true)
 |-- Team: string (nullable = true)



In [42]:
df.dtypes

[('First Name', 'string'),
 ('Gender', 'string'),
 ('Start Date', 'string'),
 ('Last Login Time', 'string'),
 ('Salary', 'int'),
 ('Bonus %', 'double'),
 ('Senior Management', 'boolean'),
 ('Team', 'string')]

In [43]:
df.select("First Name","Gender")

DataFrame[First Name: string, Gender: string]

In [44]:
from pyspark.sql.functions import *
df.select(col("First Name").alias("Forename")).show()
# We can only see the column that we are trying to rename

+--------+
|Forename|
+--------+
| Douglas|
|  Thomas|
|   Maria|
|   Jerry|
|   Larry|
|  Dennis|
|    Ruby|
|    null|
|  Angela|
| Frances|
|  Louise|
|   Julie|
| Brandon|
|    Gary|
|Kimberly|
| Lillian|
|  Jeremy|
|   Shawn|
|   Diana|
|   Donna|
+--------+
only showing top 20 rows



In [45]:
df.select("First Name", col("Gender")).show()

+----------+------+
|First Name|Gender|
+----------+------+
|   Douglas|  Male|
|    Thomas|  Male|
|     Maria|Female|
|     Jerry|  Male|
|     Larry|  Male|
|    Dennis|  Male|
|      Ruby|Female|
|      null|Female|
|    Angela|Female|
|   Frances|Female|
|    Louise|Female|
|     Julie|Female|
|   Brandon|  Male|
|      Gary|  Male|
|  Kimberly|Female|
|   Lillian|Female|
|    Jeremy|  Male|
|     Shawn|  Male|
|     Diana|Female|
|     Donna|Female|
+----------+------+
only showing top 20 rows



In [46]:
df.select("First Name", col("Gender"), df["Team"]).show()

+----------+------+--------------------+
|First Name|Gender|                Team|
+----------+------+--------------------+
|   Douglas|  Male|           Marketing|
|    Thomas|  Male|                null|
|     Maria|Female|             Finance|
|     Jerry|  Male|             Finance|
|     Larry|  Male|     Client Services|
|    Dennis|  Male|               Legal|
|      Ruby|Female|             Product|
|      null|Female|             Finance|
|    Angela|Female|         Engineering|
|   Frances|Female|Business Development|
|    Louise|Female|                null|
|     Julie|Female|               Legal|
|   Brandon|  Male|     Human Resources|
|      Gary|  Male|               Sales|
|  Kimberly|Female|             Finance|
|   Lillian|Female|             Product|
|    Jeremy|  Male|     Human Resources|
|     Shawn|  Male|             Product|
|     Diana|Female|     Client Services|
|     Donna|Female|             Product|
+----------+------+--------------------+
only showing top

In [47]:
df.withColumnRenamed("First Name", "Name").show()

## We can see the whole dataframe here

+--------+------+----------+---------------+------+-------+-----------------+--------------------+
|    Name|Gender|Start Date|Last Login Time|Salary|Bonus %|Senior Management|                Team|
+--------+------+----------+---------------+------+-------+-----------------+--------------------+
| Douglas|  Male|  8/6/1993|       12:42 PM| 97308|  6.945|             true|           Marketing|
|  Thomas|  Male| 3/31/1996|        6:53 AM| 61933|   4.17|             true|                null|
|   Maria|Female| 4/23/1993|       11:17 AM|130590| 11.858|            false|             Finance|
|   Jerry|  Male|  3/4/2005|        1:00 PM|138705|   9.34|             true|             Finance|
|   Larry|  Male| 1/24/1998|        4:47 PM|101004|  1.389|             true|     Client Services|
|  Dennis|  Male| 4/18/1987|        1:35 AM|115163| 10.125|            false|               Legal|
|    Ruby|Female| 8/17/1987|        4:20 PM| 65476| 10.012|             true|             Product|
|    null|

In [48]:
df.printSchema()

root
 |-- First Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Last Login Time: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Bonus %: double (nullable = true)
 |-- Senior Management: boolean (nullable = true)
 |-- Team: string (nullable = true)



In [49]:
df.select(concat("Start Date", lit(" "),"Last Login Time",).alias("Date & Time")).show()
# Use pySpark Functions inside DataFrame Functions

# DataFrame functions are used like df.functionName() - Ususally in camelCase
#PySpark Functions can be called inside DataFrame Functions

+-------------------+
|        Date & Time|
+-------------------+
|  8/6/1993 12:42 PM|
|  3/31/1996 6:53 AM|
| 4/23/1993 11:17 AM|
|   3/4/2005 1:00 PM|
|  1/24/1998 4:47 PM|
|  4/18/1987 1:35 AM|
|  8/17/1987 4:20 PM|
| 7/20/2015 10:43 AM|
| 11/22/2005 6:29 AM|
|   8/8/2002 6:51 AM|
|  8/12/1980 9:01 AM|
| 10/26/1997 3:19 PM|
|  12/1/1980 1:08 AM|
| 1/27/2008 11:40 PM|
|  1/14/1999 7:13 AM|
|   6/5/2016 6:09 AM|
|  9/21/2010 5:56 AM|
|  12/7/1986 7:45 PM|
|10/23/1981 10:27 AM|
|  7/22/2010 3:48 AM|
+-------------------+
only showing top 20 rows



In [50]:
df_final  = df.drop("Start Date", "Last Login Time")

In [52]:
df_final.write.csv("/home/labuser/Desktop/Untitled Folder/Final_employees2.csv")

In [53]:
from pyspark.sql.types import *

In [55]:
data = [(1,'a', 30), (2, "b", 66)]

userSchema = StructType([StructField("id", IntegerType()),
                        StructField("name", StringType()),
                        StructField("age", IntegerType())])

df = spark.createDataFrame(data, userSchema)

In [56]:
df

DataFrame[id: int, name: string, age: int]