Frameworks for ML scaling and production
----

# A simple API with Flask and Heroku

Create and pickle a model as `model.pkl`, then just create an app that accepts `POST` requests to the root path:

```python
    import pandas as pd
    from flask import Flask, jsonify, request
    import pickle

    # load model
    model = pickle.load(open('model.pkl','rb'))

    # app
    app = Flask(__name__)

    # routes
    @app.route('/', methods=['POST'])

    def predict():
        # get data
        data = request.get_json(force=True)

        # convert data into dataframe
        data.update((x, [y]) for x, y in data.items())
        data_df = pd.DataFrame.from_dict(data)

        # predictions
        result = model.predict(data_df)

        # send back to browser
        output = {'results': int(result[0])}

        # return data
        return jsonify(results=output)

    if __name__ == '__main__':
        app.run(port = 5000, debug=True)
```

Save the required packages to `requirements.txt`, where each line is of the format `package==version`. If using a clean environment, this can be done with:
>`pip freeze > requirements.txt`

To deploy to Heroku, create `Procfile` with the following contents:
>`web: gunicorn app:app

From within the Heroku web interface, Github repos can be deployed with a few clicks.

# Hadoop

https://hadoop.apache.org/

## Introduction and Use Case

Hadoop is an open-source distributed data management system. It combines tools to store, analyze, and process large-scale pools of data on clusters of servers, without requiring specialized hardware. The "vanilla" version maintained by the Apache Foundation is quite intricate and not entirely stable, so there are many commercial distributions offered by third parties (such as Cloudera, Hortonworks). The major cloud services ([Google](https://cloud.google.com/dataproc?hl=en), Amazon, Microsoft) can also host Hadoop, either with their own out-of-the-box solutions or provided by commercial distributions.

This [table](https://hadoopecosystemtable.github.io/) summarizes libraries and applications within the Hadoop "ecosystem," including those produced by Apache itself and many others.

Cloud data systems like Hadoop represent an alternative to relational databases in order to provide greater scalability and speed at large scales. It is often said that databases can optimize on 2 of 3 goals (CAP): consistency, availability, and partitioning (i.e. scalability). SQL priortizes C and A, while Hadoop prioritizes A and P. Because it lacks the transaction control of relational databases, it is better suited to "behavioral" rather than "line of business" data (such as customer accounts, supply chains, etc). Behavioral data is collected *in aggregate* as side-effect of user activity. Rather than being tracked and queried on the level of individuals, this data is primarily useful for the general patterns than can be seen in it -- hence it is acceptable to deprioritize consistency in a way that would not be workable for business-critical data.


## Alternatives for Running Hadoop

1. Apache Hadoop open source versus vendor services
1. Docker images versus virtual machines
1. Local file system, pseudo-distributed, fully distributed on own servers, versus on the cloud
1. Versioning: Apache Hadoop updates frequently, and there are incompatbilities with some versions. MapReduce in particular went through a major 1.0 to 2.0 transition.

## Elements of the Hadoop Ecosystem

### Hadoop File System (HDFS)

Developed out of a system published by Google, HDFS promises scalability on "commodity" hardware. By default, it employs 3x data redundancy and enables larger chunk sizes than other formats. It is also possible to use the native file systems of cloud services.

HDFS is immutable: any operations on data are saved as new files in the system rather than altering existing data. This includes re-executing operations: by default this will generate new outputs files every time instead of overwriting.

The HDFS command-line interface syntax is `hadoop fs -command` (or sometimes `dfs`) where `command` shares many Linux shell commands like `cat`, `mkdir`, `ls` etc plus distinctive commands like `put` and `get` to moves file betweens HDFS and other storage (local/cloud). HDFS locations are written as urls `hdfs://...`

### MapReduce

The distributed processing framework for Hadoop. Implemented in Java, MapReduce processes (and anything else executing on a Hadoop server) are executed in Java virtual machines (JVM). Each process is a distinct VM that does not share state. The quirk this introduces is that although the syntax is object-oriented (being Java, everything is a class, in this case Static classes), the paradigm is much closer to functional programming, as each process can only take in data and output results without being able to reference the results of other parallel instances.

There are now also APIs for languages more commonly used in data science like Python and R, as well as interfaces for other systems programming languages like C# and C++.

The basic unit of a MapReduce routine is the **Job**, which is instanciated to carry out Map and Reduce operations on data. The **Map** functionality applies some set of operations *on each node* in the Hadoop cluster. It returns a set of key/value pairs. The **Reduce** functionality aggregates key/value pairs (on some subset of nodes) and returns a combined list, which is stored as a new file in the system. In between these two steps, the data (duplicated across nodes) is "shuffled and sorted" to processing nodes. For efficiency, it is possible to do a preliminary **Combine** stage on the original node, so as to increase the density of data that needs to be transferred across nodes for sorting and later reduction.

So, for example, a basic word count operation -- producing a list of the unique words in a text and their corresponding counts -- the map function would turn the text into a list of words (each with 1 instance) and the reduce function would take look at each word and sum up the instances.

It is considered good practice to subdivide tasks so that each routine performs only a single operation, and more complex operations are the result of chains of jobs. Pre-processing, for example, can be run as a "map only" job.

Jobs are run by submitting them to the scheduler: this takes the form of indicating a `.jar` file and the class name to run as main, plus needed arguments like source and output locations. From the command line, the syntax is `hadoop jar filename.jar input output`.

MapReduce 1.0 was limited because it could only process in batch and was not easy to customize. The 2.0 update allows more "on-time" operations and more intricate controls of how operations are carried out.

### YARN

Yet Another Resource ____: an abstraction layer added along with MapReduce 2.0 that allows a wider range of data processing on top of HDFS.


### Apache Spark

An alternative for distributed data processing engine, which primarily operates in memory. See the notes on PySpark below.

### HBase

A wide-column, schema-on-read (NoSQL) database format that acts as a relatively accessible front-end to data stored in a Hadoop cluster.

### Hive

A query language interface for HBase, which acts as a MapReduce front-end, also known as HQL or H-SQL. The syntax is similar to SQL, but backend is fundamentally different. For one thing because it is a front-end for MapReduce (via often HBase), it is executing batch jobs on the cluster, which can take substantial time.

`CREATE TABLE` commands pull requested fields from data into a wide table, then `SELECT...WHERE` commands can pull out specific records. NB, since the underlying data is not relational, `JOIN` statements are often impractical.

### Pig

A scripting tool for Hadoop, used especially for data input and cleaning (ETL: extract, transform, load). Its native language is called Pig Latin.

### Oozie

A workflow manager used to coordinate scripts from multiple libraries. Jobs are scripted using XML, so commercial GUIs are often used in practice.

### Sqoop

Command-line utility for transferring data between relation databases and Hadoop clusters. 

### ZooKeeper

Centralized service for Hadoop configuration information, to create ensembles of programs. It performs computation in-memory for more real-time operations.

# PySpark


## Introduction

PySpark is the Python API for Apache Spark. From the website:
> Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python, and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

The Spark API is used to defined a graph of operations to be performed in parallel on a large, distributed dataset. The framework will try to optimize this for maximum parallelized efficiency. Accordingly, the methods of the Spark API are lazily evaluated. 

The connection with a Spark cluster through PySpark is managed by instances of the `SparkContext` class.

## The spark data structure

Spark's core data structure is the Resilient Distributed Dataset (RDD). This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster. However, RDDs are hard to work with directly, so Spark provides a DataFrame abstraction built on top of RDDs.

The Spark DataFrame was designed to behave a lot like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, DataFrames are also more optimized for complicated operations than RDDs. They flatten out the nested complexity of the RDD structure, but unlike reading the data out directly into a Python object, they keep the operations within the Spark API.

When you start modifying and combining columns and rows of data, there are many ways to arrive at the same result, but some often take much longer than others. When using RDDs, it's up to the data scientist to figure out the right way to optimize the query, but the DataFrame implementation has much of this optimization built in.

### Sessions

Within PySpark, the interface with data is encapsulated in `pyspark.sql.SparkSession` objects. To prevent conflicting coexisting sessions, the class method `builder.getOrCreate()` returns an existing session if it exists and only opens a new one if not.

Direct interface with the database is held in the session atribute `catalog` (which returns a `Catalog` instance). E.g. `session.catalog.listTables()` will list the available tables in the current database. The `table("name")` method returns a **PySpark DataFrame** of the requested table from the catalog.

Alternatively, SQL queries can be made with `session.sql()`. Note that to print the contents, call the `show()` method (i.e. printing does not work). It can be converted to a Pandas DataFrame with the `toPandas()` method. Conversely, to create a Spark DataFrame from Pandas, use `session.createDataFrame(df)`. Or data can be read directly from csv with `session.read.csv(file_name, header=True)`. 

A DataFrame exists only in local memory. To add it to the database, use the DataFrame's `createTempView('name')` or `createOrReplaceTempView('name')` method (NB the former will throw an exception if a view with the given name already exists). 

### Manipulating Data

PySpark DataFrames are immutable, so any mutating operation is actually creating a copy, which can then be assigned over the previous variable name.

The columns of the DataFrame are accessible as attributes or by indexing. These are Column objects, and they have overloaded operators as with Pandas Series. An alias for display can be defined for a Column object with the `alias()` method (used for tables produced by `select()`). Note that Columns appear to be lazily evaluated, so e.g. exceptions based on types are only raised when a column is joined to a DataFrame. Moreover, Columns created by operations on Columns are linked to specific names and IDs, so they can be used with `select()` or `withColumn()` only if the calling DataFrame has a matching column (NB the ID is changed by column overwriting).

To create a new DataFrame with an added or transformed column: `df.withColumn("column_name", column)`, where `column` is a Column object. Rename a column with the `withColumnedRenamed('oldName', 'newName')`.

Column data types can be changed with the Column object's `cast('type')` method.

DataFrames can be filtered using the `filter()` method, which accepts either a query string (akin to what follows a `WHERE` SQL) or a boolean array (eg. `df.col > 0`). Similarly, the `select()` method returns the a DataFrame with columns specified as positional arguments, which can be either the column names as strings or as Column objects, allowing transformed columns. To use SQL syntax to transform columns, use `selectExpr()`, where each positional argument is a SQL-style column identifier (i.e. something separated by a comma in a `SELECT` statement).

Operations like min, max, and count can be performed in `selectExpr()` or as methods of a `GroupedData` object. To do the latter, it is necessary to call `df.groupBy()`, even with no argument (thus observation is a "group"). Functions on columns are available in the `pyspark.sql.functions` module, e.g. `functions.stddev('colname')`, returning a Column that can be passed to the `agg()` method of a GroupedData object. NB, the aggregation methods return DataFrames not Columns, so need to use functions instead inside `agg()`. Note also the lazy evaluation of the function: the column name is only resolved when the Column is passed to `agg()`.

Joins can be done with the method `join(other, on, how)`.

## Modeling

Machine Learning models are implemented in the `pyspark.ml` module. Different models have different APIs:
1. Transformer models, which have a `transform()` method, which takes and returns a DataFrame, performing transformations on the column(s) identified in the constructor with `inputCol=''` or `inputCols=[]` and creating `outputCol` or `outputCols`.
1. Estimator models that perform fitted transformation. The constructor takes `inputCol` (etc), and the object's `fit()` method takes a DataFrame and returns a Transformer. 
1. Predictor models that carry out machine learning. The constructor takes a *single* `featureCol` (a feature vector, which can be created with the `VectorAssembler()` transformer) and `labelCol`. Note that `featureCol` defaults to `'features'`. The `fit()` method takes a DataFrame and returns a Transformer. NB these objects do have a `predict()` method, but it takes a single feature vector.

For example, to apply one-hot encoding to string values:
```python
    from pyspark.ml.feature import StringIndexer, OneHotEncoder
    indexer = StringIndexer(inputCol = 'string_column', outputCol = 'cat_column')
    fit_indexer = indexer.fit(df)
    indexed_df = fit_indexer.transform(df)
    encoder = OneHoteEncoder(intputCol = 'cat_column', outpotCol = 'encoded_col')
    fit_encoder = encoder.fit(indexed_df)
    encoded_df = fit_encoder.transform(indexed_df)
```
Note that this does not precisely generate one column per value but instead creates a column containing a tuple that can be interpreted by the models.

Estimators and transformers can be combined into a `pyspark.ml.Pipeline(stages=[])`, where the parameter is a list of model objects.

### Model evaluation and tuning

Train-test split can be created with the DataFrame's `randomSplit()` method, which takes a list of $n$ proportions and returns a tuple of $n$ DataFrames.

Predictor models have built-in `evaluate()` methods (though it's not clear from the docs what the metric is), but custom evaluations are defined in the `ml.evaluate` module. 

To tune a logistic regression model with 5-fold cross-validation:
```python
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    
    # Create the parameter grid
    grid = tune.ParamGridBuilder()
    grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
    grid = grid.build()

    evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')

    lr = LogisticRegression()

    cv = tune.CrossValidator(
        estimator=lr,
        estimatorParamMaps=grid,
        evaluator=evaluator
    )

    cv_results = cv.fit(train)
    best_lr = cv_results.bestModel
    
    best_predictions = best_lr.transform(test)
    print(evaluator.evaluate(best_predictions))
```
I'm not sure how well this would work with a pipeline, since it is not clear how to access underlying parameters within a pipeline. The DataCamp course recommended doing all transformations in a pipeline, then splitting the data, then doing cross-validation on the training data, but this would contaminate the fit. The Cloudera presentation splits beforehand and applies the 






# Containerization

In order to reproduce the environment for applications, it is often helpful to encapsulate them through virtualization. This can be done with various virtual environment tool, but *containers* add an additional step of portability. **Docker** is one popular containerzation tool, providing funtionality to reproduce and run containers, as well as hosting DockerHub as a repository for Docker images.

## Docker

It is important to distinguish between:
- **Dockerfile**: A Dockerfile is a text file that specifies how an image will be created.
- **Docker Images**: Images are created by building a Dockerfile.
- **Docker Containers**: Docker containers is the running instance of an image.

### The Dockerfile

```
+------------+-----------------------------------------------------+
| Command    | Description                                         |
+------------+-----------------------------------------------------+
| FROM       | The base Docker image for the Dockerfile.           |
| LABEL      | Key-value pair for specifying image metadata.       |
| RUN        | It execute commands on top of the current image as  |
|              new layers.                                         |
| COPY       | Copies files from the local machine to the          |
|              container filesystem.                               |
| EXPOSE     | Exposes runtime ports for the Docker container.     |
| CMD        | Specifies the command to execute when running the   |   
|              container. This command is overridden if another    |   
|              command is specified at runtime.                    |
| ENTRYPOINT | Specifies the command to execute when running the   |      
|              container. Entrypoint commands are not overridden   |
|              by a command specified at runtime.                  |
| WORKDIR    | Set working directory of the container.             |
| VOLUME     | Mount a volume from the local machine filesystem to | 
|              the Docker container.                               |
| ARG        | Set Environment variable as a key-value pair when   |              
|              building the image.                                 |
| ENV        | Set Environment variable as a key-value pair that   | 
|              will be available in the container after building.  |
+------------+-----------------------------------------------------+
```

### Docker Images

The command `docker build -t <image-name>` builds an image from the `Dockerfile` in the current directory. Docker keeps a records of local images, which can be managed with commands:
```
+---------------------------------+--------------------------------+
| Command                         | Description                    |
+---------------------------------+--------------------------------+
| docker images                   | List all images on the         |   
|                                   machine.                       |
| docker rmi [IMAGE_NAME]         | Remove the image with name     | 
|                                   IMAGE_NAME on the machine.     |
| docker rmi $(docker images -q)  | Remove all images from the     | 
|                                   machine.                       |
+------------+-----------------------------------------------------+
```

### Running Containers

The syntax for running an container from an image is as follows:
```bash
docker run [-d -it --rm --name <CONTAINER_NAME> -p <host:container>] <IMAGE_NAME>
```

- `-d`: run the container in detached mode. This mode runs the container in the background.
- `-it`: run in interactive mode, with a terminal session attached.
- `--rm`: remove the container when it exits.
- `--name`: specify a name for the container.
- `-p`: port forwarding from host to the container (i.e. host: container).

```
+-------------------------------+----------------------------------+
| Command                       | Description                      |
+-------------------------------+----------------------------------+
| docker ps                     | List all containers. Append -a   |
|                                 to also list containers not      | 
|                                 running.                         |
| docker stop [CONTAINER_ID]    | Gracefully stop the container    |                            
|                                 with [CONTAINER_ID] on the       |   
|                                 machine.                         |
| docker kill [CONTAINER_ID]     | Forcefully stop the container    |
|                                 with [CONTAINER_ID] on the       |                      
|                                 machine.                         |
| docker rm [CONTAINER_ID]      | Remove the container with        |   
|                                 [CONTAINER_ID] from the machine. |
| docker rm $(docker ps -a -q)  | Remove all containers from the   | 
|                                 machine.                         |
+------------+-----------------------------------------------------+
```

## Examples

### A simple script

We create a simple script `date-script.sh`:
```bash
#! /bin/sh
    DATE="$(date)"
    echo "Todays date is $DATE"
```

And a `Dockerfile`:
```bash
    # base image for building container
    FROM docker.io/alpine
    # add maintainer label
    LABEL maintainer="mark.simon.cohen@gmail.com"
    # copy script from local machine to container filesystem
    COPY date-script.sh /date-script.sh
    # execute script
    CMD sh date-script.sh
```

The Docker image will be built-off the Alpine Linux package. See https://hub.docker.com/_/alpine

`docker build -t simple` followed by `docker run simple` will print the date.

### Serve a Webpage on an nginx Web Server with Docker

Create an `index.html` file, and then a `Dockerfile`:
```bash
    # base image for building container
    FROM docker.io/nginx
    # add maintainer label
    LABEL maintainer="mark.simon.cohen@gmail.com"
    # copy html file from local machine to container filesystem
    COPY html/index.html /usr/share/nginx/html
    # port to expose to the container
    EXPOSE 80
```

Note that 80 is the default port for receiving html requests. So, as a Web server, this container will be listening on port 80.

Now, `docker build -t nginx-server` and:
```
    docker run -d -it -p 8081:80 nginx-server
```

Two points: this runs in the background, and instructs Docker to capture local port 8081 and forward it to port 80 inside the container. Docker will print the full local port to access it, and navigating a browser to that will bring up the index page. 