# ECE 454 - Distributed Computing

## Introduction

### Distributed System

- **Distributed System**: A collection of autonomous computing elements that appears to its users as a single coherent system.

#### Motivations for Distributed Systems

1. Resource Sharing
2. Simplify Processes by Integrating Multiple Systems
3. Limitations in Centralized Systems: Weak/Unreliable
4. Distributed/Mobile Users

#### Goals for Distributed Systems

1. Resource Sharing
    - CPUs, Data, Peripherals, Storage.
2. Transparency
    - Access, Location, Migration, Relocation, Replication, Concurrency, Failure.
3. Open
    - Interoperability, Composability, Extensibility.
4. Scalable
    - Size, Geography, Administration.
    
#### Types of Distributed Systems

- Web Services
- High Performance Computing, Cluster Computing, Cloud Computing, Grid Computing
- Transaction Processing
- Enterprise Application Integration
- Internet of Things, Sensor Networks

### Middleware

- **Middleware**: A layer of software that separates applications from the underlying platforms.
    - Supports Heterogeneous Computers/Networks.
    - *e.g.*: Communication, Transactions, Service Composition, Reliability.
    - **Single-System View**

### Scaling Techniques

1. *Hiding Communication Latencies*: At Server vs. At Client?
2. *Partitioning*
3. *Replication*

### Fallacies of Networked and Distributed Computing

1. Network is reliable.
2. Network is secure.
3. Network is homogeneous.
4. Topology is static.
5. Latency is zero.
6. Bandwidth is infinite.
7. Transport cost is zero.
8. There is only one administrator.

### Shared Memory vs. Message Passing

- **Shared Memory**:
    - Less Scalable
    - Faster
    - CPU-Intensive Problems
    - Parallel Computing
- **Message Passing**:
    - More Scalable
    - Slower
    - Resource Sharing / Coordination Problems
    - Distributed Computing
- Apache Hadoop is an example of a hybrid computing framework that uses message passing at a broad-view and shared memory at a detailed-view.

### Cloud and Grid Computing

- **IaaS**: Infrastructure as a Service
    - VM Computation, Block File Storage
- **PaaS**: Platform as a Service
    - Software Frameworks, Databases
- **SaaS**: Software as a Service
    - Web Services, Business Apps

### Transaction Processing Systems

- **Transaction Processing Monitor**: Coordinates Distributed Transactions

## Architectures

### Definitions

- **Component**: A modular unit with well-defined interfaces.
- **Connector**: A mechanism that mediates communication, coordination, or cooperation among components.
- **Software Architecture**: Organization of software components.
- **System Architecture**: Instantiation of software architecture in which software components are placed on real machines.
- **Autonomic System**: Adapts to its environment by monitoring its own behavior and reacting accordingly.

### Architectural Styles

- Layered
    - *Note: Assignment Topic*
- Object-Based
- Data-Centered
- Event-Based

### Layered Architecture

![Layers](images/Architecture_1.png)

- **Examples**:
    - Database Server, Application Server, Client
    - SSH Server, SSH Client
- Requests Flow Down Stack
- Responses Flow Up Stack
- *Handle-Upcall*: Async Notification
    - Subscribe with Handle
    - Publish with Upcall

### Client-Server Interactions

![Client-Server Interactions](images/Architecture_2.png)

- *Bolded Lines* = Busy
- *Dashed Lines* = Idle
- **Client**: Initiates with a Request
- **Server**: Follows with a Response
- **Total Round-Trip Time**: $(N - 1) \times t_{\text{Request-Response}}$
    - Layering can reduce the amount of processing time per layer, but the additional communication overhead between the layers introduces diminishing returns.
- An intermediate layer can be both a client and a server to the others.

### Multi-Tiered Architecture

- Logical Software Layers $\mapsto$ Physical Tiers
    - *Trade-Offs*: Ease of Maintenance vs. Reliability

### Horizontal vs. Vertical Distribution

- **Vertical Distribution**: When the logical layers of a system are organized as separate physical tiers.
    - *Performance*: High.
    - *Scalability*: Low.
    - *Dependability*: Low-Medium.
- **Horizontal Distribution**: When one logical layer is split across multiple machines - **sharding**.
    - *Performance*: Low.
    - *Scalability*: High.
    - *Dependability*: Medium-High.

### Object-Based Architecture

- In an object-based architecture, components communicate using remote object references and method calls.

#### Problems with Object-Based Architecture

- Complex Communication Interfaces
- Complex Communication Costs
- Not Scalable
- Not Language Agnostic

### Data-Centered Architecture

- In a data-centered architecture, components communicate by accessing a shared data repository.

### Event-Based Architecture

![Publish/Subscribe Middleware](images/Architecture_3.png)

- In an event-based architecture, components communicate by propagating events using a publish/subscribe system.

#### Handling Asynchronous Delivery Failure

- *At-Least Once Delivery*: Do Retransmit
- *At-Most Once Delivery*: Do Not Retransmit
- *Exactly Once Delivery*: Unknown/Unachievable

### Peer-to-Peer Systems

![Chord's Finger Table](images/Architecture_4.png)

- In a peer-to-peer system, decentralized processes are organized in an overlay network that defines a set of communication channels.
- In a peer-to-peer, distributed hash table, a keyspace is represented by a consistent hash ring on top of which nodes partition ranges amongst themselves.
- The mappings of partition ranges to nodes are maintained by a finger table which can be queried in a logarithm process.

### Hybrid Architectures

- BitTorrent is an example of a hybrid architecture combining a client-server architecture and a peer-to-peer architecture.

### Self-Management

![Self-Management Systems](images/Architecture_5.png)

- In self-management, systems use a feedback control loop that monitors system behaviors and adjusts system operations.
- **Assignment Note**: Useful for Unknown Assignment

## Processes

### IPC

- **Inter-Process Communication (IPC)**: Expensive b/c Context Switching

### Threads

- Typically, an operating system kernel support multi-threading through **lightweight processes (LWP)**.
- **Assignment Note**: Do Not Spawn Too Many Threads

### Multi-Threaded Servers

- **Dispatcher/Worker Design**: A dispatcher thread receives requests from the network and feeds them to a pool of worker threads.
- **Assignment Note**: Useful for Assignment 1 & Partition into Sequential Work and Parallel Work

### Hardware and Software Interfaces

![Hardware and Software Interfaces](images/Processes_1.png)

### Virtualization

![VMs](images/Processes_2.png)

- **Advantage**:
    - Portability
    - Live Migration of VMs
    - Replication for Availability/Fault Tolerance
- **Disadvantage**:
    - Performance

### Server Clusters

![Three Physical Tier](images/Processes_3.png)

- **Assignment Note**: Useful for Assignment 2

## Communication

### Layered Network Model

![Layered Network Model](images/Communication_1.png)

### Remote Procedure Calls

- **Remote Procedure Calls**: A transient communication abstraction implemented using a client-server protocol.
- **Client Stub**: Translate a RPC on the client.
- **Server Stub**: Translate a RPC on the server.

### Steps of a RPC

![Steps of a RPC](images/Communication_2.png)


1. The client process invokes the client stub using an ordinary procedure call.
2. The client stub builds a message and passes it to the client's OS.
3. The client's OS sends the message to the server's OS.
4. The server's OS delivers the message to the server stub.
5. The server stub unpacks the parameters and invokes the appropriate service handler in the server process.
6. The **service handler** does the work and returns the result to the server stub.
7. The server stub packs the result into a message and passes it to the server's OS.
8. The server's OS sends the message to the client's OS.
9. The client's OS delivers the message to the client stub.
10. The client stub unpacks the result and returns it to the client process.


- **Parameter Marshalling**: Packing Parameter $\to$ Message
    - Processor Architectures, Network Protocols, and VMs $\implies$ **Little-Endian** vs. **Big-Endian**
- **Number of System Calls**: 4
    1. Client Process $\to$ Client OS Socket
    2. Server OS Socket $\to$ Server Process
    3. Server Process $\to$ Server OS Socket
    4. Client OS Socket $\to$ Client Process

### Defining RPC Interfaces

- **Interface Definition Language (IDL)**: Specify RPC Signatures $\to$ Client/Server Stubs
    - High-Level Format
    - Parameter Ordering
    - Byte Sizes

### Synchronous vs. Asynchronous RPCs

- **Synchronous RPC**: The client blocks to wait for the return value.
- **Asynchronous RPC**: The client blocks to wait for the server acknowledgement of the receipt of the request.
- **One-Way RPC**: The client does not block to wait.

### Message Queuing Model

![Message Queue Interface](images/Communication_3.png)

- **Message Queue**: Alternative to RPCs
- **Persistent Communication**: Loose Coupling between Client/Server
    - *Advantage*: Resilient to Client/Server Hardware Failure
    - *Disadvantage*: Guaranteed Delivery = Impossible
- **Message-Oriented Middleware (MOM)**: Asynchronous Message Passing

### Process Coupling

- **Referential Coupling**: When one process explicitly references another.
    - *Positive Example*: RPC client connects to server using an IP address and a port number
    - *Negative Example*: Publisher inserts a news item into a pub-sub system without knowing which subscriber will read it.
- **Temporal Coupling**: Communicating processes must both be up and running.
    - *Positive Example*: A client cannot execute a RPC if the server is down.
    - *Negative Example*: A producer appends a job to a message queue today, and a consumer extracts the job tomorrow.

### RPC vs. MOM

#### RPC

- Used mostly for two-way communication, particularly where the client requires immediate response from the server.
- The middleware is linked into the client and the server processes.
- Tighter coupling means that server failure can prevent client from making progress.

#### MOM

- Used mostly for one-way communication where one party does not require an immediate response from another.
- The middleware is a separate component between the sender/publisher/producer and the receiver/subscriber/consumer.
- Looser coupling isolates one process from another which contributes to flexibility and scalability.

## Distributed File Systems

### Accessing Remote Files

![DFS Models](images/DFS_1.png)

- **Remote Access Model**
- **Upload/Download Model**

### Network File System (NFS)

![Overview of NFS](images/DFS_2.png)

- *Supports Client-Side Caching*
    - Modifications are flushed to the server when the client closes the file.
    - Consistency is implementation dependent.

![Authority Delegation](images/DFS_3.png)
    
- *Supports Authority Delegation*
    - A server can delegate authority to a client and recall it through a callback mechanism.
    
![Compound Procedure](images/DFS_4.png)
    
- *Supports Compound Procedures*
    - Multiple Round Trips to Single Round Trip

![Partial Exports](images/DFS_5.png)

- *Supports Partial Exports*

### Google File System (GFS)

![Google File System](images/DFS_6.png)

- **GFS**: A distributed file system that stripes files across inexpensive commodity servers without RAID.
    - *Layered Above Linux File System*
    - *Fault Tolerance Through Software*
- **GFS Master**: *Stores Metadata About Files/Chunks*
    - *Metadata Cache in Main Memory*
    - *Updated Log in Local Storage*
    - *Periodically Polls Client Servers for Consistency*
    
#### Reading a File

1. A client sends the file name and chunk index to the master.
2. The master responds with a contact address.
3. The client then pulls data directly from a chunk server, bypassing the master.

#### Updating a File

1. The client pushes its updates to the nearest chunk server holding the data.
2. The nearest chunk server pushes the update to the next closest chunk server holding the data, and so on.
3. When all replicas have received the data, the primary chunk server assigns a sequence number to the update operation and passes it on to the secondary chunk servers.
4. The primary replica informs the client that the update is complete.

### File Sharing Semantics

![File Sharing Semantics](images/DFS_7.png)

## Apache Hadoop MapReduce

### High-Level Architecture

![Hadoop High-Level Architecture](images/Hadoop_1.png)

- Transform lists of input data elements into lists of output data elements by applying *Mappers* and *Reducers*
    - *Immutable Data*
    - *No Communication*
    
#### Mapper

- A list of input data elements are iterated and individually transformed into zero or more output data elements.

#### Reducer

- A list of input data elements are iterated and individually aggregated into a single output data element.

#### Combiner

- An optional component that consumes the outputs of a mapper to produce a summary as the inputs for a reducer.

#### Terms

- **InputSplit**: A unit of work assigned to one map task.
    - Usually corresponds to a chunk of an input file.
    - Each record in a file belongs to exactly one input split and the framework takes care of dealing with record boundaries.
- **InputFormat**: Determines how the input files are parsed, and defines the input splits.
- **OutputFormat**: Determines how the output files are formatted.
- **RecordReader**: Reads data from an input split and creates key-value pairs for the mapper.
- **RecordWriter**: Writes key-value pairs to output files.
- **Partitioner**: Determines which partition a given key-value pair will go to.

### Data Flow

![MapReduce Data Flow 1](images/Hadoop_2.png)

![MapReduce Data Flow 2](images/Hadoop_3.png)

- **Shuffle**: The process of partitioning by reducer, sorting and copying data partitions from mappers to reducers.

### Fault Tolerance

- *Primarily: Restart Failed Tasks*
    1. Individual *TaskTrackers* periodically emit a heartbeat to the *JobTracker*.
    2. If a *TaskTracker* fails to emit a heartbeat to the *JobTracker*, the *JobTracker* assumes that the *TaskTracker* crashed.
    3. If the failed node was mapping, then other *TaskTrackers* will be asked to re-execute all the map tasks previously run by the failed *TaskTracker*.
    - *Must be Side-Effect Free*
    4. If the failed node was reducing, then other *TaskTrackers* will be asked to re-execute all reduce tasks that were in progress on the failed *TaskTracker*.
    - *Must be Side-Effect Free*
- *Secondarily: Speculative Execution*
    - If some **straggler** nodes rate limit the rest of the program, Hadoop will schedule redundant copies of remaining tasks across several nodes which do not have other work to perform.

### MapReduce Design Patterns

#### Counts and Summations

- A mapper can emit a tuple of an element and one for each element.
- A mapper can aggregate the counts for each element and emit a tuple of the element and its count.
- A combiner can aggregate the counts across all the elements processed by a mapper.

#### Selection

- A mapper can emit a tuple for each element that satisfies a predicate.

#### Projection

- A mapper can emit a tuple whose fields are a subset of each element.
- A reducer can eliminate duplicates.

#### Inverted Index

- A mapper can emit a tuple of a value and a key in that specific order.
- A reducer can aggregate all the keys for a distinct value.

#### Cross-Correlation

- **Problem**: Given a set of tuples of items, for each possible pair of items, calculate the number of tuples where these items co-occur.

##### Pairs Approach (Slow)

```
class Mapper
  method Map(void, items [i1, i2, ...])
    for all item i in [i1, i2, ...]
      for all item j in [i1, i2, ...] such that j > i
        Emit(pair [i, j], count 1)

class Reducer
  method Reduce(pair [i, j], counts [c1, c2, ...])
    s = sum([c1, c2, ...])
    Emit(pair [i, j], count s)
```

##### Stripes Approach (Fast)

```
class Mapper
  method Map(void, items [i1, i2, ...])
    for all item i in [i1, i2, ...]
      H = new AssociativeArray : item -> counter
      for all item j in [i1, i2, ...] such that j > i
        H{j} = H{j} + 1
      Emit(item i, stripe H)
      
class Reducer
  method Reduce(item i, stripes [H1, H2, ...])
    H = new AssociativeArray : item -> counter
    H = merge-sum([H1, H2, ...])
    for all item j in H.keys()
      Emit(pair [i, j], H{j})
```