# Big Data Systems Concepts

## Types and Storage of Data

### Types of Data
- Structured data
    - data that has a defined length, format, and schema
    - stored in relational databases, CRUD operations on records, ACID semantics
    - examples: numbers, dates, strings
- Unstructured data
    - data that has no defined format or structure
    - examples: text, images, audio, video
- Semi-structured data
    - data that has a defined structure, but not a defined schema
    - attributes for every record could be different
    - examples: XML, JSON

3 Vs of Big Data
- Volume (amount of data, terabytes, petabytes)
- Velocity (speed of data generation, real-time, near real-time, batch, streaming)
- Variety (types of data, structured, unstructured, semi-structured)
- Veracity (uncertainty of data, trustworthiness, quality, accuracy, completeness) (4th V, newer)

Issues with RDBMS:
- Bigdata doesn't always need strong ACID semantics (esp systems of engagement)
- Fixed schema is not sufficient, as application becomes popular more attributes need to be captured and DB modelling becomes an issue
- Very wide de-normalized attribute sets
- Data layout formats - column or row major - depends on use case
- Expensive to retain and query long term data - need low cost solution

Characteristics of Big Data Systems:
- Application need not bother about common issues like sharding, replication
    - devs focus on application logic rather than data management
- Easier to model with flexible schema
    - not necessary every record has same set of attributes
- If possible, treat data as immutable
    - keep adding timestamped versions of data values
    - avoid human errors by not destroying a good copy
- Built as distributed and incrementally scalable systems
    - add new nodes to scale as in a Hadoop cluster
- Options to have cheaper long term data retention
    - long term data reads can have more latency and can be less expensive to store on commodity hardware, e.g. Hadoop file system (HDFS)
- Generalized programming models that work close to the data
    - e.g. Hadoop map-reduce that runs tasks on data nodes

Challenges in Big Data Systems:
- Latency issues in algorithms and data storage working with large data sets
- Basic design considerations of Distributed and Parallel systems - reliability, availability, consistency
- What data to keep and for how long - depends on analysis use case
- Cleaning / Curation of data
- Overall orchestration involving large volumes of data
- Choose the right technologies from many options, including open source, to build the Big Data System for the use cases
- Programming models for analysis
- Scale out for high volume, search and analytics
- Cloud is the cost effective way long term - but need to host Big Data outside the Enterprise
- Data privacy and governance
- Skilled coordinated teams to build/maintain Big Data Systems and analyse data

Types of scalability:
1. Vertical scaling (scaling up)
    - increase the resources of a single node, demand architecture-aware algorithm design
    - processing on x TB of data takes time t, then processing on (n*x) TB of data takes equal, less or much less than (n*t)
    - e.g. more powerful CPU, more memory, 
2. Horizontal scaling (scaling out)
    - increase the number of nodes, distribute the processing and storage tasks in parallel
    - processing on x TB of data takes time t, then processing on (p*x) TB of data takes t or slightly more than t
    - e.g. parallelization of jobs at several levels: distributing separate tasks onto separate threads on the same CPU, distributing separate tasks onto separate CPUs on the same computer, distributing separate tasks onto separate computers
3. Elastic scaling (scaling up and down)
    - dynamic provisioning based on computational need
    - cloud computing
    - on-demand service, resource pooling, scalability, accountability, broad network access

Types of big data systems:
1. batch processing of big data sources at rest
    - building ML models, statistical aggregates
2. real-time processing of big data in motion
    - fraud detection from real-time financial transaction data
3. interactive exploration with ad-hoc queries

#### Big data architecture style

<img alt="picture 2" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/5e126d8f3906955d4fa2864535921f0b36e80537bc9f6c72908e9dd31a8a7683.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

- big data solutions typically involve one or more of the following types of workload:
    - batch processing of big data sources at rest
    - real-time processing of big data in motion
    - interactive exploration of big data
    - predictive analytics and machine learning
- components are usually: data sources, data storage, batch processing, real-time message ingestion, stream processing, analytical data store, analysis and reporting, orchestration, etc
- benefits : choice in technology, performance through parallelism, scalability, interoperability with existing solutions
- challenges : complexity, lack of standards, lack of skills, etc
- look at Lambda architecture, Kappa architecture

### Locality of reference

Levels of storage:
- computational data is stored in primary memory aka memory
- persistent data is stored in secondary memory aka storage
- remote data access from another computer's memory or storage is done over network

Types:
1. Temporal locality
    - recently accessed data is likely to be accessed again
    - e.g. loop iterations, function calls
2. Spatial locality
    - data near recently accessed data is likely to be accessed again
    - e.g. sequential access, array traversal
    - this is why columnar storage is better than row storage, because it is more likely that columns will be accessed together  for searching, filtering, etc

#### Cache performance

- cache hit: data requested by processor found in cache
- cache miss: data requested by processor not found in cache, must be retrieved from main memory
- cache hit ratio: fraction of memory accesses found in cache, $h = \frac{hits}{hits + misses}$
- average access time of any memory access: $t_{avg} = h*t_{cache} + (1-h)*t_{memory}$
    - $t_{cache}$ - access time of cache
    - $t_{memory}$ - access time of main memory
- time required to access main memory block = $\text{block\_size} * t_{memory}$
- time required to update cache block = $\text{cache\_block\_size} * t_{cache}$

Distributed Cache in Hadoop is a mechanism that allows copying small read-only files from HDFS to the local disks of the worker nodes, where they are accessible by the MapReduce tasks. These files are called localized and are tracked by the NodeManager. The files are deleted when they are not used by any task or when the cache size exceeds a limit. The cache size can be configured by a property.

### Storage for Big Data

RDBMS decline for Big Data due to:
1. Scalability: RDBMS scale vertically; NoSQL scales horizontally for better cost-effective expansion.
2. Schema flexibility: NoSQL adapts to dynamic data structures, unlike RDBMS with fixed schemas.
3. Data variety: NoSQL handles diverse data types effectively compared to RDBMS.
4. Query performance: NoSQL databases often outperform RDBMS for certain queries, crucial for Big Data analytics.
5. Cost considerations: NoSQL's horizontal scaling on commodity hardware is more budget-friendly than vertical scaling of RDBMS.
6. Concurrency and transaction overhead: NoSQL relaxes transaction constraints, prioritizing performance and scalability for Big Data use cases.

Database Sharding:
- Horizontal partitioning into shards for scalability.
- Shards have dedicated hardware.
- Enhances performance via workload distribution.
- Enables parallel processing and better query performance.
- Addresses vertical scaling limitations.
- Handles increased data volumes and user loads effectively.
- Sharding Types:
1. Range: Divides data based on value ranges.
2. Hash: Distributes data evenly using a hashing algorithm.
3. Directory-Based: Uses a lookup directory for flexible shard assignments.

## Comparing Parallel and Distributed Systems

| Parallel System | Distributed System |
| --- | --- |
| Computer system with several processing units attached to it | Independent, autonomous systems connected in a network accomplishing specific tasks |
| A common shared memory can be directly accessed by every processing unit in a network | Coordination is possible between connected computers with own memory and CPU |
| Tight coupling of processing resources that are used for solving single, complex problem | Loose coupling of computers connected in network, providing access to data and remotely located resources |
| Programs may demand fine grain parallelism | Programs have coarse grain parallelism |


#### Speedup calculations on parallel systems

$$ \text{Execution time after improvement} = \frac{\text{Execution time affected by improvement}}{\text{Amount of improvement}} + \text{Execution time unaffected} $$
Let's say $f$ is the fraction of the code that is infinitely parallelizable, and N is the number of processors. Then, 
1. Amdahl’s Law
    A rule stating that the performance enhancement possible with a given improvement is limited by the amount that the improved feature is used.
    $$ \text{Speedup} = \frac{\text{Execution time single processor}}{\text{Execution time on N parallel processors}} = \frac{T(1-f) + Tf}{T(1-f) + \frac{Tf}{N}} = \frac{(1-f) + f}{(1-f) + \frac{f}{N}} = \frac{1}{(1-f) + \frac{f}{N}} $$
    It is used when the workload is fixed, and the number of processors is increased.
2. Gustafson’s Law
    A rule stating that the speedup with 1 processors for a given workload $W$, should be compared with with the speedup with N processors for a workload of size $W(N)$, where $W(N) = (1 - f)W + fNW$ (Parallelizable work can increase $N$ times)
    $$ \text{Speedup} = \frac{T \times W(N)}{T \times W} = \frac{W(N)}{W} = \frac{(1 - f)W + fNW}{W} = 1 - f + fN $$
    It is used when the workload size is increased proportionally to the number of processors.

#### Memory access models
- Shared memory
    - multiple processors share a single memory space
    - processors communicate by reading and writing to the same memory location
    - e.g. OpenMP, Pthreads
- Distributed memory
    - each processor has its own private memory
    - processors communicate by sending messages to each other
    - e.g. MPI, PVM

#### Shared Memory vs Message Passing:

Shared Memory:
- Tasks on different processors access a common address space.
- Easier for programmers to conceptualize.
- Single logical address space mapped onto physical memory.
- Implemented as threads in a processor.
- Options:
    - Send Sync/Async, Blocking/Non-blocking.
    - Receive Sync/Async, Blocking/Non-blocking.
    - Handling complexities in terms of programming and wait times.
  
Distributed Memory (Message Passing):
- Tasks access data from separate, isolated address spaces.
- Communicate via sending/receiving messages.
- Requires explicit communication for data exchange.
- Harder programming abstraction compared to shared memory.
- Data moved across virtual memories.
- Harder for programmers due to explicit communication.

#### Data access strategies - Replication, Partitioning, Messaging

1. Partition 
    - Strategy: Partition data – typically, equally – to the nodes of the (distributed) system
    - Cost: Network access and merge cost when query needs to go across partitions
    - Advantage(s): Works well if task/algorithm is (mostly) data parallel, Works well when there is Locality of Reference within a partition
    - Concerns: Merge across data fetched from multiple partitions, Partition balancing, Row vs Columnar layouts - what improves locality of reference ?
2. Replication
    - Strategy: Replicate all data across nodes of the (distributed) system
    - Cost: Higher storage cost
    - Advantage(s): All data accessed from local disk: no (runtime) communication on the network, High performance with parallel access, Fail over across replicas
    - Concerns: Keep replicas in sync — various consistency models between readers and writers
3. (Dynamic) Communication
    - Strategy: Communicate (at runtime) only the data that is required
    - Cost: High network cost for loosely coupled systems and data set to be exchanged is large
    - Advantage(s): Minimal communication cost when only a small portion of the data is actually required by each node
    - Concerns: Highly available and performant network, Fairly independent parallel data processing
4. Networked Storage
    - Common Storage on the Network:
        - Storage Area Network (for raw access – i.e. disk block access)
        - Network Attached Storage (for file access)
    - Common Storage on the Cloud:
        - Use Storage as a Service
        - e.g. Amazon S3

#### Computer clusters
- type of distributed system that consists of a collection of inter-connected stand-alone computers working together as a single, integrated computing resource
- examples:
    - High Availability Clusters
        - ServiceGuard, Lifekeeper, Failsafe, heartbeat, HACMP, failover clusters
    - High Performance Clusters
        - Beowulf; 1000 nodes; parallel programs; MPI
    - Database Clusters
        - Oracle Parallel Server (OPS)
    - Storage Clusters
        - Cluster filesystems; same view of data from each node
- goals:
    - continuous availability
    - data integrity
    - linear scalability
    - open access
    - parallelism in processing
    - distributed systems management
- built with high peforance, commodity hardware, high availability, and open source software

#### Cloud computing
- on-demand availability of computer system resources, especially data storage and computing power, without direct active management by the user
- cluster is a building block for a datacenter, which is a building block for a cloud service
- motivation to use clusters:
    - rate of obsolescence of computers is high
    - solution: build a cluster of commodity workstations
    - scale-out clusters with commodity workstations as nodes are suitable for software environments that are resilient
    - on the other hand, (public) cloud infrastructure is typically built as clusters of servers due to higher reliability of individual servers
- typical cluster components:
    - processor and memory
    - network stack
    - local storage
    - OS and runtimes
- split brain: caused by failure of heartbeat network connection(s), two halves of a cluster keep running, recovery options: allow cluster half with majority number of nodes to survive, force cluster with minority number of nodes to shut down
- cluster middleware: single system image infrastructure, cluster services for availability, redundancy, fault-tolerance, recovery from failures
- execution time on clusters, depends on : distribued scheduling, local on node scheduling, communication, synchronization, etc



## Reliability and Availability

#### Metrics for reliability
1. Mean Time To Failure (MTTF)
    - average time between failures
    - $MTTF = \frac{\text{Total hours of operation}}{\text{Number of failures}} = \frac{1}{\text{Failure rate}}$
2. Failure Rate
    - number of failures per unit time
    - $Failure rate = \frac{1}{MTTF}$
3. Mean Time To Repair (MTTR)
    - average time to repair a failed component
    - $MTTR = \frac{\text{Total hours for maintenance}}{\text{Total number of repairs}}$
4. Mean Time To Diagnose (MTTD)
5. Mean Time Between Failures (MTBF)
    - average time between failures
    - $MTBF = MTTD + MTTR + MTTF$

#### Metrics for availability
1. Availability
    - $Availability = \frac{\text{Time system is UP and accessible}}{\text{Total time observed}} = \frac{MTTF}{MTBF}$
    - system is highly available when MTTF is high and MTTR is low

#### Metrics for system with multiple components
For a system with multiple components, the combined availability of the system is:
1. Serial assembly of components
    - failure of any component results in system failure
    - $A_c = A_a \times A_b$, where $A_a$ is the availability of component A and $A_b$ is the availability of component B
    - failure rate of C = failure rate of A + failure rate of B = $\frac{1}{MTTF_a} + \frac{1}{MTTF_b}$
    - $MTTF_c = \frac{1}{\frac{1}{MTTF_a} + \frac{1}{MTTF_b}}$
2. Parallel assembly of components
    - failure of all components results in system failure
    - $A_c = 1 - (1 - A_a)(1 - A_b)$
    - $MTTF_c = MTTF_a + MTTF_b$

#### Fault tolerance configurations

| Configuration                 | Failover Time        | Active Component  | Replication Type                   |
|-------------------------------|----------------------|-------------------|-------------------------------------|
| Active-Active (load balanced)  | No failover time     | All components    | Bidirectional replication         |
| Active-Passive (hot standby)   | Few seconds          | One active (passive up to date)        | Unidirectional replication        |
| Warm standby                  | Few minutes          | One active (passive not fully up to date)         | Unidirectional replication with delay |
| Cold standby                  | Few hours            | One active (passive not up-to-date, not running)        | Replication from secondary backup  |

Different topologies:
1. N+1 : N active nodes, 1 passive node 
2. N+M : N active nodes, M passive nodes
3. N to 1 : N active nodes, 1 temporary passive node which returns services to active nodes after failure
4. N to N : Any node failure is handled by distributing the load to other nodes

Recovery:
1. Diagnostic : Using heartbeat messages, the system detects the failure of a node
2. Backward recovery : The system rolls back the transactions that were in progress at failure from the last checkpoint
3. Forward recovery : The system re-executes the transactions that were in progress at failure from diagnosis data

### Big Data Analytics
Types of analytics:
1. Descriptive (what happened)
    - objective: summarize, interpret historical data for insights into past events
    - methodology: involves data aggregation, summarization, visualization
    - example: creating charts to illustrate trends in monthly website traffic
2. Diagnostic (why did it happen)
    - objective: identify reasons behind past events or trends
    - methodology: investigates patterns, anomalies in data to understand root causes
    - example: analyzing decrease in customer satisfaction scores for contributing factor
3. Predictive (what will happen)
    - objective: forecast future outcomes using historical data and patterns
    - methodology: utilizes statistical models, machine learning algorithms for predictions
    - example: predicting next quarter sales based on previous trends
4. Prescriptive (how can we make it happen)
    - objective: recommend actions to optimize future outcomes based on analysis
    - methodology: combines predictive models with optimization techniques for suggestions
    - example: recommending pricing adjustments for products based on predicted market trends

Different aspects of big data analytics:
1. Working with datasets of huge volume, velocity, variety beyond the capabilities of traditional data processing applications
2. Processing data in parallel across multiple nodes in a distributed system
3. Using specialized tools and techniques for data storage, processing, and analysis
4. Use principles of locality to optimize performance and minimize network traffic
5. Use of specialized programming models and languages for distributed computing
6. Better faster decisions in real-time
7. Richer faster insights from data of customers, products, operations, etc

#### Big data analytics lifecycle
1. Business Case Evaluation
    
2. Data Identification
3. Data Acquisition & Filtering
4. Data Extraction
5. Data Validation & Cleansing
6. Data Aggregation & Representation
7. Data Analysis
8. Data Visualization
9. Utilization of Analysis Results


## Hadoop
- open source framework for distributed storage and processing of large datasets
- key components:
    - HDFS (Hadoop Distributed File System)
    - MapReduce
    - YARN (Yet Another Resource Negotiator)

<img alt="picture 0" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/ebd34154f5a8c257141e1647b0a8a5d7638bbef6f1e1dde8398ff3ce7677aab1.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

- distributed storage: HDFS, stores data across multiple nodes for scalability and fault tolerance
- distributed processing: MapReduce, allows parallel processing of vast datasets across a cluster of computers
- scalability: scales horizontally by adding more nodes to the cluster to accommodate growing data volumes
- fault tolerance: data redundancy and automatic recovery mechanisms ensure reliability in the event of hardware or software failures
- ecosystem: offers a rich ecosystem with additional tools like:
    - data ingestion : 
        - Sqoop: transfers data between Hadoop and relational databases, from RDBMS to HDFS, populating live tables in Hive and HBase
        - Flume: collects, aggregates, and moves large amounts of streaming data into HDFS, from web servers, log files, etc
    - data processing : 
        - MapReduce: distributed processing framework for batch processing of large datasets, supports Java, Python, C++, etc
        - Spark: in-memory data processing engine, faster than MapReduce, supports multiple programming languages
    - data analysis : 
        - Hive: data warehouse infrastructure, provides SQL-like query language called HiveQL, supports MapReduce and Spark
        - Pig: data flow language and execution framework, supports MapReduce and Tez
        - Impala: SQL query engine, supports low-latency queries on Hadoop datasets, supports HiveQL and SQL
- programming language agnostic: supports various programming languages, allowing developers to use the language of their choice for writing MapReduce jobs
- data locality: optimizes performance by processing data on the same node where it is stored, reducing data transfer overhead
- use cases: 
    - widely used for batch processing, large-scale data analytics, and handling unstructured or semi-structured data
    - Hadoop is a cornerstone in the big data landscape, providing a cost-effective and scalable solution for managing and analyzing massive datasets

#### HDFS Architecture

<img alt="picture 1" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/1bad92038b580b8fd5e36ac5de6de728a0a9fc8458b89f14e7016228cdb8df75.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

- Master slave architecture within a HDFS cluster
- Master node with NameNode
    - maintains namespace - filename to blocks and their replica mappings
    - serves as arbitrator and doesn't handle actual data flow
    - HDFS client app interacts with NameNode for metadata
- Slave nodes with DataNode
    - serves block read/write from clients
    - serves create/delete/replicate requests from NameNode
    - DataNodes interact with each other for pipeline reads and writes
- NameNode functions:
    - maintains and manages the file system namespace, with two files:
        1. FsImage: contains mapping of blocks to file, hierarchy, file properties, permissions
        2. EditLog: transaction log of changes to metadata in FsImage
    - does not store any data, only metadata about files
    - runs on master node while DataNodes run on slave nodes
    - records each change that takes place to the metadata, e.g. if a file is deleted in HDFS, the NameNode will immediately record this in the EditLog
    - receives periodic heartbeat and a block report from all the DataNodes in the cluster to ensure that the DataNodes are live
    - ensures replication factor is maintained across DataNode failures
    - in case of the DataNode failure, the NameNode chooses new DataNodes for new replicas, balance disk usage and manages the communication traffic to the DataNodes
- DataNode functions:
    - stores data in the local file system
    - sends heartbeat messages to the NameNode periodically to confirm that it is alive
    - sends block report to the NameNode periodically to report the list of blocks it is storing
    - serves read/write requests from clients
    - serves create/delete/replicate requests from NameNode
    - in case of a block failure, the NameNode will choose a new DataNode to create a replica of the block
- Secondary NameNode functions:
    - performs periodic checkpoints of the namespace by merging the FsImage and EditLog
    - downloads the FsImage and EditLog from the NameNode, merges them, and uploads the new FsImage back to the NameNode
    - does not store any data, only metadata about files
    - runs on a separate node from the NameNode
    - performs regular checkpoints of the namespace by merging the FsImage and EditLog, hence called CheckpointNode
    
#### YARN Architecture

<img alt="picture 2" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/8e7bb7d535fc51c60dcb642eb68e8e9ebc5c9688579c2011d20d3b059d71596d.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

YARN workflow:
1. client program submits the application / job with specs to start AppMaster
2. ResourceManager asks a NodeManager to start a container which can host the ApplicationMaster and then launches ApplicationMaster
3. ApplicationMaster on start-up registers with ResourceManager. So now the client can contact the ApplicationMaster directly also for application specific details
4. As the application executes, AppMaster negotiates resources in the form of containers via the resource request protocol involving the ResourceManager
5. As a container is allocated successfully for an application, AppMaster works with the NodeManager on same or diff node to launch the container as per the container spec. The spec involves how the AppMaster can communicate with the container
6. App specific code inside container provides runtime information to AppMaster for progress, status etc. via application-specific protocol
7. Client that submitted the app / job can directly communicate with the AppMaster for progress, status updates. via the application specific protocol
8. On completion of the app / job, AppMaster de-registers from ResourceManager and shuts down. So the containers allocated can be re-purposed.

#### Modes of operation
1. Local (Standalone) Mode
    - default mode, runs on a single node, no HDFS, no YARN
    - used for debugging purposes
2. Pseudo-Distributed Mode
    - runs on a single node, HDFS, YARN
    - all the daemons will be running as a separate Java process on separate JVMs
    - used for development purposes
3. Fully-Distributed Mode
    - runs on clusters with multiple nodes, HDFS, YARN
    - few of the nodes run master daemons like NameNode, ResourceManager, etc
    - rest of the nodes run slave daemons like DataNode, NodeManager, etc
    - all the daemons will be running as a separate Java process on separate JVMs
    - used for production purposes

## CAP Theorem
- Brewer's conjecture: a distributed system cannot simultaneously provide all three of the following guarantees:
    - Consistency: every read receives the most recent write or an error
    - Availability: every request receives a (non-error) response, without the guarantee that it contains the most recent write
    - Partition tolerance: the system continues to operate despite an arbitrary number of messages being dropped (or delayed) by the network between nodes
- Different design choices for distributed systems:
    - CA: All RDBMS, single data center, strong consistency, no network partition
    - CP: HDFS, MongoDB, Redis, single data center, strong consistency, network partition
    - AP: Cassandra, CouchDB, DynamoDB, multiple data centers, eventual consistency, network partition

#### ACID properties
- Atomicity: all or nothing, transaction is either fully completed or not at all
- Consistency: transaction must bring the database from one valid state to another
- Isolation: concurrent transactions do not interfere with each other
- Durability: once a transaction is committed, it will remain so

#### BASE properties
A database design that sacrifices consistency for availability and partition tolerance
- Basically Available: system guarantees availability (will always return a response) but not consistency (may return stale data)
- Soft state: state of the system may be inconsistent, thus results might change over time even without input (as data is updated asynchronously)
- Eventual consistency: system will become consistent over time, given that the system doesn't receive input during that time

## MongoDB vs Cassandra

| Feature                    | MongoDB                         | Cassandra                       |
|----------------------------|---------------------------------|---------------------------------|
| Data Model             | Document-based (BSON format)    | Wide-column store               |
| Query Language         | MongoDB Query Language (MQL)     | CQL (Cassandra Query Language)  |
| Schema                 | Dynamic schema (schema-less)     | Schema-agnostic                |
| Consistency Model      | Eventual consistency            | Tunable consistency (can be adjusted per query) |
| Scaling                | Horizontal scaling              | Linearly scalable               |
| Indexing               | Rich indexing options            | Primary and secondary indexes   |
| Transactions           | Supports multi-document transactions | Limited support for transactions |
| Joins                  | Supports joins (with limitations) | No support for traditional joins |
| Data Distribution      | Sharding for horizontal scaling | Automatic data distribution across nodes |
| Use Case               | General-purpose, diverse use cases | Time-series data, write-intensive applications |
| ACID Compliance        | Supports ACID transactions (in certain configurations) | ACID compliance with tunable consistency |

### Common MongoDB commands:
1. show databases: `show databases` / `show dbs`
2. switch database: `use <database_name>`
3. show collections: `show collections`
4. create collection: `db.createCollection("<collection_name>")`
4. insert document: `db.<collection_name>.insert({ key: value })`
5. find documents: `db.<collection_name>.find()`
6. query with criteria: 
    - `db.<collection_name>.find({ key: value })`
    - `db.<collection_name>.find({ key: value }, { key: 1, _id: 0 })` (projection)
    - `db.<collection_name>.find({ key: { $gt: value } })` (greater than)
    - `db.<collection_name>.find({ key1: value1, key2: value2 })` (AND)
    - `db.<collection_name>.find({ $or: [{ key1: value1 }, { key2: value2 }] })` (OR)
    - `db.<collection_name>.find({ key: { $in: [value1, value2] } })` (IN)
7. update document: 
    - `db.<collection_name>.update({ key: value }, { $set: { new_key: new_value } })`
    - `db.<collection_name>.update({ key: value }, { $set: { new_key: new_value } }, { upsert: true })`
8. delete document: `db.<collection_name>.remove({ key: value })`
9. aggregate: `db.<collection_name>.aggregate([ ... ])`
10. create index: `db.<collection_name>.createIndex({ key: 1 })`
11. drop index: `db.<collection_name>.dropIndex("index_name")`
12. count documents: `db.<collection_name>.count()`
13. limit results: `db.<collection_name>.find().limit(5)`
14. sort results: 
    - `db.<collection_name>.find().sort({ key: 1 })`
    - `db.<collection_name>.find().sort({ key1: 1, key2: -1 })` (sort by key1 ascending, then by key2 descending)
15. projection (select fields): `db.<collection_name>.find({}, { key: 1, _id: 0 })`
16. bulk write operations: `db.<collection_name>.bulkWrite([ ... ])`
17. show help: `db.<collection_name>.help()`

#### Aggregation pipeline
- framework for data aggregation modeled on the concept of data processing pipelines
- documents enter a multi-stage pipeline that transforms the documents into an aggregated result
- each stage transforms the documents as they pass through the pipeline
- `db.<collection_name>.aggregate(pipeline, options)`
- eg: 
    1. `$match` stage filters the documents by some criteria
    2. `$group` stage groups the documents by some criteria
    3. `$sort` stage sorts the documents by some criteria
    4. `$project` stage selects some fields from the documents
    5. `$limit` stage limits the number of documents to be returned
    6. `$project` stage selects some fields from the documents


Import data from CSV:

`mongoimport --db <database_name> --collection <collection_name> --type csv --headerline --file <file_name>`

Export data to CSV:

`mongoexport --db <database_name> --collection <collection_name> --type csv --fields <field1,field2> --out <file_name>`


## Types of parallelism

- Data Parallelism:
  - Definition: Distribute data across multiple processing units or nodes; perform the same operation concurrently.
  - Example: Distribute portions of a large dataset to multiple processors; each processor independently processes its assigned data.

- Tree Parallelism:
  - Definition: Organize parallel tasks hierarchically in a tree-like structure; tasks are divided into sub-tasks forming a tree structure.
  - Example: Main task divided into sub-tasks; each sub-task further divided into more specific tasks for efficient resource utilization.

- Task Parallelism:
  - Definition: Break down a program into independent tasks or processes; execute tasks concurrently.
  - Example: Execute different program functions or modules concurrently without relying on each other's output; common in parallel programming frameworks.

- Request Parallelism:
  - Definition: Divide a computation into stages in a pipeline; each stage represents a distinct operation.
  - Example: Tasks in a pipeline architecture are handled by separate units; each stage operates on data concurrently. Used in scenarios with sequential dependence of operations.

## Map Reduce 
- programming model for processing large datasets in parallel across a cluster of computers
- MapReduce is a framework for processing data in parallel across a cluster of computers

A MapReduce framework (or system) is usually composed of three operations (or steps):
1. Map: each worker node applies the map function to the local data, and writes the output to a temporary storage. A master node ensures that only one copy of the redundant input data is processed.
  `Map(k1,v1) → list(k2,v2)`
2. Shuffle: worker nodes redistribute data based on the output keys (produced by the map function), such that all data belonging to one key is located on the same worker node.
3. Reduce: worker nodes now process each group of output data, per key, in parallel.
  `Reduce(k2, list (v2)) → list((k3, v3))`

```python
function map(String name, String document):
    // name: document name
    // document: document contents
    for each word w in document:
        emit (w, 1)

function reduce(String word, Iterator partialCounts):
    // word: a word
    // partialCounts: a list of aggregated partial counts
    sum = 0
    for each pc in partialCounts:
        sum += pc
    emit (word, sum)
```

## Apache Spark

- open-source, distributed computing system that provides a fast and general-purpose cluster-computing framework for big data processing
- developed to address the limitations of the MapReduce model and offers a more flexible and efficient alternative
- features:
    - speed: performs in-memory processing, which significantly improves the processing speed compared to the traditional MapReduce model that relies heavily on disk-based storage
    - ease of use: provides high-level APIs in Java, Scala, Python, and R, making it accessible to a broad audience
    - versatility: supports a wide range of data processing tasks, including batch processing, interactive queries, streaming analytics, and machine learning
    - in-memory processing: utilizes resilient distributed datasets (RDDs), an immutable distributed collection of objects, to store data in-memory across a cluster
    - fault tolerance: provides fault tolerance through lineage information stored in RDDs
    - data processing libraries: comes with built-in libraries for various data processing tasks, such as Spark SQL for structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for real-time data processing
    - ease of integration: can be easily integrated with other popular big data technologies, such as Apache Hadoop, Apache Hive, Apache HBase, and more
    - lazy evaluation: uses lazy evaluation, meaning that transformations on RDDs are not executed immediately
    - community support: has a large and active open-source community, which contributes to its development and provides support through forums, mailing lists, and documentation
    - cluster manager integration: can run on various cluster managers, including Apache Mesos, Apache Hadoop YARN, and its standalone built-in cluster manager

<img alt="picture 3" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/165f411f2ed28f5b894819b57bc4f58dc4aebf98a14e9194a18cb50f9d98d8b4.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

2 main abstractions:
1. Resilient Distributed Dataset (RDD)
    - immutable, distributed collection of objects
    - partitioned across nodes in a cluster
    - can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs
    - can be cached in memory across machines, can be recomputed if lost due to node failure
2. Directed Acyclic Graph (DAG)
    - sequence of computations on data
    - each node in the graph represents a RDD, each edge represents a transformation on the data
    - transformations are lazy, only executed when an action is called, evaluated in parallel, fault-tolerant, can be recomputed if lost due to node failure

## Apache Flume

- distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data
- designed to ingest streaming data from various sources and deliver it to a centralized data store
- key features:
    - data ingestion: collects and aggregates log data from multiple sources, such as web servers, application logs, and social media feeds
    - reliability: ensures reliable data delivery through fault-tolerant mechanisms, such as data replication and error handling
    - scalability: scales horizontally to handle large volumes of data by distributing the workload across multiple nodes
    - extensibility: supports custom data sources and sinks through a flexible plugin architecture
    - fault tolerance: provides fault tolerance through data replication, checkpointing, and recovery mechanisms
- Flume event : unit of data flow having a byte payload and an optional set of string attributes 
- components:
    - Source: generates events and sends them to the channel
    - Channel: stores events until they are consumed by the sink
    - Sink: removes events from the channel and delivers them to the destination (or forwards them to the next Flume agent in the flow)
- allows for multi-hop flows, where events are passed from one agent to another in a chain-like manner and fan-in and fan-out flows, where events are aggregated or distributed across multiple agents

<img alt="picture 5" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/a40d6417c862e8ac4fcc9752f5fb3f90b08de40422e048ea1285659b4558c067.png" width="500"  style="display: block; margin-left: auto; margin-right: auto;">

- sample configuration file:
    ```
    # example.conf: A single-node Flume configuration  
    # Name the components on this agent  
    a1.sources = r1  
    a1.sinks = k1  
    a1.channels = c1  
    # Describe/configure the source  
    a1.sources.r1.type = netcat  
    a1.sources.r1.bind = localhost  
    a1.sources.r1.port = 44444  
    # Describe the sink  
    a1.sinks.k1.type = logger  
    # Use a channel which buffers events in memory  
    a1.channels.c1.type = memory  
    a1.channels.c1.capacity = 1000  
    a1.channels.c1.transactionCapacity = 100  
    # Bind the source and sink to the channel  
    a1.sources.r1.channels = c1  
    a1.sinks.k1.channel = c1  
    ```
- sample command to start Flume agent:
    `bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console`
- the config file defines a Flume agent with a source that listens on localhost:44444, a sink that logs events to the console, and a memory channel with a capacity of 1000 events
- the config directory contains additional configuration files for Flume agents, sources, sinks, and channels. It would contain the flume-env.sh file for environment variables, flume-site.xml for site-specific configurations, and log4j.properties for logging configuration
- it also supports Agent configurations via Zookeeper, where agents can be configured and managed centrally using Zookeeper
- doing consolidation:
    <img alt="picture 6" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/9e4067cddea1a1375f15e964b7f2c467cf6227c2f49b5963b18b0fa44239afff.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">
- multiplexing the flow:
    <img alt="picture 7" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/57d4cf63a41c045f3bd5efc76991e87e3ec4e905636769d0b7d40408eb97e3f3.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">
- channel configuration parameters: `capacity`, `transactionCapacity`, `keepAlive`, `byteCapacity`, etc
- source configuration parameters: `type`, `bind`, `port`, `channels`, `selector.type`, `selector.optional`, etc
- sink configuration parameters: `type`, `channel`, `batchSize`, `eventSize`, `writeFormat`, etc



## Apache Sqoop

- tool designed for efficiently transferring bulk data between Apache Hadoop and structured data stores, such as relational databases
- you can use Sqoop to import data from a relational database management system (RDBMS) into HDFS, Hive, or HBase, transform the data in Hadoop MapReduce, and then export the data back into an RDBMS
- automates the process of importing and exporting data using the database schema to generate the necessary Hadoop code
- uses MapReduce to import and export data, providing parallel data transfer

Commands:
```py
# Imports a table from a database into Hadoop.
sqoop import --connect jdbc:mysql://hostname/database --username user --password pass --table tablename --target-dir /path/to/hdfs/dir

sqoop import \
  --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' \
  --split-by a.id --target-dir /user/foo/joinresults

# Exports a Hadoop dataset back to a database.
sqoop export --connect jdbc:mysql://hostname/database --username user --password pass --table tablename --export-dir /path/to/hdfs/dir

sqoop list-databases --connect jdbc:mysql://hostname --username user --password pass

sqoop list-tables --connect jdbc:mysql://hostname/database --username user --password pass

sqoop eval --connect jdbc:mysql://hostname/database --username user --password pass --query "SELECT * FROM tablename LIMIT 10"

sqoop job --create jobname -- import/export (other options)
```

| Feature                    | Sqoop                           | Flume                           |
|----------------------------|---------------------------------|---------------------------------|
| Data Flow | Various RDBMS, NoSQL, can map to Hive/HBase from RDBMS | Streaming data, e.g. logs |
| Loading type | Not event driven | Event driven |
| Source integration | Connector driven | Agent driven |
| Usage | Structured sources | Streaming systems with semi-structured data, e.g. twitter feed, web server logs |
| Performance and reliability | Parallel transfer with high utilisation | Reliable transfer with aggregations at low latency |


## Apache Zookeeper [[Link]](https://zookeeper.apache.org/doc/r3.9.2/zookeeperOver.html) [[SO Link]](https://stackoverflow.com/a/70486661)

- centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services
- used for managing distributed systems and ensuring consistency across a cluster of machines
- key features:
    - configuration management: stores and manages configuration information for distributed systems
    - naming service: provides a hierarchical namespace for distributed systems
    - synchronization: provides distributed synchronization and coordination services
    - group services: allows distributed applications to form groups and perform group operations
    - fault tolerance: provides fault tolerance through replication and leader election mechanisms
    - consistency: ensures consistency across distributed systems through atomic broadcast and consensus algorithms
- working:
    - Zookeeper maintains a hierarchical namespace called znodes, similar to a file system directory structure
    - znodes can be ephemeral, persistent, or sequential, and can store data associated with them
    - servers replicated across a set of nodes called an ensemble, with one node acting as the leader and others as followers
    - servers maintain an in-memory image of state, along with transaction logs and snapshots
    - service is available as long as a majority of the ensemble is available
    - clients connect to any server in the ensemble (TCP connection) and can read, write, and watch znodes, and send heartbeats to maintain session
- CP system (not A) since if leader cannot be elected (no quorum), no new writes can be processed
- All client transactions are time stamped and ordered
- Typically meant for read-heavy workloads (10:1)


<img alt="picture 8" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/035b4d5650d6f9a5b86ab84e1b7bba004278bb67226e4d2833c67401c62c10d3.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

- Hierarchical Namespace
<img alt="picture 9" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/076643ad5d2dd8abd12080828d6b8bf77f0df754b586f0373bb8ab511101c587.png" style="display: block; margin-left: auto; margin-right: auto;">

It supports only these operations:
- create : creates a node at a location in the tree
- delete : deletes a node
- exists : tests if a node exists at a location
- get data : reads the data from a node
- set data : writes data to a node
- get children : retrieves a list of children of a node
- sync : waits for data to be propagated


## Apache Oozie

- workflow scheduler system to manage Apache Hadoop jobs
- allows users to define workflows to execute a series of actions in a specific order (DAGs of actions) - called Oozie workflow jobs
- supports various Hadoop jobs, such as MapReduce, Pig, Hive, Sqoop, and Spark
- Oozie coordinator jobs are recurrent Oozie workflow jobs triggered by time (frequency) and data availability
- Oozie actions can be Hadoop filesystem actions, Hadoop MapReduce jobs, Pig jobs, Hive jobs, Sqoop jobs, and Oozie sub-workflow action

Example workflow definition:
```xml
<workflow-app name='wordcount-wf' xmlns="uri:oozie:workflow:0.1">
    <start to='wordcount'/>
    <action name='wordcount'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.myorg.WordCount.Map</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.myorg.WordCount.Reduce</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to='end'/>
        <error to='end'/>
    </action>
    <kill name='kill'>
        <message>Something went wrong: ${wf:errorCode('wordcount')}</message>
    </kill/>
    <end name='end'/>
</workflow-app>
```

<img alt="picture 10" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/51ccbd6ff6e968b54e00c973b48c18a04d0936d1805246afa5e97f17d3de2cd3.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

# NoSQL Databases

Characteristics of NoSQL databases:
- Not Only SQL: non-relational databases that do not use SQL as their primary query language
- Non-relational data model: schema-less, flexible data model that can handle unstructured, semi-structured, and structured data
- Schema-less design: allows for dynamic schema changes without predefined schema constraints
- Loosen consistency to address scalability and availability requirements in large-scale applications
- Open source movement born out of web-scale applications
- Distributed for scale
- Cluster-friendly
- Focus on CP or AP in CAP, while RDBMS focuses on CA for single nodes (ACID properties)
- Auto sharding and replication
    - sharding : horizontal partitioning of data across multiple nodes to distribute the load and improve performance
    - replication : copies of data stored on multiple nodes to ensure fault tolerance and high availability

Use cases : big data, real-time web applications, cloud-based applications, content management systems, social media applications, IoT applications, e-commerce applications
Typical web scale systems do not need strict consistency and durability guarantees, but need to be highly available and partition tolerant like social networks, recommendation engines, retail catalogs, reviews and blogs, etc

Types of NoSQL databases:
1. Key-Value Stores: simple data model with a hash table of key and a value, e.g. Redis, DynamoDB
    - value can be a complex data structure or a simple object/record
2. Document Stores: store data in documents, e.g. MongoDB, CouchDB
    - formats like JSON
    - documents accessed by unique key, or indexed fields
3. Column Stores: store data in columns rather than rows, each storage block contains data from only one column, e.g. Cassandra, HBase
    - allows versioning of data, efficient for read-heavy workloads
4. Graph Databases: store data in graph structures with nodes, edges, and properties, e.g. Neo4j, Amazon Neptune
    - optimized for graph operations like traversals, shortest path, etc

## MongoDB 
- open-source, document-oriented NoSQL database that provides high performance, high availability, and easy scalability
- key features:
    - document-based data model: stores data in flexible, JSON-like documents called BSON (Binary JSON)
    - dynamic schema: allows for schema-less design, enabling changes to the data structure without downtime
    - high availability: supports replica sets for automatic failover and data redundancy
    - horizontal scalability: scales horizontally through sharding, distributing data across multiple nodes
    - rich query language: supports a powerful query language with features like secondary indexes, aggregation, and full-text search
    - flexible data model: supports complex data structures, nested arrays, and sub-documents
    - secondary indexes: allows for efficient querying and indexing of data
    - aggregation framework: provides a powerful framework for data aggregation and analysis
    - geospatial indexing: supports geospatial queries and indexing for location-based data
    - text search: provides full-text search capabilities for text data
    - transactions: supports multi-document transactions for ACID compliance
- data model:
    - collections: equivalent to tables in relational databases, store documents
    - documents: equivalent to rows in relational databases, store data in BSON format
    - fields: equivalent to columns in relational databases, store key-value pairs
    - indexes: improve query performance by creating indexes on fields

Example of a join:
```javascript
// Sample 'orders' collection
db.orders.insertMany([
  { _id: 1, productId: 101, quantity: 2 },
  { _id: 2, productId: 102, quantity: 1 },
  { _id: 3, productId: 101, quantity: 3 }
]);

// Sample 'products' collection
db.products.insertMany([
  { _id: 101, name: 'Product 1', price: 10 },
  { _id: 102, name: 'Product 2', price: 20 }
]);

// Perform a join operation using aggregate
db.orders.aggregate([
  {
    $lookup: {
      from: 'products',
      localField: 'productId',
      foreignField: '_id',
      as: 'product'
    }
  },
  {
    $unwind: '$product'
  },
  {
    $project: {
      _id: 1,
      productId: 1,
      quantity: 1,
      productName: '$product.name',
      productPrice: '$product.price'
    }
  }
]);
```

Replication:
1. Primary-Secondary Replication: primary node for read and write operations, secondary nodes for read-only operations
2. Replica Sets: group of MongoDB instances that maintain the same data set, provide redundancy and high availability
3. Automatic Failover: in case of primary node failure, a secondary node is automatically elected as the new primary
4. Data Redundancy: data is replicated across multiple nodes to ensure fault tolerance and data durability

Read concern levels:
1. "local": returns the most recent data available on the node
2. "majority": returns data that has been written to a majority of nodes
3. "linearizable": returns data that reflects all successful writes that have been acknowledged by a majority of nodes

Write concern levels:
1. "acknowledged": returns after the write operation has been acknowledged by the primary node
  - 1: returns after the write operation has been acknowledged by the primary node
  - 0: returns after the write operation has been sent to the primary node
  - n (number): returns after the write operation has been acknowledged by n nodes
2. "majority": returns after the write operation has been acknowledged by a majority of nodes
3. "journaled": returns after the write operation has been acknowledged by the primary node and written to the journal

Consistency scenarios:
1. read="majority", write="majority": casually consistent and durable
2. read="majority", write="acknowledged": causally consistent but not durable
3. read="local", write="majority": eventually consistent and durable writes
4. read="local", write="acknowledged": eventually consistent but not durable

## Apache Cassandra
- open-source, distributed NoSQL database designed for scalability and high availability without compromising performance
- key features:
    - linear scalability: scales linearly by adding more nodes to the cluster
    - fault tolerance: provides fault tolerance through data replication and automatic data distribution
    - tunable consistency: allows users to configure consistency levels based on their requirements
    - flexible data model: supports wide-column store with a schema-less design
    - eventual consistency: provides eventual consistency for read and write operations
    - built-in caching: supports in-memory caching for improved read performance
    - support for ACID transactions: supports atomicity, consistency, isolation, and durability for transactions
    - support for secondary indexes: allows for efficient querying of data
    - support for custom compaction strategies: provides flexibility in managing data compaction
- AP system (not C) since it focuses on availability and partition tolerance
- data model:
    - keyspace: equivalent to a database in relational databases, contains column families
    - column family: equivalent to a table in relational databases, contains rows and columns
    - partition key: used to distribute data across nodes, determines the partition where the data is stored
    - clustering key: used to define the order of data within a partition
    - secondary index: allows for querying data based on non-primary key columns
- replication strategies:
    - SimpleStrategy: used for single data center deployments (specify replication factor = N)
    - NetworkTopologyStrategy: used for multi-data center deployments
- consistency levels: for both read and write operations
    - ONE, TWO, THREE, ALL - number of replicas that must respond to a read/write operation
    - QUORUM - majority of replicas must respond
    - LOCAL_QUORUM - majority of replicas in the local data center must respond
    - ANY - at least one replica must respond

Consisitency scenarios:
1. QUORUM reads and writes: causally consistent and durable

Sample queries:
```sql
CREATE KEYSPACE my_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };

CREATE TABLE my_keyspace.my_table (
    id UUID PRIMARY KEY,
    name TEXT,
    age INT
);

INSERT INTO my_keyspace.my_table (id, name, age) VALUES (uuid(), 'Alice', 30);

SELECT * FROM my_keyspace.my_table WHERE name = 'Alice';

UPDATE my_keyspace.my_table SET age = 31 WHERE name = 'Alice';

DELETE FROM my_keyspace.my_table WHERE name = 'Alice';
```

## Neo4j

- open-source, graph database that uses graph structures with nodes, edges, and properties to represent and store data
- key features:
    - graph data model: stores data in nodes, relationships, and properties
    - native graph storage: optimized for storing and querying graph data
    - property graph model: supports properties on nodes and relationships
    - Cypher query language: expressive query language for graph data
    - ACID transactions: supports atomicity, consistency, isolation, and durability for transactions
    - high performance: optimized for graph traversals and pattern matching
    - scalability: scales horizontally through clustering and sharding
    - built-in indexing: provides indexing for efficient querying of graph data
    - graph algorithms: supports a wide range of graph algorithms for analyzing graph data
    - visualization: provides tools for visualizing and exploring graph data
    - integration: integrates with various programming languages and frameworks

Data model:
- nodes: entities in the graph, represent entities like people, products, etc
- relationships: connect nodes, represent relationships between entities
- properties: key-value pairs associated with nodes and relationships, represent attributes of entities and relationships
- labels: used to categorize nodes, represent types of entities
- indexes: used to quickly look up nodes based on properties

Sample queries:
```cypher
CREATE (alice:Person { name: 'Alice', age: 30 });
CREATE (bob:Person { name: 'Bob', age: 35 });

MATCH (alice:Person { name: 'Alice' }) RETURN alice;

MATCH (alice:Person { name: 'Alice' }) SET alice.age = 31 RETURN alice;

MATCH (alice:Person { name: 'Alice' }) DELETE alice;

MATCH (:Person { name: 'Tom Hanks' })-[r:ACTED_IN]->(m:Movie), (:Person { name: 'Ron Howard' })-[r:DIRECTED]->(m) WHERE m.released > 2000 RETURN m LIMIT 10;

MATCH (p:Person)-[r:ACTED_IN]->(m:Movie) WHERE p.name = 'Tom Hanks' RETURN m.title;
```

# Spark
- open-source, distributed computing system that provides a fast and general-purpose cluster-computing framework for big data processing
- developed to address the limitations of the MapReduce model and offers a more flexible and efficient alternative
- features:
    - speed: performs in-memory processing, which significantly improves the processing speed compared to the traditional MapReduce model that relies heavily on disk-based storage
    - ease of use: provides high-level APIs in Java, Scala, Python, and R, making it accessible to a broad audience
    - versatility: supports a wide range of data processing tasks, including batch processing, interactive queries, streaming analytics, and machine learning
    - in-memory processing: utilizes resilient distributed datasets (RDDs), an immutable distributed collection of objects, to store data in-memory across a cluster
    - fault tolerance: provides fault tolerance through lineage information stored in RDDs
    - data processing libraries: comes with built-in libraries for various data processing tasks, such as Spark SQL for structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for real-time data processing
    - ease of integration: can be easily integrated with other popular big data technologies, such as Apache Hadoop, Apache Hive, Apache HBase, and more
    - lazy evaluation: uses lazy evaluation, meaning that transformations on RDDs are not executed immediately
    - community support: has a large and active open-source community, which contributes to its development and provides support through forums, mailing lists, and documentation
    - cluster manager integration: can run on various cluster managers, including Apache Mesos, Apache Hadoop YARN, and its standalone built-in cluster manager
- covers variety of workloads that earlier required separate distributed systems like:
    - batch processing: Hadoop MapReduce
    - interactive queries: Hive
    - streaming analytics: Storm
    - machine learning: Mahout
    - graph processing: Giraph
- can be used from Python, Scala, Java, and R

## RDDs (Resilient Distributed Datasets)
- fundamental data structure in Spark, an immutable distributed collection of objects that can be operated on in parallel
- can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs
    - created by transforming data in stable storage like HDFS or S3 using transformations like `map`, `filter`, `groupByKey`, `join`, etc
- can be cached in memory across machines, can be recomputed if lost due to node failure
- transformations are lazy, only executed when an action is called, evaluated in parallel, fault-tolerant, can be recomputed if lost due to node failure
- operations:
    - transformations: create a new RDD from an existing one, eg. `map`, `filter`, `reduceByKey`, `join`
        - optimized by Spark through lazy evaluation and pipelining
        - recover data from failures by recomputing from the original data
    - actions: return a value to the driver program after running a computation on the dataset, eg. `collect`, `count`, `reduce`, `saveAsTextFile`
        - trigger the execution of the DAG of transformations
        - materialize the result of the computation
- tranformations:
    1. `map(func)`: returns a new RDD by applying a function to each element of the RDD
    2. `filter(func)`: returns a new RDD by selecting elements that satisfy a predicate
    3. `flatMap(func)`: similar to `map`, but each input item can be mapped to 0 or more output items, useful for tokenizing
    4. `sample(withReplacement, fraction, seed)`: returns a sampled subset of the RDD
    5. `union(otherDataset)`: returns the union of the RDD with another one
    6. `intersection(otherDataset)`: returns the intersection of the RDD with another one
    7. `distinct(numPartitions)`: returns a new RDD with distinct elements
    8. `groupByKey(numPartitions)`: returns a new RDD of `(K, Iterable[V])` pairs
    9. `reduceByKey(func, [numPartitions])`: when called on a dataset of `(K, V)` pairs, returns a dataset of `(K, V)` pairs where the values for each key are aggregated using the given reduce function
    10. `sortByKey(ascending, [numPartitions])`: returns an RDD sorted by key
    11. `join(otherDataset, [numPartitions])`: returns an RDD containing all pairs of elements with matching keys in self and other
    12. `cogroup(otherDataset, [numPartitions])`: returns an RDD of `(K, (Iterable[V], Iterable[W]))` pairs
    13. `cartesian(otherDataset)`: returns the Cartesian product of the two datasets
- actions:
    1. `reduce(func)`: aggregate the elements of the dataset using a function `func` (which takes two arguments and returns one)
    2. `collect()`: return all elements of the dataset as an array at the driver program
    3. `count()`: return the number of elements in the dataset
    4. `first()`: return the first element of the dataset
    5. `take(n)`: return an array with the first `n` elements of the dataset
    6. `takeSample(withReplacement, num, [seed])`: return an array with a random sample of `num` elements of the dataset
    7. `saveAsTextFile(path)`: write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS, or any other Hadoop-supported file system
    8. `countByKey()`: only available on RDDs of type `(K, V)`, returns a hashmap of `(K, Int)` pairs with the count of each key
    9. `foreach(func)`: run a function `func` on each element of the dataset

Dataframes : 
- provides a more structured and optimized way to work with data compared to RDDs (named columns, schema, etc) - fastest option

Datasets :
- provides the type safety of RDDs with the optimization benefits of DataFrames - best of both worlds
- can be used with Java, Scala only

<img alt="picture 0" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/2d656b48ab268f1f9e135d2624301114f7759880295fb8c32b3a82ccf0b93fb5.png" width="1000" style="display: block; margin-left: auto; margin-right: auto;">

```scala
// Create two DataFrames
val df1 = spark.createDataFrame(Seq(
  (1, "Alice"),
  (2, "Bob"),
  (3, "Charlie")
)).toDF("id", "name")

val df2 = spark.createDataFrame(Seq(
  (1, "Engineering"),
  (2, "Finance"),
  (3, "Marketing")
)).toDF("id", "department")

// Perform inner join
val joinedDF = df1.join(df2, Seq("id"), "inner")
// can also be "outer", "left", "right", "left_outer", "right_outer", "left_semi", "left_anti"
```

Spark stack:
<img alt="picture 1" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/767e579dc6f96f27cfda5046e3825d9c4db14cc77127f5416898850e98f9fd6c.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">
1. Spark Core : basic functionality and API of Spark, including RDDs and transformations, and components for task scheduling, memory management, fault recovery, etc
2. Spark SQL : provides support for structured and semi-structured data, including DataFrames, Datasets, and SQL queries
    - allows using Hive variant of SQL called HiveQL, intermixing SQL queries with Spark programs in Java, Scala, Python, and R
3. MLlib : provides machine learning algorithms and utilities for classification, regression, clustering, collaborative filtering, etc
    - lower level primitives for ML algorithms like gradient descent optimization algorithms, linear algebra, etc
4. Spark Streaming : enables scalable, high-throughput, fault-tolerant stream processing of live data streams
    - can ingest data from various sources like Kafka, Flume, Kinesis, etc
    - can process data using complex algorithms expressed with high-level functions like map, reduce, join, window, etc and common graph processing algorithms like PageRank, triangle counting, etc
4. GraphX : provides an API for graphs and graph-parallel computation, including graph algorithms like PageRank, connected components, etc

Transformations can be broadly classified into two categories:
1. Narrow transformations: each input partition contributes to at most one output partition, eg. `map`, `filter`, `flatMap`, `mapPartitions`, `mapPartitionsWithIndex`, `sample`, `union`, `intersection`, `distinct`, `groupByKey`, `reduceByKey`, `join`, `cogroup`, `cartesian`
2. Wide transformations: each input partition contributes to multiple output partitions, eg. `groupByKey`, `reduceByKey`, `join`, `cogroup`, `cartesian`, `sortByKey`

Spark SQL example
```scala
val res = spark.sql("create database temp")
val res = spark.sql("show databases").show()
spark.sql("CREATE TABLE employee(id INT, name STRING, age INT) ROW FORMAT DELIMITED FIELDS 
TERMINATED BY ',' LINES TERMINATED BY '\n'")
val res = spark.sql("show tables")
res.show()
val res = spark.sql("desc employee")
res.show()
spark.sql("LOAD DATA INPATH '/employee.csv' INTO TABLE employee")
val res = spark.sql("FROM employee SELECT id, name, age where age > 40")
res.show()

val DF = spark.read.option("header",true).csv("bank.csv") // read CSV file into a DF
DF.show() // Examine the DF
DF.createOrReplaceTempView("BANK") // Create a Table named BANK from DF
spark.sql("desc BANK").show() // Display schema
spark.sql("SELECT age, job, balance FROM BANK").show(5) // SQL select qury
spark.sql("SELECT age, job, balance FROM BANK").where("job == ‘admin.’").show(10)
// SORT by age 
spark.sql(""" SELECT age, job, balance FROM BANK WHERE job in ('admin.','services') order by age""").show(10)
// SQL GROUP BY clause
spark.sql(""" SELECT job, count(*) as count FROM BANK GROUP BY job""").show()
//SQL JOIN
val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id")
```

### Spark Runtime architecture

<img alt="picture 2" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/cbc682ec248518f3f44d6b994283423f96d69668243c17ffcb2235b926593930.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

1. Driver: manages the execution of the Spark application, including creating the SparkContext, distributing the code to the executors, and collecting the results
    It contains the :
    - DAGScheduler: computes a DAG of stages for each job and submits them to the TaskScheduler
        - also breaks the DAG into stages of tasks for submission to the TaskScheduler
    - TaskScheduler: responsible for scheduling tasks on the cluster, managing task execution, and handling failures
    - RDD graph compiler: optimizes the logical execution plan generated by the DAGScheduler
    - SchedulerBackend: interface to the cluster manager, responsible for launching executors and scheduling tasks
    - BlockManager: manages data storage for RDDs, caching, and shuffle data
2. SparkContext: represents the connection to a Spark cluster, used to create RDDs, broadcast variables, and accumulators
3. Executors: run the tasks and store the data for the application, managed by the driver
    - provides in-memory storage for RDDs that are cached by user programs
4. Tasks: individual units of work that are sent to the executors for execution
5. Cluster Manager: manages the resources of the cluster and allocates them to the Spark application
    - can be standalone, Mesos, YARN, Kubernetes, etc

`spark-submit` command:
```bash
spark-submit --class <main-class> --master <master-url> --deploy-mode <deploy-mode> --conf <key=value> <application-jar> [application-arguments]
```
- it submits a Spark application to the cluster for execution with the specified configuration
- `--class`: main class of the application (eg. `org.apache.spark.examples.SparkPi`)
- `--master`: URL of the cluster manager (eg. `spark://host:port`, `mesos://host:port`, `yarn`, `local`)
- `--deploy-mode`: deployment mode for the application (eg. `client`, `cluster`)
- `--conf`: configuration properties for the application (eg. `spark.executor.memory=4g`)
- `application-jar`: JAR file containing the application code and dependencies (eg. `myApp.jar`)
- `application-arguments`: arguments passed to the application (eg. `arg1 arg2`)

Shuffle operations:
- operations that require data to be shuffled across the network, such as `groupByKey`, `reduceByKey`, `join`, `sortByKey`, etc
- can be expensive in terms of network and disk I/O, and can impact the performance of the application
- Spark optimizes shuffle operations through various mechanisms like pipelining, combiners, partitioning, and spill to disk

Broadcast variables:
- used to efficiently distribute large read-only data to all tasks in a Spark application
- broadcast variables are cached in serialized form and distributed to all executors
- can be used to reduce the cost of shipping large objects to tasks, especially when they are used multiple times across multiple tasks
- can be created using the `broadcast` method on the SparkContext

Accumulators:
- used to aggregate values from tasks and return the aggregated result to the driver
- accumulators are write-only variables that can be updated by tasks and read by the driver
- can be used for tasks like counting the number of occurrences of an event, summing values, etc
- can be created using the `accumulator` method on the SparkContext

RDD Persistence:
- RDDs can be persisted in memory or disk to avoid recomputation of the same data
- persistence levels: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `OFF_HEAP`
    - `MEMORY_ONLY`: store RDD partitions in memory
    - `MEMORY_AND_DISK`: store RDD partitions in memory, spill to disk if memory is full
    - `MEMORY_ONLY_SER`: store RDD partitions in memory as serialized Java objects
    - `MEMORY_AND_DISK_SER`: store RDD partitions in memory as serialized Java objects, spill to disk if memory is full
    - `DISK_ONLY`: store RDD partitions on disk
    - `OFF_HEAP`: store RDD partitions off-heap in serialized form
- can be persisted using the `persist` method on the RDD
- Spark automatically unpersists RDDs that are no longer used to free up memory using LRU eviction policy
    - can manually unpersist RDDs using the `unpersist` method

### MLlib
- scalable machine learning library built on top of Spark that provides a wide range of machine learning algorithms and utilities
- key features:
    - distributed algorithms: provides distributed implementations of popular machine learning algorithms
    - high-level APIs: supports high-level APIs for building machine learning pipelines
    - integration with Spark SQL: integrates with Spark SQL for data preprocessing and feature engineering
    - model persistence: supports model persistence for saving and loading machine learning models
    - hyperparameter tuning: provides utilities for hyperparameter tuning and model selection
- tasks supported by MLlib:
    - feature transformations: supports feature transformations like vectorization, normalization, and encoding
    - classification: provides algorithms for classification tasks like logistic regression, decision trees, random forests, etc
    - regression: provides algorithms for regression tasks like linear regression, decision trees, random forests, survival regression, etc
    - clustering: provides algorithms for clustering tasks like k-means, Gaussian mixture models, frequent itemsets, association rules, sequential pattern mining
    - collaborative filtering: provides algorithms for collaborative filtering tasks like alternating least squares, matrix factorization, etc
    - dimensionality reduction: provides algorithms for dimensionality reduction tasks like singular value decomposition, principal component analysis, etc
    - evaluation metrics: provides utilities for evaluating machine learning models using metrics like accuracy, precision, recall, F1 score, etc
- primary API is the dataframe-based API in `spark.ml` package, which provides higher-level APIs built on top of DataFrames

Recommendation system:
1. Collaborative filtering: based on the idea that users who have agreed in the past will agree in the future
    - user-based collaborative filtering: recommend items by finding similar users based on their ratings
    - item-based collaborative filtering: recommend items by finding similar items based on user ratings
2. Matrix factorization: factorize the user-item interaction matrix into two lower-dimensional matrices
    - alternating least squares (ALS): popular algorithm for matrix factorization in collaborative filtering
    - ALS is implemented in MLlib as `ALS` algorithm

## Spark Streaming
- extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams
- key features:
    - micro-batch processing: processes data in small, user-defined batches, enabling low-latency processing
    - fault tolerance: provides fault tolerance through checkpointing and write-ahead logs
    - window operations: supports windowed computations over data streams
    - integration with Spark Core: integrates seamlessly with Spark Core, allowing users to combine batch and streaming processing
    - support for various data sources: supports various data sources like Kafka, Flume, Kinesis, etc
    - high-level APIs: provides high-level APIs for stream processing, including transformations and output operations
    - exactly-once semantics: supports exactly-once semantics for end-to-end fault-tolerant stream processing
    - integration with MLlib: integrates with MLlib for real-time machine learning and analytics

Continuous Operator Model:
- traditional model, there are a set of worker nodes, each of which run one or more continuous operators
- each continuous operator processes the streaming data one record at a time and forwards the records to other operators in the pipeline
- there are "source" operators for receiving data from ingestion systems, and "sink" operators that output to downstream systems

Spark is designed to:
- fast failure and straggler recovery: recover quickly from failures and stragglers to provide results in real time
- load balancing: dynamically adapt the resource allocation based on the workload
- unification of batch, streaming, and interactive workloads: combine batch, streaming, and interactive queries
- advanced analytics like machine learning and SQL queries: support complex workloads like continuously learning and updating data models, or querying the "latest" view of streaming data with SQL queries
- to solve these challenges, Spark Streaming uses a new architecture called Discretized Streams (DStreams)

Discretized Streams (DStreams):
- basic abstraction provided by Spark Streaming, represents a continuous stream of data
    - allows for the processing of live data streams in a fault-tolerant manner, recovering fast from failures and dynamic scheduling of batches leading to better load balancing
- recievers accept data in parallel and buffer it in the memory of Spark's worker nodes, then the latency-optimized Spark engine runs short tasks to process the batches and output the results to other systems
- unlike the traditional continuous operator model, where the computation is statically allocated to a node, Spark tasks are assigned dynamically to the workers based on the locality of the data and available resources
- internally, a DStream is represented as a sequence of RDDs
- can be created from various input sources like Kafka, Flume, Kinesis, etc
- supports various transformations like `map`, `filter`, `reduceByKey`, `join`, `window`, etc

Basic Streaming Transformations:
- Stateless transformations: (these transformations do not depend on previous data)
    - `map`: transforms one record to one record
    - `filter`: filters out records that don't satisfy a predicate
    - `flatMap`: transforms one record to zero or more records
- Stateful transformations: (these transformations depend on previous data)
    - `aggregation`: combines all records to produce a single value
    - `group-and-aggregate`: extracts a grouping key from the record and computes a separate aggregated value for each key
    - `join`: joins same-keyed records from several streams
    - `sort`: sorts the records observed in the stream

Transformations on DStreams:
- `map(func)`: return a new DStream by passing each element of the source DStream through a function `func`
- `flatMap(func)`: similar to `map`, but each input item can be mapped to 0 or more output items
- `filter(func)`: return a new DStream by selecting only the records of the source DStream on which `func` returns true
- `repartition(numPartitions)`: changes the level of parallelism in this DStream by creating more or fewer partitions
- `union(otherStream)`: return a new DStream that contains the union of the elements in the source DStream and otherDStream
- `count()`: return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream
- `reduce(func)`: return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function `func` (which takes two arguments and returns one)
- `countByValue()`: when called on a DStream of elements of type K, return a new DStream of `(K, Long)` pairs where the value of each key is its frequency in each RDD of the source DStream
- `reduceByKey(func, [numTasks])`: when called on a DStream of `(K, V)` pairs, return a new DStream of `(K, V)` pairs where the values for each key are aggregated using the given reduce function
- `join(otherStream, [numTasks])`: when called on two DStreams of `(K, V)` and `(K, W)` pairs, return a new DStream of `(K, (V, W))` pairs with all pairs of elements for each key
- `cogroup(otherStream, [numTasks])`: when called on a DStream of `(K, V)` and `(K, W)` pairs, return a new DStream of `(K, Seq[V], Seq[W])` tuples
- `transform(func)`: return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream
- `updateStateByKey(func)`: return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key

Window operations:
- apply transformations over a sliding window of data, the window slides over a source DSream, and the source RDD's that fall within the window are combined and operated upon
- must specify the window length and sliding interval (eg. window length of 3 RDDs and sliding interval of 2 RDDs means the window slides every 2 RDDs and contains the last 3 RDDs)
- functions:
    - `window(windowLength, slideInterval)`: return a new DStream which is computed based on windowed batches of the source DStream
    - `countByWindow(windowLength, slideInterval)`: return a sliding window count of elements in the stream
    - `reduceByWindow(func, windowLength, slideInterval)`: return a new single-element stream, created by aggregating elements in the stream over a sliding interval using `func`
    - `reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])`: when called on a DStream of `(K, V)` pairs, return a new DStream of `(K, V)` pairs where the values for each key are aggregated using the given reduce function
    - `reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])`: a more efficient version of the above `reduceByKeyAndWindow()` where the reduce value of each window is calculated incrementally using the reduce values of the previous window
    - `countByValueAndWindow(windowLength, slideInterval, [numTasks])`: when called on a DStream of `(K, V)` pairs, return a new DStream of `(K, Long)` pairs where the value of each key is its frequency within a sliding window
    
Structured Streaming:
- treats a live data stream as a table that is being continuously appended, 
- express streaming computation as standard batch-like query as on a static table, each query runs as an incremental query on the unbounded input table
- does not materialize the entire table, reads the latest available data from the streaming data source, processes data incrementally to update the result, and discards the source data after processing
- Spark is responsible for updating the Result Table when there is new data
- output modes:
    - `append`: only the new rows in the streaming DataFrame/Dataset will be written to the sink
    - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink every time there are some updates
    - `update`: only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates

| Info | Spark | Kafka |
| --- | --- | --- |
| Description | General purpose distributed processing system used for big data workloads. It uses in-memory caching and optimized query execution to provide fast analytic queries against data of any size. | Distributed Streaming platform that allows developers to create applications that continuously produce and consume data streams. |
| Processing Model | Batch processing and streaming | Event Streaming |
| Throughput & Latency | High throughput, low latency | High throughput, low latency |
| Scalability | Horizontally scalable as a distributed system, though scaling is expensive due to RAM requirement | Horizontally scalable to support a growing number of users and use cases, meaning that it can handle an increasing amount of data by adding more nodes & partitions to the system |
| Fault Tolerance | Fault tolerant by nature (RDDs). In case of node failure, RDDs are automatically recreated. | Fault-tolerant as it replicates data across multiple servers and can automatically recover from node failures |

### Windowing
- 4 types:
    1. Tumbling window: non-overlapping, fixed-size windows
    2. Sliding window: overlapping, fixed-size windows
        - window slides by a fixed interval reevaluted only when the contents of the window change, every time a new record arrives
        - eg. calculate the sum of the last 5 elements in a stream every 2 elements (or moving average of 5 elements)
    3. Session window: windows based on session boundaries
        - window is re-evaluated based on the arrival of new data and the session boundaries, every time there is activity in the session, the boundary is extended
        - eg. calculate the total time spent by a user on a website in a session
    4. Hopping window: fixed-size windows that hop by a fixed interval
        - window is re-evaluated on fixed time intervals irrespective of the data stream arrival
        - can be used to calculate metrics like hourly, daily, weekly, etc

<img alt="picture 3" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/2426dd54973a39d86e1c0df1f2fad404da016265f75306b0eed0547003ea3b18.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

### Exam answers

HBase vs Cassandra:
- Hadoop vs HBase - limitation that HBase addresses is its lack of real-time, random read and write access to data. Hadoop's primary storage system, HDFS, is optimized for batch processing, which makes it inefficient for real-time queries and updates.
- Updates and deletions in HBases are implemented through a concept called "Write-Ahead Logging (WAL)." When a write operation occurs, the new data is first written to a WAL file. Then, the data is written to a MemStore, an in-memory data structure. Periodically, MemStore data is flushed to disk as an HFile, which is an immutable, sorted file. Deletions are implemented by writing a special marker called a "delete marker" to the WAL and MemStore. During compaction, HBase removes these delete markers and any corresponding data from the HFiles, effectively deleting the row.
- Updates and deletions in Cassandra are implemented through a process called "tombstone insertion." When a row is updated or deleted, a special marker called a "tombstone" is written for that particular row key. Tombstones are used during compaction to indicate that a particular row is deleted. During compaction, Cassandra merges SSTables and removes any rows marked with tombstones, effectively deleting the row.

Flume config file to ingest data from a web server log file and write it to HDFS in real-time, only when 1000 or more entries are made to the webserver log
```bash
# Define the agent name, source, channel, and sink
agent.sources = source1
agent.channels = channel1
agent.sinks = sink1

# Configure the source
agent.sources.source1.type = exec
agent.sources.source1.command = tail -F /path/to/webserver/log/file

# Configure the channel
agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 10000
agent.channels.channel1.transactionCapacity = 1000

# Configure the sink
agent.sinks.sink1.type = hdfs
agent.sinks.sink1.hdfs.path = hdfs://<namenode>:<port>/path/to/hdfs/folder
agent.sinks.sink1.hdfs.filePrefix = logs-
agent.sinks.sink1.hdfs.fileSuffix = .log
agent.sinks.sink1.hdfs.rollInterval = 0
agent.sinks.sink1.hdfs.rollSize = 0
agent.sinks.sink1.hdfs.rollCount = 1000
agent.sinks.sink1.hdfs.batchSize = 1000
agent.sinks.sink1.hdfs.txnEventMax = 1000

# Bind the source and sink to the channel
agent.sources.source1.channels = channel1
agent.sinks.sink1.channel = channel1
```

Carefully go through scala Code1 and scala Code2 give below and answer the questions given below them.  
```scala
val lines = new Array[String](2) 
lines(0) = "Hello world" 
lines(1) = "How are you world" 
val stringRDD=sc.parallelize(lines,1) 
val wordsRDD = stringRDD.flatMap(x => x.split(" ")) 
val wordRDD = wordsRDD.collect()
```
```scala
val lines = new Array[String](2) 
lines(0) = "Hello world" 
lines(1) = "How are you world" 
val stringRDD=sc.parallelize(lines,1) 
val wordsRDD = stringRDD.flatMap(x => x.split(" ")) 
val filtRDD = wordsRDD.filter(x => x.startsWith("H")) 
val wordRDD = wordsRDD.collect() 
```

1. Will there be any difference in the execution time for Code1 and Code2 ? 
- Additional filter operation in Code2, so Code2 will take more time to execute compared to Code1, but the difference may be negligible for small datasets
2. Modify Code1 to run as 2 tasks in parallel 
- `sc.parallelize(lines, 2)` to run the RDD creation in parallel
3. Modify the Code2 to output only words having a length of 5 
- `val filtRDD = wordsRDD.filter(x => x.length == 5)` to filter words with a length of 5
4. Modify Code2 to output the number of unique words in the list.
- `val uniqueWords = wordsRDD.distinct().count()` to get the count of unique words

Concepts relating to Cassandra:
- primary key design: crucial for data distribution and retrieval efficiency
- composite primary key: allows for efficient distribution and retrieval of data based on multiple criteria
- partition key: determines how data is distributed across nodes in the cluster
- clustering key: determines the physical sorting order of data within a partition
- data locality: storing related data together based on the primary key can improve data locality and reduce the need to query multiple nodes for related data
- query optimization: understanding query patterns and access patterns is essential for designing an effective primary key structure that minimizes the need for data scans and improves query performance
- trade-offs: balancing data distribution, data locality, and query performance based on the specific requirements and access patterns of the application

## Architecture Diagrams

### Hadoop

<img alt="picture 12" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/c8d13a26353d5a6c9d27fcdf1a9248edadf083645aeeee53330acd17c52f7135.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

### HDFS

<img alt="picture 11" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/96d8065c92beac6ec11fc327ce744524d8e2a15895954e5006786160366bbac3.png" width="500" />
<img alt="picture 13" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/324c076408e8129db78d87ec8ceaaffb695494ef780439e5dec42a0b6995316b.png" width="500" />  

### YARN

<img alt="picture 14" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/24853241be11497c90cf30b4885c9bb76b7244ad8f816e2df2c8e8d392c5dea7.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

YARN scheduling policies:
- FIFO: first-in, first-out scheduling policy, where jobs are executed in the order they are submitted
- Capacity: allows for multiple queues, each with a guaranteed capacity, and jobs are scheduled based on the available capacity in each queue
- Fair: dynamically allocates resources to jobs based on the demand, ensuring that all jobs get a fair share of the cluster resources

### HBase

<img alt="picture 16" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/c2c102a530317488c2b5d49fb2fd6a7ff8865ff19464c03f77b7b34b84815527.png" width="600" style="display: block; margin-left: auto; margin-right: auto;">

### Pig

<img alt="picture 17" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/d825829d1afd71e83ce17514b47474db1305923375501199f32e8c4360940106.png" width="300" style="display: block; margin-left: auto; margin-right: auto;">

### Hive

<img alt="picture 18" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/ceddd053d34825eecef17433e30fd6546088091eff24329b6126ede9e8a21aeb.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

### Sqoop

<img alt="picture 19" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/b3a13b09e3cf3b2ab71fa1d27acff4dd19b87a49316d3ce1fe3a88e34b3fefbe.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

### Flume

<img alt="picture 20" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/07307b3ab41d9a193d341c27262bbd49102c63c57bed72742ca6f699f9d19a0a.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

### Oozie

<img alt="picture 21" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/8a74d673a3e0a5f65dfe63f8d45c4bfe5e8fc1318a8b37b2005d548d4036f880.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

### MongoDB
<img alt="picture 22" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/becbf7882812def4c5bfea4359575b7ed611b7a4bc813ff4eaa3d477427f9586.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

### Cassandra

<img alt="picture 23" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/ca810848243d3c4b95177562ff27a7763f7f16c08bff770b8ccee58c8a33a10e.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">  

### Spark
<img alt="picture 24" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/165f411f2ed28f5b894819b57bc4f58dc4aebf98a14e9194a18cb50f9d98d8b4.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">

### Kafka

<img alt="picture 25" src="https://cdn.jsdelivr.net/gh/sharatsachin/images-cdn@master/images/b28bd84242c4fb13329a1bf75fe5f1a649a649bf6cdee0dcca0108df27406f07.png" width="500" style="display: block; margin-left: auto; margin-right: auto;">