# Introduction

To address options for portability of PERCEIVE Hadoop Ecosystem and apply big data technology for PERCEIVE project, this notebook aims to build an Apache Nifi + Kite sdk + Apache Spark pipeline, and show the potential value and usage of this data pipeline.

This article is consisted of three parts. For the first part, we will setup the environment and relevant components, and build dependency between each component. The second part will show a simple Nifi data flow from local and the web to HDFS that demonstrates several fundamental capabilities of Nifi.

The third part will address the advantages of Nifi and show why the pipeline can potentially contribute to PERCEIVE project in the future.

# Overview

Before moving to next part, here is a brief introduction about Apache Nifi and Kite sdk.

#### a. Apache NiFi:

Nifi is an easy to use, powerful, and reliable dataflow tool that enables the automation of data flow between systems. Some of the use cases include, but are not limited to:

- **Big Data Ingest**– Offers a simple, reliable and secure way to collect data streams.
- **IoAT Optimization**– Allows organizations to overcome real world constraints such as limited or expensive bandwidth while ensuring data quality and reliability.
- **Compliance**– Enables organizations to understand everything that happens to data in motion from its creation to its final resting place, which is particularly important for regulated industries that must retain and report on chain of custody.
- **Digital Security**– Helps organizations collect large volumes of data from many sources and prioritize which data is brought back for analysis first, a critical capability given the time sensitivity of identifying security breaches

Details of Nifi will be discussed later through the example of using Nifi and Kite.

![egauge_portfolio](img/nifi_intro.png)

#### b.  Kite sdk:

The Kite SDK is a high-level data API that makes it easier to put data into Hadoop and to work with data once it's loaded. With Kite, it will be much convenient to maintain datasets and relevant metadata through Avro schema. Currently, Kite can work with file formats, including CSV, JSON, Avro, and Parquet. With Kite sdk, you can define how your data is stored, including Hive, HDFS, local file system, HBase, Amazon S3, and compress data: Snappy (default), Deflate, Bzip2, and Lzo.

For fully leverage Kite sdk, the best way is to use Kite Dataset command line interface (CLI),which provides utility commands to perform essential tasks such as creating a schema and datset, importing data from a CSV file, and viewing the results.


The following use case example may help to understand how Kite works:

        Kite will handle how the data is stored. For example, if I wanted to store incoming CSV data into a Parquet formatted Hive table, I could use the Kite API to create a schema for my CSV data and then call the Kite API to create the Hive table for me. Kite also works with partitioned data and will automatically partition records when writing.[1]
        
Example of schema:
```
{
  "type":"record",
  "name":"Movie",
  "namespace":"org.kitesdk.examples.data",
  "fields":[
    {"name":"id","type":"int"},
    {"name":"title","type":"string"},
    {"name":"release_date","type":"string"},
    {"name":"imdb_url","type":"string"}
  ]
}
```

## 1. Setting up the environment

### 1.1 Installation
This tutorial was tested using the following environment and components:

(Since the installation for the above components can easily be found through google, we will simply put the link here)

- Ubuntu(64 bit) on VirualBox:
    - https://linus.nci.nih.gov/bdge/installUbuntu.html
- Apache Nifi 1.3.0:
    - For installation: https://www.youtube.com/watch?v=ZTXRm7tqQs4
    - User guide: https://nifi.apache.org/docs.html
    - Required jdk environment
- Apache Hadoop 2.8.1 + Apache Spark 2.2.0 + Scala 2.12.3:
    - Need to change the version: https://medium.com/@ivanermilov/setting-up-hadoop-spark-hive-development-environment-on-ubuntu-94f0f8166ef1
- Apache Kite sdk:
    - http://kitesdk.org/docs/1.1.0/Install-Kite.html
    - http://kitesdk.org/docs/1.0.0/Kite-SDK-Guide.html
- Apache Hive 2.3.0 (optional):
    - https://www.edureka.co/blog/apache-hive-installation-on-ubuntu

If following the above tutorials to install these environment and components, please pay attention for the version used in these tutorials. 

### 1.2 Building the dependency



#### a. Dependency between Nifi and Spark

"Apache Spark has become wildly popular for processing large quantities of data. One of the key features that Spark provides is the ability to process data in either a batch processing mode or a streaming mode with very little change to your code".[2] 

However, in many context, operating on the data as soon as it is available can provides great benefits. Therefore, with the dependency between Apache Nifi and Apache Spark, the Spark application can directly perform streaming analysis and process data from Nifi.

In [2] article, it introduces how to incorporate the Apache Nifi Receiver with the Spark application in Java. If your Spark application is built on Java and maintained by Maven, it is pretty easy to follow this article and add the receiver to the application's POM (Project Object Model).

However, since most analysis jobs in PERCEIVE are using Python language, it is still a problem to built dependency through Python.

#### b. Working with Kite sdk within Apache Nifi

Although Kite is designed to work within Kite CLI, Nifi provides the following kite processors that realize part of Kite sdk functions, making most jobs out of Kite CLI: 

|Type|Version|Tags|Description|
|--|-----|-----|-------------|
|ConvertAvroSchema |1.3.0|convert,kite,avro|Converts recrods from one Avro schema to another|
|ConvertCSVToAvro|1.3.0|csv,kite,avro|Converts CSV files to Avro according to an Avro Schema|
|ConvertJSONToAvro|1.3.0|json,kite,avro|Converts JSON files to Avro according to an Avro Schema|
|InferAvroSchema|1.3.0|schema, infer,csv,json,kite,avro|Examines the contents of the incoming FlowFile to infer an Avro schema|
|StoreInKiteDataset|1.3.0|hive,hdfs,hadoop,kite,hbase|Stores Avro records in a Kite dataset|

However, some critical jobs have to be done manually within Kite CLI, such as manually creating an Avro schema, updating a new Avro schema for current files, creating a hive table with parquet format, creating a partition strategy for datasets, and creating a directory in HDFS to store the Parquet data. 

## 2. Examples of Apache NiFi

Through a few examples, this section will demonstrate several fundamental capabilities of Nifi, how to work with NiFi and Kite sdk together, and how to configure several core processors of NiFi. These examples will accomplish the following tasks:

- Load files from local and web URL
- Filter files based on file format
- Add custom attributes using NiFi Expression Language
- Split files based on month
- Store files into different folders within HDFS based on file attribute
- Work with Kite sdk processors within NiFi

### 2.1 Load files from local to HDFS

The NiFi flow is shown below. At a very high-level, this dataflow will generate all files from local system and store into different month folders within HDFS. To realize dynamically configure the output path, we use NiFi Expression Language to add a custom attribute for each file based on file name. We will discuss the details and how to configure for each processor below.

![egauge_portfolio](img/nifi_1.png)

#### Step 1. GetFile processor

In my case, this processor reads all files under my input directory. Important configurations for this processor are:

|Configuration|Description|Value in the example|
|--|-----|-----|
|Input Directory|The input directory from which to pull files |/home/yueliu/Documents/Full Disclosure/2002|
|Keep source file|Whether to keep the original file after generating|false|
|File Filter|which can use regular expression to filter files|[^\\.].\* |

#### Step 2. RouteOnAttribute processor

In NiFi, each FlowFile has a minimum set of attribute: filename, path, uuid, entryDate, lineageStartDate, and fileSize. The detail explanation of these common attribute can be found here [4]. RouteOnAttribute processor will route FlowFile based on the attributes that it contains. In this case, this processor will route FlowFile to the success relationship if the file name is end with "csv", while route FlowFile to the unmatched relationship if the condition is not qualified.

|Configuration|Description|Value in the example|
|--|-----|-----|
|Routing Strategy|Specifies how to determine which relationship to use when evaluating the Expression Language |Route To Property name|
|fileFormat|The property I added that is used to evaluate file attribute |${filename:endsWith('csv')}|

User can add any number of property to generate appropriate FlowFiles.

#### Step 3. UpdateAttribute processor

UpdateAttribute processor is used to add or update any number of user-defined attributes to a FlowFile. It is useful for adding statically configured values, as well as deriving attribute values dynamically by using the Expression Language. 

In this example, we use two UpdateAttribute processors to add a custom attribute to represent the month information for txt and csv FlowFile based on their file names. To better understand the NiFi Expression Language used below, one txt file name is '2002_Aug_1.txt', while one csv file name is 'Full_Disclosure_Mainling_List_Aug2002.csv'. More details about NiFi Expression Language can be found here [5].

Add_Month_Attribute_For_CSV processor:

|Configuration|Description|Value in the example|
|--|-----|-----|
|month|we add this attribute for each FlowFile based on NiFi Expression Language|${filename:getDelimitedField(5,'_'):substring(0,3)}|

Configuration of the connection between RouteOnAttribute and Add_Month_Attribute_For_CSV: fileFormat relationship should be checked


Add_Month_Attribute_For_TXT processor:

|Configuration|Description|Value in the example|
|--|-----|-----|
|month|we add this attribute for each FlowFile based on NiFi Expression Language|${filename:getDelimitedField(2,'_')}|

Configuration of the connection between RouteOnAttribute and Add_Month_Attribute_For_TXT: unmatched relationship should be checked


#### Step 4. PutHDFS processor:

This processor will write FlowFile to Hadoop Distributed File System (hdfs). Given the custom attribute "month" we have created before for each FlowFile, we can use this attribute as a dynamic path for each FlowFile, thus organizing all FlowFiles within month folders in HDFS. 

|Configuration|Description|Value in the example|
|--|-----|-----|
|Hadoop Configuration Resources|A file or comma separated list of files which contains the Hadoop file system configuration. We usually put the path of core-site.xml file here|/home/yueliu/Documents/hadoop-2.8.1/etc/hadoop/core-site.xml|
|Directory|The parent HDFS directory to which files should be written. The directory will be created if it does not exist.|/sample/${month}|

#### Step 5. Check within HDFS

Use the following code in terminal to check the result of the above NiFi dataflow:
```
hadoop fs -ls /sample/
```

### 2.2 Load xml file from URL to HDFS

The NiFi flow is shown below. At a very high-level, this dataflow will a XML file from a URL and store into HDFS. In this example, we add a custom attribute to represent the file format to help you understand how attribute works in NiFi. We will discuss the details and how to configure for each processor below.

![egauge_portfolio](img/nifi_2.png)

#### Step 1. GetHTTP processor

GetHTTP processor can fetches data from an HTTP URL and write the data to the content of a FlowFile. In this example, we pull a XML file from the link below. 

|Configuration|Description|Value in the example|
|--|-----|-----|
|URL|The URL to pull from |http://capec.mitre.org/data/xml/capec_v2.11.xml|

#### Step 2. UpdateAttribute processor

In this example, we use the Expression Language to retrieve format information from FlowFile name and assign to a custom attribute named "fileFormat". 


|Configuration|Description|Value in the example|
|--|-----|-----|
|fileFormat|we add this attribute for each FlowFile based on NiFi Expression Language|${filename:substringAfterLast('.')}|



#### Step 3. PutHDFS processor:

This processor will write FlowFile to Hadoop Distributed File System (hdfs). Given the custom attribute "fileFormat" we have created before for each FlowFile, we can use this attribute as an output path for each FlowFile in HDFS.

|Configuration|Description|Value in the example|
|--|-----|-----|
|Hadoop Configuration Resources|A file or comma separated list of files which contains the Hadoop file system configuration. We usually put the path of core-site.xml file here|/home/yueliu/Documents/hadoop-2.8.1/etc/hadoop/core-site.xml|
|Directory|The parent HDFS directory to which files should be written. The directory will be created if it does not exist.|/sample/${fileFormat}|

#### Step 4. Check within HDFS


Use the following code in terminal to check the result of the above NiFi dataflow:
```
hadoop fs -ls /sample/
```
Here is the result of the above two NiFi dataflows:
![egauge_portfolio](img/nifi_result.png)

## 3. Advantages of Apache NiFi

Through answering the following questions, we will address the advantages of NiFi and show why the pipeline can potentially contribute to PERCEIVE project in the future:

- **Is the raw data still there?**

    It depends on the configuration of processors. For example, user can define the contribution of GetFile processor and set "false" for "Keep source file" property, thus keeping the raw data within the input directory. 


- **Is the processed data now sitting in HDFS processed?**
   
   Yes. Through the above screenshot of HDFS, all processed data are sitting within month subfolders in HDFS. 
    
    
- **How to add multiple files?**
   
   If all files are stored in one directory, it only needs one Get processor to retrieve these files. If files are stored separately or stored in different systems, it might need multiple Get processors to generate files and then process. 
    
    
- **If we do have multiple files, are we running in parallel?**

    In GetFile processor, there is a property named "batch size" that let user to define the maximum number of files to pull in each iteration. For multiple Get processors, they will be running in parallel if user run them at the same time. For example, in the first picture, GetFile processor and GetHTTP processor are running at the same time if we run them together.    
    
    
- **What do I need to do to run NiFi?**
    
    NiFi only needs Java environment to run.
    
    
- **What is going on after I click to execute the NiFi Pipeline?**

    From the high level, FlowFile will be passed through multiple processors, routed to different flows based on the configuration of processors, analyzed/processed, and then sent to local systems or other big data products. 
    
    
- **NiFi best selling point is data provenance. What can we gain by using that versus just doing everything with NiFi? What are the pros? What are the cons? Is it worthwhile the overhead of a new project?**

    "NiFi keeps a very granular level of detail about each piece of data that it ingests. As the data is processed through the system and is transformed, routed, split, aggregated, and distributed to other endpoints, this information is all stored within NiFi’s Provenance Repository"". Through "Data Provenance" from the Global Menu, there is a table to list the provenance events that track each FlowFile. Also,as shown below, NiFi can track how one specific FlowFile is processed through data flow.
    
    ![egauge_portfolio](img/nifi_dataprovenance_1.png)
    
    ![egauge_portfolio](img/nifi_dataprovenance_2.png)
    
    As introduced at the beginning, NiFi is a dataflow tool that is compatible with multiple big data products and cloud systems. Therefore, through using NiFi, it can easily represent the big picture of how data are processed through complex analysis, process, split, aggregation, interaction, and distribution with other products. Therefore, it is much easy to manage and make a change for the overall processing. In other words, the big advantage of NiFi is that the analysis procedures are visualized and thus can be used for multiple times. Therefore, NiFi is more suitable for the projects that have streaming data. If the set of processing procedures within NiFi is used for only one time, it may waste more time on building NiFi processors 

## Reference 
[1] HDF/NiFi to convert row-formatted text files to columnar Parquet and ORC: https://community.hortonworks.com/articles/70257/hdfnifi-to-convert-row-formatted-text-files-to-col.html

[2] Stream Processing: NiFi and Spark https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark

[3]NiFi data provence: https://blogs.apache.org/nifi/entry/basic_dataflow_design

[4] Nifi common attribute: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_getting-started-with-apache-nifi/content/common-attributes.html

[5] NiFi Expression Language and function: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_getting-started-with-apache-nifi/content/ExpressionLanguage.html

[6] Converting CSV To Avro with Apache NiFi: https://community.hortonworks.com/articles/28341/converting-csv-to-avro-with-apache-nifi.html

[7] Stream data into HIVE like a Boss using NiFi HiveStreaming - Olympics 1896-2008: https://community.hortonworks.com/articles/52856/stream-data-into-hive-like-a-king-using-nifi.html

[8] Using NiFi to ingest and transform RSS feeds to HDFS using an external config file: https://community.hortonworks.com/articles/48816/nifi-to-ingest-and-transform-rss-feeds-to-hdfs-usi.html