In [76]:
%run ./spark-instance.ipynb

SparkConf created
Started SparkSession
Spark version 3.3.1


In [1]:
clean_spark()  # run by EOB

NameError: name 'clean_spark' is not defined

In [51]:
%matplotlib inline

from collections import defaultdict
from datetime import datetime
import math
import os

import matplotlib.pyplot as plt
import pandas as pd

from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes


import pyspark.sql.functions as psf
import pyspark.sql.types as pst
from pyspark.storagelevel import StorageLevel

### Extracting Censys data

In [52]:
DATASET = "universal-internet-dataset"
CENSYS_BASE_PATH_FMT = "censys/dataset={dataset}/format=parquet"
CENSYS_PATH_FMT = os.path.join(CENSYS_BASE_PATH_FMT, "year={year}/month={month:02d}/day={day:02d}")

# Censys snapshot of 2022-Nov
timestamps = [
    datetime(2022, 11, 1),
    datetime(2022, 11, 8),
    datetime(2022, 11, 15),
    datetime(2022, 11, 22),
    datetime(2022, 11, 29),
]


def load_censys_data(ts):
    censys_base_path = CENSYS_PATH_FMT.format(dataset=DATASET, year=ts.year, month=ts.month, day=ts.day)
    try:
        censys_df = spark.read.option("basePath", f"s3a://{censys_base_path}").parquet(f"s3a://{censys_base_path}")
    except AnalysisException as e:
        print(e)
    return censys_df


def filter_df_by_label(df, ts, label: str):
    llabel = label.lower()

    ser_filtered_df = df.select('*').where((psf.array_contains(psf.col("service_names_list"), label.upper())))
    ser_filtered_df = ser_filtered_df.withColumn("filter_source", psf.lit("service_name"))

    if False:
        dns_filtered_df = df.select('*').filter(
            (psf.expr(f"exists(dns_names, x -> lower(x) like '%.{llabel}.%') or exists(dns_names, x -> lower(x) like '{llabel}.%')")) | 
            (psf.expr(f"exists(r_dns_names, x -> lower(x) like '%.{llabel}.%') or exists(r_dns_names, x -> lower(x) like '{llabel}.%')"))
        )
        dns_filtered_df = dns_filtered_df.withColumn("filter_source", psf.lit("dns_rdns"))

        filtered_df = ser_filtered_df.unionByName(dns_filtered_df)
        uniq_filtered_df = filtered_df.dropDuplicates(["ipv4"])
        uniq_filtered_df = uniq_filtered_df.withColumn("date", psf.lit(ts).cast(pst.TimestampType()))

    return ser_filtered_df

In [22]:
CENSYS_TS_DICT = {}
for ts in timestamps:
    print(ts)
    _censys_df = load_censys_data(ts)

    select_df = _censys_df.select("host_identifier.ipv4",
                                 _censys_df.dns.names.alias("dns_names"),
                                 _censys_df.dns.reverse_dns.names.alias("r_dns_names"),
                                 "service_names_list",
                                 "services.port",
                                 _censys_df.services.tls.certificates.leaf_data.names.alias("leaf_data_names"),
                                 _censys_df.services.tls.version_selected.alias("tls_version"),
                                 _censys_df.services.tls.cipher_selected.alias("tls_cipher"),
                                 "services.tls.certificates.leaf_data.pubkey_bit_size",
                                 _censys_df.services.tls.certificates.leaf_data.signature.self_signed.alias("tls_signature_self_signed"),
                                 _censys_df.services.tls.certificates.leaf_data.signature.signature_algorithm.alias("tls_signature_algorithm"),
                                 _censys_df.services.tls.certificates.leaf_data.subject.common_name.alias("cert_cn"),
                                 _censys_df.services.tls.certificates.leaf_data.issuer.common_name.alias("issuer_cn")
                                )

    uniq_filtered_df = filter_df_by_label(select_df, ts, "ldap")

    when = ts.strftime("%Y%m%d")
    uniq_filtered_df = uniq_filtered_df.withColumn("date", psf.lit(ts).cast(pst.TimestampType()))

    CENSYS_TS_DICT[ts] = uniq_filtered_df
print("-------")

2022-11-01 00:00:00
2022-11-08 00:00:00
2022-11-15 00:00:00
2022-11-22 00:00:00
2022-11-29 00:00:00
-------


In [23]:
dfs = [df for df in CENSYS_TS_DICT.values()]

censys_compact_df = dfs[0].unionByName(dfs[1])
for i in range(2, len(dfs)):
    censys_compact_df = censys_compact_df.unionByName(dfs[i])

In [None]:
output = "luvizottocesarg-tmp/ldap-dependency-censys_compact"
censys_compact_df.coalesce(1).write.parquet(f"s3a://{output}")

In [None]:
dfs = []
for df in CENSYS_TS_DICT.values():
    df2 = df.select("ipv4", "filter_source", "date",
                    psf.explode_outer(psf.arrays_zip(
                                        df.dns_names.alias("dns_name"),
                                        df.r_dns_names.alias("r_dns_name"),
                                        df.leaf_data_names.alias("leaf_data_name"),
                                        df.service_names_list.alias("service_name"),
                                        df.port.alias("port"),
                                        df.tls_version,
                                        df.tls_cipher,
                                        df.pubkey_bit_size,
                                        df.tls_signature_self_signed,
                                        df.tls_signature_algorithm,
                                        df.cert_cn,
                                        df.issuer_cn)))
    df2 = df2.select("ipv4", "filter_source", "date",
                     "col.dns_name", "col.r_dns_name", "col.leaf_data_name", "col.service_name", "col.port", "col.tls_version",
                     "col.tls_cipher", "col.pubkey_bit_size", "col.tls_signature_self_signed",
                     "col.tls_signature_algorithm", "col.cert_cn", "col.issuer_cn")

    # look these examples. There are 2 LDAP services for the same IP, there is ldap0.acc.umu.se in the dns_names, dns_names with one element (first) mapps to the first service name (probably right?)
    #df.filter((psf.col("ipv4") == "100.18.51.128") | (psf.col("ipv4") == "100.37.175.222") | (psf.col("ipv4") == "116.203.25.90") | (psf.col("ipv4") == "130.239.18.143")).select("ipv4", "filter_source", "dns_names", "service_names_list", "cert_cn").show(truncate=False)
    #df2.filter((psf.col("ipv4") == "100.18.51.128") | (psf.col("ipv4") == "100.37.175.222") | (psf.col("ipv4") == "116.203.25.90") | (psf.col("ipv4") == "130.239.18.143")).select("ipv4", "filter_source", "dns_name", "service_name", "cert_cn").show(truncate=False)

    if False:
        df2_filtered = df2.filter((psf.col("dns_name").like("%.ldap.%")) | (psf.col("dns_name").like("ldap.%")) | (psf.col("r_dns_name").like("%.ldap.%")) | (psf.col("r_dns_name").like("ldap.%")) | (psf.col("service_name") == "LDAP"))
    df2_filtered = df2.filter(psf.col("service_name") == "LDAP")

    #display(df2_filtered.count())  # 94997
    #display(df2_filtered.dropDuplicates(["ipv4"]).count())  # 74824
    dfs.append(df2_filtered)

In [None]:
censys_df = dfs[0].unionByName(dfs[1])
for i in range(2, len(dfs)):
    censys_df = censys_df.unionByName(dfs[i])

output = "luvizottocesarg-tmp/ldap-dependency-censys"
censys_df.coalesce(1).write.parquet(f"s3a://{output}")

### Extracting Goscanner data

In [109]:
def load_hosts_data(port, ts):
    hosts_base_path = HOSTS_PATH_FMT.format(port=port, year=ts.year, month=ts.month, day=ts.day)
    hosts_df = spark.read.option("header", "true") \
                         .option("lineSep", "\n") \
                         .option("quote", "\"") \
                         .option("escape", "\"") \
                         .option("inferSchema", "true") \
                         .csv(f"s3a://{hosts_base_path}")
    return hosts_df


def load_certs_data(port, ts):
    certs_base_path = CERTS_PATH_FMT.format(port=port, year=ts.year, month=ts.month, day=ts.day)
    certs_df = spark.read.option("header", "true") \
                         .option("multiline", "true") \
                         .option("wholeFile", "true") \
                         .option("inferSchema", "true") \
                         .csv(f"s3a://{certs_base_path}")
    return certs_df


def load_tls_data(port, ts):
    tls_base_path = TLS_PATH_FMT.format(port=port, year=ts.year, month=ts.month, day=ts.day)
    tls_df = spark.read.option("header", "true") \
                       .option("quote", "\"") \
                       .option("escape", "\"") \
                       .option("multiline", "true") \
                       .option("wholeFile", "true") \
                       .option("inferSchema", "true") \
                       .csv(f"s3a://{tls_base_path}")
    return tls_df


def load_ldap_data(port, ts):
    ldap_base_path = LDAP_PATH_FMT.format(port=port, year=ts.year, month=ts.month, day=ts.day)
    ldap_df = spark.read.option("header", "true") \
                        .option("lineSep", "\n") \
                        .option("quote", "\"") \
                        .option("escape", "\"") \
                        .option("inferSchema", "true") \
                        .csv(f"s3a://{ldap_base_path}")
    return ldap_df


def load_ldapstarttls_data(port, ts):
    starttls_base_path = STARTTLS_PATH_FMT.format(port=port, year=ts.year, month=ts.month, day=ts.day)
    starttls_df = spark.read.option("header", "true") \
                            .option("lineSep", "\n") \
                            .option("quote", "\"") \
                            .option("escape", "\"") \
                            .option("inferSchema", "true") \
                            .csv(f"s3a://{starttls_base_path}")
    return starttls_df


def load_cert_validator(port, ts):
    cert_validator_base_path = CERTVAL_PATH_FMT.format(port=port, year=ts.year, month=ts.month, day=ts.day)
    df = spark.read.parquet(f"s3a://{cert_validator_base_path}")
    return df

def convert_ldap_attributes(df):
    df = df.withColumn("asd", eval_list_list_str_udf(psf.col("attribute_names")))
    df = df.drop("attribute_names")
    df = df.withColumnRenamed("asd", "attribute_names")
    df = df.withColumn("qwe", eval_list_list_list_str_udf(psf.col("attribute_values_list")))
    df = df.drop("attribute_values_list")
    df = df.withColumnRenamed("qwe", "attribute_values_list")
    return df


def flatten_ldap_metadata(df):
    df = df.select("id", "ip", "port", "result_code", "error_data", "matched_dns",
                                    psf.explode_outer(psf.arrays_zip(
                                                      df.attribute_names,
                                                      df.attribute_values_list)))
    df = df.select("id", "ip", "port", "result_code", "error_data", "matched_dns", "col.attribute_names", "col.attribute_values_list")
    df = df.select("id", "ip", "port", "result_code", "error_data", "matched_dns",
                                    psf.explode_outer(psf.arrays_zip(
                                                      df.attribute_names,
                                                      df.attribute_values_list)))
    df = df.select("id", "ip", "port", "result_code", "error_data", "matched_dns", "col.attribute_names", "col.attribute_values_list")
    df = df.withColumnRenamed("attribute_names", "attribute_name")
    return df


def flat_ldap_attr_values(df):
    # good for vendorName filters for example
    df = df.select("id", "ip", "port", "result_code", "error_data", "matched_dns", "attribute_name", psf.explode_outer(df.attribute_values_list))
    df = df.withColumnRenamed("col", "attribute_value")
    return df


def load_ldap_root_dse(port, ts):
    root_dse_base_path = ROOT_DSE_PATH_FMT.format(port=port, year=ts.year, month=ts.month, day=ts.day)
    df = spark.read.option("header", "true") \
                   .option("multiline", "true") \
                   .option("wholeFile", "true") \
                   .option("inferSchema", "true") \
                   .csv(f"../dataset/{root_dse_base_path}")

    df = convert_ldap_attributes(df)
    return df


def load_ldap_schema(port, ts):
    schema_base_path = SCHEMA_PATH_FMT.format(port=port, year=ts.year, month=ts.month, day=ts.day)
    df = spark.read.option("header", "true") \
                   .option("multiline", "true") \
                   .option("wholeFile", "true") \
                   .option("inferSchema", "true") \
                   .csv(f"../dataset/{schema_base_path}")

    df = convert_ldap_attributes(df)
    return df


def convert_output_df(df):
    df = df.select("id", "generic_error", psf.explode("root_stores").alias("store_name", "result"))
    df = df.select("id", "generic_error", "store_name", "result.*")
    df = df.withColumn("vc", eval_list_list_str_udf(psf.col("valid_chains")))
    df = df.drop("valid_chains")
    df = df.withColumnRenamed("vc", "valid_chains")
    return df


len_udf = psf.udf(lambda chain: [len(x) for x in chain], pst.ArrayType(pst.IntegerType()))


def chain_len(df):
    df = df.withColumn("chain_len", len_udf(psf.col("valid_chains")))
    df.select("chain_len").groupBy(psf.col("chain_len")).count().show()
    return df


def peer_certs_len(peer_certs_str):
    peer_certs = eval_list(peer_certs_str)
    return len(peer_certs)


peer_certs_len_udf = psf.udf(peer_certs_len, pst.IntegerType())

def eval_list_list_list_str(my_list):
    try:
        if isinstance(my_list, str):
            return eval(my_list)
        else:
            return [[[]]]
    except:
        return [[[]]]


eval_list_list_list_str_udf = psf.udf(eval_list_list_list_str, pst.ArrayType(pst.ArrayType(pst.ArrayType(pst.StringType()))))


def eval_list_list_str(my_list):
    try:
        if isinstance(my_list, str):
            return eval(my_list)
        else:
            return [[]]
    except:
        return [[]]


eval_list_list_str_udf = psf.udf(eval_list_list_str, pst.ArrayType(pst.ArrayType(pst.StringType())))


def eval_list(my_list):
    try:
        if isinstance(my_list, str):
            return eval(my_list)
        else:
            return []
    except:
        return []


eval_udf = psf.udf(eval_list, pst.ArrayType(pst.IntegerType()))


SHORT1 = "Signed by unknown authority"
SHORT2 = "Expired/Not yet valid"
SHORT3 = "Not authorized to sign other certificates"
SHORT4 = "Too many intermediate certificates"
SHORT5 = "Unhandled critical extension"
SHORT6 = "Other errors"
SHORT7 = "Self-signed"
SHORT8 = "Valid chain"
SHORT9 = "Invalid signature"  # under SHORT1


short_error_name_map = {
    'x509: certificate signed by unknown authority': SHORT1,
    'x509: certificate signed by unknown authority - With possible explanation': SHORT1,
    'x509: certificate has expired or is not yet valid': SHORT2,
    'x509: certificate is not authorized to sign other certificates': SHORT3,
    'x509: too many intermediates for path length constraint': SHORT4,
    'x509: unhandled critical extension': SHORT5,
    'x509: certificate signed by unknown authority (possibly because of "x509: invalid signature': SHORT9,
    '': SHORT8
}


def error_str(error_list, skid, akid, peer_certs_len):
    # test that all elements are empty
    if all(not element for element in error_list):
        return SHORT8

    # get the first non-empty error
    for error_data in error_list:
        if error_data != "":
            break

    parsed_error = ':'.join(error_data.split(":")[:3])
    if "certificate signed by unknown authority" in parsed_error:
        #if peer_certs_len == 1:
        if skid == "" or akid == "":
            if peer_certs_len == 1:
                return SHORT7
        elif skid == akid:
            return SHORT7

    error = short_error_name_map.get(parsed_error, None)
    if error is None:
        parsed_error = ':'.join(error_data.split(":")[:2])
        return short_error_name_map[parsed_error]
    return error


error_str_udf = psf.udf(error_str, pst.StringType())


def error_str2(error_list, peer_certs_len):
    # test that all elements are empty
    if all(not element for element in error_list):
        return SHORT8

    # get the first non-empty error
    for error_data in error_list:
        if error_data != "":
            break

    parsed_error = ':'.join(error_data.split(":")[:3])
    if "certificate signed by unknown authority" in parsed_error:
        if peer_certs_len == 1:
            return SHORT7

    error = short_error_name_map.get(parsed_error, None)
    if error is None:
        parsed_error = ':'.join(error_data.split(":")[:2])
        return short_error_name_map[parsed_error]
    return error


error_str2_udf = psf.udf(error_str2, pst.StringType())


# port 636 scans occurs one day earlier than port 389
PORT_SCANDATE_MAP = {
    636: [
        datetime(2023, 11, 7),
        datetime(2023, 11, 14),
        datetime(2023, 11, 21),
        datetime(2023, 11, 28)
    ],
    389: [
        #datetime(2023, 11, 1),  # does not have a 636 pair
        datetime(2023, 11, 8),
        datetime(2023, 11, 15),
        datetime(2023, 11, 22),
        datetime(2023, 11, 29)
    ]
}

"""
PORT_SCANDATE_MAP = {
    636: [datetime(2024, 4, 23)],
    389: [datetime(2024, 4, 24)],
}
"""

"""
PORT_SCANDATE_MAP = {
    636: [
        datetime(2024, 2, 20),
        datetime(2024, 2, 27),
        datetime(2024, 3, 5),
        datetime(2024, 3, 12),
        datetime(2024, 3, 19)
    ],
    389: [
        datetime(2024, 2, 21),
        datetime(2024, 2, 28),
        datetime(2024, 3, 6),
        datetime(2024, 3, 13),
        datetime(2024, 3, 20)
    ]
}
"""

HOSTS_PATH_FMT = "catrin/measurements/tool=goscanner/format=raw/port={port}/scan=tcp/result=hosts/year={year}/month={month:02d}/day={day:02d}"
CERTS_PATH_FMT = "catrin/measurements/tool=goscanner/format=raw/port={port}/scan=tls/result=certs/year={year}/month={month:02d}/day={day:02d}"
TLS_PATH_FMT = "catrin/measurements/tool=goscanner/format=raw/port={port}/scan=tls/result=tls_verbose/year={year}/month={month:02d}/day={day:02d}"
LDAP_PATH_FMT = "catrin/measurements/tool=goscanner/format=raw/port={port}/scan=ldap/result=ldap/year={year}/month={month:02d}/day={day:02d}"
STARTTLS_PATH_FMT = "catrin/measurements/tool=goscanner/format=raw/port={port}/scan=starttls_ldap/result=starttls_ldap/year={year}/month={month:02d}/day={day:02d}"
CERTVAL_PATH_FMT = "catrin/data_processing/tool=cert-validator/format=parquet/port={port}/year={year}/month={month:02d}/day={day:02d}"
SCHEMA_PATH_FMT = "catrin/measurements/tool=goscanner/format=raw/port={port}/scan=ldap_crawl/result=ldap_schema/year={year}/month={month:02d}/day={day:02d}"
ROOT_DSE_PATH_FMT = "catrin/measurements/tool=goscanner/format=raw/port={port}/scan=ldap_crawl/result=ldap_root_dse/year={year}/month={month:02d}/day={day:02d}"


goscanner_timestamps = []
for dates in PORT_SCANDATE_MAP.values():
    goscanner_timestamps.extend(dates)


tls_version_str_dict = {
    int("0x0301", 16): "TLSv1.0",
    int("0x0302", 16): "TLSv1.1",
    int("0x0303", 16): "TLSv1.2",
    int("0x0304", 16): "TLSv1.3",
    int("0x0300", 16): "SSLv3"
}


def tls_version_to_string(version_number: int):
    return tls_version_str_dict.get(version_number, str(version_number))


tls_version_udf = psf.udf(tls_version_to_string, pst.StringType())


def convert_cipher(x):
    try:
        parts = [int(part, 16) for part in x.split(",")]
    except ValueError:
        # to cover reserved values like 0x00,0x1C-1D
        return None
    cipher = parts[0] << 8
    cipher |= parts[1]
    return hex(cipher)[2:]


# https://www.iana.org/assignments/tls-parameters/tls-parameters.xhtml#tls-parameters-4
tls_parameters_pdf = pd.read_csv("tls-parameters-4.csv")
tls_parameters_pdf["Cipher"] = tls_parameters_pdf["Value"].apply(convert_cipher)
tls_parameters_pdf["DTLS-OK"] = tls_parameters_pdf["DTLS-OK"].apply(lambda x: str(x))
tls_parameters_pdf["Recommended"] = tls_parameters_pdf["Recommended"].apply(lambda x: str(x))
tls_parameters_pdf["Reference"] = tls_parameters_pdf["Reference"].apply(lambda x: str(x))
tls_parameter_dict = tls_parameters_pdf[["Cipher", "Description"]].set_index("Cipher").to_dict()["Description"]


def cipher_to_description(cipher):
    return tls_parameter_dict.get(cipher, "Unknown")


cipher_to_description_udf = psf.udf(cipher_to_description, pst.StringType())

In [60]:
# Decoding X.509 certificates
def get_x509(pem: str):
    return x509.load_pem_x509_certificate(str.encode(pem), default_backend())


def get_extensions(cert):
    tls_key_exchange = []
    san_list = []
    try:
        extensions = cert.extensions
        for extension in extensions:
            if isinstance(extension.value, x509.TLSFeature):
                tls_key_exchange.append(extension.value.key_exchange)
                continue
            if isinstance(extension.value, x509.SubjectAlternativeName):
                subject_alt_name = extension.value
                san_list += [name for name in subject_alt_name.get_values_for_type(x509.DNSName)]
                continue
    except:
        pass

    return tls_key_exchange, san_list


def get_x509_fields(pem: str):
    try:
        cert = get_x509(pem)
    except ValueError:
        # the certificate contains bytes that cannot be interpreted. Probably invalid cert
        # https://github.com/pyca/cryptography/issues/6804
        print(pem)
        return 11 * [None]  # CHANGE HERE IN CASE ADDITIONAL RETURN PARAMETER

    _, san_list = get_extensions(cert)
    
    subject_rdns = []
    issuer_rdns = []
    try:
        issuer_rdns = [rdn.rfc4514_string() for rdn in cert.issuer.rdns]
        subject_rdns = [rdn.rfc4514_string() for rdn in cert.subject.rdns]
    except ValueError:
        # the certificate contains bytes that cannot be interpreted. Probably invalid cert
        # https://github.com/pyca/cryptography/issues/6804
        pass

    not_valid_after = None
    try:
        if cert.not_valid_after > datetime.min:
            not_valid_after = cert.not_valid_after
    except ValueError:
        # ValueError: year 0 is out of range
        pass
    not_valid_before = None
    try:
        if cert.not_valid_before > datetime.min:
            not_valid_before = cert.not_valid_before
    except ValueError:
        # ValueError: year 0 is out of range
        pass

    public_key_size = None
    try:
        public_key_size = cert.public_key().key_size
    except AttributeError:
        #'cryptography.hazmat.bindings._rust.openssl.ed25519' object has no attribute 'key_size'
        pass

    cert_fp = cert.fingerprint(hashes.SHA256()).hex().upper()

    skid = ""
    try:
        skid = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_KEY_IDENTIFIER).value.digest.hex().upper()
    except:
        # x509.ExtensionNotFound
        # Parse error
        pass
    
    akid = ""
    try:
        akid = cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.AUTHORITY_KEY_IDENTIFIER).value.key_identifier.hex().upper()
    except:
        # x509.ExtensionNotFound
        # Parse error
        pass
    
    return (cert.signature_algorithm_oid._name,
            public_key_size,
            san_list,
            cert_fp,
            skid,
            akid,
            subject_rdns,
            issuer_rdns,
            cert.version.name,
            not_valid_after,
            not_valid_before,
           )


pem_decoded_schema = pst.StructType([pst.StructField("tls_signature_algorithm", pst.StringType(), True),
                                     pst.StructField("pubkey_bit_size", pst.IntegerType(), True),
                                     pst.StructField("leaf_data_names", pst.ArrayType(pst.StringType()), True),  # SAN
                                     pst.StructField("fingerprint", pst.StringType(), True),
                                     pst.StructField("subject_key_identifier", pst.StringType(), True),
                                     pst.StructField("authority_key_identifier", pst.StringType(), True),
                                     pst.StructField("subject_rdns", pst.ArrayType(pst.StringType()), True),
                                     pst.StructField("issuer_rdns", pst.ArrayType(pst.StringType()), True),
                                     pst.StructField("version", pst.StringType(), True),
                                     pst.StructField("not_valid_after", pst.TimestampType(), True),
                                     pst.StructField("not_valid_before", pst.TimestampType(), True),
                                    ])


decode_cert_udf = psf.udf(get_x509_fields, pem_decoded_schema)

In [67]:
dfs = []
for port, ts_list in PORT_SCANDATE_MAP.items():
    print(port)
    for ts in ts_list:
        print(ts)
        hosts_df = load_hosts_data(port, ts)

        hosts_df = hosts_df.withColumnRenamed("id", "host_id")
        hosts_df = hosts_df.withColumnRenamed("ip", "ipv4")
        hosts_df = hosts_df.withColumnRenamed("cert_id", "hosts_cert_id")
        hosts_df = hosts_df.select("host_id", "port", "ipv4", "protocol", "cipher", "hosts_cert_id", "peer_certificates")
        hosts_df = hosts_df.withColumn("peer_certs_len", peer_certs_len_udf(psf.col("peer_certificates"))).drop("peer_certificates")

        certval_df = convert_output_df(load_cert_validator(port, ts))
        certval_df = certval_df.withColumn("parsed_root_store_error", parse_error_udf(psf.col("root_store_error")))
        g_certval_df = certval_df.groupby("id") \
                                .agg(psf.collect_set("store_name").alias("store_name_list"),
                                    psf.collect_list("is_valid").alias("valid_list"),
                                    psf.collect_list("parsed_root_store_error").alias("root_store_error_list")
                                    )
        g_certval_df = g_certval_df.withColumnRenamed("id", "certval_id")

        certs_df = load_certs_data(port, ts)
        certs_df = certs_df.withColumnRenamed("id", "cert_id")
        certs_df = certs_df.withColumn("decoded_cert", decode_cert_udf(psf.col("cert"))).drop("cert", "system_cert_store").select("cert_id", "decoded_cert.*")

        ldap_df = None
        if port == 636:
            ldap_df = load_ldap_data(port, ts)
        else:  # port 389
            ldap_df = load_ldapstarttls_data(port, ts)

        ldap_df = ldap_df.withColumnRenamed("id", "ldap_id").select("ldap_id", "ldap_server")

        ldap_id_df = ldap_df.filter(psf.col("ldap_server") == 1).select("ldap_id")

        ldap_hosts_df = hosts_df.join(ldap_id_df, ldap_id_df.ldap_id == hosts_df.host_id, "inner").filter(psf.col("ldap_id").isNotNull()).drop("ldap_id")

        ldap_hosts_cert_df = ldap_hosts_df.join(certs_df, ldap_hosts_df.hosts_cert_id == certs_df.cert_id, "inner").drop("hosts_cert_id", "cert_id")

        ldap_hosts_cert_val_df = ldap_hosts_cert_df.join(g_certval_df, ldap_hosts_cert_df.host_id == g_certval_df.certval_id, "inner").drop("certval_id")


        ldap_hosts_cert_val_df = ldap_hosts_cert_val_df.withColumn("tls_version", tls_version_udf(psf.col("protocol"))).drop("protocol")
        ldap_hosts_cert_val_df = ldap_hosts_cert_val_df.withColumn("tls_cipher", cipher_to_description_udf(psf.col("cipher"))).drop("cipher")
        
        when = ts.strftime("%Y%m%d")
        ldap_hosts_cert_val_df = ldap_hosts_cert_val_df.withColumn("date", psf.lit(when))

        dfs.append(ldap_hosts_cert_val_df)
    print("------------------")

636
2024-04-16 00:00:00


AnalysisException: Path does not exist: s3a://catrin/data_processing/tool=cert-validator/format=parquet/port=636/year=2024/month=04/day=16

In [None]:
goscanner_df = dfs[0].unionByName(dfs[1])
for i in range(2, len(dfs)):
    goscanner_df = goscanner_df.unionByName(dfs[i])

output = "luvizottocesarg-tmp/2023-ldap-dependency-goscanner"
output = "luvizottocesarg-tmp/2024-04-ldap-dependency-goscanner"
goscanner_df.coalesce(1).write.parquet(f"s3a://{output}")

### Retrieve SAN from goscanner dataset

In [8]:
dfs = []
for port, ts_list in PORT_SCANDATE_MAP.items():
    print(port)
    for ts in ts_list:
        print(ts)
        hosts_df = load_hosts_data(port, ts)

        hosts_df = hosts_df.withColumnRenamed("id", "host_id")
        hosts_df = hosts_df.withColumnRenamed("cert_id", "hosts_cert_id")
        hosts_df = hosts_df.select("host_id", "hosts_cert_id")

        certs_df = load_certs_data(port, ts)
        certs_df = certs_df.withColumnRenamed("id", "cert_id")
        certs_df = certs_df.withColumn("decoded_cert", decode_cert_udf(psf.col("cert"))).drop("cert", "system_cert_store").select("cert_id", "decoded_cert.*")

        ldap_df = None
        if port == 636:
            ldap_df = load_ldap_data(port, ts)
        else:  # port 389
            ldap_df = load_ldapstarttls_data(port, ts)

        ldap_df = ldap_df.withColumnRenamed("id", "ldap_id").select("ldap_id", "ldap_server")

        ldap_id_df = ldap_df.filter(psf.col("ldap_server") == 1).select("ldap_id")

        ldap_hosts_df = hosts_df.join(ldap_id_df, ldap_id_df.ldap_id == hosts_df.host_id, "inner").filter(psf.col("ldap_id").isNotNull()).drop("ldap_id")

        ldap_hosts_cert_df = ldap_hosts_df.join(certs_df, ldap_hosts_df.hosts_cert_id == certs_df.cert_id, "inner").drop("hosts_cert_id", "cert_id")
        
        when = ts.strftime("%Y%m%d")
        ldap_hosts_cert_df = ldap_hosts_cert_df.withColumn("date", psf.lit(when))

        dfs.append(ldap_hosts_cert_df)
    print("------------------")

636
2024-02-20 00:00:00
2024-02-27 00:00:00
2024-03-05 00:00:00
2024-03-12 00:00:00
2024-03-19 00:00:00
------------------
389
2024-02-21 00:00:00
2024-02-28 00:00:00
2024-03-06 00:00:00
2024-03-13 00:00:00
2024-03-20 00:00:00
------------------


In [9]:
goscanner_df = dfs[0].unionByName(dfs[1])
for i in range(2, len(dfs)):
    goscanner_df = goscanner_df.unionByName(dfs[i])

output = "luvizottocesarg-tmp/2024-goscanner-ldap-san"
goscanner_df.coalesce(1).write.parquet(f"s3a://{output}")

### Cert validator

In [56]:
dfs = []
for port, timestamps in PORT_SCANDATE_MAP.items():
    print(port)
    for ts in timestamps:
        print(ts)
        _hosts_df = load_hosts_data(port, ts)

        _hosts_df = _hosts_df.withColumnRenamed("id", "host_id")
        _hosts_df = _hosts_df.withColumnRenamed("ip", "ipv4")
        _hosts_df = _hosts_df.withColumnRenamed("cert_id", "hosts_cert_id")  # to use skid and akid
        _hosts_df = _hosts_df.select("host_id", "ipv4", "port", "peer_certificates", "hosts_cert_id")
        _hosts_df = _hosts_df.withColumn("peer_certs_len", peer_certs_len_udf(psf.col("peer_certificates"))).drop("peer_certificates")

        _certs_df = load_certs_data(port, ts)
        _certs_df = _certs_df.withColumnRenamed("id", "cert_id")
        _certs_df = _certs_df.withColumn("decoded_cert", decode_cert_udf(psf.col("cert"))).drop("cert", "system_cert_store").select("cert_id", "decoded_cert.*")
        
        _cert_val_df = load_cert_validator(port, ts)
        _cert_val_df = convert_output_df(_cert_val_df)

        _cert_val_df = _cert_val_df.groupby("id") \
                                     .agg(psf.collect_set("store_name").alias("store_name_list"),
                                          psf.collect_list("is_valid").alias("valid_list"),
                                          psf.collect_list("root_store_error").alias("root_store_error_list")
                                         )

        _cert_val_df = _cert_val_df.withColumn("date", psf.lit(ts).cast(pst.TimestampType()))

        _cert_val_hosts_df = _cert_val_df.join(_hosts_df, _cert_val_df.id == _hosts_df.host_id, "inner").drop("host_id")
        joined_df = _cert_val_hosts_df.join(_certs_df, _cert_val_hosts_df.hosts_cert_id == _certs_df.cert_id, "inner").drop("hosts_cert_id", "cert_id")
        joined_df = joined_df.withColumn("validation_error", error_str_udf(psf.col("root_store_error_list"),
                                                                           psf.col("subject_key_identifier"),
                                                                           psf.col("authority_key_identifier"),
                                                                           psf.col("peer_certs_len")
                                                                          ))
        #joined_df = _cert_val_hosts_df
        #joined_df = joined_df.withColumn("validation_error", error_str2_udf(psf.col("root_store_error_list"),
        #                                                                   psf.col("peer_certs_len")
        #                                                                  ))
        dfs.append(joined_df)


cert_val_df = dfs[0].unionByName(dfs[1])
for i in range(2, len(dfs)):
    cert_val_df = cert_val_df.unionByName(dfs[i])

636
2023-11-07 00:00:00
2023-11-14 00:00:00
2023-11-21 00:00:00
2023-11-28 00:00:00
389
2023-11-01 00:00:00
2023-11-08 00:00:00
2023-11-15 00:00:00
2023-11-22 00:00:00
2023-11-29 00:00:00


In [45]:
cert_val_df.persist()
cert_val_df.groupBy("validation_error").count().show(truncate=False)

+-----------------------------------------+------+
|validation_error                         |count |
+-----------------------------------------+------+
|Self-signed                              |77748 |
|Signed by unknown authority              |378235|
|Valid chain                              |486062|
|Unhandled critical extension             |63    |
|Expired/Not yet valid                    |95705 |
|Not authorized to sign other certificates|177   |
|Invalid signature                        |319   |
|Too many intermediate certificates       |4     |
+-----------------------------------------+------+



In [57]:
output = "luvizottocesarg-tmp/2023-Nov-cert-validator-processing"
cert_val_df.coalesce(1).write.parquet(f"s3a://{output}")
cert_val_df.unpersist()

DataFrame[id: int, store_name_list: array<string>, valid_list: array<boolean>, root_store_error_list: array<string>, date: timestamp, ipv4: string, port: int, peer_certs_len: int, tls_signature_algorithm: string, pubkey_bit_size: int, leaf_data_names: array<string>, fingerprint: string, subject_key_identifier: string, authority_key_identifier: string, subject_rdns: array<string>, issuer_rdns: array<string>, version: string, not_valid_after: timestamp, not_valid_before: timestamp, validation_error: string]

### Extracting hosts and ldap

without preprocess... Let the user do this

In [107]:
dfs = []
for port, timestamps in PORT_SCANDATE_MAP.items():
    print(port)
    for ts in timestamps:
        print(ts)
        _hosts_df = load_hosts_data(port, ts)

        _hosts_df = _hosts_df.withColumnRenamed("id", "host_id")
        _hosts_df = _hosts_df.withColumnRenamed("ip", "ipv4")
        _hosts_df = _hosts_df.select("host_id", "ipv4", "port")

        _ldap_df = None
        if port == 636:
            _ldap_df = load_ldap_data(port, ts)
        else:  # port 389
            _ldap_df = load_ldapstarttls_data(port, ts)

        _ldap_df = _ldap_df.withColumnRenamed("id", "ldap_id").select("ldap_id", "ldap_server")
        _ldap_id_df = _ldap_df.filter(psf.col("ldap_server") == 1).select("ldap_id")

        joined_df = _hosts_df.join(_ldap_id_df, _ldap_id_df.ldap_id == _hosts_df.host_id, "inner").filter(psf.col("ldap_id").isNotNull()).drop("ldap_id")

        joined_df = joined_df.withColumn("date", psf.lit(ts).cast(pst.TimestampType()))
        dfs.append(joined_df)


all_df = dfs[0].unionByName(dfs[1])
for i in range(2, len(dfs)):
    all_df = all_df.unionByName(dfs[i])

636
2023-11-07 00:00:00
2023-11-14 00:00:00
2023-11-21 00:00:00
2023-11-28 00:00:00
389
2023-11-08 00:00:00
2023-11-15 00:00:00
2023-11-22 00:00:00
2023-11-29 00:00:00


In [108]:
output = f"luvizottocesarg-tmp/{ts.year}-{ts.month:02d}-hosts_ldap-nopreprocess"
all_df.coalesce(1).write.parquet(f"s3a://{output}")

### Extracting hosts, certs, cert_val, tls_verbose

In [106]:
dfs = []
for port, timestamps in PORT_SCANDATE_MAP.items():
    print(port)
    for ts in timestamps:
        print(ts)
        _hosts_df = load_hosts_data(port, ts)

        _hosts_df = _hosts_df.withColumnRenamed("id", "host_id")
        _hosts_df = _hosts_df.withColumnRenamed("ip", "ipv4")
        _hosts_df = _hosts_df.withColumnRenamed("cert_id", "hosts_cert_id")
        _hosts_df = _hosts_df.select("host_id", "ipv4", "port", "protocol", "cipher", "peer_certificates", "hosts_cert_id")

        _certs_df = load_certs_data(port, ts)
        _certs_df = _certs_df.withColumnRenamed("id", "cert_id")
        _certs_df = _certs_df.select("cert_id", "cert")
        
        _tls_df = load_tls_data(port, ts)
        _tls_df = _tls_df.withColumnRenamed("id", "tls_id")
        _tls_df = _tls_df.withColumnRenamed("fingerprint", "tls_fingerprint")
        _tls_df = _tls_df.select("tls_id", "tls_fingerprint")

        _cert_val_df = load_cert_validator(port, ts)
        _cert_val_df = convert_output_df(_cert_val_df)

        _cert_val_df = _cert_val_df.groupby(
            "id"
        ).agg(
            psf.collect_set("store_name").alias("store_name_list"),
            psf.collect_list("is_valid").alias("valid_list"),
            psf.collect_list("root_store_error").alias("root_store_error_list")
        )
        
        _ldap_df = None
        if port == 636:
            _ldap_df = load_ldap_data(port, ts)
        else:  # port 389
            _ldap_df = load_ldapstarttls_data(port, ts)

        _ldap_df = _ldap_df.withColumnRenamed("id", "ldap_id").select("ldap_id", "ldap_server")
        _ldap_id_df = _ldap_df.filter(psf.col("ldap_server") == 1).select("ldap_id")
        print("ldap", _ldap_id_df.count())
        _ldap_hosts_df = _hosts_df.join(_ldap_id_df, _ldap_id_df.ldap_id == _hosts_df.host_id, "inner").filter(psf.col("ldap_id").isNotNull()).drop("ldap_id")
        print("ldap join host", _ldap_hosts_df.count())
        _cert_val_ldap_hosts_df = _ldap_hosts_df.join(_cert_val_df, _ldap_hosts_df.host_id == _cert_val_df.id, "inner").drop("host_id")
        print("ldap join host join certval", _cert_val_ldap_hosts_df.count())
        _cert_val_ldap_hosts_certs_df = _cert_val_ldap_hosts_df.join(_certs_df, _cert_val_ldap_hosts_df.hosts_cert_id == _certs_df.cert_id, "inner").drop("hosts_cert_id", "cert_id")
        
        joined_df = _cert_val_ldap_hosts_certs_df.join(_tls_df, _tls_df.tls_id == _cert_val_ldap_hosts_certs_df.id, "inner").drop("tls_id")
        
        joined_df = joined_df.withColumn("date", psf.lit(ts).cast(pst.TimestampType()))
        dfs.append(joined_df)


all_df = dfs[0].unionByName(dfs[1])
for i in range(2, len(dfs)):
    all_df = all_df.unionByName(dfs[i])

636
2023-11-07 00:00:00
ldap 36965
ldap join host 36965
ldap join host join certval 36905
2023-11-14 00:00:00
ldap 37012
ldap join host 37012
ldap join host join certval 36950
2023-11-21 00:00:00
ldap 37081
ldap join host 37081
ldap join host join certval 36996
2023-11-28 00:00:00
ldap 36973
ldap join host 36973
ldap join host join certval 36889
389
2023-11-08 00:00:00
ldap 78472
ldap join host 78472
ldap join host join certval 38139
2023-11-15 00:00:00
ldap 78005
ldap join host 78005
ldap join host join certval 38043
2023-11-22 00:00:00
ldap 78203
ldap join host 78203
ldap join host join certval 38029
2023-11-29 00:00:00
ldap 78243
ldap join host 78243
ldap join host join certval 37983


In [88]:
output = f"luvizottocesarg-tmp/{ts.year}-{ts.month:02d}-hosts_certs_certval_tls-nopreprocess"
all_df.coalesce(1).write.parquet(f"s3a://{output}")

### Extract peer certificates from LDAP servers

In [104]:
dfs = []
for port, timestamps in PORT_SCANDATE_MAP.items():
    print(port)
    for ts in timestamps:
        print(ts)
        _hosts_df = load_hosts_data(port, ts)

        _hosts_df = _hosts_df.withColumnRenamed("id", "host_id")
        _hosts_df = _hosts_df.withColumnRenamed("ip", "ipv4")
        _hosts_df = _hosts_df.select("host_id", "ipv4", "port", "peer_certificates")

        _certs_df = load_certs_data(port, ts)
        _certs_df = _certs_df.withColumnRenamed("id", "cert_id")
        _certs_df = _certs_df.select("cert_id", "cert")

        _cert_val_df = load_cert_validator(port, ts)
        _cert_val_df = convert_output_df(_cert_val_df)

        _cert_val_df = _cert_val_df.groupby(
            "id"
        ).agg(
            psf.collect_set("store_name").alias("store_name_list"),
            psf.collect_list("is_valid").alias("valid_list"),
            psf.collect_list("root_store_error").alias("root_store_error_list")
        )

        _ldap_df = None
        if port == 636:
            _ldap_df = load_ldap_data(port, ts)
        else:  # port 389
            _ldap_df = load_ldapstarttls_data(port, ts)

        _ldap_df = _ldap_df.withColumnRenamed("id", "ldap_id").select("ldap_id", "ldap_server")
        _ldap_id_df = _ldap_df.filter(psf.col("ldap_server") == 1).select("ldap_id")

        _hosts_df = _hosts_df.withColumn("a", eval_udf(psf.col("peer_certificates"))).drop("peer_certificates").withColumnRenamed("a", "peer_certificates")
        _hosts_df = _hosts_df.select("host_id", "ipv4", "port", psf.explode_outer(_hosts_df.peer_certificates).alias("peer_certificate"))

        _ldap_hosts_df = _hosts_df.join(_ldap_id_df, _ldap_id_df.ldap_id == _hosts_df.host_id, "inner").filter(psf.col("ldap_id").isNotNull()).drop("ldap_id")

        _cert_val_ldap_hosts_df = _ldap_hosts_df.join(_cert_val_df, _ldap_hosts_df.host_id == _cert_val_df.id, "inner").drop("host_id")

        joined_df = _cert_val_ldap_hosts_df.join(_certs_df, _cert_val_ldap_hosts_df.peer_certificate == _certs_df.cert_id, "inner").drop("cert_id")

        joined_df = joined_df.withColumn("date", psf.lit(ts).cast(pst.TimestampType()))

        joined_df = joined_df.groupBy("id", "ipv4", "port", "store_name_list", "valid_list", "root_store_error_list", "date"
                                     ).agg(psf.collect_list("peer_certificate").alias("peer_certificates"),
                                           psf.collect_list("cert").alias("certificates")
                                          )
        dfs.append(joined_df)

all_df = dfs[0].unionByName(dfs[1])
for i in range(2, len(dfs)):
    all_df = all_df.unionByName(dfs[i])

636
2023-11-07 00:00:00
2023-11-14 00:00:00
2023-11-21 00:00:00
2023-11-28 00:00:00
389
2023-11-08 00:00:00
2023-11-15 00:00:00
2023-11-22 00:00:00
2023-11-29 00:00:00


In [105]:
output = f"luvizottocesarg-tmp/{ts.year}-{ts.month:02d}-hosts_peercerts_certval-nopreprocess"
all_df.coalesce(1).write.parquet(f"s3a://{output}")

### Extracting crawl and schema with TLS and X509 data

In [None]:
dfs = []
for port, timestamps in PORT_SCANDATE_MAP.items():
    print(port)
    for ts in timestamps:
        print(ts)
        