# Big Data

`Hadoop` is general term for two units, Storage and Processing. Storage component is HDFS and Processing component is YARN.

`HDFS` allows you to dump any kind of data across the cluster. YARN allows parallel processing on the data. It is "Data Intensive",  because code is transferred to the data location. 128Mb default block size. default replication factor is 3.

`MapReduce`: It is a way of splitting computational task to a distributed set of files. Map for parallel processing and Reduce for aggregation. It consists of Job Tracker (Master) and Task Tracker (Slave). Job tracker sends code to run on the task tracker.

Task Tracker allocates CPU and memory to the tasks and monitor the tasks on the worker nodes.

`Pig` has two parts, pig latin is language and Pig runtime is Execution engine. Whenever you write pig script there will be MapReduce job running in the background.

`Hive` - Powerful Analytical tool, uses HiveQL.

`Spark` - very popular for real-time data processing. Written in scala. Executes In-memory computations. 100 times faster than MapReduce.

`Oozie` - Schedules hadoop jobs.

`Flume` - Ingestion tools for unstructured/real-time data.

`Sqoop` - Ingestion tool for Strutured/Batch data.

`HDFS` - Hadoop Distributed File System. HDFS has NameNode and DataNode.

`NameNode` - Master Daemon. Maintains and manages data nodes. Stores meta data like location of blocks, size of the blocks, permissions, hierarchy and receives heart beat from data nodes.

*Secondary NameNode:*

- Checkpointing is a process of capturing editlogs with Fslimage. checkpoint happens periodically. Default once per hour.


- Allows faster failover as we have backup of metadata.


*FslImage:* File that contains state of the file system since the starting of the name node. Records meta data about all the transactions happened on the data eversince name node is set up.

*EditLog* Recent modifications you made for the file systems. 

### Hive

Hive is data warehouse tool. You use hive when you are dealing with structured data. It creates tables on top of HDFS to give structure to data. 

*HDFS is schema on Read. RDBMS is schema on write*. In RDBMS you need to specify the schema before storing the data, In HDFS you need not to. HDFS doesnt maintain any schema. You store the data in HDFS in the form of Flat files and give structure with the help of hive whenever you need. 

When you create tables on top of HDFS files using Hive, you are giving structure to HDFS files, to store structure information we use metastore. It stores file structure, Hive tables, database definitions and mapping to data. Metastore is RDBMS maintained by Hive.

Whatever the query you are writing in the hive, it is internally converts into MapReduce and runs on Hadoop. 


**Uses of Hive**

- Hive provides tools to Extract Transform and Load (ETL)

- Provides structure on variety of data formats.

- Used for OLAP.

- By using hive we can access files stored in HBase and HDFS.


**Limitations of Hive**

- It is not a procedural language. 

- It doesnt have triggers, stored procedures etc.,

### Tez vs MapReduce

Pig/Hive jobs can be written in SQL and they are converted to MapReduce jobs and run on top of the HDFS.

Tez takes place of MapReduce, Instead of translating your queries into mappers and reducers, it translates it into Directed Acyclic Graphs (DAG). It is same idea that spark uses to run faster.

Tez is default on EMR. But you can set configuration to MapReduce.

You cannot stop the clusters in EMR like EC2. Terminated cluster metadata is stored for 2 days and you can clone it.

## Spark Basics

Spark is distributed data processing engine that used to process large amounts of data. On top of data processing engine there are libraries for SQL, machine learning, graph computation and stream processing, which can be used together in an application.

Spark supports Java, Python, Scala and R. Tasks most frequently associated with spark include ETL and SQL batch jobs across large datasets.

SparkSession is unified entry point into spark application. Spark session is combination of different contexts like "Spark Context", "hive context", "SQL context".

### Spark vs MapReduce

Alternative to MapReduce.

MapReduce writes most of the data to disk after each map and reduce operation, spark keeps data in-memory after each transformation. Spark can spill over data to disk if the memory is filled.

It can use data stored in variety of formats, and databases like Cassandra, S3, HDFS etc., Where as MapReduce can access data stored only in HDFS.

only for Batch processing -- Batch processing as well as real time processing.

Slower than spark because of its I/O latency -- 100x faster in-memory and 10x faster on disk.

Data processing engine -- Data analytics engine

Supports SQL through HiveSQL -- Supports SQL through spark SQL

MapReduce is not interactive -- Spark is interactive

More lines of code -- Less lines of code

### Features of Spark

`In-Memory Computing`: Keeping data in server's RAM as it makes is easy to access data and makes Machine learning algorithms to work faster.

`Lazy Evaluation` Execution will not start until action is triggered.

Supports `Multiple Languages`: Spark allows you to write applications on Java, Python, Scala and R.

`100x faster`

`Advanced Analytics`: Spark not only supports 'Map' and 'Reduce' it also supports SQL queries, Streaming data, Machine Learning Algorithms, and Graph algorithms.

`Real-Time` Processing Spark can handle real-time processing.

### Transformation vs Action

The two types of operations in spark are Transformation and Action.

Transformation is a function that produces new RDD from existing RDD but when we want to work with existing dataset at that point action is performed.

• map(), filter(), union()  ---> Transformations

• reduce(), collect(), count() ---> Actions.

**Spark Dataframes are standard way of using spark Machine Learning Capabilities.**

## How to train your ML models with python in Spark

RDDs in spark are fundamental data structure in spark but dataframes are new data structures. Dataframes are easy to read and performs very better.

#### What does pyspark API do

- Loads data into memory.

- Reads and manipulates data in Spark.

- Push processing to cluster nodes.

#### Sklearn in spark


Opensource machine learning libraries like sklearn do not scale, they dont work in spark. They are written to perform processing only on single machine.

### Using pandas UDFs to train big data

When you have dataset of millions of records and you want to train it on your local system, you cant do that. You have to use big data tools like spark in order to distribute the processing to different nodes. But Pyhton and its libraries like NumPy, Pandas, sklearn wont work(Scale) with distributed processing.

You have to recode everything using Spark machine learning packages like spark.mllib and spark.ml, but they are not powerful as sklearn.

When you want to run your python code in parallelized distributed fashion, you need to convert your python code into User Defined Functions (UDFs).

Pandas UDFs can be used in variety of applications ranging from feature generation to statistical testing.

Before using Pandas UDFs make sure that your data is in SparkDataFrame type because, key datatype in pyspark is sparkDataFrame, If you want to distribute computation using pyspark, then you will need to perform transformations using SparkDataFrames not on other python datatypes.

It is also possible to use pandas DataFrames but the thing is pandas dataframe doenst support LazyEvaluation. When you load a large dataset, it loads entire dataset into the memory on a single node.

1. Import all required libararies both pyspark and python libraries.

2. load your data with pandas using 'read_csv'

3. Declare the sklearn function using python def method, try running your function normally on your system.

4. Create pandas UDFs using 'pandas_udf' as a decorator to wrap your original function.

5. Create spark dataframe using from your pandas dataframe using 'CreateDataFrame'

6. Assign unique ID and partition ID for each record using Spark SQL. (This is used to distribute data).

7. execute the pandas UDFs on top of the spark dataframes.



The general way that pandas UDFs work is in three steps


1. you first split a spark dataframe using Groupby statement, and 

2. each  partition is sent to worker nodes and translated into pandas dataframes and gets passed to UDFs. 

3. The UDF then returns a transformed pandas dataframe which is combined with all the other partitions and translated back to spark dataframe.

When we use pandas UDFs to scale model. Our dataset gets distributed across spark cluster, and pyarrow transfers between spark and pandas dataframe representation.

https://databricks.com/session/automating-predictive-modeling-at-zynga-with-pyspark-and-pandas-udfs
https://towardsdatascience.com/a-brief-introduction-to-pyspark-ff4284701873
https://towardsdatascience.com/scalable-python-code-with-pandas-udfs-a-data-science-application-dd515a628896
https://changhsinlee.com/pyspark-udf/
https://towardsdatascience.com/beginners-guide-to-create-first-end-to-end-machine-learning-pipeline-in-pyspark-d3df25a08dfd

### Differences between Pandas DataFrame and Spark DataFrames


The key difference between pandas and spark dataframes is eager and lazy evaluation. 

Example: You can specify operation using Spark DataFrame for loading dataset from S3 and applying no. of transformations to the dataframe, but the operations wont immediately applied, Instead a graph transformation is recorded. and once the data is actually needed the transformations are applied. This approch avoids pulling full dataframe into the memory. With pandas Dataframes everything is pulled into the memory and every transformation is applied immediately.

when reading CSV files into the spark dataframes, spark performs the operations in eager mode, meaning that all the data is loaded into memory before next step begins execution. While lazy evaluation is used when reading files in paraquet format. large CSV files must be transformed into paraquet before executing pipeline. 

Spark output is stored to S3 in the form of paraquet files, or it can be directly sent to NoSQL databases.The best and easy way is to store output in S3 and NoSQL database will access from S3.

#### Differences b/w pandas and spark

In pandas you can easily read csv files directly using 'read_csv()'. But Spark supports professional formats like JSON, paraquet, Hive tables, and can read from S3, HDFS and local or RDBMS but CSV is not supported natively in spark. You can use library 'Spark-csv'.

The other differences are difference is methods like head(), describe(), count() etc.,

### Hive vs Spark




![image.png](attachment:image.png)

![image.png](attachment:image.png)