In [1]:
import os
from collections import defaultdict
from collections import Counter

In [2]:
with open("testlist.tsv", 'r') as fp:
    test_hosts = fp.readlines()
len(test_hosts)

1370

# Detection heuristic

In [3]:
def analyse_log(isp, correct_sni, host_ip):

    ### safe SNI logs analysis ###
    safe_sni_rst_hop = None
    safe_sni_fin_hop = None
    safe_sni_tls_record_hop = None

    with open("./%s_logs/%s_%s_safe" % (isp, host_ip, correct_sni), 'r') as fp:
        safe_sni_logs = fp.read().split("\n\n\n")

    for it, line in enumerate(safe_sni_logs):
        if ("TLS Server Hello found" in line or "TLS Alert found" in line) and safe_sni_tls_record_hop is None:
            safe_sni_tls_record_hop = it + 1
        elif "RST recevied at" in line and safe_sni_rst_hop is None:
            safe_sni_rst_hop = it + 1
        elif "FIN recevied at" in line and safe_sni_fin_hop is None:
            safe_sni_fin_hop = it + 1


    if not safe_sni_tls_record_hop:
        return "No TLS ServerHello/Alert via safe SNI test"

    ### Correct SNI logs analysis ###
    correct_sni_rst_hop = None
    correct_sni_fin_hop = None
    correct_sni_tls_record_hop = None

    with open("./%s_logs/%s_%s_correct" % (isp, host_ip, correct_sni), 'r') as fp:
        correct_sni_logs = fp.read().split("\n\n\n")
        
    for it, line in enumerate(correct_sni_logs):
        if ("TLS Server Hello found" in line or "TLS Alert found" in line) and correct_sni_tls_record_hop is None:
            correct_sni_tls_record_hop = it + 1
        elif "RST recevied at" in line and correct_sni_rst_hop is None:
            correct_sni_rst_hop = it + 1
        elif "FIN recevied at" in line and correct_sni_fin_hop is None:
            correct_sni_fin_hop = it + 1
            

    if not (correct_sni_rst_hop or correct_sni_fin_hop) and not correct_sni_tls_record_hop:
        return "Not enough hops for correct SNI test"

    if correct_sni_tls_record_hop:
        return "SNI inspection based censorship not present"
    elif safe_sni_tls_record_hop and correct_sni_rst_hop:
        if correct_sni_rst_hop < safe_sni_tls_record_hop:
            return "SNI inspection based censorship present (RST) [< last hop]"
        elif correct_sni_rst_hop == safe_sni_tls_record_hop:
            return "SNI inspection based censorship present (RST) [last hop]"
    elif safe_sni_tls_record_hop and correct_sni_fin_hop:
        if correct_sni_fin_hop < safe_sni_tls_record_hop:
            return "SNI inspection based censorship present (FIN) [< last hop]"
        elif correct_sni_fin_hop == safe_sni_tls_record_hop:
            return "SNI inspection based censorship present (FIN) [last hop]"
    else:
        return "Unkown"

    print (host_ip, correct_sni)
    print ("returing None")

# Analysing airtel logs

In [4]:
isp = "airtel"
results = defaultdict(lambda : 0)
fp = open("%s_processed.log" % (isp), 'w')

with open("./airtel_logs/fail.log", 'r') as fp:
    failures = set(list(map(lambda x : x.split()[0], fp.readlines())))

for line in test_hosts:
    correct_sni, host_ip = line.strip("\n").split()

    if correct_sni in failures:
        results["TestRunFailure"] += 1
        continue

    try:
        results[analyse_log(isp, correct_sni, host_ip)] += 1

    except Exception as e:
        results[e] += 1

fp.close()
for result, value in sorted(results.items(), key = lambda x : -1 * x[1]):
    print (result, ":", value)

SNI inspection based censorship present (RST) [< last hop] : 1058
SNI inspection based censorship not present : 290
TestRunFailure : 15
No TLS ServerHello/Alert via safe SNI test : 4
SNI inspection based censorship present (RST) [last hop] : 3
