# Requirements and environment setup
* Docker
## Standing up Jupyter + Spark environment
### I'm using the jupyter-all-spark Docker image to spin up both Jupyter and Spark. I'm using volume re-direction in Docker to allow for Notebook persistence outside of the Docker container.
```bash
docker run \
-d \
--restart=always \
--name jupyter-all-spark \
-p 8888:8888 -v /home/jasdav02/Jupyter:/home/jovyan/work \
jupyter/all-spark-notebook
```
### Once running, you'll need to fetch the authentication token
```bash
docker exec \
-it jupyter-all-spark \
jupyter notebook list
```
```
Currently running servers:
http://0.0.0.0:8888/?token=5a3cdff4f94f2b333774a49b763acc1b27e7bdeae1d4dc43 :: /home/jovyan
```
### With token in hand, point your browser to `http://localhost:8888`

# Reading in data stored in ElasticSearch into Spark

In [2]:
import pyspark
import os
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

import datetime
from datetime import timedelta, date

# Download `Elasticsearch Hadoop` library provided by Elastic.co

In [2]:
!rm -f /home/jovyan/elasticsearch-hadoop-7.4.2.zip; \
rm -rf /home/jovyan/elasticsearch-hadoop-7.4.2; \
cd /home/jovyan; \
wget https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-7.4.2.zip; \
unzip elasticsearch-hadoop-7.4.2.zip; \
rm -f /home/jovyan/elasticsearch-hadoop-7.4.2.zip

--2020-01-28 18:00:16--  https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-7.4.2.zip
Resolving artifacts.elastic.co (artifacts.elastic.co)... 151.101.130.222, 151.101.66.222, 151.101.194.222, ...
Connecting to artifacts.elastic.co (artifacts.elastic.co)|151.101.130.222|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8694734 (8.3M) [application/zip]
Saving to: ‘elasticsearch-hadoop-7.4.2.zip’


2020-01-28 18:00:19 (3.59 MB/s) - ‘elasticsearch-hadoop-7.4.2.zip’ saved [8694734/8694734]

Archive:  elasticsearch-hadoop-7.4.2.zip
   creating: elasticsearch-hadoop-7.4.2/
  inflating: elasticsearch-hadoop-7.4.2/NOTICE.txt  
  inflating: elasticsearch-hadoop-7.4.2/README.md  
  inflating: elasticsearch-hadoop-7.4.2/LICENSE.txt  
   creating: elasticsearch-hadoop-7.4.2/dist/
  inflating: elasticsearch-hadoop-7.4.2/dist/elasticsearch-hadoop-7.4.2.jar  
  inflating: elasticsearch-hadoop-7.4.2/dist/elasticsearch-hadoop-7.4.2-javadoc.jar  
  i

# Setup Spark Environment incorporating the library we downloaded earlier

In [3]:
#Change username to root to connect successfully to HDFS
os.environ["HADOOP_USER_NAME"] = "root"
#Give this Spark instance a name
jobName = "Read_Diskover"
#We need to add ES-Hadoop to the dependency party
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=5051 --jars /home/jovyan/elasticsearch-hadoop-7.4.2/dist/elasticsearch-spark-20_2.11-7.4.2.jar pyspark-shell'
os.environ["SPARK_HOME"] = "/usr/local/spark"

In [5]:
spark = SparkSession.builder \
       .appName(jobName) \
       .master("local[*]") \
       .config('spark.executor.memory', '16G')\
       .config('spark.driver.memory', '4G')\
       .config('spark.driver.maxResultSize', '4G')\
       .getOrCreate()

In [5]:
spark

# Elasticsearch cluster configuration. This is reading from a single node instance of Elasticsearch 7.x

In [6]:
es_reader = (spark.read
    .format("org.elasticsearch.spark.sql")
    .option("inferSchema", "true")
    .option("es.read.field.as.array.include", "tags")
    .option("es.nodes","elasticsearch.local:9200")
    .option("es.nodes.wan.only", "true"))

# Let's get a list of scanned file systems as provided by `Diskover`.

In [15]:
!curl -s -XGET 'http://elasticsearch.local:9200/_cat/indices?v&pretty' | grep home-jasdav02

green  open   diskover-home-jasdav02                                  aCO6Aia0Q8eTND7ziXCUog   1   0      25617         3007      9.2mb          9.2mb


# Let's load one of the indexes in as a Spark Dataframe

In [8]:
df = es_reader.load("diskover-home-jasdav02")

# Enable pretty output of Spark Dataframes

In [9]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

# Show the first 20 lines in the Dataframe

In [10]:
df

available,bulk_time,change_percent_filesize,change_percent_items,change_percent_items_files,change_percent_items_subdirs,costpergb,crawl_time,dir_count,dupe_md5,extension,file_count,filehash,filename,filesize,free,group,hardlinks,indexing_date,inode,items,items_files,items_subdirs,last_access,last_change,last_modified,owner,path,path_parent,state,tag,tag_custom1,tag_custom2,tag_custom3,total,type,used,worker_name
,,,,,,0.0,,,,rb,,21a90e297781c6306...,indirect_immediat...,1576,,jasdav02,1,2019-12-11 17:04:...,97627154,,,,2019-05-01 20:56:31,2018-09-26 19:20:21,2015-12-07 18:00:39,jasdav02,,/home/jasdav02/LS...,,,,,,,file,,hpc-admin0.9886
,,,,,,0.0,,,,rb,,77b7566d48ea39460...,stack_event_colle...,1189,,jasdav02,1,2019-12-11 17:04:...,85029547,,,,2019-05-01 20:56:20,2018-09-26 19:20:17,2015-12-07 18:00:25,jasdav02,,/home/jasdav02/LS...,,,,,,,file,,hpc-admin0.9868
,,,,,,0.0,,,,rb,,ba37c65f5814b701c...,java_thread_pool_...,4251,,jasdav02,1,2019-12-11 17:04:...,97627156,,,,2019-05-01 20:56:31,2018-09-26 19:20:21,2015-12-07 18:00:39,jasdav02,,/home/jasdav02/LS...,,,,,,,file,,hpc-admin0.9886
,,,,,,0.0,,,,rb,,f605d95a628410141...,stack_options.rb,2186,,jasdav02,1,2019-12-11 17:04:...,85029548,,,,2019-05-01 20:56:20,2018-09-26 19:20:17,2015-12-07 18:00:25,jasdav02,,/home/jasdav02/LS...,,,,,,,file,,hpc-admin0.9868
,,,,,,0.0,,,,rb,,06be9cfb84ca6e4ed...,ruby_thread_pool_...,10052,,jasdav02,1,2019-12-11 17:04:...,83761260,,,,2019-05-01 20:56:31,2018-09-26 19:20:21,2015-12-07 18:00:39,jasdav02,,/home/jasdav02/LS...,,,,,,,file,,hpc-admin0.9886
,,,,,,0.0,,,,h,,76f52c9d1f3cd53aa...,packet-sprt.h,1424,,docker,1,2019-12-11 17:04:...,77778711,,,,2019-05-01 20:56:54,2015-07-15 21:29:25,2015-06-17 16:19:58,1001,,/home/jasdav02/wi...,,,,,,,file,,hpc-admin0.10015
,,,,,,0.0,,,,rb,,e0a6d72098506eba3...,core_refinements_...,20062,,jasdav02,1,2019-12-11 17:04:...,83933278,,,,2019-05-01 20:59:24,2018-09-26 19:20:36,2015-12-07 18:03:14,jasdav02,,/home/jasdav02/LS...,,,,,,,file,,hpc-admin0.9896
,,,,,,0.0,,,,rb,,5f8a8cac9115eace9...,stack_resource_su...,1912,,jasdav02,1,2019-12-11 17:04:...,80709795,,,,2019-05-01 20:56:20,2018-09-26 19:20:17,2015-12-07 18:00:25,jasdav02,,/home/jasdav02/LS...,,,,,,,file,,hpc-admin0.9868
,,,,,,0.0,,,,c,,6a4377fa39040897e...,packet-tnef.c,31301,,docker,1,2019-12-11 17:04:...,77778714,,,,2019-05-01 20:56:54,2015-07-15 21:29:25,2015-06-17 16:19:58,1001,,/home/jasdav02/wi...,,,,,,,file,,hpc-admin0.10015
,,,,,,0.0,,,,rb,,f6b4c7fbf9dae712a...,delay_add_associa...,1959,,jasdav02,1,2019-12-11 17:04:...,78146582,,,,2019-05-01 20:59:24,2018-09-26 19:20:36,2015-12-07 18:03:14,jasdav02,,/home/jasdav02/LS...,,,,,,,file,,hpc-admin0.9896


# Show schema

In [11]:
df.printSchema()

root
 |-- available: long (nullable = true)
 |-- bulk_time: float (nullable = true)
 |-- change_percent_filesize: float (nullable = true)
 |-- change_percent_items: float (nullable = true)
 |-- change_percent_items_files: float (nullable = true)
 |-- change_percent_items_subdirs: float (nullable = true)
 |-- costpergb: double (nullable = true)
 |-- crawl_time: float (nullable = true)
 |-- dir_count: integer (nullable = true)
 |-- dupe_md5: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- file_count: integer (nullable = true)
 |-- filehash: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- filesize: long (nullable = true)
 |-- free: long (nullable = true)
 |-- group: string (nullable = true)
 |-- hardlinks: integer (nullable = true)
 |-- indexing_date: timestamp (nullable = true)
 |-- inode: string (nullable = true)
 |-- items: long (nullable = true)
 |-- items_files: long (nullable = true)
 |-- items_subdirs: long (nullable = true)
 |-- last_ac

# Select only fields of interest and display

In [12]:
df.select('indexing_date','path_parent','filename','filesize','extension')

indexing_date,path_parent,filename,filesize,extension
2019-12-11 17:04:...,/home/jasdav02/LS...,indirect_immediat...,1576,rb
2019-12-11 17:04:...,/home/jasdav02/LS...,stack_event_colle...,1189,rb
2019-12-11 17:04:...,/home/jasdav02/LS...,java_thread_pool_...,4251,rb
2019-12-11 17:04:...,/home/jasdav02/LS...,stack_options.rb,2186,rb
2019-12-11 17:04:...,/home/jasdav02/LS...,ruby_thread_pool_...,10052,rb
2019-12-11 17:04:...,/home/jasdav02/wi...,packet-sprt.h,1424,h
2019-12-11 17:04:...,/home/jasdav02/LS...,core_refinements_...,20062,rb
2019-12-11 17:04:...,/home/jasdav02/LS...,stack_resource_su...,1912,rb
2019-12-11 17:04:...,/home/jasdav02/wi...,packet-tnef.c,31301,c
2019-12-11 17:04:...,/home/jasdav02/LS...,delay_add_associa...,1959,rb


# Calculate total size consumed by files in GiB

In [14]:
size = df.agg(sum('filesize')/1074000000)
size

(sum(filesize) / 1074000000)
10.432978512104285
