In [1]:
# OpenINTEL PySpark notebook
#
# Author: Mattijs Jonker <m.jonker@utwente.nl> & Mathay Kahraman <mathayk@gmail.com>
# Copyright -- all rights reserved -- 2020

# Instructions
The first part of the file defines functions to generate the data.
At the Section 'Loading Data & Creating Columns' one should run the 'Run All Above' command

Imports

In [2]:
import os
from dateutil import rrule
from datetime import datetime, timedelta, date
import random
import itertools
import collections
import IPy
import numpy as np
import pandas as pd
import subprocess
import IPython.display
import re
import pickle
import matplotlib.pyplot as plt
import matplotlib.dates as mdate
import matplotlib.patches as mpatches
import time
import calendar
import networkx as nx
import sys
import slack
import asyncio
from IPython.display import display_html
from IPython.display import Javascript
from IPython.display import HTML

import pyarrow as pa
os.environ["ARROW_LIBHDFS_DIR"] = "/usr/local/hadoop/lib/native"

# Find Spark
import findspark
findspark.init()

# PySpark imports
import pyspark
import pyspark.sql.functions as psf
import pyspark.sql.types as pst
from pyspark.ml.feature import Tokenizer, RegexTokenizer, HashingTF, CountVectorizer, CountVectorizerModel
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf,col
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, IntegerType, MapType
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import lit
from pyspark.sql.functions import substring
from pyspark.sql.functions import length
from pyspark.sql.functions import desc, asc
from pyspark.sql import Row
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import arrays_zip
from pyspark.sql.functions import avg, first, regexp_extract, countDistinct
from pyspark.sql import Window

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.6.0-spark2.3-s_2.11 pyspark-shell'

Create Spark configuration

In [3]:
APP_NAME = "mathay-pyspark-calculations"

EXTERNAL_DRIVER_IP = "192.87.172.56" # Your external IP address, for callback connections to the Spark driver
OI_KRB_PRINCIPAL = "mathay"          # Your assigned username, i.e., Kerberos principal

OI_KRB_KEYTAB    = os.path.join(     # Your Kerberos keytab
    os.path.expanduser("~"),
    "{}.keytab".format(OI_KRB_PRINCIPAL)
)

## DO NOT CHANGE THE CONFIGURATION BELOW UNLESS YOU KNOW WHAT YOU ARE DOING
spark_conf = pyspark.SparkConf().setAppName(APP_NAME
).setMaster("yarn").set("spark.scheduler.pool", "root.users.{}".format(OI_KRB_PRINCIPAL)
).set("spark.submit.deployMode", "client"
).set("spark.authenticate", "true"
).set("spark.sql.parquet.binaryAsString", "true"
).set("spark.network.crypto.enabled", "true"
).set("spark.driver.host", EXTERNAL_DRIVER_IP
).set("spark.driver.bindAddress", "0.0.0.0"
).set("spark.driver.port", "33000").set("spark.blockManager.port", "33016").set("spark.ui.port", "33032"
).set("spark.driver.cores","2").set("spark.driver.memory","4G"
).set("spark.executor.cores", "5").set("spark.executor.memory", "12G").set("spark.executor.memoryOverhead", "4G"
# ).set("spark.executor.cores", "5").set("spark.executor.memory", "12G").set("spark.executor.memoryOverhead", "4G"
).set("spark.dynamicAllocation.enabled", "true").set("spark.shuffle.service.enabled", "true" # dynamic ori is True
# ).set("spark.ui.showConsoleProgress", "true").set("spark.eventLog.logBlockUpdates.enabled", "true"
# ).set("spark.eventLog.enabled", "true").set("spark.eventLog.dir", "file:/tmp/spark-events") # I added the last 2 lines
# 4, 12, 4, 5 cores
# ).set("spark.dynamicAllocation.maxExecutors", "12" # deze heb ik toegevoegd
)#.set("spark.sql.autoBroadcastJoinThreshold", "-1") # deze heb ik toegevoegd

### Helpers

In [4]:
# A helper to kinit, keytab-based
def kinit_helper(principal, keytab):
    
    kinit_cmd = "kinit -p {} -k -t {} -l 7d -r 7d".format(principal, keytab)
    
    # Call subprocess to execute cmd
    process = subprocess.Popen(kinit_cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable="/bin/bash")
    output, error = process.communicate()
    
    stdout_str = error.decode("utf-8")
    if len(stdout_str) > 0:
        print(stdout_str)
    
    stderr_str = error.decode("utf-8")
    if len(stderr_str) > 0:
        print(stderr_str)
        
# A helper to get a (pyarrow) hdfs client
def hdfs_fs_helper(principal, host="default"):
    
    # Create a HDFS connection
    return pa.hdfs.connect(host=u"{}".format(host), port=0, user=u"{}".format(principal), kerb_ticket=u"/tmp/krb5cc_{}".format(os.getuid()), driver=u"libhdfs")

#### Start the SparkContext
n.b.: 
1. Please do not keep the SparkContext open for an extended period unless you have to. The corresponding application will reserve YARN resources on the OpenINTEL cluster even when idle.
2. Once you have pulled results to the driver you can stop the SparkContext and do the post-processing in, e.g., Pandas. Obviously, you should keep the Python kernel running to maintain state.

In [5]:
kinit_helper(OI_KRB_PRINCIPAL, OI_KRB_KEYTAB)

# SparkContext
sc = pyspark.SparkContext(conf=spark_conf)
print(sc.applicationId)

# SQLContext
sqlc = pyspark.SQLContext(sc)

application_1597403588481_0923


#### Stop the SparkContext

In [6]:
# sc.stop()

Explicit GC

In [7]:
# Call the Garbage Collector explicitly
sc._jvm.System.gc()

Cache clearance (persisted DFs, etc.)

In [8]:
sqlc.clearCache()

### Configuration

In [9]:
# The list of sources, i.e., TLDs, to investigateinvestigateinvestigateinvestigate
SOURCES = ["alexa"]
# SOURCES = ["open-tld"]
# SOURCESSOURCES
# OpenINTEL measurement data base path
OI_MDATA_BASE = "/user/openintel/measurement_data/type=warehouse"

### Here begins my code
We create a Dataframe (DF) which consists of query_name and txt_text records


In [10]:
# Create a Dataframe that consists of all the SPF records in a dataset
# Output: is a Dataframe that has two columns: query_name (domainname), txt_text (SPF record)
def load_spark_df(day, month, year, one_day, datasets):
    # Create Spark DF
    SOURCES = datasets
    df = sqlc.read.option("basePath", "/").parquet(*[
        # n.b.: set paths list as desired: /user/openintel/measurement_data/type=warehouse/source=<tld>/year=<yyyy>/month=<mm>/day=<dd>
        os.path.join(OI_MDATA_BASE, "source={}".format(i_source), "year={:4d}".format(year), "month={:02d}".format(month), "day={:02d}".format(day)) for i_source in SOURCES
    ]).withColumn(
        # Add an ISO8601 date column
        # n.b.: read() will produce integers for the year, month and day partitions in the HDFS file hierarchy
        "date", psf.concat_ws('-', "year", psf.lpad("month", 2, '0'), psf.lpad("day", 2, '0')))

#     remove_first_last_character = udf(lambda x:x[1:-1],StringType())
#     remove_last_character = udf(lambda x:x[:-1],StringType())

    # Refactor DF such that only 1 day of data is taken, there is a non-empty txt_text record and it contains spf
#     if one_day:
#         df = df.filter(
#             (psf.col("year") == year) & (psf.col("month") == month) & (psf.col("day") == day)
#         )
#     else:
#         df = df.filter(
#             (psf.col("year") == year) & (psf.col("month") == month) & (psf.col("day") <= day)
#         )
    
    df = df.filter(
        psf.col("txt_text").isNotNull()
    ).select(
        ["query_name", "txt_text"]
    ).where(
        df.txt_text.contains("v=spf1")
    ).withColumn(
        'query_name', col('query_name').substr(lit(0), length(col('query_name')) - 1)
    ).withColumn(
        'txt_text', col('txt_text').substr(lit(2), length(col('txt_text')) - 2)
    )
    # ).withColumn(
    #     'txt_text', remove_first_last_character('txt_text')
    # ).withColumn(
    #     'query_name', remove_last_character('query_name')
    # )

    #ToDO Remove this line to make it work on all the data
    # df_original = df.limit(1000)
    df_original = df
#     print(df_original.take(10))
    return df_original

In [11]:
# Not used anymore
# Check if SPF record is valid
# Source: https://github.com/calumsbaird/sdvalidator/blob/master/sdvalidator/validate.py
SPF_REGEX_STRING = "^[Vv]=[Ss][Pp][Ff]1( +([-+?~]?([Aa][Ll][Ll]|[Ii][Nn][Cc][Ll][Uu][Dd][Ee]:(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\})|[Aa](:(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}))?((\/([1-9]|1[0-9]|2[0-9]|3[0-2]))?(\/\/([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8]))?)?|[Mm][Xx](:(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}))?((\/([1-9]|1[0-9]|2[0-9]|3[0-2]))?(\/\/([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8]))?)?|[Pp][Tt][Rr](:(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}))?|[Ii][Pp]4:([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/([1-9]|1[0-9]|2[0-9]|3[0-2]))?|[Ii][Pp]6:(::|([0-9A-Fa-f]{1,4}:){7}[0-9A-Fa-f]{1,4}|([0-9A-Fa-f]{1,4}:){1,8}:|([0-9A-Fa-f]{1,4}:){7}:[0-9A-Fa-f]{1,4}|([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}){1,2}|([0-9A-Fa-f]{1,4}:){5}(:[0-9A-Fa-f]{1,4}){1,3}|([0-9A-Fa-f]{1,4}:){4}(:[0-9A-Fa-f]{1,4}){1,4}|([0-9A-Fa-f]{1,4}:){3}(:[0-9A-Fa-f]{1,4}){1,5}|([0-9A-Fa-f]{1,4}:){2}(:[0-9A-Fa-f]{1,4}){1,6}|[0-9A-Fa-f]{1,4}:(:[0-9A-Fa-f]{1,4}){1,7}|:(:[0-9A-Fa-f]{1,4}){1,8}|([0-9A-Fa-f]{1,4}:){6}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|([0-9A-Fa-f]{1,4}:){6}:([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|([0-9A-Fa-f]{1,4}:){5}:([0-9A-Fa-f]{1,4}:)?([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|([0-9A-Fa-f]{1,4}:){4}:([0-9A-Fa-f]{1,4}:){0,2}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|([0-9A-Fa-f]{1,4}:){3}:([0-9A-Fa-f]{1,4}:){0,3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|([0-9A-Fa-f]{1,4}:){2}:([0-9A-Fa-f]{1,4}:){0,4}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|[0-9A-Fa-f]{1,4}::([0-9A-Fa-f]{1,4}:){0,5}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|::([0-9A-Fa-f]{1,4}:){0,6}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))(\/([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8]))?|[Ee][Xx][Ii][Ss][Tt][Ss]:(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}))|[Rr][Ee][Dd][Ii][Rr][Ee][Cc][Tt]=(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\})|[Ee][Xx][Pp]=(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\})|[A-Za-z][-.0-9A-Z_a-z]*=(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*))* *$"

def validate_spf(spf_record, domain, __depth=0):
    """
    >>> validate_spf('csbaird.com')
    'VALID'
    
    Check validity of a domain's spf record.
    Uses a regex and checks recursive lookup depth.
    :param str domain: A domain such as example.com
    :param dict cache: Dictionary for storing spf/dmarc records
    :returns: 'VALID'|'INVALID'|'MISSING'
    :rtype: str
    """

    # Check record follow general syntax
#     SPF_REGEX = re.compile("^v=spf1[ \t]+[+?~-]?(?:(?:all)|(?:ip4(?:[:][0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})?(?:/[0-9]{1,2})?)|(?:ip6(?:[:]([0-9A-Fa-f]{0,4}:){1,5}[0-9A-Fa-f]{0,4})?(?:/[0-9]{1,2})?)|(?:a(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+)?(?:/[0-9]{1,2})?)|(?:mx(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+)?(?:/[0-9]{1,2})?)|(?:ptr(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+))|(?:exists(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+))|(?:include(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+))|(?:redirect(?:[=][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+))|(?:exp(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+))|)(?:(?:[ \t]+[+?~-]?(?:(?:all)|(?:ip4(?:[:][0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})?(?:/[0-9]{1,2})?)|(?:ip6(?:[:]([0-9A-Fa-f]{0,4}:){1,5}[0-9A-Fa-f]{0,4})?(?:/[0-9]{1,2})?)|(?:a(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+)?(?:/[0-9]{1,2})?)|(?:mx(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+)?(?:/[0-9]{1,2})?)|(?:ptr(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+))|(?:exists(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+))|(?:include(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+))|(?:redirect(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+))|(?:exp(?:[:][A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?(?:\.[A-Za-z0-9_](?:[A-Za-z0-9_-]*[A-Za-z0-9_])?)+))|))*)?$")
    SPF_REGEX = re.compile("^[Vv]=[Ss][Pp][Ff]1( +([-+?~]?([Aa][Ll][Ll]|[Ii][Nn][Cc][Ll][Uu][Dd][Ee]:(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\})|[Aa](:(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}))?((\/([1-9]|1[0-9]|2[0-9]|3[0-2]))?(\/\/([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8]))?)?|[Mm][Xx](:(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}))?((\/([1-9]|1[0-9]|2[0-9]|3[0-2]))?(\/\/([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8]))?)?|[Pp][Tt][Rr](:(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}))?|[Ii][Pp]4:([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/([1-9]|1[0-9]|2[0-9]|3[0-2]))?|[Ii][Pp]6:(::|([0-9A-Fa-f]{1,4}:){7}[0-9A-Fa-f]{1,4}|([0-9A-Fa-f]{1,4}:){1,8}:|([0-9A-Fa-f]{1,4}:){7}:[0-9A-Fa-f]{1,4}|([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}){1,2}|([0-9A-Fa-f]{1,4}:){5}(:[0-9A-Fa-f]{1,4}){1,3}|([0-9A-Fa-f]{1,4}:){4}(:[0-9A-Fa-f]{1,4}){1,4}|([0-9A-Fa-f]{1,4}:){3}(:[0-9A-Fa-f]{1,4}){1,5}|([0-9A-Fa-f]{1,4}:){2}(:[0-9A-Fa-f]{1,4}){1,6}|[0-9A-Fa-f]{1,4}:(:[0-9A-Fa-f]{1,4}){1,7}|:(:[0-9A-Fa-f]{1,4}){1,8}|([0-9A-Fa-f]{1,4}:){6}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|([0-9A-Fa-f]{1,4}:){6}:([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|([0-9A-Fa-f]{1,4}:){5}:([0-9A-Fa-f]{1,4}:)?([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|([0-9A-Fa-f]{1,4}:){4}:([0-9A-Fa-f]{1,4}:){0,2}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|([0-9A-Fa-f]{1,4}:){3}:([0-9A-Fa-f]{1,4}:){0,3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|([0-9A-Fa-f]{1,4}:){2}:([0-9A-Fa-f]{1,4}:){0,4}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|[0-9A-Fa-f]{1,4}::([0-9A-Fa-f]{1,4}:){0,5}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])|::([0-9A-Fa-f]{1,4}:){0,6}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))(\/([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8]))?|[Ee][Xx][Ii][Ss][Tt][Ss]:(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}))|[Rr][Ee][Dd][Ii][Rr][Ee][Cc][Tt]=(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\})|[Ee][Xx][Pp]=(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*(\.([A-Za-z]|[A-Za-z]([-0-9A-Za-z]?)*[0-9A-Za-z])|%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\})|[A-Za-z][-.0-9A-Z_a-z]*=(%\{[CDHILOPR-Tcdhilopr-t]([1-9][0-9]?|10[0-9]|11[0-9]|12[0-8])?[Rr]?[+-\/=_]*\}|%%|%_|%-|[!-$&-~])*))* *$")
    
    if not SPF_REGEX.match(spf_record):
        return False
    return True # Comment this line to perform the evualate record function
    
    # Check there isnt too many lookups
    try:
        __evaluate_record(spf_record, domain, depth=__depth)
        return True
    except LookupError:
        return False

    
SPF_MECHANISM_REGEX_STRING = "([+\-~?])?(mx|ip4|ip6|exists|include|all|a|redirect|exp|ptr|v)[:=]?([\w+/_.:\-{%}]*)"
SPF_MECHANISM_REGEX = re.compile(SPF_MECHANISM_REGEX_STRING)

    
def __evaluate_record(spf_record, domain, depth=0, void=0):
    """
    Test hidden record
    """ 
    # A maximum depth of 10 lookups are allowed as per SPF RFC
    if depth >= 10:
        raise LookupError("max 10 spf lookups exceeded")
 
   # A maximum depth of 2 void lookups are allowed as per SPF RFC
    if void >= 2:
        raise LookupError("max 2 void lookups exceeded")
    
   # Get each part of the spf record 
    for match in re.findall(SPF_MECHANISM_REGEX, spf_record):
        qual = match[0]
        mech = match[1].strip()
        value = match[2].strip()
        
        if mech == "include":
            depth+=1
            validate_spf(value,domain,__depth=depth)
            #process_domain_spf(value, depth)
        elif mech == "a":
            depth+=1
            #if value == "":
                #resolve_record(domain, "A")
            #else:
                #resolve_record(value, "A")
        elif mech == "mx":
            depth+=1
            #if value == "":
                #resolve_record(domain, "MX")
            #else:
                #resolve_record(value, "MX")
        elif mech == "ip4" or mech == "ip6":
            pass
            # validate_ip(value) # TODO validate ip with
            # ipaddress.ip_network(address)
        elif mech == "redirect":
            depth+=1
            validate_spf(value,domain, __depth=depth)
        elif mech == "exists":
            depth+=1
            # Resolution not implemented
            # J had commented out
            #if value == "":
            #    resolve_record(domain, "A")
            #else:
            #    resolve_record(value, "A")
        elif mech == "ptr":
            depth+=1
            # Resolution not implemented
            # J had commented out
            #if value == "":
            #    resolve_record(domain, "PTR")
            #else:
            #    resolve_record(value, "PTR")
            
            #print "Use of PTR is discouraged"
        elif mech == 'v':
            pass
        elif mech == 'all':
            assert value == ''
            assert qual in '+-?~' # define '+' as invalid
        else:
            # Invalid mechanism
            print('here')
            raise LookupError('invalid mechanism')
            
print(validate_spf("v=spf1 a:smtp.vogelsang.info a:vgsdom01.vogelsang-gmbh.com a:smtp.exedra.de a:smtp2.vogelsang.info a:hd91.hosting.punkt.de a:vpro0109.proserver.punkt.de ip4:217.29.42.0/24 ip4:217.29.41.0/24 ip4:212.227.126.128/25 ip4:128.127.48.0/23 ip4:80.228.28.0/27 ip""4:185.195.148.0/24 -all", "vogelsang.info"))
print(validate_spf("v=spf1 a=hello.com redirect=_spf.yandex.net", "wowfeed.online"))

True
True


In [12]:
# Not used anymore
# Add row to df such that it shows if SPF record is valid or not
def validate_spf(df):
    df = df.withColumn('is_valid',psf.when(df.txt_text.rlike(SPF_REGEX_STRING)
                                   ,True).otherwise(False))
#     print(df.show())
    return df

In [13]:
# Not used anymore
def insert_dns_queries(df):
    # Insert dns_queries row, which displays the amount of DNS queries will be performed by the SPF record
    mechanisms_queries = ['include', 'a', 'mx', 'ptr', 'exists', 'redirect']

    # print(df_mechanisms.take(1))

    def count_queries(row):
        words = row
        count = 0
        for word in words:
            if word in mechanisms_queries:
                count += 1
        return count

    udf_count_queries = udf(count_queries,IntegerType())
    df_mechanisms = df.withColumn("dns_queries", udf_count_queries(df.words_mechanisms))
#     print(df_mechanisms.show())
    return df_mechanisms

In [14]:
# Not used anymore
def check_queries(df):
    # Check if dns_queries has more than 10 than delete it
#     print(df_mechanisms.count())

    # df_invalid = df_mechanisms.where(df_mechanisms.dns_queries > 10)

    # df = df_mechanisms.where(df_mechanisms.dns_queries <= 10)

    df = df.withColumn('is_valid_queries',psf.when(df.dns_queries <= 10, True).otherwise(False))

#     print(df.count())
#     print(df.show())
    return df

In [15]:
# Not used anymore
def count_top_words(df, vocabularies, d, m, y, df_include, source):     
    # Count words in all SPF records
#     print(vocabularies)
    word_names = [{ str(j) : vocabularies[i][j] for j in range(len(vocabularies[i])) } for i in range(len(vocabularies))]
#     print(word_names)

    size = df.count()

    featureCount_udf = psf.udf(
        lambda r: [(x, int(x in r.indices)) for x in range(r.size)],
        ArrayType(
            StructType(
                [
                    StructField("featureNumber", IntegerType()),
                    StructField("count", IntegerType())
                ]
            )
        )
    )
    
    result = []
    
    for i in range(4):
    #     t = df.select(psf.explode(featureCount_udf("features")).alias("features"))\
        if i == 3: # @@@@@@@@@ watch out for these when adding more functions @@@@@@@@@
            df = df_include
        t = df.select(psf.explode(featureCount_udf((i==0 and "features_mechanisms")\
            or (i==1 and "features_qualifiers") or (i==2 and "features_combined")\
            or "features_include")).alias("features"))\
            .select("features.*")\
            .groupBy("featureNumber")\
            .agg(psf.sum("count").alias("count"))
        
        if i == 3: # include-top-analysis
            t = t.withColumn("featureWord", t["featureNumber"].cast(StringType()))\
            .na.replace(word_names[i], 1, "featureWord")

            t = t.withColumn("featureWord", psf.split(t.featureWord, ':')[1])\
                    .sort(desc("count"))\
#                     .show()
        else:
            t = t.withColumn("featureWord", t["featureNumber"].cast(StringType()))\
                .na.replace(word_names[i], 1, "featureWord")\
                .sort(desc("count"))\
#                 .show()
        t = t.withColumn("domains", lit(size))
#         t = t.cache()
        t.show()
        
        i_hdfs_path = "/user/mathay/results/"
        if source == 'alexa':
            i_hdfs_path += "alexa/"
        if i == 0:
            i_hdfs_path += "mechanisms-analysis/year={:4d}/month={:02d}/day={:02d}".format(y, m, d)
        elif i == 1:
            i_hdfs_path += "qualifiers-analysis/year={:4d}/month={:02d}/day={:02d}".format(y, m, d)
        elif i == 2:
            i_hdfs_path += "combination-analysis/year={:4d}/month={:02d}/day={:02d}".format(y, m, d)
        elif i == 3:
            i_hdfs_path += "include-top-analysis/year={:4d}/month={:02d}/day={:02d}".format(y, m, d)
        t.write.parquet(i_hdfs_path, mode="overwrite", compression="gzip")
        
    ########### adoption rate calculations ###########
    

In [16]:
# Validation-data: validates SPF records and displays how much there invalided and by which category.
# It uses data with only the original SPF record, so no included SPF records
# Output:
# +----------------+------------------+-------------+---------------+-------------+---------------------+------------+
# |valid_spf_string|invalid_spf_string|valid_spf_dns|invalid_spf_dns|total_domains|total_invalid_domains|both_invalid|
# +----------------+------------------+-------------+---------------+-------------+---------------------+------------+
# |        36357044|            234802|     36582602|           9244|     36591846|               242577|        1469|
# +----------------+------------------+-------------+---------------+-------------+---------------------+------------+
#
# Validation-full-data: Saved the full Dataframe to generate the results of the validation-data
# Output:
# +--------------------+--------------------+--------------------+----------------+-------------------+
# |          query_name|            txt_text|               words|valid_spf_string|valid_spf_dns_limit|
# +--------------------+--------------------+--------------------+----------------+-------------------+
# |turkeythawingchar...|v=spf1 a mx inclu...|[v=spf1, a, mx, i...|            true|               true|
# |      tushingham.com|v=spf1 include:sp...|[v=spf1, include:...|            true|               true|
# |          tugset.com|v=spf1 include:_s...|[v=spf1, include:...|            true|               true|
# | tunefulteaching.com|v=spf1 +a +mx +ip...|[v=spf1, +a, +mx,...|            true|               true|
# |        turnwall.com|v=spf1 +a +mx +ip...|[v=spf1, +a, +mx,...|            true|               true|
# |     tunesnation.com|v=spf1 ip6:fd92:5...|[v=spf1, ip6:fd92...|            true|               true|
# |        tukastil.com|v=spf1 a mx ip4:9...|[v=spf1, a, mx, i...|            true|               true|
# |      tubleasure.com|v=spf1 ip6:fd9c:d...|[v=spf1, ip6:fd9c...|            true|               true|
# |turkcellfaturasio...|v=spf1 +a +mx +ip...|[v=spf1, +a, +mx,...|            true|               true|
# |tuhostingvirtual.com|v=spf1 +a +mx +ip...|[v=spf1, +a, +mx,...|            true|               true|
# |  tulenderlatino.com|v=spf1 include:sp...|[v=spf1, include:...|            true|               true|
# |   tuspanaderias.com|v=spf1 include:sp...|[v=spf1, include:...|            true|               true|
# |   tuoyangdichan.com|v=spf1 include:sp...|[v=spf1, include:...|            true|               true|
# |       tucniacik.com|v=spf1 a mx inclu...|[v=spf1, a, mx, i...|            true|               true|
# |   tulsanewsdude.com|v=spf1 ip4:198.54...|[v=spf1, ip4:198....|            true|               true|
# |turkington-window...|v=spf1 include:sp...|[v=spf1, include:...|            true|               true|
# |    turbotimeusa.com|v=spf1 include:sh...|[v=spf1, include:...|            true|               true|
# |    turbotimeusa.com|v=spf1 include:sp...|[v=spf1, include:...|            true|               true|
# |turtleandthemonke...|v=spf1 include:sp...|[v=spf1, include:...|            true|               true|
# | tuspequerrechos.com|v=spf1 include:_s...|[v=spf1, include:...|            true|               true|
# +--------------------+--------------------+--------------------+----------------+-------------------+
#
# Invalid-dns-limit: Generates which mechanisms/modifiers are related to the fact that DNS lookup is exceeded
# Output:
# +-----+------+-------+-----+----+---------------------+--------+
# |    a|exists|include|   mx| ptr|invalid_records_count| domains|
# +-----+------+-------+-----+----+---------------------+--------+
# |64253|    25|  36681|19005|3968|                 9244|36591846|
# +-----+------+-------+-----+----+---------------------+--------+
def validation_spf(df, d, m, y, source):
    months = {1: "January", 2: "February", 3: "March", 4: "April", 5: "May", 6: "June", 7: "July", 8: "August",
              9: "September", 10: "October", 11: "November", 12: "December"}

    df_all_records = df

    tokenizer = Tokenizer(inputCol="txt_text", outputCol="words")

    tokenized = tokenizer.transform(df_all_records)

    qualifiers = ["+", "-", "~", "?"]
    mechanisms = ["all", "include", "a", "mx", "ptr", "ip4", "ip6", "exists", "redirect", "exp"]
    mechanisms_dns = ["include", "a", "mx", "ptr", "exists", "redirect"]
    combined = mechanisms[:]
    combined_dns = mechanisms_dns[:]
    for qualifier in qualifiers:
        for mechanism in mechanisms:
            combined.append(qualifier + mechanism)
        for mechanism in mechanisms_dns:
            combined_dns.append(qualifier + mechanism)
    
    def check_string(spf):
        if spf[0] != "v=spf1":
            return False
        for i in range(1, len(spf)):
            result = check_mechanism_string(spf[i])
            if not result:
                return False
        return True
    
    def check_mechanism_string(spf):
        if spf == ' ':
            return True
        spf = spf.replace(' ', '').replace('\t', '').replace('\n', '')
        if spf == '':
            return True
        if spf.startswith(tuple(combined)):
            return True
        return False
        
    def check_dns_limit(spf):
#         if spf[0] != "v=spf1":
#             return False
        dns_count = 0
        for i in range(1, len(spf)):
            result = check_mechanism_dns(spf[i], dns_count)
            if result:
                dns_count += 1
            if dns_count > 10:
                return False
        return True
    
    def check_mechanism_dns(spf, count):
        # returns array[0, 1] where 0 is if valid and 1 if we need to increment counter
        if spf.startswith(tuple(combined_dns)):
            return True
        return False

    udf_check_string = udf(check_string, BooleanType())
    df_string_checked = tokenized.withColumn("valid_spf_string", udf_check_string(tokenized.words))
    
    udf_check_dns_limit = udf(check_dns_limit, BooleanType())
    df_both_checked = df_string_checked.withColumn("valid_spf_dns_limit", udf_check_dns_limit(df_string_checked.words))

    invalid_string = df_both_checked.where(df_both_checked.valid_spf_string == False).count()
    valid_string = df_both_checked.where(df_both_checked.valid_spf_string == True).count()
    invalid_dns = df_both_checked.where(df_both_checked.valid_spf_dns_limit == False).count()
    valid_dns = df_both_checked.where(df_both_checked.valid_spf_dns_limit == True).count()
    
    total_domains = df_both_checked.count()
    total_invalid_domains = df_both_checked.where((col('valid_spf_string') == False) | (col('valid_spf_dns_limit') == False)).count()
    domains_both_invalid = df_both_checked.where((col('valid_spf_string') == False) & (col('valid_spf_dns_limit') == False)).count()
    
    df_result = sqlc.createDataFrame(
                [
                    (valid_string, invalid_string, valid_dns, invalid_dns, total_domains, total_invalid_domains, domains_both_invalid), # create your data here, be consistent in the types.
                ],
                ['valid_spf_string', 'invalid_spf_string', 'valid_spf_dns', 'invalid_spf_dns', 'total_domains', 'total_invalid_domains', 'both_invalid'] # add your columns label here
            )
    
    df_result.show()
    save_df(df_result, d, m, y, source, 'validation-data')
    
    df_both_checked.show()
    save_df(df_both_checked, d, m, y, source, 'validation-full-data') # this might be unnecessary
    
    #### Here starts new section that counts it
    
    # Counting where it goes wrong
    def count_dns(spf):
        result = {"include": 0, "a": 0, "mx": 0, "ptr": 0, "exists": 0, "redirect": 0}
        for i in range(len(spf)):
            if spf[i].startswith("a") | spf[i].startswith("+a") | spf[i].startswith("-a") | spf[i].startswith("~a") | spf[i].startswith("?a"):
                result["a"] += 1
            elif spf[i].startswith("include") | spf[i].startswith("+include") | spf[i].startswith("-include") | spf[i].startswith("~include") | spf[i].startswith("?include"):
                result["include"] += 1
            elif spf[i].startswith("mx") | spf[i].startswith("+mx") | spf[i].startswith("-mx") | spf[i].startswith("~mx") | spf[i].startswith("?mx"):
                result["mx"] += 1
            elif spf[i].startswith("ptr") | spf[i].startswith("+ptr") | spf[i].startswith("-ptr") | spf[i].startswith("~ptra") | spf[i].startswith("?ptr"):
                result["ptr"] += 1
            elif spf[i].startswith("exists") | spf[i].startswith("+exists") | spf[i].startswith("-exists") | spf[i].startswith("~exists") | spf[i].startswith("?exists"):
                result["exists"] += 1
            elif spf[i].startswith("redirect") | spf[i].startswith("+redirect") | spf[i].startswith("-redirect") | spf[i].startswith("~redirect") | spf[i].startswith("?redirect"):
                result["redirect"] += 1
        return result

    df = df_both_checked.where(df_both_checked.valid_spf_dns_limit == False)
    udf_count = udf(count_dns, MapType(StringType(), IntegerType()))
    df_counted = df.withColumn("counts", udf_count(df.words))
    
    columns = df_counted.select('counts').rdd.flatMap(lambda x: x).toDF().columns
    for i in columns:
        df_counted = df_counted.withColumn(i, lit(df_counted.counts[i]))
    
    a = df_counted.agg(psf.sum("a")).collect()[0][0]
    exists = df_counted.agg(psf.sum("exists")).collect()[0][0]
    include = df_counted.agg(psf.sum("include")).collect()[0][0]
    mx = df_counted.agg(psf.sum("mx")).collect()[0][0]
    ptr = df_counted.agg(psf.sum("ptr")).collect()[0][0]
    redirect = df_counted.agg(psf.sum("redirect")).collect()[0][0]
    invalid_records_count = df_counted.count()
    all_records_count = df_both_checked.count()
    
    df_result = sqlc.createDataFrame(
                [
                    (a, exists, include, mx, ptr, redirect, invalid_records_count, all_records_count), # create your data here, be consistent in the types.
                ],
                ['a', 'exists', 'include', 'mx', 'ptr', 'redirect', 'invalid_records_count', 'domains'] # add your columns label here
            )
    
    df_result.show()
    save_df(df_result, d, m, y, source, 'invalid-dns-limit')

In [17]:
# Adds an include row/column
def insert_include_row(df):
    df_include = df
    df_include = df.where(df.txt_text.contains("include"))
#     print(df_include.head())
#     print(df_include.count())

    INCLUDE_REGEX = "include:(\S+)"

    tokenizer = RegexTokenizer(inputCol="txt_text", outputCol="words", pattern=INCLUDE_REGEX, gaps=False)

    countvectorizer_vocabsize = 20
    countVectorizer = CountVectorizer(inputCol="words",\
                                        outputCol="features",\
                                        vocabSize=countvectorizer_vocabsize, minDF=1.0, minTF=1.0)

    # Perform KMeans clustering with K clusters.
    # ToDo: Adjust seed
    #     kmeans = KMeans().setK(6).setSeed(1)

    # Create pipeline with all the stages
    pipeline = Pipeline(stages=[tokenizer]) # Add countVectorizer, kmeans to the list to perform clustering

    # Run the pipeline with df as dataframe
    cvModel = pipeline.fit(df_include)

    # Return all stages to print vocabulary(s)
    stages = cvModel.stages
#     vectorizers = [s for s in stages if isinstance(s, CountVectorizerModel)]

#     vocabulary_include = [v.vocabulary for v in vectorizers][0]
#     print(vocabulary_include)

    # Apply transform to retrieve clusters
    #     result.append(cvModel.transform(df))
    df_include = cvModel.transform(df_include)
#     print(df_include.show())
    return df_include

In [18]:
# Generates all the linked domains in the 'combined' column
# The added column displays if a new domain has been added in this depth
# The generated (saved) path has an extra parameter: depth
# Output:
# +--------------------+--------------------+--------------------+-----+
# |                 src|        ori_includes|            combined|added|
# +--------------------+--------------------+--------------------+-----+
# |   0001attorneys.com|[websitewelcome.c...|[websitewelcome.c...|false|
# |007transportservi...|[spf.protection.o...|[spf.protection.o...|false|
# |        00duncan.com|[spf.protection.o...|[spf.protection.o...|false|
# |         0110666.com|       [spf.163.com]|       [spf.163.com]|false|
# |      0120145929.com|[spf12.gmoserver.jp]|[spf12.gmoserver.jp]|false|
# |         01321kk.com|[namebrightmail.com]|[namebrightmail.com]|false|
# |           018cn.com| [spf.mxhichina.com]| [spf.mxhichina.com]|false|
# |    01abdominaux.com|[relay.mailchanne...|[relay.mailchanne...|false|
# |    01disruption.com|   [_spf.google.com]|   [_spf.google.com]|false|
# |        01future.com|        [mx.ovh.com]|        [mx.ovh.com]|false|
# |        01zerone.com| [_spf.site4now.net]| [_spf.site4now.net]|false|
# |          020jhe.com| [spf7.gmoserver.jp]| [spf7.gmoserver.jp]|false|
# |         021-186.com| [spf.mxhichina.com]| [spf.mxhichina.com]|false|
# |   021shuangying.com|      [spf.cn4e.com]|      [spf.cn4e.com]|false|
# |       024miaomu.com|[spf.mail.faidns....|[spf.mail.faidns....|false|
# |         0298765.com|       [spf.163.com]|       [spf.163.com]|false|
# |          0307cs.com|        [mx.ovh.com]|        [mx.ovh.com]|false|
# |        0318mesh.com|  [spf.263xmail.com]|  [spf.263xmail.com]|false|
# |    032designltd.com|[spf.protection.o...|[spf.protection.o...|false|
# |         0376543.com|       [spf.163.com]|       [spf.163.com]|false|
# +--------------------+--------------------+--------------------+-----+
def include_analysis(df_spark, day, month, year, depth, source):
    last_df = df_spark
    
    # Find latest calculated depth
    latest_calculated = -1
    
    i_hdfs_path_without = "/user/mathay/results_new/source=" + str(source) + "/data=include-depth/"
    for d in reversed(range(0, depth + 1)):
        try: # deze was without
            df_include = sqlc.read.option("basePath", "/").parquet(*[
                        os.path.join(i_hdfs_path_without, "year={:4d}".format(year), "month={:02d}".format(month)
                                     , "day={:02d}".format(day), "depth={}".format(d))
                    ]).select("src", col("combined").alias("words"), "added", "ori_includes")
            latest_calculated = d
            break
        except:
            pass
        
    if latest_calculated == depth:
        return df_include
    
#     print("eerste keer: ", latest_calculated)
    did_minus_one = True if latest_calculated == -1 else False
    
    df_ori = None
    
    for d in range(latest_calculated + 1, depth + 1):
#         print("d:", d)
#         print("latest_calculated:", latest_calculated)
        df_include = None
        
#         if d == 0 and did_minus_one:
#             continue
        
        if d == -1 or d == 0:
            df_include = df_spark
            df_include = insert_include_row(df_include).drop('txt_text')
            
            def split_includes(row):
                result = []
                for i in row:
                    result.append(i.split(":")[1])
                return result
            udf_split_includes = udf(split_includes, ArrayType(StringType()))

            df_split_includes = df_include.withColumn("ori_includes", udf_split_includes(df_include.words)).drop('words')
#             print("het is -1 of 0")
        else:
            # deze was without
            df_include = sqlc.read.option("basePath", "/").parquet(*[
                            os.path.join(i_hdfs_path_without, "year={:4d}".format(year), "month={:02d}".format(month)
                                         , "day={:02d}".format(day), "depth={}".format(d-1))
                        ]).select("src", col("combined").alias("words"))
            df_split_includes = df_include.select(col("src").alias("query_name"), col("words").alias("ori_includes"))
            
        i_hdfs_path = "/user/mathay/results_new/source=" + str(source) + "/data=include-depth/"  
            
        if d == -1 or d == 0:
            i_hdfs_path += "year={:4d}/month={:02d}/day={:02d}/depth={}".format(year, month, day, 0)
#             print("in de -1 of 0 voor de show")
#             df_split_includes.show()
            df_split_includes = df_split_includes.select(col("query_name").alias("src"), col("ori_includes").alias("combined"))
            df_split_includes.write.parquet(i_hdfs_path, mode="overwrite", compression="gzip")
#             df_split_includes.show(truncate=False)
#             print("ik kom hier")
            continue
        
        df_exploded = df_split_includes.withColumn("ori_includes_exploded", psf.explode("ori_includes")) \
                .select(col("query_name").alias("src"), col("ori_includes_exploded").alias("dst"), "ori_includes").dropDuplicates()

        df_exploded_renamed = df_exploded.select(col("src").alias("ori_include"), col("dst").alias("new_include"))
        
        if df_ori is None:
            df_ori = sqlc.read.option("basePath", "/").parquet(*[
                            os.path.join(i_hdfs_path_without, "year={:4d}".format(year), "month={:02d}".format(month)
                                         , "day={:02d}".format(day), "depth={}".format(0))
                        ]).select('src', 'combined').withColumn('new_include', psf.explode("combined")).dropDuplicates()
            df_ori = df_ori.select(col('src').alias('ori_include'), col('new_include'))
#             print("df_ori")
#             df_ori.where(col('ori_include') == 'uhrig.com').show(truncate=False)
#             print("df_ori")
#             df_ori.where(col('ori_include') == 'digitalfirstmedia.com').show(truncate=False)
        
        if d > 1:
            df_joined = df_exploded.join(df_ori, df_exploded.dst == df_ori.ori_include, 'left').select('src', 'dst', 'new_include', 'ori_includes')
            df_joined = df_joined.na.fill("Empty")
        else:
            df_joined = df_exploded.join(df_exploded_renamed, df_exploded.dst == df_exploded_renamed.ori_include, 'left').select('src', 'dst', 'new_include', 'ori_includes')
            df_joined = df_joined.na.fill("Empty")
        
        df_joined = df_joined.dropDuplicates()
        
#         df_joined = df_exploded.join(df_exploded_renamed, df_exploded.dst == df_exploded_renamed.ori_include, 'left').select('src', 'dst', 'new_include', 'ori_includes')
#         df_joined = df_joined.na.fill("Empty")

        df_combined = df_joined.withColumn("combined", psf.array("dst", "new_include"))
        df_combined = df_combined.select("src", psf.array_remove(df_combined.combined, "Empty").alias("combined"), "ori_includes")

        df_aggregated = df_combined.groupby(["src", "ori_includes"]).agg(psf.collect_set("combined").alias("combined"))

        df_one_list = df_aggregated.withColumn("combined", psf.flatten(col("combined")))
        
        df_one_list_no_duplicates = df_one_list.withColumn("combined", psf.array_distinct("combined"))
        
        df_checked_added = df_one_list_no_duplicates.withColumn("added", \
                                     psf.when(psf.size(df_one_list_no_duplicates.combined) > \
                                              psf.size(df_one_list_no_duplicates.ori_includes), True)\
                                     .otherwise(False))
        
        i_hdfs_path = "/user/mathay/results_new/source=" + str(source) + "/data=include-depth/"
        i_hdfs_path += "year={:4d}/month={:02d}/day={:02d}/depth={}".format(year, month, day, d)
        df_checked_added.write.parquet(i_hdfs_path, mode="overwrite", compression="gzip")
        
        if d == depth:
            df_checked_added.show(truncate=False)

In [19]:
from graphframes import *

In [20]:
# Load a Dataframe, but without filtering on SPF records. So you will get all the records that are available
def load_spark_df_no_spf(day, month, year, one_day, datasets, opencc=False):
    # Create Spark DF
    if opencc:
        SOURCES = ['opencc']
    else:
        SOURCES = datasets
    df = sqlc.read.option("basePath", "/").parquet(*[
        # n.b.: set paths list as desired: /user/openintel/measurement_data/type=warehouse/source=<tld>/year=<yyyy>/month=<mm>/day=<dd>
        os.path.join(OI_MDATA_BASE, "source={}".format(i_source), "year={:4d}".format(year), "month={:02d}".format(month), "day={:02d}".format(day)) for i_source in SOURCES
    ]).withColumn(
        # Add an ISO8601 date column
        # n.b.: read() will produce integers for the year, month and day partitions in the HDFS file hierarchy
        "date", psf.concat_ws('-', "year", psf.lpad("month", 2, '0'), psf.lpad("day", 2, '0')))
    
#     df = df.filter(
#         psf.col("txt_text").isNotNull()
#     ).select(
#         ["query_name", "txt_text"]
#     ).where(
#         df.txt_text.contains("v=spf1")
#     ).withColumn(
#         'query_name', col('query_name').substr(lit(0), length(col('query_name')) - 1)
#     ).withColumn(
#         'txt_text', col('txt_text').substr(lit(2), length(col('txt_text')) - 2)
#     )
    df = df.select(
        ["query_name", "txt_text"]
#     ).withColumn(
#         'query_name', col('query_name').substr(lit(0), length(col('query_name')) - 1)
    ).withColumn(
        'txt_text', col('txt_text').substr(lit(2), length(col('txt_text')) - 2)
    ).withColumn("qname_sld", regexp_extract("query_name", "([^.]+[.][^.]+[.])$", 1))
    
    if opencc:
        df = df.filter(df.qname_sld.endswith('.' + str(datasets[0]) + '.'))
    

    df_original = df
    return df_original

In [21]:
# Calcultes the adoption rate of a given source
# Output:
# +-------------+-----------+-----------------+
# |total_domains|spf_domains|    adoption_rate|
# +-------------+-----------+-----------------+
# |    145418816|   36591846|0.251630751827879|
# +-------------+-----------+-----------------+
def adoption_rate(d, m, y, source, opencc=False):
    dataset = [source]
    df_spark = load_spark_df_no_spf(d, m, y, False, dataset, opencc) # need to change to com and org
    total_domains = df_spark.select("qname_sld").distinct().count()
    spf_domains = df_spark.where(df_spark.txt_text.contains("v=spf1")).count()
    adoption_rate = int(spf_domains) / int(total_domains)
    
    df_result = sqlc.createDataFrame(
                [
                    (total_domains, spf_domains, adoption_rate), # create your data here, be consistent in the types.
                ],
                ['total_domains', 'spf_domains', 'adoption_rate'] # add your columns label here
            )
    
    df_result.show()
    
    save_df(df_result, d, m, y, source, 'adoption-rate')

In [22]:
# Check if a dataframe already exists
def check_if_already_exists(d, m, y):
    try:
        sqlc.read.parquet("/user/mathay/results_new/source={}/data={}/year={:4d}/month={:02d}/day={:02d}".format('com', 'adoption-rate', 2020, 5, 1))        
        sqlc.read.parquet("/user/mathay/results/mechanisms-analysis/year={:4d}/month={:02d}/day={:02d}".format(y, m, d))
        sqlc.read.parquet("/user/mathay/results/qualifiers-analysis/year={:4d}/month={:02d}/day={:02d}".format(y, m, d))
        sqlc.read.parquet("/user/mathay/results/combination-analysis/year={:4d}/month={:02d}/day={:02d}".format(y, m, d))
        sqlc.read.parquet("/user/mathay/results/include-top-analysis/year={:4d}/month={:02d}/day={:02d}".format(y, m, d))
        sqlc.read.parquet("/user/mathay/results/validation-analysis/year={:4d}/month={:02d}/day={:02d}".format(y, m, d))
        sqlc.read.parquet("/user/mathay/results/validation-full-analysis/year={:4d}/month={:02d}/day={:02d}".format(y, m, d))
        sqlc.read.parquet("/user/mathay/results/invalid-dns-limit-analysis/year={:4d}/month={:02d}/day={:02d}".format(y, m, d))
        for i in range(6):
            sqlc.read.parquet("/user/mathay/results/spf-include-analysis/year={:4d}/month={:02d}/day={:02d}/depth={}".format(y, m, d, i))
    except:
        return False
    return True

In [23]:
# Saves a dataframe in the correct format
def save_df(df, d, m, y, source, data_name):
    if isinstance(source, list):
        source = "com-org-net-alexa"
    i_hdfs_path = "/user/mathay/results_new/source=" + str(source) + "/"
    i_hdfs_path += "data=" + str(data_name) + "/year={:4d}/month={:02d}/day={:02d}".format(y, m, d)
    df.write.parquet(i_hdfs_path, mode="overwrite", compression="gzip")

In [24]:
# Counts how often each qualifier, mechanism, modifier, and combination of qualifier and mechanism occurs
# Ouput:
# +---------+--------+-------------+
# |      key|   count|total_domains|
# +---------+--------+-------------+
# |+redirect|     353|     36591846|
# |-redirect|       0|     36591846|
# | redirect|  552143|     36591846|
# | +include| 1261471|     36591846|
# | ?include|   64455|     36591846|
# |?redirect|       0|     36591846|
# |       mx| 8602351|     36591846|
# | ~include|      29|     36591846|
# | -include|     205|     36591846|
# |~redirect|       0|     36591846|
# |        ?| 4287975|     36591846|
# |  include|25203690|     36591846|
# |     +all|   43530|     36591846|
# |     ~ptr|     101|     36591846|
# |  +exists|    1502|     36591846|
# |  -exists|      42|     36591846|
# |      ip6| 1685005|     36591846|
# |     +ip6|   18505|     36591846|
# |  ?exists|      20|     36591846|
# |  ~exists|       0|     36591846|
# +---------+--------+-------------+
qualifiers = ["+", "-", "~", "?"]
mechanisms = ["all", "include", "a", "mx", "ptr", "ip4", "ip6", "exists", "redirect", "exp"]
mechanisms_dns = ["include", "a", "mx", "ptr", "exists"]
combined = mechanisms[:]
for qualifier in qualifiers:
    for mechanism in mechanisms:
        combined.append(qualifier + mechanism)
        
def count_everything(df, d, m, y, dataset):
    tokenizer = Tokenizer(inputCol="txt_text", outputCol="words")
    df_tokenized = tokenizer.transform(df)
    
    def count_all(tokenized_record):
        result_dict = {i: 0 for i in combined}
        for qual in qualifiers:
            result_dict[qual] = 0
        for word in tokenized_record:
            for possible_start in combined: # check combined and mechanisms
                if word.startswith(possible_start):
                    if word == 'a' or word == '+a' or word == '-a' or word == '~a' or word == '?a' or\
                    (word.startswith('a') and len(word) > 1 and word[1] == ':') or\
                    (word.startswith('+a') and len(word) > 2 and word[2] != ':') or\
                    (word.startswith('-a') and len(word) > 2 and word[2] != ':') or\
                    (word.startswith('~a') and len(word) > 2 and word[2] != ':') or\
                    (word.startswith('?a') and len(word) > 2 and word[2] != ':'):
                        result_dict[possible_start] += 1
                    else:    
                        result_dict[possible_start] += 1
#                 if len(word) > 1: # check for mechanisms only
#                     if word[1:].startswith(possible_start):
#                         result_dict[possible_start] += 1
            
            for possible_start in qualifiers: # check qualifiers
                if word.startswith(possible_start):
                    result_dict[possible_start] += 1
        return result_dict
    
    udf_count_everything = udf(count_all, MapType(StringType(), IntegerType()))
    df_counted = df_tokenized.withColumn("countings", udf_count_everything(df_tokenized.words))
    df_result = df_counted.select(psf.explode("countings")).groupBy("key").sum("value").select(col('key'), col("sum(value)").alias("count"))
    total_domains = df.count()
    df_result = df_result.withColumn('total_domains', lit(total_domains))
    df_result.show()
    save_df(df_result, d, m, y, dataset, 'count-variables')

In [25]:
# Counts how often the domain is included using the include mechanism
# Output (you can sort on count to display the top x included domains):
# +--------------------+-------+-------------+
# |                 key|  count|total_domains|
# +--------------------+-------+-------------+
# |     _spf.google.com|2625365|     36591846|
# |      _spf.heteml.jp|  67228|     36591846|
# |_spf.spamgateway....|     19|     36591846|
# |       spf.hyatt.com|     40|     36591846|
# |   filter.npit.co.uk|     45|     36591846|
# |  ispf.taomail.co.uk|     25|     36591846|
# |ecbiz137.inmotion...|      1|     36591846|
# |    wpmx.wadax.ne.jp|    248|     36591846|
# |ultrasavvyagency.com|   1271|     36591846|
# |smtp.secureserver...|   8376|     36591846|
# |  spf.zamailgate.com|   1111|     36591846|
# |       wmxf.f-hs.com|    370|     36591846|
# |spf.inspectionsup...|    227|     36591846|
# |       marenavis.com|     58|     36591846|
# | mx16.secure-cms.net|     48|     36591846|
# |   spf.sg.aliyun.com|    277|     36591846|
# |          kiviuq.net|      5|     36591846|
# |     spf.h1.hitme.pl|    200|     36591846|
# |spf105.unisonserv...|    734|     36591846|
# |smart.spf.superho...|    549|     36591846|
# +--------------------+-------+-------------+
def count_included_domains(df, d, m, y, dataset):
    tokenizer = Tokenizer(inputCol="txt_text", outputCol="words")
    df_tokenized = tokenizer.transform(df)
    
    def count_includes(tokenized_record):
        result_dict = {}
        for word in tokenized_record:
            if word.startswith(('include', '+include', '-include', '?include', '~include')):
                splitted = word.split(':')
                if len(splitted) > 1:
                    url = splitted[1]
                    if url not in result_dict:
                        result_dict[url] = 1
                    else:
                        result_dict[url] += 1
        return result_dict
    
    udf_count_includes = udf(count_includes, MapType(StringType(), IntegerType()))
    df_counted = df_tokenized.withColumn("countings", udf_count_includes(df_tokenized.words))
    df_result = df_counted.select(psf.explode("countings")).groupBy("key").sum("value").select(col('key'), col("sum(value)").alias("count"))
    total_domains = df.count()
    df_result = df_result.withColumn('total_domains', lit(total_domains))
    df_result.show()
    save_df(df_result, d, m, y, dataset, 'included-domains-count')

In [26]:
# Verifies the SPF records using the linkage of SPF records (only DNS requests)
# Output:
# +-------------+-------------+---------------+
# |total_domains|valid_domains|invalid_domains|
# +-------------+-------------+---------------+
# |     23040052|     19910161|        3129891|
# +-------------+-------------+---------------+
def dns_lookup(d, m, y, source, df_spf):
    df_depth = sqlc.read.parquet("/user/mathay/results_new/source={}/data=include-depth/year={:4d}/month={:02d}/day={:02d}/depth={:1d}".format(source, y, m, d, 5))
    df_depth = df_depth.select("src", "combined")
    df_depth = df_depth.withColumn("combined", psf.array_union(df_depth.combined, psf.array(psf.lit(df_depth.src))))
    print("df_depth:", df_depth.count())

    df_exploded = df_depth.withColumn("domain_to_count", psf.explode("combined")).select("src", "domain_to_count")
    # df_exploded.filter(df_exploded.src == "spaceistanbul.com").show(truncate=False)
    print("df_exploded:", df_exploded.count())
    print("df_exploded:", df_exploded.show())

#     df_spf = load_spark_df(d, m, y, True, ['com', 'org', 'net', 'alexa'])
    # df_spf.filter(df_spf.query_name == "spaceistanbul.com").show(truncate=False)
    print("df_spf:", df_spf.count())
    print("df_spf:", df_spf.show())

    df_combined = df_exploded.join(df_spf, df_spf.query_name == df_exploded.domain_to_count)
    df_combined = df_combined.select("src", "domain_to_count", "txt_text")
    # df_combined.filter(df_combined.src == "0001attorneys.com").show()
    print("df_combined:", df_combined.count())
    print("df_combined:", df_combined.show())

    tokenizer = Tokenizer(inputCol="txt_text", outputCol="words")
    df_tokenized = tokenizer.transform(df_combined)
    # df_tokenized.show()
    print("df_tokenized:", df_tokenized.count())

    qualifiers = ["+", "-", "~", "?"]
    mechanisms = ["all", "include", "a", "mx", "ptr", "ip4", "ip6", "exists", "redirect", "exp"]
    mechanisms_dns = ["include", "a", "mx", "ptr", "exists"]
    combined = mechanisms[:]
    combined_dns = mechanisms_dns[:]
    for qualifier in qualifiers:
        for mechanism in mechanisms:
            combined.append(qualifier + mechanism)
        for mechanism in mechanisms_dns:
            combined_dns.append(qualifier + mechanism)

    def count_dns_requests(spf):
        dns_count = 0
        for i in range(1, len(spf)):
            result = check_mechanism_dns(spf[i], dns_count)
            if result:
                dns_count += 1
        return dns_count

    def check_mechanism_dns(spf, count):
        if spf.startswith(tuple(combined_dns)):
            if spf.endswith("all"): # edge case
                return False
            return True
        return False

    udf_count_dns_requests = udf(count_dns_requests, IntegerType())
    df_counted = df_tokenized.withColumn("dns_count", udf_count_dns_requests(df_tokenized.words))
    # df_counted.show()
    print("df_counted:", df_counted.count())

    df_joined = df_counted.groupby("src").agg(psf.sum("dns_count")).select("src", col("sum(dns_count)").alias("dns_count"))
    print("df_joined:", df_joined.count())
    print(df_joined.show())
    
#     invalid_domains = df_joined.filter(col("dns_count") > 10).count()
    valid_domains = df_joined.filter(col("dns_count") <= 10).count()
    total_domains = df_joined.count()
    invalid_domains = total_domains - valid_domains
    
    df_result = sqlc.createDataFrame(
                [
                    (total_domains, valid_domains, invalid_domains), # create your data here, be consistent in the types.
                ],
                ['total_domains', 'valid_domains', 'invalid_domains'] # add your columns label here
            )
    
    save_df(df_result, d, m, y, source, 'dns-limit-advanced')
    
    df_result.show()

In [27]:
# Verifies the SPF records using DNS requests and Syntax errors
# Output:
# +-------------+-------------+---------------+
# |total_domains|valid_domains|invalid_domains|
# +-------------+-------------+---------------+
# |     36591846|     32853069|        3738777|
# +-------------+-------------+---------------+
def verify_all(d, m, y, source, df_spf):
    df_depth = sqlc.read.parquet("/user/mathay/results_new/source={}/data=include-depth/year={:4d}/month={:02d}/day={:02d}/depth={:1d}".format(source, y, m, d, 5))
    df_depth = df_depth.select("src", "combined")
    df_depth = df_depth.withColumn("combined", psf.array_union(df_depth.combined, psf.array(psf.lit(df_depth.src))))
    print("df_depth:", df_depth.count())

    df_exploded = df_depth.withColumn("domain_to_count", psf.explode("combined")).select("src", "domain_to_count")
    # df_exploded.filter(df_exploded.src == "spaceistanbul.com").show(truncate=False)
    print("df_exploded:", df_exploded.count())
#     print("df_exploded:", df_exploded.show())

#     df_spf = load_spark_df(d, m, y, True, ['com', 'org', 'net', 'alexa'])
    # df_spf.filter(df_spf.query_name == "spaceistanbul.com").show(truncate=False)
    print("df_spf:", df_spf.count())
#     print("df_spf:", df_spf.show())

    df_combined = df_exploded.join(df_spf, df_spf.query_name == df_exploded.domain_to_count)
    df_combined = df_combined.select("src", "domain_to_count", "txt_text")
    # df_combined.filter(df_combined.src == "0001attorneys.com").show()
    print("df_combined:", df_combined.count())
#     print("df_combined:", df_combined.show())

    tokenizer = Tokenizer(inputCol="txt_text", outputCol="words")
    df_tokenized = tokenizer.transform(df_combined)
    # df_tokenized.show()
    print("df_tokenized:", df_tokenized.count())

    qualifiers = ["+", "-", "~", "?"]
    mechanisms = ["all", "include", "a", "mx", "ptr", "ip4", "ip6", "exists", "redirect", "exp"]
    mechanisms_dns = ["include", "a", "mx", "ptr", "exists"]
    combined = mechanisms[:]
    combined_dns = mechanisms_dns[:]
    for qualifier in qualifiers:
        for mechanism in mechanisms:
            combined.append(qualifier + mechanism)
        for mechanism in mechanisms_dns:
            combined_dns.append(qualifier + mechanism)

    def count_dns_requests(spf):
        dns_count = 0
        for i in range(1, len(spf)):
            result = check_mechanism_dns(spf[i], dns_count)
            if result:
                dns_count += 1
        return dns_count

    def check_mechanism_dns(spf, count):
        if spf.startswith(tuple(combined_dns)):
            if spf.endswith("all"): # edge case
                return False
            return True
        return False

    udf_count_dns_requests = udf(count_dns_requests, IntegerType())
    df_counted = df_tokenized.withColumn("dns_count", udf_count_dns_requests(df_tokenized.words))
    # df_counted.show()
    print("df_counted:", df_counted.count())

    df_joined = df_counted.groupby("src").agg(psf.sum("dns_count")).select("src", col("sum(dns_count)").alias("dns_count"))
#     print("df_joined:", df_joined.count())
    print("df_joined:", df_joined.count())
    
    df_count_ori = sqlc.read.parquet("/user/mathay/results_new/source={}/data=validation-full-data/year={:4d}/month={:02d}/day={:02d}".format(source, y, m, d))
    print("df_count_ori:", df_count_ori.count())
    df_all_joined = df_count_ori.join(df_joined, df_joined.src == df_count_ori.query_name, how='left')
    df_all_joined = df_all_joined.select("query_name", "valid_spf_string", "valid_spf_dns_limit", "dns_count")
    print("df_all_joined:", df_all_joined.count())
#     print(df_all_joined.show())
    
    df_corrected = df_all_joined.withColumn('dns_valid', (psf.when(col('dns_count').isNull(), True)).otherwise(False))
    print("df_corrected:", df_corrected.count())

    df_corrected = df_corrected.fillna(-1, subset=['dns_count'])
#     print(df_corrected.count())
    
    df_new = df_corrected.withColumn('dns_valid',
                                 psf.when((col('dns_count') >= 0) & (col('dns_count') <= 10), 
                                           (col('valid_spf_string')) & (col('valid_spf_dns_limit')))
                                 .otherwise(psf.when(col('dns_count') == -1, (col('valid_spf_string')) & (col('valid_spf_dns_limit'))).otherwise(False)))
    print("df_new:", df_new.count())
    
#     invalid_domains = df_joined.filter(col("dns_count") > 10).count()
    valid_domains = df_new.filter(col('dns_valid') == True).count()
    total_domains = df_new.count()
    invalid_domains = total_domains - valid_domains
    
    df_result = sqlc.createDataFrame(
                [
                    (total_domains, valid_domains, invalid_domains), # create your data here, be consistent in the types.
                ],
                ['total_domains', 'valid_domains', 'invalid_domains'] # add your columns label here
            )
    
    save_df(df_result, d, m, y, source, 'verify-all')
    
    df_result.show()

# Loading Data & Creating Columns

In [42]:
data = 'invalid-dns-limit'
sqlc.read.parquet("/user/mathay/results_new/source={}/data={}/year={:4d}/month={:02d}/day={:02d}".format('com', data, 2020, 5, 1)).show()

+-----+------+-------+-----+----+---------------------+--------+
|    a|exists|include|   mx| ptr|invalid_records_count| domains|
+-----+------+-------+-----+----+---------------------+--------+
|64253|    25|  36681|19005|3968|                 9244|36591846|
+-----+------+-------+-----+----+---------------------+--------+



In [None]:
# To generate the data run something like this this:
datasets = ['com', 'org', 'net']
for dataset in datasets:
    for y in range(2015, 2021):
        for m in range(1, 13):
            if y == 2015 and (m == 1 or m == 2):
                continue
            if y == 2015 and m <= 3 and dataset == 'com':
                continue
            if y == 2020 and m >= 6:
                continue
                
            if y == 2015 or (y == 2016 and m <= 1):
                complete_dataset = ["com", "org", "net"]
            else:
                complete_dataset = ["com", "org", "net", "alexa"]
            
                
            print(dataset)
            print(complete_dataset)
                
            d = 1
            print(d, m, y)
            df_spark = load_spark_df(d, m, y, False, [dataset]) # was zonder haakjes
            df_spf = load_spark_df(d, m, y, False, complete_dataset)
            adoption_rate(d, m, y, dataset, opencc=False)
            count_everything(df_spark, d, m, y, dataset)
            count_included_domains(df_spark, d, m, y, dataset)
            validation_spf(df_spark, d, m, y, dataset)
            include_analysis(df_spark, d, m, y, 5, dataset)
            include_analysis(df_spark, d, m, y, 5, 'com-org-net-alexa')
            validation_spf(df_spf, d, m, y, dataset)
            verify_all(d, m, y, dataset, df_spf)

In [31]:
for y in range(2015, 2021):
    df = load_spark_df(1, 5, y, False, ['com', 'org', 'net'])
    validation_spf(df, 1, 5, y, 'com-org-net')

+----------------+------------------+-------------+---------------+-------------+---------------------+------------+
|valid_spf_string|invalid_spf_string|valid_spf_dns|invalid_spf_dns|total_domains|total_invalid_domains|both_invalid|
+----------------+------------------+-------------+---------------+-------------+---------------------+------------+
|        26099097|            298139|     26387686|           9550|     26397236|               306941|         748|
+----------------+------------------+-------------+---------------+-------------+---------------------+------------+

+---------------+--------------------+--------------------+----------------+-------------------+
|     query_name|            txt_text|               words|valid_spf_string|valid_spf_dns_limit|
+---------------+--------------------+--------------------+----------------+-------------------+
|       01pc.net|v=spf1 include:sp...|[v=spf1, include:...|            true|               true|
|        04k.net|v=spf1 re

In [29]:
for y in range(2015, 2021):
    df_spark = load_spark_df(1, 5, y, False, ['com', 'org', 'net'])
    if y >= 2018:
        include_analysis(df_spark, 1, 5, y, 5, 'com-org-net')
    #     dns_lookup(1, 5, y, 'com-org-neverify_all df_spark)
    validation_spf(df_spark, 1, 5, y, 'com-org-net')
    verify_all(1, 5, y, 'com-org-net', df_spark)

+----------------+------------------+-------------+---------------+-------------+---------------------+------------+
|valid_spf_string|invalid_spf_string|valid_spf_dns|invalid_spf_dns|total_domains|total_invalid_domains|both_invalid|
+----------------+------------------+-------------+---------------+-------------+---------------------+------------+
|        26099097|            298139|     26387687|           9549|     26397236|               306940|         748|
+----------------+------------------+-------------+---------------+-------------+---------------------+------------+

+---------------+--------------------+--------------------+----------------+-------------------+
|     query_name|            txt_text|               words|valid_spf_string|valid_spf_dns_limit|
+---------------+--------------------+--------------------+----------------+-------------------+
|       01pc.net|v=spf1 include:sp...|[v=spf1, include:...|            true|               true|
|        04k.net|v=spf1 re



+-----+------+-------+-----+----+---------------------+--------+
|    a|exists|include|   mx| ptr|invalid_records_count| domains|
+-----+------+-------+-----+----+---------------------+--------+
|56634|     7|  25367|29028|7632|                 9549|26397236|
+-----+------+-------+-----+----+---------------------+--------+

df_depth: 11892583
df_exploded: 33371549
df_spf: 26397236
df_combined: 18341376
df_tokenized: 18341376
df_counted: 18341376
df_joined: 11818177
df_count_ori: 26397236
df_all_joined: 26397236
df_corrected: 26397236
df_new: 26397236
+-------------+-------------+---------------+
|total_domains|valid_domains|invalid_domains|
+-------------+-------------+---------------+
|     26397236|     25820579|         576657|
+-------------+-------------+---------------+

+----------------+------------------+-------------+---------------+-------------+---------------------+------------+
|valid_spf_string|invalid_spf_string|valid_spf_dns|invalid_spf_dns|total_domains|total_invalid_

In [29]:
df_spark = load_spark_df(1, 5, 2020, False, ['com'])
validation_spf(df_spark, 1, 5, 2020, 'com')

+----------------+------------------+-------------+---------------+-------------+---------------------+------------+
|valid_spf_string|invalid_spf_string|valid_spf_dns|invalid_spf_dns|total_domains|total_invalid_domains|both_invalid|
+----------------+------------------+-------------+---------------+-------------+---------------------+------------+
|        36357044|            234802|     36582602|           9244|     36591846|               242577|        1469|
+----------------+------------------+-------------+---------------+-------------+---------------------+------------+

+--------------------+--------------------+--------------------+----------------+-------------------+
|          query_name|            txt_text|               words|valid_spf_string|valid_spf_dns_limit|
+--------------------+--------------------+--------------------+----------------+-------------------+
|          timstx.com|v=spf1 include:_s...|[v=spf1, include:...|            true|               true|
|     



+-----+------+-------+-----+----+---------------------+--------+
|    a|exists|include|   mx| ptr|invalid_records_count| domains|
+-----+------+-------+-----+----+---------------------+--------+
|64253|    25|  36681|19005|3968|                 9244|36591846|
+-----+------+-------+-----+----+---------------------+--------+



In [28]:
d, m, y = 1, 3, 2020
for y in range(2015, 2021):
    df_spark = load_spark_df(d, m, y, False, ['com', 'org', 'net'])
    count_included_domains(df_spark, d, m, y, 'com-org-net')

+--------------------+-----+-------------+
|                 key|count|total_domains|
+--------------------+-----+-------------+
|    spf.shopserve.jp| 5944|     26832100|
|    wiregrassweb.biz|   83|     26832100|
|sg-autorepondeur.com|   60|     26832100|
|     wmx.wadax.ne.jp| 2508|     26832100|
|     esenturkiye.com|    1|     26832100|
|liberty.nswebhost...|  245|     26832100|
|jabba.chaiyohosti...|   99|     26832100|
|sayginsacprofil.c...|    1|     26832100|
|mojiko-hotel.com....|    1|     26832100|
|        uk1.m0uy.biz|   25|     26832100|
|       _spf.daum.net| 2209|     26832100|
|blackberry.client...|  159|     26832100|
|barracuda.supercp...|  132|     26832100|
|        wpengine.com|   16|     26832100|
|    mesanetworks.net|    1|     26832100|
|server1.voltagecr...|   11|     26832100|
|easternlanguagein...|    1|     26832100|
|             vif.net|  104|     26832100|
|   spf01.popurih.com|    1|     26832100|
|smptout.secureser...|    3|     26832100|
+----------

In [34]:
sqlc.read.parquet("/user/mathay/results_new/source={}/data={}/year={:4d}/month={:02d}/day={:02d}".format('com-org-net', 'included-domains-count', y, m, d)).sort(desc("count")).show(100, truncate=False)

+------------------------------+-------+-------------+
|key                           |count  |total_domains|
+------------------------------+-------+-------------+
|spf.protection.outlook.com    |4674246|42144226     |
|_spf.google.com               |3023218|42144226     |
|websitewelcome.com            |2428704|42144226     |
|spf.efwd.registrar-servers.com|2374471|42144226     |
|mx.ovh.com                    |1006800|42144226     |
|secureserver.net              |852760 |42144226     |
|bluehost.com                  |804787 |42144226     |
|relay.mailchannels.net        |757034 |42144226     |
|_spf.mailspamprotection.com   |658300 |42144226     |
|spf.mxhichina.com             |406201 |42144226     |
|zoho.com                      |360476 |42144226     |
|_mailcust.gandi.net           |323549 |42144226     |
|emailsrvr.com                 |319370 |42144226     |
|servers.mcsv.net              |319209 |42144226     |
|sendgrid.net                  |310576 |42144226     |
|spf.mandr

In [36]:
sqlc.read.parquet("/user/mathay/results_new/source={}/data={}/year={:4d}/month={:02d}/day={:02d}/depth=0".format('com-org-net-alexa', 'include-depth', 2020, 5, 1))\
.filter(col("combined") == []).show(truncate=False)

Py4JJavaError: An error occurred while calling o330.equalTo.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList []
	at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
	at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
	at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
	at scala.util.Try.getOrElse(Try.scala:79)
	at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:163)
	at org.apache.spark.sql.functions$.typedLit(functions.scala:127)
	at org.apache.spark.sql.functions$.lit(functions.scala:110)
	at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:263)
	at org.apache.spark.sql.Column.equalTo(Column.scala:286)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [43]:
sqlc.read.parquet("/user/mathay/results_new/source={}/data={}/year={:4d}/month={:02d}/day={:02d}".format('com', 'validation-full-data', 2020, 5, 1)).show(truncate=False)

+--------------------------+----------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------+----------------+-------------------+
|query_name                |txt_text                                                                                                                          |words                                                                                                                                     |valid_spf_string|valid_spf_dns_limit|
+--------------------------+----------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------+----------------+-------------

In [None]:
datasets = ['com', 'org', 'net']
for dataset in datasets:
    for y in range(2015, 2021):
        for m in range(1, 13):
            if y == 2015 and (m == 1 or m == 2):
                continue
            if y == 2015 and m <= 3 and dataset == 'com':
                continue
            if y == 2020 and m >= 6:
                continue
                
            if y == 2015 or (y == 2016 and m <= 1):
                complete_dataset = ["com", "org", "net"]
            else:
                complete_dataset = ["com", "org", "net", "alexa"]
            
                
            print(dataset)
            print(complete_dataset)
                
            d = 1
            print(d, m, y)
#             df_spark = load_spark_df(d, m, y, False, [dataset]) # was zonder haakjes
            df_spf = load_spark_df(d, m, y, False, complete_dataset)
#             adoption_rate(d, m, y, dataset, opencc=False)
#             count_everything(df_spark, d, m, y, dataset)
#             count_included_domains(df_spark, d, m, y, dataset)
#             validation_spf(df_spark, d, m, y, dataset)
#             include_analysis(df_spark, d, m, y, 5, dataset)
#             include_analysis(df_spark, d, m, y, 5, 'com-org-net-alexa')
#             validation_spf(df_spf, d, m, y, dataset)
            verify_all(d, m, y, dataset, df_spf)

com
['com', 'org', 'net']
1 4 2015
df_depth: 9871842
df_exploded: 27036745
df_spf: 25297015
df_combined: 15291412
df_tokenized: 15291412
df_counted: 15291412
df_joined: 9809951
df_count_ori: 21192267
df_all_joined: 21192267
df_corrected: 21192267
df_new: 21192267
+-------------+-------------+---------------+
|total_domains|valid_domains|invalid_domains|
+-------------+-------------+---------------+
|     21192267|     20457291|         734976|
+-------------+-------------+---------------+

com
['com', 'org', 'net']
1 5 2015
df_depth: 10105466
df_exploded: 27518425
df_spf: 26397236
df_combined: 15599393
df_tokenized: 15599393
df_counted: 15599393
df_joined: 10041028
df_count_ori: 22170274
df_all_joined: 22170274
df_corrected: 22170274
df_new: 22170274
+-------------+-------------+---------------+
|total_domains|valid_domains|invalid_domains|
+-------------+-------------+---------------+
|     22170274|     21440157|         730117|
+-------------+-------------+---------------+

com
['co

In [27]:
for y in range(2016, 2021):
    for m in range(1, 13):
        if y == 2016 and m <= 1:
            continue
        print(m, y)
        d = 1
#         df_spark = load_spark_df(d, m, y, False, ['com', 'org', 'net', 'alexa'])
#         include_analysis(df_spark, d, m, y, 5, 'alexa')
        df_spark = load_spark_df(d, m, y, False, ['com-org-net-alexa'])
        dns_lookup(d, m, y, 'com-org-net-alexa', df_spark)

2 2016


Py4JJavaError: An error occurred while calling o185.count.
: org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:150)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:387)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:117)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:211)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:101)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:189)
	at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:189)
	at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:85)
	at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:206)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:189)
	at org.apache.spark.sql.execution.FileSourceScanExec.consume(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.execution.ColumnarBatchScan$class.produceBatches(ColumnarBatchScan.scala:144)
	at org.apache.spark.sql.execution.ColumnarBatchScan$class.doProduce(ColumnarBatchScan.scala:83)
	at org.apache.spark.sql.execution.FileSourceScanExec.doProduce(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.FileSourceScanExec.produce(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:125)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:96)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:40)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:544)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:598)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.python.EvalPythonExec.doExecute(EvalPythonExec.scala:87)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2835)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146)
	... 149 more


In [102]:
datasets = ['com', 'net', 'org', 'alexa']
days_months = {1: 31, 2: 28, 3:31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31}
d, m = 1, 5
for y in range(2015, 2020):
    for dataset in datasets:
    df_spark = load_spark_df(d, m, y, False, dataset)
    include_analysis(df_spark, d, m, y, 5, dataset)
    dns_lookup(d, m, y, 'com-org-net-alexa')

+------------------------------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+-----+
|src                                                         |ori_includes                                                         |combined                                                             |added|
+------------------------------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+-----+
|0-250.com                                                   |[spf.websitewelcome.com, websitewelcome.com, spf2.websitewelcome.com]|[spf.websitewelcome.com, websitewelcome.com, spf2.websitewelcome.com]|false|
|0-d4y.com                                                   |[spf.websitewelcome.com, websitewelcome.com, spf2.websitewelcome.com]|[spf.websitewelcome.com, website

In [105]:
datasets = ['com', 'org', 'net', 'alexa'] # 'org', 'alexa', 'com'
datasets = [['com', 'net', 'org', 'alexa']]
days_months = {1: 31, 2: 28, 3:31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31}
for dataset in datasets:
    for y in range(2015, 2021):
        for m in range(1, 13):
            if dataset == 'alexa':
                if (y == 2016 and m == 1) or y == 2015:
                    continue
            elif dataset == 'com' or dataset == 'org' or dataset == 'net':
                if y == 2015 and (m == 1 or m == 2):
                    continue
            elif dataset == 'se':
                if (y == 2016 and m <= 6) or y == 2015:
                    continue
            else: # this can be deleted maybe
                if y == 2015 and (m == 1 or m == 2):
                    continue
            if y == 2020 and m >= 6:
                continue
            
            # delete these 2 conditions
            if y == 2015 or (y == 2016 and m == 1):
                dataset = ['com', 'org']
            elif y == 2016 and m >= 3 and m <= 6:
                dataset = ['com', 'org', 'alexa']
            else: # alexa starting from 1-3-2016, opencc (net) from 1-7-2016
                dataset = ['com', 'net', 'org', 'alexa']
            print(dataset)
                
            d = 1
            print(d, m, y)
            df_spark = load_spark_df(d, m, y, False, dataset) # was zonder haakjes
#             adoption_rate(d, m, y, dataset, opencc=False)
#             count_everything(df_spark, d, m, y, dataset)
#             count_included_domains(df_spark, d, m, y, dataset)
#             validation_spf(df_spark, d, m, y, dataset)
#             include_analysis(df_spark, d, m, y, 5, dataset)
            include_analysis(df_spark, d, m, y, 5, 'com-org-net-alexa')
            dns_lookup(d, m, y, 'com-org-net-alexa')

['com', 'org']
1 3 2015
+------------------------------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+-----+
|src                                                         |ori_includes                                                         |combined                                                             |added|
+------------------------------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+-----+
|0-250.com                                                   |[spf.websitewelcome.com, websitewelcome.com, spf2.websitewelcome.com]|[spf.websitewelcome.com, websitewelcome.com, spf2.websitewelcome.com]|false|
|0-d4y.com                                                   |[spf.websitewelcome.com, websitewelcome.com, spf2.websitewelcome.com]|[spf.web

Py4JJavaError: An error occurred while calling o68899.count.
: org.apache.spark.SparkException: Job 3434 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:932)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:930)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:930)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2128)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2041)
	at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
	at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:121)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2835)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
	at sun.reflect.GeneratedMethodAccessor200.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [84]:
datasets = ['com', 'org', 'net', 'alexa']
for dataset in datasets:
    sqlc.read.parquet("/user/mathay/results_new/source={}/data={}/year={:4d}/month={:02d}/day={:02d}".format(dataset, 'adoption-rate', 2020, 5, 1)).show()

+-------------+-----------+-----------------+
|total_domains|spf_domains|    adoption_rate|
+-------------+-----------+-----------------+
|    145418816|   36591846|0.251630751827879|
+-------------+-----------+-----------------+

+-------------+-----------+-------------------+
|total_domains|spf_domains|      adoption_rate|
+-------------+-----------+-------------------+
|     10028070|    2571803|0.25646041561337324|
+-------------+-----------+-------------------+

+-------------+-----------+-------------------+
|total_domains|spf_domains|      adoption_rate|
+-------------+-----------+-------------------+
|     13145894|    2980577|0.22673064304337157|
+-------------+-----------+-------------------+

+-------------+-----------+------------------+
|total_domains|spf_domains|     adoption_rate|
+-------------+-----------+------------------+
|      1019464|     750469|0.7361407563190069|
+-------------+-----------+------------------+



In [98]:
# load_spark_df(1, 5, 2020, False, ['opencc']).filter(col('query_name').endswith('ee')).show(truncate=False)
for d in range(1, 20):
    adoption_rate(d, 5, 2017, 'com', False) # org is van 1 op 2

+-------------+-----------+-------------------+
|total_domains|spf_domains|      adoption_rate|
+-------------+-----------+-------------------+
|    126286662|   24427740|0.19343087870989892|
+-------------+-----------+-------------------+

+-------------+-----------+-------------------+
|total_domains|spf_domains|      adoption_rate|
+-------------+-----------+-------------------+
|    126373515|   24440465|0.19339863261696882|
+-------------+-----------+-------------------+

+-------------+-----------+-------------------+
|total_domains|spf_domains|      adoption_rate|
+-------------+-----------+-------------------+
|    126406126|   24439681|0.19334253626283904|
+-------------+-----------+-------------------+

+-------------+-----------+-------------------+
|total_domains|spf_domains|      adoption_rate|
+-------------+-----------+-------------------+
|    126430507|   28243052|0.22338795177021634|
+-------------+-----------+-------------------+

+-------------+-----------+---------

In [83]:
def check_difference(d, m, y, source):
    dataset = [source]
    df_spark = load_spark_df_no_spf(d, m, y, False, dataset) # need to change to com and org
    total_domains = df_spark.select("qname_sld").distinct().count()
    spf_domains = df_spark.where(df_spark.txt_text.contains("v=spf1")).count()
    adoption_rate = int(spf_domains) / int(total_domains)
    
    df_result = sqlc.createDataFrame(
                [
                    (total_domains, spf_domains, adoption_rate), # create your data here, be consistent in the types.
                ],
                ['total_domains', 'spf_domains', 'adoption_rate'] # add your columns label here
            )
    
    df_result.show()
    
    save_df(df_result, d, m, y, source, 'adoption-rate')

In [None]:
def load_spark_df_no_spf(day, month, year, one_day, datasets):
    # Create Spark DF
    SOURCES = datasets
    df = sqlc.read.option("basePath", "/").parquet(*[
        # n.b.: set paths list as desired: /user/openintel/measurement_data/type=warehouse/source=<tld>/year=<yyyy>/month=<mm>/day=<dd>
        os.path.join(OI_MDATA_BASE, "source={}".format(i_source), "year={:4d}".format(year), "month={:02d}".format(month), "day={:02d}".format(day)) for i_source in SOURCES
    ]).withColumn(
        # Add an ISO8601 date column
        # n.b.: read() will produce integers for the year, month and day partitions in the HDFS file hierarchy
        "date", psf.concat_ws('-', "year", psf.lpad("month", 2, '0'), psf.lpad("day", 2, '0')))
    
#     df = df.filter(
#         psf.col("txt_text").isNotNull()
#     ).select(
#         ["query_name", "txt_text"]
#     ).where(
#         df.txt_text.contains("v=spf1")
#     ).withColumn(
#         'query_name', col('query_name').substr(lit(0), length(col('query_name')) - 1)
#     ).withColumn(
#         'txt_text', col('txt_text').substr(lit(2), length(col('txt_text')) - 2)
#     )
    df = df.select(
        ["query_name", "txt_text"]
#     ).withColumn(
#         'query_name', col('query_name').substr(lit(0), length(col('query_name')) - 1)
    ).withColumn(
        'txt_text', col('txt_text').substr(lit(2), length(col('txt_text')) - 2)
    ).withColumn("qname_sld", regexp_extract("query_name", "([^.]+[.][^.]+[.])$", 1))
    

    df_original = df
    return df_original

In [28]:
print("5, 3")
adoption_rate(5, 3, 2017, 'com')
adoption_rate(5, 3, 2017, 'org')
adoption_rate(5, 3, 2017, 'net')
print("6, 3")
adoption_rate(6, 3, 2017, 'com')
adoption_rate(6, 3, 2017, 'org')
adoption_rate(6, 3, 2017, 'net')
print("3, 5")
adoption_rate(3, 5, 2017, 'com')
adoption_rate(3, 5, 2017, 'org')
adoption_rate(3, 5, 2017, 'net')
print("4, 5")
adoption_rate(4, 5, 2017, 'com')
adoption_rate(4, 5, 2017, 'org')
adoption_rate(4, 5, 2017, 'net')

5, 3
+-------------+-----------+------------------+
|total_domains|spf_domains|     adoption_rate|
+-------------+-----------+------------------+
|    126194027|   27531841|0.2181707142129635|
+-------------+-----------+------------------+

+-------------+-----------+------------------+
|total_domains|spf_domains|     adoption_rate|
+-------------+-----------+------------------+
|     10335916|    2104995|0.2036582921146031|
+-------------+-----------+------------------+

+-------------+-----------+-------------------+
|total_domains|spf_domains|      adoption_rate|
+-------------+-----------+-------------------+
|     14908875|    2807070|0.18828181200794827|
+-------------+-----------+-------------------+

6, 3
+-------------+-----------+-------------------+
|total_domains|spf_domains|      adoption_rate|
+-------------+-----------+-------------------+
|    126155586|   24186959|0.19172325036800195|
+-------------+-----------+-------------------+

+-------------+-----------+---------

In [None]:
adoption_rate(15, 5, 2017, 'com')

In [45]:
df_five = load_spark_df_no_spf(5, 3, 2017, False, ['com'])
df_six = load_spark_df_no_spf(6, 3, 2017, False, ['com'])
print(df_five.select("qname_sld").distinct().count())
print(df_six.select("qname_sld").distinct().count())

126194027
126155586


In [47]:
df_five = df_five.where(df_five.txt_text.contains("v=spf1"))
df_six = df_six.where(df_six.txt_text.contains("v=spf1"))

In [52]:
df_five.filter(df_five.qname_sld == "006682.com.").show()
df_six.filter(df_six.qname_sld == "006682.com.").show()

+-----------+--------------------+-----------+
| query_name|            txt_text|  qname_sld|
+-----------+--------------------+-----------+
|006682.com.|v=spf1 mx include...|006682.com.|
+-----------+--------------------+-----------+

+----------+--------+---------+
|query_name|txt_text|qname_sld|
+----------+--------+---------+
+----------+--------+---------+



In [50]:
df_dif = df_five.subtract(df_six)
print(df_dif.count())
print(df_dif.show(truncate=False))

3662163
+-------------------------+-----------------------------------------------------------------------+-------------------------+
|query_name               |txt_text                                                               |qname_sld                |
+-------------------------+-----------------------------------------------------------------------+-------------------------+
|006682.com.              |v=spf1 mx include:_spf.io.bouncemx.com ?all                            |006682.com.              |
|011234.com.              |v=spf1 include:smtp.mailgee.com ~all                                   |011234.com.              |
|020zhe.com.              |v=spf1 include:spf.mxhichina.com -all                                  |020zhe.com.              |
|03212016.com.            |v=spf1 include:spf.163.com ~all                                        |03212016.com.            |
|0517ha.com.              |v=spf1 mx include:_spf.io.bouncemx.com ?all                            |0517ha.com.

In [55]:
df_ns = load_spark_df_ns(5, 3, 2017, False, ['com'])
df_ns = df_ns.where(df_ns.query_type == "NS").select(col('query_name').alias('query_name_ns'), col('qname_sld').alias('qname_sld_ns'), 'ns_address')
df_ns.show()

+--------------------+--------------------+--------------------+
|       query_name_ns|        qname_sld_ns|          ns_address|
+--------------------+--------------------+--------------------+
|   jacqsjewelry.com.|   jacqsjewelry.com.| ns1.p05.dynect.net.|
|   jacqsjewelry.com.|   jacqsjewelry.com.| ns2.p05.dynect.net.|
|   jacqsjewelry.com.|   jacqsjewelry.com.| ns3.p05.dynect.net.|
|   jacqsjewelry.com.|   jacqsjewelry.com.| ns4.p05.dynect.net.|
|   jacqsjewelry.com.|   jacqsjewelry.com.|                null|
|  janelsjournal.com.|  janelsjournal.com.|lucy.ns.cloudflar...|
|  janelsjournal.com.|  janelsjournal.com.|apollo.ns.cloudfl...|
|  janelsjournal.com.|  janelsjournal.com.|                null|
|     jamesgruft.com.|     jamesgruft.com.|pdns1.registrar-s...|
|     jamesgruft.com.|     jamesgruft.com.|pdns2.registrar-s...|
|     jamesgruft.com.|     jamesgruft.com.|                null|
|  jaxfitacademy.com.|  jaxfitacademy.com.| dns1.p05.nsone.net.|
|  jaxfitacademy.com.|  j

In [57]:
df_joined_ns = df_dif.join(df_ns, df_ns.qname_sld_ns == df_dif.qname_sld)
df_joined_ns.show()

+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+
|         query_name|            txt_text|          qname_sld|      query_name_ns|       qname_sld_ns|          ns_address|
+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+
|       04grh6l.com.|v=spf1 ip4:210.15...|       04grh6l.com.|       04grh6l.com.|       04grh6l.com.|         01.dnsv.jp.|
|       04grh6l.com.|v=spf1 ip4:210.15...|       04grh6l.com.|       04grh6l.com.|       04grh6l.com.|         02.dnsv.jp.|
|       04grh6l.com.|v=spf1 ip4:210.15...|       04grh6l.com.|       04grh6l.com.|       04grh6l.com.|         03.dnsv.jp.|
|       04grh6l.com.|v=spf1 ip4:210.15...|       04grh6l.com.|       04grh6l.com.|       04grh6l.com.|         04.dnsv.jp.|
|       04grh6l.com.|v=spf1 ip4:210.15...|       04grh6l.com.|       04grh6l.com.|       04grh6l.com.|                null|
|       

In [None]:
df_keep_domain = df_joined_ns.

In [58]:
df_grouped_ns = df_joined_ns.groupBy('ns_address').count()
df_grouped_ns.orderBy(desc("count")).show(truncate=False)

+-----------------------+-------+
|ns_address             |count  |
+-----------------------+-------+
|null                   |3602816|
|ns58.domaincontrol.com.|182984 |
|ns57.domaincontrol.com.|182981 |
|ns54.domaincontrol.com.|107366 |
|ns53.domaincontrol.com.|107348 |
|ns10.domaincontrol.com.|105043 |
|ns09.domaincontrol.com.|104992 |
|ns52.domaincontrol.com.|91229  |
|ns51.domaincontrol.com.|91213  |
|ns44.domaincontrol.com.|88441  |
|ns43.domaincontrol.com.|88423  |
|ns42.domaincontrol.com.|87997  |
|ns41.domaincontrol.com.|87980  |
|ns70.domaincontrol.com.|87560  |
|ns69.domaincontrol.com.|87536  |
|ns02.domaincontrol.com.|87394  |
|ns01.domaincontrol.com.|87376  |
|ns72.domaincontrol.com.|86840  |
|ns71.domaincontrol.com.|86819  |
|ns68.domaincontrol.com.|85893  |
+-----------------------+-------+
only showing top 20 rows



In [143]:
df_five = df_five.where(df_five.txt_text.contains("v=spf1"))
df_five = df_five.select(col("query_name").alias("query_name_old"), col("txt_text").alias("txt_text_old"), col("qname_sld").alias("qname_sld_old"))
df_five.select("qname_sld_old").distinct().count()

2075413

In [144]:
df_five.show()
df_six.show()

+--------------------+--------------------+--------------------+
|      query_name_old|        txt_text_old|       qname_sld_old|
+--------------------+--------------------+--------------------+
|          mtmac.org.|v=spf1 include:sp...|          mtmac.org.|
|          hcluk.org.|\"v=spf1 include:...|          hcluk.org.|
|   total-secure.org.|v=spf1 include:_m...|   total-secure.org.|
|       siratali.org.|v=spf1 a mx ptr i...|       siratali.org.|
|industrialresearc...|v=spf1 +a +mx +ip...|industrialresearc...|
|invitacionesdebod...|v=spf1 a mx ip4:3...|invitacionesdebod...|
|        yogeeks.org.|v=spf1 redirect=_...|        yogeeks.org.|
|campamentosroboti...|v=spf1 include:_s...|campamentosroboti...|
|        100murs.org.|v=spf1 include:mx...|        100murs.org.|
|harmonybydesign.org.|v=spf1 include:_s...|harmonybydesign.org.|
|atakoyevdenevenak...|v=spf1 a mx inclu...|atakoyevdenevenak...|
|      mush-tech.org.|v=spf1 ip4:66.96....|      mush-tech.org.|
|jdswellbeingcentr...|v=s

In [147]:
cond = [df_five.qname_sld_old == df_six.qname_sld] #, df_five.txt_text_old != df_six.txt_text, df_six.txt_text == ''
df_joined = df_five.join(df_six, cond)
df_joined = df_joined.select('query_name', 'qname_sld', 'txt_text', 'txt_text_old')
df_joined.show(truncate=False)

+-------------------+--------------+---------------------------------------------------+---------------------------------------------------------------------------------------------------------+
|query_name         |qname_sld     |txt_text                                           |txt_text_old                                                                                             |
+-------------------+--------------+---------------------------------------------------+---------------------------------------------------------------------------------------------------------+
|05hb.org.          |05hb.org.     |null                                               |v=spf1 a mx ptr include:spf.raiolanetworks.com ~all                                                      |
|05hb.org.          |05hb.org.     |null                                               |v=spf1 a mx ptr include:spf.raiolanetworks.com ~all                                                      |
|www.05hb.org.      |05hb

In [148]:
df_joined.count()

27745990

In [105]:
grouped = df_joined.groupBy('txt_text_old').count()
grouped.orderBy(desc("count")).show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------+-----+
|txt_text_old                                                                                                                      |count|
+----------------------------------------------------------------------------------------------------------------------------------+-----+
|v=spf1 +a +mx +ip4:200.63.98.144 ?all                                                                                             |26   |
|v=spf1 +a +mx +ip4:50.87.144.85 ~all                                                                                              |18   |
|v=spf1 +a +mx +ip4:67.20.55.36 ~all                                                                                               |9    |
|v=spf1 +a +mx +ip4:64.119.182.254 +ip4:64.119.182.122 ~all                                                                        |8    |
|v=spf1 +ip4:46.101.120.27 

In [106]:
def load_spark_df_ns(day, month, year, one_day, datasets):
    # Create Spark DF
    SOURCES = datasets
    df = sqlc.read.option("basePath", "/").parquet(*[
        # n.b.: set paths list as desired: /user/openintel/measurement_data/type=warehouse/source=<tld>/year=<yyyy>/month=<mm>/day=<dd>
        os.path.join(OI_MDATA_BASE, "source={}".format(i_source), "year={:4d}".format(year), "month={:02d}".format(month), "day={:02d}".format(day)) for i_source in SOURCES
    ]).withColumn(
        # Add an ISO8601 date column
        # n.b.: read() will produce integers for the year, month and day partitions in the HDFS file hierarchy
        "date", psf.concat_ws('-', "year", psf.lpad("month", 2, '0'), psf.lpad("day", 2, '0')))
    
#     df = df.filter(
#         psf.col("txt_text").isNotNull()
#     ).select(
#         ["query_name", "txt_text"]
#     ).where(
#         df.txt_text.contains("v=spf1")
#     ).withColumn(
#         'query_name', col('query_name').substr(lit(0), length(col('query_name')) - 1)
#     ).withColumn(
#         'txt_text', col('txt_text').substr(lit(2), length(col('txt_text')) - 2)
#     )
#     df = df.select(
#         ["query_name", "txt_text"]
#     ).withColumn(
#         'query_name', col('query_name').substr(lit(0), length(col('query_name')) - 1)
#     ).withColumn(
#         'txt_text', col('txt_text').substr(lit(2), length(col('txt_text')) - 2)
#     ).withColumn("qname_sld", regexp_extract("query_name", "([^.]+[.][^.]+[.])$", 1))
    
    df = df.withColumn("qname_sld", regexp_extract("query_name", "([^.]+[.][^.]+[.])$", 1))
    df_original = df
    return df_original

In [127]:
df_ns = load_spark_df_ns(5, 3, 2017, False, ['org'])
df_ns = df_ns.where(df_ns.query_type == "NS").select(col('query_name').alias('query_name_ns'), col('qname_sld').alias('qname_sld_ns'), 'ns_address')
df_ns.show()

+-------------------+-------------------+--------------------+
|      query_name_ns|       qname_sld_ns|          ns_address|
+-------------------+-------------------+--------------------+
|    newbirthbc.org.|    newbirthbc.org.|  ns1.wordpress.com.|
|    newbirthbc.org.|    newbirthbc.org.|  ns2.wordpress.com.|
|    newbirthbc.org.|    newbirthbc.org.|  ns3.wordpress.com.|
|    newbirthbc.org.|    newbirthbc.org.|                null|
|        zygato.org.|        zygato.org.|   ns3.firstfind.nl.|
|        zygato.org.|        zygato.org.|   ns4.firstfind.nl.|
|        zygato.org.|        zygato.org.|  ns5.firstfind.net.|
|        zygato.org.|        zygato.org.|                null|
|         mtmac.org.|         mtmac.org.|dns1.registrar-se...|
|         mtmac.org.|         mtmac.org.|dns2.registrar-se...|
|         mtmac.org.|         mtmac.org.|dns3.registrar-se...|
|         mtmac.org.|         mtmac.org.|dns4.registrar-se...|
|         mtmac.org.|         mtmac.org.|dns5.registrar

In [128]:
df_joined_ns = df_joined.join(df_ns, df_ns.qname_sld_ns == df_joined.qname_sld)
df_joined_ns.show()

+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|          query_name|           qname_sld|txt_text|        txt_text_old|       query_name_ns|        qname_sld_ns|          ns_address|
+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|  cowetaschools.org.|  cowetaschools.org.|        |v=spf1 a mx ip4:1...|  cowetaschools.org.|  cowetaschools.org.|dns1.cowetaschool...|
|  cowetaschools.org.|  cowetaschools.org.|        |v=spf1 a mx ip4:1...|  cowetaschools.org.|  cowetaschools.org.|dns2.cowetaschool...|
|  cowetaschools.org.|  cowetaschools.org.|        |v=spf1 a mx ip4:1...|  cowetaschools.org.|  cowetaschools.org.|                null|
| stlucieschools.org.| stlucieschools.org.|        |v=spf1 include:sp...| stlucieschools.org.| stlucieschools.org.|ns1.stlucieschool...|
| stlucieschools.org.| stlucieschools.org

In [129]:
print(df_joined.count())
df_joined_ns_removed = df_joined_ns.dropDuplicates(['qname_sld'])
print(df_joined_ns_removed.show())
print(df_joined_ns_removed.count())

820
+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|          query_name|           qname_sld|txt_text|        txt_text_old|       query_name_ns|        qname_sld_ns|          ns_address|
+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|  cowetaschools.org.|  cowetaschools.org.|        |v=spf1 a mx ip4:1...|  cowetaschools.org.|  cowetaschools.org.|dns1.cowetaschool...|
| stlucieschools.org.| stlucieschools.org.|        |v=spf1 include:sp...| stlucieschools.org.| stlucieschools.org.|ns1.stlucieschool...|
|      vivagente.org.|      vivagente.org.|        |v=spf1 +a +mx +ip...|      vivagente.org.|      vivagente.org.|ns1.todayswebhost...|
|   chrisfarrell.org.|   chrisfarrell.org.|        |v=spf1 a mx ip4:6...|   chrisfarrell.org.|   chrisfarrell.org.| ns1.seetheinfo.com.|
|aberaeronallotmen...|aberaeronallotm

In [121]:
df_grouped_ns = df_joined_ns.groupBy('ns_address').count()
df_grouped_ns.orderBy(desc("count")).show(truncate=False)

+------------------------+-----+
|ns_address              |count|
+------------------------+-----+
|null                    |822  |
|dns1.aguamarket.cl.     |26   |
|dns2.aguamarket.cl.     |26   |
|ns1422.hostgator.com.   |18   |
|ns1421.hostgator.com.   |18   |
|ns2.inmotionhosting.com.|17   |
|ns.inmotionhosting.com. |15   |
|ns1.asmallorange.com.   |12   |
|ns2.asmallorange.com.   |12   |
|ns2.myhostcenter.com.   |10   |
|ns1.myhostcenter.com.   |10   |
|ns2.byethost38.org.     |9    |
|ns2.arobuddhism.org.    |9    |
|ns1.arobuddhism.org.    |9    |
|ns1.byethost38.org.     |9    |
|ns2.whspn.net.          |8    |
|ns1.whspn.net.          |8    |
|ns1.webzdns.com.        |8    |
|ns2.webzdns.com.        |8    |
|ns3.webzdns.com.        |8    |
+------------------------+-----+
only showing top 20 rows



In [118]:
load_spark_df_ns(5, 3, 2017, False, ['org']).where(col('qname_sld') == 'wikipedia.org.').show(truncate=False)

+----------+-------------------+-------------+-------------------+-------------+---------+-----------+--------------+------------------+-------+-----+-------+----------+----------+---------------------+-------------+--------------------+----------------------------------------------------------------+------------------+--------------------+----------------------------------------------------------------+----------------------------------------------------------------------+------------------+----------------------------------------------------------------+----------+------------+--------------+---------+------------+---------------+----------------+---------------+---------------+---------------------+-------------------+-------------------+---------------+---------------+---------------+---------------+---------------+--------------+---------------------+----------------------+--------------------+-----------+----------------+----------+---------------------------+--------------------

In [27]:
def load_spark_df_no_spf_new(day, month, year, one_day, datasets):
    # Create Spark DF
    SOURCES = datasets
    df = sqlc.read.option("basePath", "/").parquet(*[
        # n.b.: set paths list as desired: /user/openintel/measurement_data/type=warehouse/source=<tld>/year=<yyyy>/month=<mm>/day=<dd>
        os.path.join(OI_MDATA_BASE, "source={}".format(i_source), "year={:4d}".format(year), "month={:02d}".format(month), "day={:02d}".format(day)) for i_source in SOURCES
    ]).withColumn(
        # Add an ISO8601 date column
        # n.b.: read() will produce integers for the year, month and day partitions in the HDFS file hierarchy
        "date", psf.concat_ws('-', "year", psf.lpad("month", 2, '0'), psf.lpad("day", 2, '0')))
    
#     df = df.filter(
#         psf.col("txt_text").isNotNull()
#     ).select(
#         ["query_name", "txt_text"]
#     ).where(
#         df.txt_text.contains("v=spf1")
#     ).withColumn(
#         'query_name', col('query_name').substr(lit(0), length(col('query_name')) - 1)
#     ).withColumn(
#         'txt_text', col('txt_text').substr(lit(2), length(col('txt_text')) - 2)
#     )
    df = df.select(
        ["query_name", "txt_text"]
#     ).withColumn(
#         'query_name', col('query_name').substr(lit(0), length(col('query_name')) - 1)
    ).withColumn(
        'txt_text', col('txt_text').substr(lit(2), length(col('txt_text')) - 2)
    )
#     .withColumn("qname_sld", regexp_extract("query_name", "([^.]+[.][^.]+[.])$", 1))
    

    df_original = df
    return df_original

In [38]:
df_old = load_spark_df_no_spf(1, 5, 2020, True, ['alexa'])
print(df_old.select("qname_sld").distinct().count())
df_new = load_spark_df_no_spf_new(1, 5, 2020, True, ['alexa'])
print(df_new.select("query_name").distinct().count())

1019464
2468439


In [44]:
df_saved = sqlc.read.parquet("/user/mathay/results_new/source=com/data=adoption-rate/year=2020/month=05/day=01")
df_saved.show()

+-------------+-----------+-----------------+
|total_domains|spf_domains|    adoption_rate|
+-------------+-----------+-----------------+
|    145418816|   36591846|0.251630751827879|
+-------------+-----------+-----------------+



In [43]:
df_old.filter(col('qname_sld') != 'myshopify.com.').show(truncate=False)

+-------------------------+--------------------------------------------------------------------+---------------------+
|query_name               |txt_text                                                            |qname_sld            |
+-------------------------+--------------------------------------------------------------------+---------------------+
|dubaidedoktorluk.com.    |null                                                                |dubaidedoktorluk.com.|
|dubaidedoktorluk.com.    |null                                                                |dubaidedoktorluk.com.|
|www.dubaidedoktorluk.com.|null                                                                |dubaidedoktorluk.com.|
|www.dubaidedoktorluk.com.|null                                                                |dubaidedoktorluk.com.|
|www.dubaidedoktorluk.com.|null                                                                |dubaidedoktorluk.com.|
|dubaidedoktorluk.com.    |null                 

In [29]:
load_spark_df(1, 5, 2020, True, ['tlsa']).show()
load_spark_df_no_spf(1, 5, 2020, True, ['tlsa']).show()

+----------+--------+
|query_name|txt_text|
+----------+--------+
+----------+--------+

+--------------------+--------+--------------------+
|          query_name|txt_text|           qname_sld|
+--------------------+--------+--------------------+
|_465._tcp.dondevi...|    null|dondevivelalogist...|
|_465._tcp.legalne...|    null|  legalnezrodlo.org.|
|_465._tcp.atlassn...|    null|       atlassnap.ru.|
|_465._tcp.mx.bfoj...|    null|        bfojigs.com.|
|_465._tcp.leather...|    null|leatherfederation...|
|_465._tcp.leather...|    null|leatherfederation...|
|_465._tcp.email.t...|    null|           topas.cz.|
|_465._tcp.mx.bost...|    null|bostondogtrainer....|
|_465._tcp.rewriti...|    null|rewritingthenarra...|
|_465._tcp.mx.mgir...|    null|      mgireland.com.|
|_465._tcp.cringen...|    null| cringenturvion.com.|
|_465._tcp.artones...|    null|        artones.com.|
|_465._tcp.artones...|    null|        artones.com.|
|_465._tcp.rentals...|    null|  rentalsnstuff.com.|
|_465._tcp

# DNS lookup

In [97]:
df_result = dns_lookup(1, 5, 2020, 'com-org-net-alexa')

+-------------+-------------+---------------+
|total_domains|valid_domains|invalid_domains|
+-------------+-------------+---------------+
|     26537574|     22963642|        3573932|
+-------------+-------------+---------------+



In [98]:
for y in range(2015, 2020):
    dns_lookup(1, 5, y, 'com-org-net-alexa')

AnalysisException: 'Path does not exist: hdfs://openintel/user/mathay/results_new/source=com-org-net-alexa/data=include-depth/year=2015/month=05/day=01/depth=5;'

In [80]:
df_counted.filter(df_counted.src == "spaceistanbul.com").show(truncate=False)

+-----------------+-----------------+-------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|src              |domain_to_count  |txt_text                                                                                                                                   |words                                                                                                                                               |dns_count|
+-----------------+-----------------+-------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+

In [81]:
df_joined.filter(df_joined.src == "spaceistanbul.com").show()

+-----------------+---------+
|              src|dns_count|
+-----------------+---------+
|spaceistanbul.com|       17|
+-----------------+---------+



In [40]:
df_combined = df_depth.join(df_spf, df_spf.query_name == df_depth.src)
df_combined = df_combined.select("src", "combined", "txt_text")
df_combined.show()

+--------------------+--------------------+--------------------+
|                 src|            combined|            txt_text|
+--------------------+--------------------+--------------------+
|    0-ei47j69dvt.com|[spf.0-ei47j69dvt...|v=spf1 include:sp...|
|         0004185.com|[spf.efwd.registr...|v=spf1 include:sp...|
|         0007885.com|[spf.efwd.registr...|v=spf1 include:sp...|
|          007hgg.com|[spf.cn-webdesign...|v=spf1\194\160mx/...|
|       007powers.com|[bluehost.com, _s...|v=spf1 +a +mx +ip...|
|         0086991.com|    [webhostbox.net]|v=spf1 a mx inclu...|
|       010gongmu.com|[spf.mail.faidns....|v=spf1 include:sp...|
|       010gymbox.com|   [_spf.hostnet.nl]|v=spf1 include:_s...|
|          011465.com|    [spf.011465.com]|v=spf1 include:sp...|
|          01dash.com|[aspmx.googlemail...|v=spf1 include:as...|
|        01ddsv01.com|[websitewelcome.c...|v=spf1 +a +mx +ip...|
|          01home.com|   [spf.mail.qq.com]|v=spf1 include:sp...|
|          027xxo.com|[sp

In [41]:
df_all = load_spark_df_no_spf(1, 5, 2020, True, ['opencc'])
df_spf = load_spark_df(1, 5, 2020, True, ['opencc'])

In [43]:
df_all.filter(df_all.query_name.endswith('.gov.')).show()
df_spf.filter(df_spf.query_name.endswith('.gov')).show()

+----------+--------+---------+
|query_name|txt_text|qname_sld|
+----------+--------+---------+
+----------+--------+---------+

+--------------------+--------------------+
|          query_name|            txt_text|
+--------------------+--------------------+
|       alabamada.gov|v=spf1 include:_s...|
|          chhdwv.gov|v=spf1 ip4:173.16...|
|     highlandsfl.gov|v=spf1 include:sp...|
|          fincen.gov|v=spf1 redirect=_...|
|northhampton-nh-p...|v=spf1 include:sp...|
|         admongo.gov|         v=spf1 -all|
|townofnorthhudson...|v=spf1 include:_s...|
|   barrington-il.gov|v=spf1 mx a ip4:6...|
|cityofkingsburg-c...|v=spf1 include:sp...|
|             its.gov|v=spf1 include:tw...|
|  bentoncountyms.gov|v=spf1 ip4:63.247...|
|         golearn.gov|         v=spf1 -all|
|      blacksburg.gov|v=spf1 mx ptr:tob...|
| webstercountymo.gov|v=spf1 include:em...|
|        perry-ga.gov|v=spf1 redirect=_...|
|           azgfd.gov|v=spf1 ip4:159.87...|
|scottsvalley-nsn.gov|v=spf1 includ

In [52]:
def adoption_rate_test(d, m, y, source, opencc=False):
    dataset = [source]
    df_spark = load_spark_df_no_spf(d, m, y, False, dataset, opencc) # need to change to com and org
    df_spark = df_spark.filter(df_spark.query_name.endswith('.gov.'))
    total_domains = df_spark.select("qname_sld").distinct().count()
    spf_domains = df_spark.where(df_spark.txt_text.contains("v=spf1")).count()
    adoption_rate = int(spf_domains) / int(total_domains)
    
    df_result = sqlc.createDataFrame(
                [
                    (total_domains, spf_domains, adoption_rate), # create your data here, be consistent in the types.
                ],
                ['total_domains', 'spf_domains', 'adoption_rate'] # add your columns label here
            )
    
    df_result.show()
    
#     save_df(df_result, d, m, y, source, 'adoption-rate')
for m in range(10, 13):
    print("1-" + str(m) + "-2017")
    adoption_rate_test(1, m, 2017, 'opencc', False)
    
for m in range(1, 6):
    print("1-" + str(m) + "-2018")
    adoption_rate_test(1, m, 2018, 'opencc', False)

1-10-2017
+-------------+-----------+------------------+
|total_domains|spf_domains|     adoption_rate|
+-------------+-----------+------------------+
|         1278|        655|0.5125195618153364|
+-------------+-----------+------------------+

1-11-2017
+-------------+-----------+------------------+
|total_domains|spf_domains|     adoption_rate|
+-------------+-----------+------------------+
|         1267|        717|0.5659037095501184|
+-------------+-----------+------------------+

1-12-2017
+-------------+-----------+------------------+
|total_domains|spf_domains|     adoption_rate|
+-------------+-----------+------------------+
|         1264|        748|0.5917721518987342|
+-------------+-----------+------------------+

1-1-2018
+-------------+-----------+-----------------+
|total_domains|spf_domains|    adoption_rate|
+-------------+-----------+-----------------+
|         1258|        844|0.670906200317965|
+-------------+-----------+-----------------+

1-2-2018
+------------

In [31]:
for y in range(2016, 2021):
    for m in range(1, 13):
        try:
            print(m, y)
            sqlc.read.parquet("/user/mathay/results_new/source=alexa/data=verify-all/year={:4d}/month={:02d}/day={:02d}".format(y, m, 1)).show()
        except:
            continue

1 2016
2 2016
+-------------+-------------+---------------+
|total_domains|valid_domains|invalid_domains|
+-------------+-------------+---------------+
|       472080|       406314|          65766|
+-------------+-------------+---------------+

3 2016
+-------------+-------------+---------------+
|total_domains|valid_domains|invalid_domains|
+-------------+-------------+---------------+
|       478655|       412605|          66050|
+-------------+-------------+---------------+

4 2016
+-------------+-------------+---------------+
|total_domains|valid_domains|invalid_domains|
+-------------+-------------+---------------+
|       471243|       405631|          65612|
+-------------+-------------+---------------+

5 2016
+-------------+-------------+---------------+
|total_domains|valid_domains|invalid_domains|
+-------------+-------------+---------------+
|       478038|       411525|          66513|
+-------------+-------------+---------------+

6 2016
+-------------+-------------+-----

In [32]:
def load_spark_df_ns(day, month, year, one_day, datasets):
    # Create Spark DF
    SOURCES = datasets
    df = sqlc.read.option("basePath", "/").parquet(*[
        # n.b.: set paths list as desired: /user/openintel/measurement_data/type=warehouse/source=<tld>/year=<yyyy>/month=<mm>/day=<dd>
        os.path.join(OI_MDATA_BASE, "source={}".format(i_source), "year={:4d}".format(year), "month={:02d}".format(month), "day={:02d}".format(day)) for i_source in SOURCES
    ]).withColumn(
        # Add an ISO8601 date column
        # n.b.: read() will produce integers for the year, month and day partitions in the HDFS file hierarchy
        "date", psf.concat_ws('-', "year", psf.lpad("month", 2, '0'), psf.lpad("day", 2, '0')))
    
#     df = df.filter(
#         psf.col("txt_text").isNotNull()
#     ).select(
#         ["query_name", "txt_text"]
#     ).where(
#         df.txt_text.contains("v=spf1")
#     ).withColumn(
#         'query_name', col('query_name').substr(lit(0), length(col('query_name')) - 1)
#     ).withColumn(
#         'txt_text', col('txt_text').substr(lit(2), length(col('txt_text')) - 2)
#     )
#     df = df.select(
#         ["query_name", "txt_text"]
#     ).withColumn(
#         'query_name', col('query_name').substr(lit(0), length(col('query_name')) - 1)
#     ).withColumn(
#         'txt_text', col('txt_text').substr(lit(2), length(col('txt_text')) - 2)
#     ).withColumn("qname_sld", regexp_extract("query_name", "([^.]+[.][^.]+[.])$", 1))
    
    df = df.withColumn("qname_sld", regexp_extract("query_name", "([^.]+[.][^.]+[.])$", 1))
    df_original = df
    return df_original

In [34]:
def verify_all(d, m, y, source, df_spf):
    df_depth = sqlc.read.parquet("/user/mathay/results_new/source={}/data=include-depth/year={:4d}/month={:02d}/day={:02d}/depth={:1d}".format(source, y, m, d, 5))
    df_depth = df_depth.select("src", "combined")
    df_depth = df_depth.withColumn("combined", psf.array_union(df_depth.combined, psf.array(psf.lit(df_depth.src))))
    print("df_depth:", df_depth.count())

    df_exploded = df_depth.withColumn("domain_to_count", psf.explode("combined")).select("src", "domain_to_count")
    # df_exploded.filter(df_exploded.src == "spaceistanbul.com").show(truncate=False)
    print("df_exploded:", df_exploded.count())
#     print("df_exploded:", df_exploded.show())

#     df_spf = load_spark_df(d, m, y, True, ['com', 'org', 'net', 'alexa'])
    # df_spf.filter(df_spf.query_name == "spaceistanbul.com").show(truncate=False)
    print("df_spf:", df_spf.count())
#     print("df_spf:", df_spf.show())

    df_combined = df_exploded.join(df_spf, df_spf.query_name == df_exploded.domain_to_count)
    df_combined = df_combined.select("src", "domain_to_count", "txt_text")
    # df_combined.filter(df_combined.src == "0001attorneys.com").show()
    print("df_combined:", df_combined.count())
#     print("df_combined:", df_combined.show())

    tokenizer = Tokenizer(inputCol="txt_text", outputCol="words")
    df_tokenized = tokenizer.transform(df_combined)
    # df_tokenized.show()
    print("df_tokenized:", df_tokenized.count())

    qualifiers = ["+", "-", "~", "?"]
    mechanisms = ["all", "include", "a", "mx", "ptr", "ip4", "ip6", "exists", "redirect", "exp"]
    mechanisms_dns = ["include", "a", "mx", "ptr", "exists"]
    combined = mechanisms[:]
    combined_dns = mechanisms_dns[:]
    for qualifier in qualifiers:
        for mechanism in mechanisms:
            combined.append(qualifier + mechanism)
        for mechanism in mechanisms_dns:
            combined_dns.append(qualifier + mechanism)

    def count_dns_requests(spf):
        dns_count = 0
        for i in range(1, len(spf)):
            result = check_mechanism_dns(spf[i], dns_count)
            if result:
                dns_count += 1
        return dns_count

    def check_mechanism_dns(spf, count):
        if spf.startswith(tuple(combined_dns)):
            if spf.endswith("all"): # edge case
                return False
            return True
        return False

    udf_count_dns_requests = udf(count_dns_requests, IntegerType())
    df_counted = df_tokenized.withColumn("dns_count", udf_count_dns_requests(df_tokenized.words))
    # df_counted.show()
    print("df_counted:", df_counted.count())

    df_joined = df_counted.groupby("src").agg(psf.sum("dns_count")).select("src", col("sum(dns_count)").alias("dns_count"))
#     print("df_joined:", df_joined.count())
    print("df_joined:", df_joined.count())
    
    df_count_ori = sqlc.read.parquet("/user/mathay/results_new/source={}/data=validation-full-data/year={:4d}/month={:02d}/day={:02d}".format(source, y, m, d))
    print("df_count_ori:", df_count_ori.count())
    df_all_joined = df_count_ori.join(df_joined, df_joined.src == df_count_ori.query_name, how='left')
    df_all_joined = df_all_joined.select("query_name", "valid_spf_string", "valid_spf_dns_limit", "dns_count")
    print("df_all_joined:", df_all_joined.count())
#     print(df_all_joined.show())
    
    df_corrected = df_all_joined.withColumn('dns_valid', (psf.when(col('dns_count').isNull(), True)).otherwise(False))
    print("df_corrected:", df_corrected.count())

    df_corrected = df_corrected.fillna(-1, subset=['dns_count'])
#     print(df_corrected.count())
    
    df_new = df_corrected.withColumn('dns_valid',
                                 psf.when((col('dns_count') >= 0) & (col('dns_count') <= 10), 
                                           (col('valid_spf_string')) & (col('valid_spf_dns_limit')))
                                 .otherwise(psf.when(col('dns_count') == -1, (col('valid_spf_string')) & (col('valid_spf_dns_limit'))).otherwise(False)))
    return df_new

In [57]:
# performing analysis on verify-all
df_spf_01 = load_spark_df(1, 1, 2020, True, ['com', 'org', 'net'])
df_spf_02 = load_spark_df(1, 2, 2020, True, ['com', 'org', 'net'])

df_01_2016 = verify_all(1, 1, 2020, 'com', df_spf_01)
df_02_2016 = verify_all(1, 2, 2020, 'com', df_spf_02)

# df_01_2016.show()

df_newly_invalid = df_02_2016.subtract(df_01_2016)
# df_newly_invalid.show()

# df_newly_invalid.count()

df_ns = load_spark_df_ns(1, 2, 2020, False, ['com', 'org', 'net'])
df_ns = df_ns.where(df_ns.query_type == "NS").select(col('query_name').alias('query_name_ns'), col('qname_sld').alias('qname_sld_ns'), 'ns_address')
# df_ns.show()

df_ns = df_ns.withColumn("qname_sld_ns",psf.expr("substring(qname_sld_ns, 1, length(qname_sld_ns)-1)"))
# df_ns.show()

df_joined_ns = df_newly_invalid.join(df_ns, df_ns.qname_sld_ns == df_newly_invalid.query_name)
# df_joined_ns.show()

df_joined_ns_splitted = df_joined_ns.withColumn("ns_address", regexp_extract("ns_address", "([^.]+[.][^.]+[.])$", 1))
# df_joined_ns_splitted.show()

# df_joined_ns_removed = df_joined_ns.dropDuplicates(['query_name'])

df_grouped_ns = df_joined_ns_splitted.groupBy('ns_address').count()
df_grouped_ns.orderBy(desc("count")).show(truncate=False)

df_depth: 22111381
df_exploded: 61332159
df_spf: 40060394
df_combined: 29695821
df_tokenized: 29695821
df_counted: 29695821
df_joined: 21898352
df_count_ori: 34741992
df_all_joined: 34741992
df_corrected: 34741992
df_depth: 22384048
df_exploded: 61937142
df_spf: 40361511
df_combined: 29900588
df_tokenized: 29900588
df_counted: 29900588
df_joined: 22168478
df_count_ori: 34998052
df_all_joined: 34998052
df_corrected: 34998052
+----------------------+-------+
|ns_address            |count  |
+----------------------+-------+
|null                  |1838124|
|domaincontrol.com.    |319449 |
|registrar-servers.com.|259365 |
|ztomy.com.            |220926 |
|googledomains.com.    |191830 |
|foundationapi.com.    |176710 |
|bluehost.com.         |125425 |
|cloudflare.com.       |91563  |
|dan.com.              |87171  |
|xserver.jp.           |82512  |
|superhosting.bg.      |67686  |
|hostgator.com.        |66447  |
|mochahost.com.        |59018  |
|sedoparking.com.      |58579  |
|ovh.net.  

In [36]:
# performing analysis on verify-all
df_spf_01 = load_spark_df(1, 1, 2019, True, ['com', 'org', 'net'])
df_spf_02 = load_spark_df(1, 5, 2020, True, ['com', 'org', 'net'])

df_spf_01 = df_spf_01.select(col("query_name").alias("query_name_old"), col("txt_text").alias("txt_text_old"))

df_joined = df_spf_02.join(df_spf_01, df_spf_01.query_name_old == df_spf_02.query_name, how='left')

df_filtered = df_joined.filter((df_joined.txt_text.contains('+exists')) & (~(df_joined.txt_text_old.contains('+exists'))))
df_filtered.show(truncate=False)
print(df_filtered.count())
# df_01_2016.show()

# df_newly_invalid = df_02_2016.subtract(df_01_2016)
# df_newly_invalid.show()

# df_newly_invalid.count()

df_ns = load_spark_df_ns(1, 5, 2020, False, ['com', 'org', 'net'])
df_ns = df_ns.where(df_ns.query_type == "NS").select(col('query_name').alias('query_name_ns'), col('qname_sld').alias('qname_sld_ns'), 'ns_address')
# df_ns.show()

df_ns = df_ns.withColumn("qname_sld_ns",psf.expr("substring(qname_sld_ns, 1, length(qname_sld_ns)-1)"))
# df_ns.show()

df_joined_ns = df_filtered.join(df_ns, df_ns.qname_sld_ns == df_filtered.query_name)
# df_joined_ns.show()

df_joined_ns_splitted = df_joined_ns.withColumn("ns_address", regexp_extract("ns_address", "([^.]+[.][^.]+[.])$", 1))
# df_joined_ns_splitted.show()

# df_joined_ns_removed = df_joined_ns.dropDuplicates(['query_name'])

df_grouped_ns = df_joined_ns_splitted.groupBy('ns_address').count()
df_grouped_ns.orderBy(desc("count")).show(truncate=False)

+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|query_name            |txt_text                                                                                                                                                                                                                                                     

In [61]:
adoption_rate(1, 5, 2020, 'opencc', False)

+-------------+-----------+-------------------+
|total_domains|spf_domains|      adoption_rate|
+-------------+-----------+-------------------+
|      1844432|     533202|0.28908737215576397|
+-------------+-----------+-------------------+



In [60]:
datasets = [['com', 'org', 'net'], ['alexa'], ['opencc']]
for dataset in datasets:
    print(dataset)
    df_all = load_spark_df_no_spf(1, 5, 2020, True, dataset)
#     df_spf = load_spark_df(1, 5, 2020, True, dataset)
#     df_unique_spf = df_spf.select("txt_text").distinct().count()
    print(df_all.select("query_name").distinct().count())
#     print(df_spf.count())
#     print(df_unique_spf)
    print()

['com', 'org', 'net']
321836211

['alexa']
2468439

['opencc']
4417991



In [30]:
df = sqlc.read.parquet("/user/mathay/results_new/source={}/data={}/year={:4d}/month={:02d}/day={:02d}".format('com', 'count-variables', 2020, 5, 1))

In [31]:
df.show(truncate=False)

+---------+--------+-------------+
|key      |count   |total_domains|
+---------+--------+-------------+
|+redirect|353     |36591846     |
|-redirect|0       |36591846     |
|redirect |552143  |36591846     |
|+include |1261471 |36591846     |
|?include |64455   |36591846     |
|?redirect|0       |36591846     |
|mx       |8602351 |36591846     |
|~include |29      |36591846     |
|-include |205     |36591846     |
|~redirect|0       |36591846     |
|?        |4287975 |36591846     |
|include  |25203690|36591846     |
|+all     |43530   |36591846     |
|~ptr     |101     |36591846     |
|+exists  |1502    |36591846     |
|-exists  |42      |36591846     |
|ip6      |1685005 |36591846     |
|+ip6     |18505   |36591846     |
|?exists  |20      |36591846     |
|~exists  |0       |36591846     |
+---------+--------+-------------+
only showing top 20 rows



In [34]:
df_prev = load_spark_df(1, 4, 2020, True, ['alexa'])
df_next = load_spark_df(1, 5, 2020, True, ['alexa'])

In [29]:
df_prev.show()

+--------------------+--------------------+
|          query_name|            txt_text|
+--------------------+--------------------+
| churchatthemill.com|v=spf1 include:sp...|
|    daddymarkets.com|v=spf1 a mx ptr i...|
|   burkeconcrete.com|v=spf1 a mx ptr i...|
|         app1pro.com|v=spf1 +a +mx +ip...|
|  datacrunchcorp.com|v=spf1 include:sp...|
|      cartonmahdi.ir|v=spf1 a mx ip4:1...|
|           corked.nl|v=spf1 include:eu...|
| economic-news.space|v=spf1 include:mx...|
|cleaneatzkitchen.com|v=spf1 include:_s...|
| dublindailynews.com|v=spf1 include:sp...|
|     bdexchange.info|v=spf1 +a +mx +ip...|
|          easynik.me|v=spf1 include:sp...|
|   arizonaautism.com|v=spf1 include:sp...|
|           ceped.org|v=spf1 include:sp...|
|       citadines.com|v=spf1 ip4:52.28....|
|     clubllondon.com|v=spf1 include:sp...|
|         carwars.com|v=spf1 ip4:52.207...|
|        drukpoint.pl|v=spf1 a mx inclu...|
|  duongtrunghieu.com|v=spf1 a mx ptr i...|
|ceylanlarotokurta...|v=spf1 inc

In [35]:
df_prev = df_prev.select('query_name')
df_next = df_next.select('query_name')

In [36]:
df_old = df_prev.subtract(df_next)
df_new = df_next.subtract(df_prev)

In [37]:
print(df_old.show())
print(df_old.count())
print(df_new.show())
print(df_new.count())

+--------------------+
|          query_name|
+--------------------+
|  100poundsocial.com|
|         16safety.ca|
|          193sow.org|
|             1984.is|
|        1balcony.com|
|      1cmarketing.ru|
|             1jam.ru|
|       1nomer.com.ua|
|            1peec.ga|
|           1phim.net|
|         22mabhas.ir|
|       230-fifth.com|
|         24homkin.ru|
|         29tanin.com|
|            2itb.com|
|              2sc.al|
|30minutemarketing...|
|          360kia.com|
|       360piksel.com|
|     3dcadcam.com.tr|
+--------------------+
only showing top 20 rows

None
335035
+--------------------+
|          query_name|
+--------------------+
|              114.by|
|15minuteweightlos...|
|            1mja.com|
|         2-times.com|
|    2020insights.net|
|   247sattamatka.com|
|           2kojja.ru|
|             2url.kr|
|360football-suppl...|
|             3arv.ir|
|           3gorki.ru|
|              3iq.ca|
|            3jil.net|
|4-wheeling-in-wes...|
|      44gosuslugi.

In [32]:
df = load_spark_df(1, 5, 2020, True, ['com', 'org', 'net'])
df.show()

+-------------------+--------------------+
|         query_name|            txt_text|
+-------------------+--------------------+
|01045intelifone.net|v=spf1 mx ip4:91....|
|        0020zdz.net|         v=spf1 -all|
|          02853.net|v=spf1 ip6:fd92:5...|
|           05gj.net|v=spf1 ip6:fd18:d...|
|           0642.net|v=spf1 ip6:fd92:5...|
|           0684.net|v=spf1 ip6:fd92:5...|
|           03ns.net|v=spf1 include:_s...|
|            03j.net|v=spf1 ip4:133.24...|
|   1000memories.net|v=spf1 include:sp...|
|      100ideias.net|v=spf1 ip4:194.39...|
|         10ancy.net|v=spf1 include:_s...|
|            0b8.net|v=spf1 ip6:fd18:d...|
|            0b6.net|v=spf1 ip4:46.254...|
|         0neday.net|v=spf1 include:_s...|
|    0ursdesigns.net|v=spf1 include:sp...|
|       0bitrate.net|v=spf1 ip4:66.96....|
|            0wn.net|v=spf1 include:mx...|
|         0xnull.net|v=spf1 +a +mx inc...|
|        0xpower.net|v=spf1 include:sp...|
|    1000legends.net|         v=spf1 -all|
+----------

In [46]:
print(df_contains.count())

5037915


In [34]:
df_contains = df.filter(df.txt_text.contains("?all"))
df_contains.show(truncate=False)

+--------------------+-----------------------------------------------------------------------------------------+
|query_name          |txt_text                                                                                 |
+--------------------+-----------------------------------------------------------------------------------------+
|0bitrate.net        |v=spf1 ip4:66.96.128.0/18 ?all                                                           |
|0x21b.net           |v=spf1 include:_mailcust.gandi.net ?all                                                  |
|10px.net            |v=spf1 ip4:67.40.81.89/29 ?all                                                           |
|12stonesfarm.net    |v=spf1 include:servers.mcsv.net ?all                                                     |
|1800kidhurt.net     |v=spf1 ip4:66.96.128.0/18 ?all                                                           |
|1eighty.net         |v=spf1 mx a include:spf.host-h.net ?all                                   

In [42]:
def count_qualifiers(row):
    result = {'+': 0, '-': 0, '~': 0, '?': 0}
    splitted = row.split()
    for word in splitted:
        if word == "v=spf1":
            continue
        if word[0] == "+":
            result['+'] += 1
        elif word[0] == "-":
            result['-'] += 1
        elif word[0] == "~":
            result['~'] += 1
        elif word[0] == "?":
            if word != "?all":
                result['?'] += 1
        else:
            result['+'] += 1
    return result

udf_count_queries = udf(count_qualifiers, MapType(StringType(), IntegerType()))
df_counted = df_contains.withColumn("qualifiers", udf_count_queries(df_contains.txt_text))
df_counted.show(truncate=False)

+--------------------+-----------------------------------------------------------------------------------------+--------------------------------+
|query_name          |txt_text                                                                                 |qualifiers                      |
+--------------------+-----------------------------------------------------------------------------------------+--------------------------------+
|0bitrate.net        |v=spf1 ip4:66.96.128.0/18 ?all                                                           |[+ -> 1, - -> 0, ~ -> 0, ? -> 0]|
|0x21b.net           |v=spf1 include:_mailcust.gandi.net ?all                                                  |[+ -> 1, - -> 0, ~ -> 0, ? -> 0]|
|10px.net            |v=spf1 ip4:67.40.81.89/29 ?all                                                           |[+ -> 1, - -> 0, ~ -> 0, ? -> 0]|
|12stonesfarm.net    |v=spf1 include:servers.mcsv.net ?all                                                     |[+ -> 1, - -

In [43]:
columns = df_counted.select('qualifiers').rdd.flatMap(lambda x: x).toDF().columns
for i in columns:
    df_counted = df_counted.withColumn(i, lit(df_counted.qualifiers[i]))
df_counted.show()

+--------------------+--------------------+--------------------+---+---+---+---+
|          query_name|            txt_text|          qualifiers|  +|  -|  ?|  ~|
+--------------------+--------------------+--------------------+---+---+---+---+
|        0bitrate.net|v=spf1 ip4:66.96....|[+ -> 1, - -> 0, ...|  1|  0|  0|  0|
|           0x21b.net|v=spf1 include:_m...|[+ -> 1, - -> 0, ...|  1|  0|  0|  0|
|            10px.net|v=spf1 ip4:67.40....|[+ -> 1, - -> 0, ...|  1|  0|  0|  0|
|    12stonesfarm.net|v=spf1 include:se...|[+ -> 1, - -> 0, ...|  1|  0|  0|  0|
|     1800kidhurt.net|v=spf1 ip4:66.96....|[+ -> 1, - -> 0, ...|  1|  0|  0|  0|
|         1eighty.net|v=spf1 mx a inclu...|[+ -> 3, - -> 0, ...|  3|  0|  0|  0|
|          1stpla.net|v=spf1 ip4:66.96....|[+ -> 1, - -> 0, ...|  1|  0|  0|  0|
|             1yo.net|v=spf1 ip4:173.19...|[+ -> 6, - -> 0, ...|  6|  0|  0|  0|
|          242477.net|v=spf1 include:_m...|[+ -> 1, - -> 0, ...|  1|  0|  0|  0|
|  22cortlandtnyc.net|v=spf1

In [44]:
plus = df_counted.agg(psf.sum("+")).collect()[0][0]
mini = df_counted.agg(psf.sum("-")).collect()[0][0]
tilde = df_counted.agg(psf.sum("~")).collect()[0][0]
question = df_counted.agg(psf.sum("?")).collect()[0][0]
print("+", plus)
print("-", mini)
print("~", tilde)
print("?", question)
#5,037,915 domains with ?all

+ 11174793
- 1839
~ 3470
? 10169
