## Apache Spark
What is Spark? 
 
 Spark is a distributed computing framework
* Apache spark is a fast and general purpose engine for large scale data processing and it works on a cluster. It doesn't come with inbuilt cluster resource manager
    * Spark Core(Heart of Spark)
        * Cluster computing engine
            * Memory Management,Task Scheduling, Faulty recovery and interact with cluster manager
        * Set of core api's(libraries) supports java,python,scala,R
            * Structured API's (Data Frames,Data Sets)
            * Un-Structured API's(RDD)
* <b> We feel that working on a database in good case, for the worst case we need work on collections </b>


### How does spark executes our programs on a cluster
* Master slave architecure
    * Master is the Driver
        * Driver is responsible for analysing, distributing, scheduling and monitoring work across the executors
    * Slaves are executors
* Once you submit your spark submit, the request goes to yarn resource manager and it launches application master.
* The application master will launch a driver
* The executor will perform task(one task for one partition and default spark creates one partition for one block)
* When your program has transformations and actions then all transaformations will be in one stage and actions will be another stage because when you run your word count program , the results will be clubbed at reduce/reduceByKey that means all the partitions data will be shuffled and sorted then apply actions

* Default spark submit creates one executor per one data node
    * Suppose if the file size is around 600MB then file splits to 600MB/128MB(default block size) = 5 blocks
        * default taks created =  5 tasks(one per block)
        * default partitions = 5 partitions(one per block)
        * default executors = 1(single node cluster)
* <b> 1 core = 1 Thread/process </b>

* <b> So we might think, more concurrent tasks for each executor will give better performance. But research shows that any application with more than 5 concurrent tasks, would lead to a bad show. So the optimal value is 5. </b>    
    


### Case 1 Hardware — 
* 6 Nodes and each node have 16 cores, 64 GB RAM
    * 1 core and 1 gb of ram allocated to os and hadoop daemons
    * so, 15 cores, 63 gb on each node available
        * 15 cores * 6 nodes = 90 cores
        * 90 cores/ 5 cores max per executor for max throughput = 18 executors
        * 1 executor for Application Master(AM)
        * 17 executors available / 6 node cluster = 3 executors for each data node
        * Each node has 63 gb RAM available / 3 executors per datanode = 21 GB of RAM for each executor
        * Spark Memory divided into 3 parts
            * Storage Memory (75%)
            * User Memory (25%)
            * Reserved Memory (300Mb)
        * 21GB*(1-0.07)=19GB available for each executor memory
    * <b> So, Finally, --num-executors=17 --executor-memory=19gb --executor-cores=5 </b>
 

## Case 2 Hardware — 
* 6 Nodes and Each node have 32 Cores, 64 GB
 * 1 core and 1 gb of ram allocated to os and hadoop daemons
    * so, 31 cores, 63 gb on each node available
        * 31 cores * 6 nodes = 186 cores
        * 186 cores/ 5 cores max per executor for max throughput = 37 executors
        * 1 executor for Application Master(AM)
        * 36 executors available / 6 node cluster = 6 executors for each data node
        * Each node has 63 gb RAM available / 6 executors per datanode = 10 GB of RAM for each executor
        * Spark Memory divided into 3 parts
            * Storage Memory (75%)
            * User Memory (25%)
            * Reserved Memory (300Mb)
        * 10GB*(1-0.07)=9GB available for each executor memory
    * <b> So, Finally, --num-executors=37 --executor-memory=9gb --executor-cores=5 </b>

## Case 3 Hardware -- When available memory more than required
* Suppose in case 1 you think that 19 gb is more memory and you fell 10 gb is sufficient then calculation changes
    * 10 gb per executor, so 63 gb available on each node / 10 = 6 executors are needed
    * 6 executors * 6 nodes = total 36 executors
    * 15 cores * 6 = 90 cores
    * 90 cores / 36 executors = 3 cores per each executor
    * Since for bettter optimization we can't increase no of cores for each executor
    * <b> So, Finally, --num-executors=36 --executor-memory=10gb --executor-cores=3 </b>

<b> Spark default read and write format to hive is parquet </b>

* <b> Spark default nature is cache that means spark decides to cache the frequently used data and this is in smal size </b>
* <b> Persistence is that we ask spark to store the data either in ram/disk, this way spark stores large amount of data </b>


### Where do we use vargs(*),kwargs(**)?
* <b> *args are used in to convert all of data frame columns </b>

     <code> 
     def reNameColumns(cols):
                reNamedCols = []
                for col in cols:
                    reNamedCols.append(re.sub("[^a-zA-Z0-9]", "_", col.lower()))
                return reNamedCols
     cols = reNameColumns(jsonDF.columns)
     jsonDF.toDF(*cols)
     </code>
     
* <b>**args are used in to get jdbc connection proprerties as map </b>
     <code>
         Class JDBCConnection:
                 def __init__(self):
                    self.config = cp.ConfigParser()
                    self.config.read(
                        r"C:\Users\91889\PycharmProjects\pyspark_poc\src\main\python\com\rposam\spark\config\properties.txt")

            def getPostgresConnectoin(self):
                host = self.config.get("postegres_props", "host")
                port = self.config.get("postegres_props", "port")
                user = self.config.get("postegres_props", "user")
                pwd = self.config.get("postegres_props", "password")
                driver = self.config.get("postegres_props", "driver")
                dbname = self.config.get("postegres_props", "dbname")
                props = {
                    "url": "jdbc:postgresql://{}:{}/{}".format(host, port, dbname),
                    "user": user,
                    "password": pwd,
                    "driver": driver,
                    "batchsize": 25000
                }
                return props
            props = JDBCConnection().getPostgresConnectoin()
            jsonDF.coalesce(1).write.format("jdbc"). \
                mode(saveMode="append").options(**props).option("dbtable", "awspostgredb.json_tab").save()
     </code>

### What's the most complicated issue you faced?
* The most complicated issue i face is missing of encode for one of my task. I got a task to load some of delimiter separated files and join them and store the results into a jdbc.
* The delimiter for the all the files are two characters but spark driver api read support only one delimiter
* So , i did choose to load data as rdd instead of dataframe limitations(as above two delimeters)
    <code>
        --Failed 
          df = spark.read.format("csv").option("delimiter","::").load("two_delimitors_sep_file.txt")
        --Success 
         rdd = spark.sparkContext.textFile("two_delimitors_sep_file.txt")
         rdd = rdd.map(lambda line: line.encode("utf-8").map(lambda line:line.split("::")).map(lambda line:(line(0),line(1),line(2),line(3)).toDF()
    </code>       
* Converted rdd to a dataframe and verifed all the files data showing as expected but whiel joining two dataframes i got an error(some espace character or other error) and got stuck there.
* I try looking at datasets and printing some lines and everything is fine but join fails.
* After struggling for some hours, somewhere i found that's an issue with rdd data read.
* <b> After rdd created, first thing  we need to do is encoding (convert from unicode to string). </b>
    <code>
        rdd.map(lambda line:line.encode("utf-8")
    </code>
* Then followed joining of all dataframes and finally succeeded.

### What is lineage?
* Lineage is a flow of transformations and spark remembers dependent of transformatons.
* Lets consider word count program as an example.
    <code>
        rdd = spark.sparkContext.textFile(".../dir")
        rdd1 = rdd.map(lamdba x : x.encode("utf-8")
        rdd2 = rdd1.flatMap(lambda x : x.split(","))
        rdd3 = rdd2.map(lambda x: (x,1))
        rdd4 = rdd3.reduceByKey(lambda x,y:x+y)
        rdd4.repartition(1).saveAsTextFile(".../dir")
     </code>
* As you know transformations are lazy and it is invoked only when an action being called. So, All transformations(rdd,rdd1,rdd2,rdd3) are one dependent to another and spark only remembers it , doesn't execute.
* Series of dependent transformations are called lineage.
* When rdd3.reduceByKey(this is an action) is called, spark tries to execute it's dependent transformations.
            rdd1 = rdd.map --> rdd1 object has data and releases dependent objects(i.e rdd) data(free the memory)
            rdd2 = rdd1.flatMap --> rdd2 object has data and free the data of rdd1
            rdd3 = rdd2.map --> rdd3 has data and free the data of rdd2
            rdd4 = rdd3.reduceByKey --> rdd3 has data and free the data of rdd3

### What is persistence and when do we go for persistence?
 Persistence is nothing but storing of data(either in memory or disk or both). 
 
 In the above example, there is only one transformation exist, what if any of other actions exists?. 
 let's see an example
    <code>
     r1 = sc.textFile(".../dir")
     r2 = r1.t1()
     r3 = r2.t2()
     r4 = r3.t3()
     r5 = r4.ac1()
     r6 = r3.t4()
     r7 = r6.ac2()    
    </code>
    
   The above example, there are 2 actions and both are dependent on r3 but when first action(r4.ac1) executed it frees up the data in r3 and r6 starts execution again from beggining(i.e from textFile).
   
   Here r1 to r3 executes twice, so in-order to speed-up the executions we go for persisting(reusing) the r3 data and reduces execution time.
   
   <code>
     r1 = sc.textFile(".../dir")
     r2 = r1.t1()
     r3 = r2.t2()
     r3.persist(StorageLevel.Memory_only)
     r4 = r3.t3()
     r5 = r4.ac1()
     r6 = r3.t4()
     r7 = r6.ac2()    
    </code>
  <b> Persist will be executed only when whole file scan action executed, that means count,groupBy,Join,Set Operations</b>

### Types of persistence?
Pyspark has 6 types of persistence but java/scala has 8 types of persistence because java/scala supports storing the data deserialization mode where as python stores all the data in serialization mode.

<b> What's Serialization an Deserialization? </b>
####  Serialization is converting JVM object to binary object(This works in the same way of how compression works).

* When we compress any of the file, compressed zip file will reduce the size of actual file to it's maximum.

* When we download any of the file from internet, we prefer to download tar/zip compressed files, we don't download it's original unizipped/untar files.

* In the same way when spark shuffles(redistribution) the data, to reduce network traffic and transfers serialized objects and deserialize it while processing.

#### What's DAG?
DAG represents the flow of execution(transformations/actions).
DAG shows execution flow with stages and each new stage is created when there's shuffle operations performed. 

Consider Word count program, the second stage is created when reduceByKey action performed.

### What's checkpoint?
Entire information spark application execution including persisted data will be maintained with lineage. If memory gets crashed, lineage info also will be lost and spark forgets everything till it processed before crash.

<b>Irrespective of lineage crash, is it possible to load intermittent data in somewhere at disk?</b>

yes that is checkpointing.

### Log4j.properties
* We create a class of Log4j or use any available Log4j class
* we need to include following in spark_home/conf/spark-defaults.conf
    <code>
    spark.driver.extraJavaOptions	   -Dlog4j.configuration=file:log4j.properties -Dlogfile.name=filenameyourwish -Dspark.yarn.app.container.log.dir=app-logs
    </code>
* Log4j class accept sparksession and creates logger object
    <code>
    from com.rposam.pyspark.log.Log4j import Log4j

    logger = Log4j(spark)
    logger.info("Starting spark session...")
    </code>

### What spark do?
* Spark is used to create programs and execute programs
* How to execute spark programs?
    * Interactive clients
        * Spark shell
        * notebook
    * spark submit
* <b>Spark is a Distributed processing engine, so how does it works?</b>
    * When you submit a application to spark, it creates a Master process and Master process going to create a bunch of slaves to distribute the work and compute
    * Master is Driver
    * Slaves are Executors
    * Spark go and ask cluster manager(yarn) to allocate some conatiner to start driver process and aks some more containers to execute the application
    * This repeats for each spark submit
    

### Execution Modes
* If you submit your application from local mode, the driver will be launched on client machine but if you submit as cluster then cluster manage assign a node to create driver process
* If you run in client mode, suppose if your machine failed,corrupted and lost, spark submit job fails.

<b> If spark dynamic execution enabled, all the unncessary threads/executors which are not in use will be killed/dead</b>

### Spark Transformations
* Spark Dataframes/rdds are immutable then how do you process/compute?
    * Well, as Dataframes/RDD's immutable, we use transaformations to transform and create new dataframes/rdds.
    * Transformations are lazy, then how does it evaluate?. Yes , transformations will get evaluated only when an action is invoked
    * There are tow types of transformations
        * Narrow transformatios
            * A Transformation performed on a single partition and produces a valid results, such as where,etc..
        * Wider transformations
            * A Transformation that requires data from other partition and produces a valid results, such as Group By,Distinct ,etc.
            * It shuffles all partitions data and sort then produce valid results

### Jobs and Stages
* Each action creates a job
* Each shuffle creates a stage
* The link between each stage is exchange. Exchange is an internal buffer that holds results after any action completes

### Spark RDD
* Spark community suggests to use Dataframe instead of RDD because Dataframe has catalyst optimizer over rdd as it optimizes RDD
* RDD stands for Resilient,distributed dataset. In simple rdd is a collection, they don't have rows/column structure,schemas.
    * <b>Resilient</b> - Fault tolerant, how?. They also store information about how they are created. lets consider an rdd partitioned to one executor for processing. In some times, the executor may fails or crashes.How ever the driver notices the failure and assign the rdd partition to another exector core. The new executor will reload the partition and starts processing it. This is an easy thing, because rdd comes with information how to create it and how to process it.
    * In - Simple, RDD can be recreated and reprocessed,that's why we call it as resilient.
* If you want to create an RDD, you can create them using spark context
* Spark context comes with bunch of methos, binary file,sequence file,and hadoop file and object file
* RDD's provide basic transformations map,reduce,flatmap,etc..
* The Group by implementation on RDD is not so obvious and might not make sense at first. We needed to handcode everything,including regular opeations such grouping ,aggregations.

### Catalyst Optimizer/SQL Engine
* The spark sql engine is a powerfull compiler that optimizes your code and also generates efficient java byte code.
* The overall effort of spark sql engine can be broken down into 4 phases.
    * <b>Analysis</b>
        * The spark sql engine will read your code and generates an abstract syntax tree for your sql/dataframe queries. In this phase your code is analyzed,column names , table and view, sql functions are resolved.
    * <b>Logical optiomization</b>
        * The sql enginel will apply rule based optimization and construct a set of multiple execution plans
        * Then the catalyst optimizer will use cost based optimization to assign a cost to each plan
        * Predicate pushdown, boolean experesssion simplifier
    * <b>Physical plan</b>
        * SQL engine picks most effective logical plan and generates a physical plan. How the plan is going to execute on a cluster
    * <b>Code Generation</b>
        * This phase involves generating efficent java byte code to run on a each machine/node/executor. This is introduce in spark 2.0

### Spark DataSet API
* Dataset API's are language native apis in scala and java.These API's are strongly typed in your jvm based language like scala and they are not all available in dynamically typed languages such as python. If you want to use dataset api then you must your jvm based language scala,java.

### Spark Dataframe
* csv,json won't come with metadata(column names,data types). We need to explcitly need to define schema
* parquet is binary file formationa and it comes with metadata(column names,data types). If you have a choice better to go parquet with snappy compression
* Like java, scala, python works on their own datatypes, Spark also work on spark own datatypes and it's required for spark sql engine

<b> DataFrame Writer </b>
* controls Number of files and file size(maxrecordperfile)
* partitions and buckets
* sorted data