OpenINTEL data quickstart notebook<br/>
Author: Mattijs Jonker <m.jonker@utwente.nl><br/>
Copyright -- all rights reserved -- 2019<br/>

## Instructions

This notebook and the Dockerfile that comes with is meant to serve as a quickstart towards exploring and analyzing (public) OpenINTEL data. In essence, we will build a custom docker container that comes with Jupyter, Spark and Hadoop installed. The container will run a Jupyter notebook server to run the notebook you are currently reading. Within this notebook we will create a Spark application, load some OpenINTEL measurement data (Avro), and perform a few basic analyses to get you on your way.

Please note that this notebook essentially starts a local Spark application that reads data from a local Hadoop filesystem. If you have a Hadoop cluster in place and use, for example, Spark on YARN and perhaps even Kerberos authentication, your docker container needs additional packages and configuration files, and your notebook needs additional configuration directives. Within the OpenINTEL project we have a Dockerfile specific to our setup. Feel free to reach out to us should you need help setting up something similar for your own environment.

**Please perform the below steps to get started**

(for now, these instructions are for unix-based systems only)

### Step 1: Build OpenINTEL quickstart docker image

1. We will assume that you already have a docker daemon up and running on the host machine. In case you do not, instructions for installation can easily be found online.
2. Clone the git repository with the OpenINTEL quickstart docker files: *git clone https://github.com/mattijsjonker/openintel-quickstart-dockerfile*
3. Build the OpenINTEL quickstart docker image: *cd openintel-quickstart-dockerfile && sudo docker build -t openintel-quickstart .* (alternatively, use *sudo ./build-docker.sh*)
    1. Monitor the build output. A successful build is indicated with: *Successfully built <ID>*.
    2. Optionally, you can verify that the image has been built with the command: *sudo docker images*. The output should show you an *openintel-quickstart* image of about 2.3GiB

### Step 2: Download OpenINTEL data
    
1. Download a sample of the (public) OpenINTEL data and place it in the *data* folder. You can do this placement on the host machine as the directory will be shared with our docker container
    1. cd data && mkdir -p source=alexa/year=2019/month=09/day=01 && cd $_
    2. wget https://data.openintel.nl/data/alexa1m/2019/openintel-alexa1m-20190101.tar && tar -xvf openintel-alexa1m-20190101.tar && rm openintel-alexa1m-20190101.tar
    3. Repeat this for a few more days if you wish (evidently, more data means longer example execution time)
    
### Step 3: Run a container using the newly built image
    
1. Run a docker container using the image we just built
    1. You can run a container with the command: *sudo ./run-interactive-docker.sh*
    2. This will create a docker container with the name *openintel-quickstart*
    3. This will "expose" the 8888/TCP port for the notebook server running inside the container. Please be aware that on Linux systems, the port will be world-accessible unless you have explicitly taken steps to prevent this. More information can be found [here](https://docs.docker.com/network/iptables/).
    
The parameters in *run-interactive-docker.sh* will run the container in attached mode. You can detach with ^P^Q and reattach with *sudo docker attach openintel-quickstart*. You can also stop and start the container as you please. These topics are out of scope. Please refer to the Docker [documentation](https://docs.docker.com/get-started/) for more information.

### Step 4: Access the Jupyter server running inside the container

1. You can access Jupyter Notebook at http://localhost:8888/lab
2. The password is 'openintel'
3. This quickstart notebook can be found under */home/openintel/notebooks* (you may have to open the filebrowser with CTRL+B)

### Step 5: run example code

1. You can run the example code below. Please make sure to run the *Imports*, *Configuration* and *Start SparkContext* code blocks before running any of the example code.
    1. Note that each analytic example comes in two separate forms: one form uses DataFrame abstractions and the other uses Spark SQL. The results for each pair of examples are the same. This allows you to pick the approach you are more comfortable or familiar with
    2. You can follow the Spark application's progress in the Jupyter Notebook output, provided you are attached to the running docker container



Imports

In [1]:
import os
from dateutil import rrule
from datetime import datetime, timedelta, date
import random
import itertools
import collections
import numpy as np
import pandas as pd
import subprocess
import IPython.display
import re
import pyarrow as pa

# Find Spark
import findspark
findspark.init()

# PySpark imports
import pyspark
import pyspark.sql.functions as psf
import pyspark.sql.types as pst

Configuration

In [2]:
# Create a (local) HDFS connection
# n.b.: not strictly required for this quickstart notebook
hdfs_fs = pa.hdfs.connect(host=u"default", port=0, user=u"openintel", driver=u"libhdfs")

# Set the OpenINTEL data directory
DATA_DIRECTORY = "/home/openintel/data"

# The list of sources, i.e., TLDs, to investigate
SOURCES = ["alexa"]

print(hdfs_fs.ls(DATA_DIRECTORY))

['file:/home/openintel/data/source=alexa']


#### Create Spark Configuration

In [3]:
# Create a SparkConf
APP_NAME = "openintel-quickstart"
spark_conf = pyspark.SparkConf().setAppName(APP_NAME).setMaster("local"
).set("spark.driver.cores","2"
).set("spark.driver.memory","2G"
# Note: You may have to fiddle with the executor cores, memory and memoryOverhead to fit your docker container's available resources
#       For quickstart purposes, the default settings suffice.
).set("spark.executor.cores", "4"
).set("spark.executor.memory", "6G").set("spark.executor.memoryOverhead", "2G"
).set("spark.dynamicAllocation.enabled", "true")

#### Start the SparkContext

In [4]:
# SparkContext
sc = pyspark.SparkContext(conf=spark_conf)

# SQLContext
sqlc = pyspark.SQLContext(sc)

#### Stop the SparkContext

In [None]:
# Commented to prevent accidental sequential execution
#sc.stop()

### Example code (verbatim SQL)
Here we show how to work with Spark SQL. This involves loading a dataframe, registering a view, and then performing queries using that view.
1. Load Spark DataFrame
2. Register view / temp table
3. Perform Spark SQL queries
4. Drop temp table

1. Load Spark DF and register view *mdata*

In [5]:
# Create Spark DF
spark_df = sqlc.read.option("basePath", "/").format("avro").load(*[
    # n.b.: set paths list as desired: /home/openintel/data/source=<tld>/year=<yyyy>/month=<mm>/day=<dd>
    os.path.join(DATA_DIRECTORY, "source={}".format(i_source)) for i_source in SOURCES
])

# Note that you may have to explicitly specify the Avro schema in case the Avro files you load are "evolved" w.r.t. each other. This can
# technically happen with the public OpenINTEL data, but only if your analysis involves older data. You can either extract the schema from
# a newer file and supply it as read option like so: *.option("avroSchema", ...)* or reach out to us for the latest .avsc.

# Register temporary table
tt_name = "mdata"
spark_df.createOrReplaceTempView(tt_name)

# Show the colums to work with
# see also: https://openintel.nl/background/dictionary/
print(spark_df.columns)

['query_type', 'query_name', 'response_type', 'response_name', 'response_ttl', 'timestamp', 'rtt', 'worker_id', 'status_code', 'ip4_address', 'ip6_address', 'country', 'as', 'as_full', 'ip_prefix', 'cname_name', 'dname_name', 'mx_address', 'mx_preference', 'mxset_hash_algorithm', 'mxset_hash', 'ns_address', 'nsset_hash_algorithm', 'nsset_hash', 'txt_text', 'txt_hash_algorithm', 'txt_hash', 'ds_key_tag', 'ds_algorithm', 'ds_digest_type', 'ds_digest', 'dnskey_flags', 'dnskey_protocol', 'dnskey_algorithm', 'dnskey_pk_rsa_n', 'dnskey_pk_rsa_e', 'dnskey_pk_rsa_bitsize', 'dnskey_pk_eccgost_x', 'dnskey_pk_eccgost_y', 'dnskey_pk_dsa_t', 'dnskey_pk_dsa_q', 'dnskey_pk_dsa_p', 'dnskey_pk_dsa_g', 'dnskey_pk_dsa_y', 'dnskey_pk_eddsa_a', 'dnskey_pk_wire', 'nsec_next_domain_name', 'nsec_owner_rrset_types', 'nsec3_hash_algorithm', 'nsec3_flags', 'nsec3_iterations', 'nsec3_salt', 'nsec3_next_domain_name_hash', 'nsec3_owner_rrset_types', 'nsec3param_hash_algorithm', 'nsec3param_flags', 'nsec3param_itera

2. Perform some example Spark SQL queries

In [6]:
## Number of records per (source, day)
# We simply count the number of measurement data records per (source, day)

# Verbatim Spark SQL query
spark_sql_query_string = """
SELECT source,
       CONCAT_WS("-", year, LPAD(month, 2, "0"), LPAD(day, 2, "0")) AS date, 
       COUNT(1) AS record_count
FROM {} 
WHERE year = 2019 AND month = 9 AND day <= 7
GROUP BY source, date
ORDER BY source, date
""".format(tt_name)

spark_sql_query_r = sqlc.sql(spark_sql_query_string)
spark_sql_query_r.show(truncate=False)

+------+----------+------------+
|source|date      |record_count|
+------+----------+------------+
|alexa |2019-09-01|5022012     |
|alexa |2019-09-02|2677058     |
|alexa |2019-09-03|2683449     |
|alexa |2019-09-04|2872323     |
+------+----------+------------+



In [7]:
## Number of distinct A records per (source, day)
# We count the number of distinct IPv4 addresses per (source, day)

# Verbatim Spark SQL query
spark_sql_query_string = """
SELECT source,
       CONCAT_WS("-", year, LPAD(month, 2, "0"), LPAD(day, 2, "0")) AS date,
       COUNT(DISTINCT(ip4_address)) AS ip4_address_count
FROM {} 
WHERE year = 2019 AND month = 9 AND day <= 7
GROUP BY source, date
ORDER BY source, date
""".format(tt_name)

spark_sql_query_r = sqlc.sql(spark_sql_query_string)
spark_sql_query_r.show(3, truncate=False)

+------+----------+-----------------+
|source|date      |ip4_address_count|
+------+----------+-----------------+
|alexa |2019-09-01|262105           |
|alexa |2019-09-02|161305           |
|alexa |2019-09-03|157303           |
+------+----------+-----------------+
only showing top 3 rows



In [8]:
## Number of distinct domain names with a CloudFlare name server
# We count the number of distinct domain names per (source, day),
# provided the domain name has an NS resource record that ends in .ns.cloudflare.com.

# Verbatim Spark SQL query
spark_sql_query_string = """
SELECT source,
       CONCAT_WS("-", year, LPAD(month, 2, "0"), LPAD(day, 2, "0")) AS date, 
       COUNT(DISTINCT(query_name)) AS qname_count
FROM {} 
WHERE year = 2019 AND month = 9 AND day = 1
      AND ns_address LIKE '%.ns.cloudflare.com.'
GROUP BY source, date
ORDER BY source, date
""".format(tt_name)

spark_sql_query_r = sqlc.sql(spark_sql_query_string)
spark_sql_query_r.show(truncate=False)

+------+----------+-----------+
|source|date      |qname_count|
+------+----------+-----------+
|alexa |2019-09-01|39506      |
+------+----------+-----------+



In [9]:
## Distinct CloudFlare name servers names
# We get the distinct CloudFlare NS record names on 2019-09-01

# Verbatim Spark SQL query
spark_sql_query_string = """
SELECT DISTINCT source,
       CONCAT_WS("-", year, LPAD(month, 2, "0"), LPAD(day, 2, "0")) AS date, 
       REGEXP_EXTRACT(ns_address, '([^.]+)[.]ns[.]cloudflare.com[.]$', 1) AS cf_ns_name
FROM {} 
WHERE year = 2019 AND month = 9 AND day = 1
      AND ns_address LIKE '%.ns.cloudflare.com.'
ORDER BY cf_ns_name
""".format(tt_name)

spark_sql_query_r = sqlc.sql(spark_sql_query_string)
spark_sql_query_r.show(3, truncate=False)

+------+----------+----------+
|source|date      |cf_ns_name|
+------+----------+----------+
|alexa |2019-09-01|abby      |
|alexa |2019-09-01|ada       |
|alexa |2019-09-01|adam      |
+------+----------+----------+
only showing top 3 rows



4. Drop the temporary view *mdata*

In [10]:
# Drop the temporary table
sqlc.dropTempTable(tt_name)
print(sqlc.tableNames())

[]


### Example code (DF abstractions)
Here we show how to work with DF abstractions to accomplish the same as in the above SQL queries. This involves loading a dataframe and then performing operations on the DF
1. Load Spark DataFrame
2. Perform Spark DF operations

1. Load a Spark DataFrame
- Note that, differently from above, we add an ISO8601 date column to the DF for convenience

In [11]:
# Create Spark DF
spark_df = sqlc.read.option("basePath", "/").format("avro").load(*[
    # n.b.: set paths list as desired: /home/openintel/data/source=<tld>/year=<yyyy>/month=<mm>/day=<dd>
    os.path.join(DATA_DIRECTORY, "source={}".format(i_source)) for i_source in SOURCES
]).withColumn(
    # Add an ISO8601 date column
    # n.b.: read() will produce integers for the year, month and day partitions in the HDFS file hierarchy
    "date", psf.concat_ws('-', "year", psf.lpad("month", 2, '0'), psf.lpad("day", 2, '0')))

# Note that you may have to explicitly specify the Avro schema in case the Avro files you load are "evolved" w.r.t. each other. This can
# technically happen with the public OpenINTEL data, but only if your analysis involves older data. You can either extract the schema from
# a newer file and supply it as read option like so: *.option("avroSchema", ...)* or reach out to us for the latest .avsc.

# Show the colums to work with
# see also: https://openintel.nl/background/dictionary/
print(spark_df.columns)

['query_type', 'query_name', 'response_type', 'response_name', 'response_ttl', 'timestamp', 'rtt', 'worker_id', 'status_code', 'ip4_address', 'ip6_address', 'country', 'as', 'as_full', 'ip_prefix', 'cname_name', 'dname_name', 'mx_address', 'mx_preference', 'mxset_hash_algorithm', 'mxset_hash', 'ns_address', 'nsset_hash_algorithm', 'nsset_hash', 'txt_text', 'txt_hash_algorithm', 'txt_hash', 'ds_key_tag', 'ds_algorithm', 'ds_digest_type', 'ds_digest', 'dnskey_flags', 'dnskey_protocol', 'dnskey_algorithm', 'dnskey_pk_rsa_n', 'dnskey_pk_rsa_e', 'dnskey_pk_rsa_bitsize', 'dnskey_pk_eccgost_x', 'dnskey_pk_eccgost_y', 'dnskey_pk_dsa_t', 'dnskey_pk_dsa_q', 'dnskey_pk_dsa_p', 'dnskey_pk_dsa_g', 'dnskey_pk_dsa_y', 'dnskey_pk_eddsa_a', 'dnskey_pk_wire', 'nsec_next_domain_name', 'nsec_owner_rrset_types', 'nsec3_hash_algorithm', 'nsec3_flags', 'nsec3_iterations', 'nsec3_salt', 'nsec3_next_domain_name_hash', 'nsec3_owner_rrset_types', 'nsec3param_hash_algorithm', 'nsec3param_flags', 'nsec3param_itera

In [12]:
## Number of records per (source, day)
# We simply count the number of measurement data records per (source, day)
spark_df.filter(
    (psf.col("year") == 2019) & (psf.col("month") == 9) & (psf.col("day") <= 7)
).select(
    ["source", "date"]
).groupby(
    ["source", "date"]
).agg(
    psf.count(psf.lit("1")).alias("record_count")
).orderBy("date"
).show(truncate=False)

+------+----------+------------+
|source|date      |record_count|
+------+----------+------------+
|alexa |2019-09-01|5022012     |
|alexa |2019-09-02|2677058     |
|alexa |2019-09-03|2683449     |
|alexa |2019-09-04|2872323     |
+------+----------+------------+



In [13]:
## Number of distinct A records per (source, day)
# We count the number of distinct IPv4 addresses per (source, day)
spark_df.filter(
    (psf.col("year") == 2019) & (psf.col("month") == 9) & (psf.col("day") <= 7)
).select(
    ["source", "date", "ip4_address"]
).groupby(
    ["source", "date"]
).agg(
    psf.countDistinct(psf.col("ip4_address")).alias("ip4_address_count")
).orderBy("date"
).show(truncate=False)

+------+----------+-----------------+
|source|date      |ip4_address_count|
+------+----------+-----------------+
|alexa |2019-09-01|262105           |
|alexa |2019-09-02|161305           |
|alexa |2019-09-03|157303           |
|alexa |2019-09-04|191823           |
+------+----------+-----------------+



In [14]:
## Number of distinct domain names with a CloudFlare name server
# We count the number of distinct domain names per (source, day),
# provided the domain name has an NS resource record that ends in .ns.cloudflare.com.
spark_df.filter(
    (psf.col("year") == 2019) & (psf.col("month") == 9) & (psf.col("day") == 1) &
    (psf.col("ns_address").endswith(".ns.cloudflare.com."))
).select(
    ["source", "date", "query_name"]
).groupby(
    ["source", "date"]
).agg(
    psf.countDistinct(psf.col("query_name")).alias("qname_count")
).orderBy("date"
).show(truncate=False)

+------+----------+-----------+
|source|date      |qname_count|
+------+----------+-----------+
|alexa |2019-09-01|39506      |
+------+----------+-----------+



In [15]:
## Distinct CloudFlare name servers names
# We get the distinct CloudFlare NS record names on 2019-09-01
spark_df.filter(
    (psf.col("year") == 2019) & (psf.col("month") == 9) & (psf.col("day") == 1) &
    (psf.col("ns_address").endswith(".ns.cloudflare.com."))
).select(
    ["source", "date", psf.regexp_extract(psf.col("ns_address"), "([^.]+)[.]ns[.]cloudflare.com[.]$", 1).alias("cf_ns_name")]
).distinct(
).orderBy("cf_ns_name"
).show(3, truncate=False)

+------+----------+----------+
|source|date      |cf_ns_name|
+------+----------+----------+
|alexa |2019-09-01|abby      |
|alexa |2019-09-01|ada       |
|alexa |2019-09-01|adam      |
+------+----------+----------+
only showing top 3 rows

