# Processing and Analyzing Certificate Transparency Data

## Introduction

The purpose of this project is to process and analyze a sample of Certificate Transparency (CT) data. The sample is only a small portion of the entire dataset. Additionally, the sample is expected to be larger-than-RAM in terms of processing. This means that common methodologies using libraries such as pandas or NumPy will not suffice at scale.

The first part of this project will aim to process the sample CT data and identify records that have duplicate fingerprints in the leaf certificate. 

The second part of this project will aim to identifty some phishing domains using the CT data. From this, we will be able to identify the top issuers for phishy domains as well as the most frequent Top Level Domains (TLD).

*Note: The DataFrame.show() lines aid in documenting the development process. Commenting these lines with a hash mark will speed up run time.*

## Summary of Analysis Results

After analyzing the data, we determined that the top issuer of certificates for phishing domains using the embedded keywords "google" or "paypal" is letsencrypt.org.

Furthermore, ".com" is the top TLD for the phishing domains identified followed by ".ms". The ".ms" TLD is associated with the British overseas territory, Montserrat. Similarly, there are multiple foreign TLDs associated with phishing domains in this sample.

## Part 1: Identify Duplicates in Larger-than-RAM Dataset

We will first read in the larger-than-RAM sample of the CT dataset. To do this, we will use the Python Application Programming Interface (API), PySpark. PySpark allows us to interact with Apache Spark, which is an analytics engine optimized for working with large sets of data via streaming and batching. 

Let's start by importing PySpark and SQLContext from PySpark SQL.

In [1]:
import pyspark
from pyspark.sql import SQLContext

In [2]:
#Instantiate SQLContext object and pass it as 'sc'
sc = pyspark.SparkContext()
sqlCtx = SQLContext(sc)

Now, we will use the SQLContext method read.json() to read the JSON dataset into a Spark DataFrame object.

In [3]:
df = sqlCtx.read.json('ctl_records_sample.jsonlines')

In [4]:
#Print type to confirm that we have successfully read it in as a Spark DataFrame
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


We can see that we have successfully read in the json data as a Spark DataFrame. Next, we will print the Schema that Spark inferred. This will allow us to understand the JSON object schema in depth. 

In [5]:
df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- cert_index: long (nullable = true)
 |    |-- cert_link: string (nullable = true)
 |    |-- chain: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- as_der: string (nullable = true)
 |    |    |    |-- extensions: struct (nullable = true)
 |    |    |    |    |-- authorityInfoAccess: string (nullable = true)
 |    |    |    |    |-- authorityKeyIdentifier: string (nullable = true)
 |    |    |    |    |-- basicConstraints: string (nullable = true)
 |    |    |    |    |-- certificatePolicies: string (nullable = true)
 |    |    |    |    |-- crlDistributionPoints: string (nullable = true)
 |    |    |    |    |-- extendedKeyUsage: string (nullable = true)
 |    |    |    |    |-- extra: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- keyUsage: string (nullable = true)
 |    |    |    |    |-- subjectAltName: string (nullab

Let's print five rows to see what the data looks like.

In [6]:
df.show(5)

+--------------------+------------------+
|                data|      message_type|
+--------------------+------------------+
|{693605056, http:...|certificate_update|
|{183056883, http:...|certificate_update|
|{425694054, http:...|certificate_update|
|{426436820, http:...|certificate_update|
|{781260929, http:...|certificate_update|
+--------------------+------------------+
only showing top 5 rows



Everything under the "data" structure is listed in one column while the "message_type" string is in a separate column. This is important to note because we will have to combine these two columns into one JSON object again. Before that, let's create a new column by selecting the fingerprint information from the leaf certificate.

In [7]:
df2 = df.select('data','data.leaf_cert.fingerprint','message_type')

In [8]:
#Display one record 
df2.show(1)

+--------------------+--------------------+------------------+
|                data|         fingerprint|      message_type|
+--------------------+--------------------+------------------+
|{693605056, http:...|E3:74:8C:75:ED:6D...|certificate_update|
+--------------------+--------------------+------------------+
only showing top 1 row



We have generated a new "fingerprint" column with the fingerprint information from the leaf certificate. We will now identify the duplicate digital certificates based on their leaf certificate fingerprint. To do this, we will first import Window and Functions from pyspark.sql. PySpark Window allows us to calculate the count over a range of input rows with the same value. 

In [9]:
from pyspark.sql import Window
from pyspark.sql import functions as F

In [10]:
#create a window partition by the fingerprint column
fingerprint_partition = Window.partitionBy('fingerprint')
#create a new column "counts" with the count of the fingerprint for that row across the entire dataframe
repeatcounts = df2.select('*', F.count('*').over(fingerprint_partition).alias('counts'))
#Display five records
repeatcounts.show(5)
                          

+--------------------+--------------------+------------------+------+
|                data|         fingerprint|      message_type|counts|
+--------------------+--------------------+------------------+------+
|{695677893, http:...|00:01:AD:C6:69:15...|certificate_update|     1|
|{713348251, http:...|00:02:FF:63:DD:AA...|certificate_update|     1|
|{694982703, http:...|00:07:3D:7E:FE:56...|certificate_update|     1|
|{686067012, http:...|00:08:50:2A:49:B2...|certificate_update|     2|
|{425760954, http:...|00:08:50:2A:49:B2...|certificate_update|     2|
+--------------------+--------------------+------------------+------+
only showing top 5 rows



We have successfully created a "counts" column showing the number of times that the fingerprint value for that row appears across the entire dataset. We only want to keep the rows with a "counts" value greater than one as this indicates the duplicates.

In [11]:
#Create new dataframe where the counts value is greater than one
duplicates = repeatcounts.where(F.col('counts') > 1)
#Display one record
duplicates.show(1)

+--------------------+--------------------+------------------+------+
|                data|         fingerprint|      message_type|counts|
+--------------------+--------------------+------------------+------+
|{686067012, http:...|00:08:50:2A:49:B2...|certificate_update|     2|
+--------------------+--------------------+------------------+------+
only showing top 1 row



In [12]:
duplicates.show(5)

+--------------------+--------------------+------------------+------+
|                data|         fingerprint|      message_type|counts|
+--------------------+--------------------+------------------+------+
|{686067012, http:...|00:08:50:2A:49:B2...|certificate_update|     2|
|{425760954, http:...|00:08:50:2A:49:B2...|certificate_update|     2|
|{695275282, http:...|00:13:75:0F:BC:58...|certificate_update|     2|
|{427019740, http:...|00:13:75:0F:BC:58...|certificate_update|     2|
|{694084231, http:...|00:14:11:BE:87:A6...|certificate_update|     2|
+--------------------+--------------------+------------------+------+
only showing top 5 rows



Our new duplicates dataframe only contains rows where the fingerprint value occurs more than once in the dataframe. Let's create a new dataframe with only the "fingerprint", "data", and "message_type" columns.

In [13]:
#Drop "counts" column
duplicates2 = duplicates.select('fingerprint','data','message_type')
#Display five records
duplicates2.show(5)

+--------------------+--------------------+------------------+
|         fingerprint|                data|      message_type|
+--------------------+--------------------+------------------+
|00:08:50:2A:49:B2...|{686067012, http:...|certificate_update|
|00:08:50:2A:49:B2...|{425760954, http:...|certificate_update|
|00:13:75:0F:BC:58...|{695275282, http:...|certificate_update|
|00:13:75:0F:BC:58...|{427019740, http:...|certificate_update|
|00:14:11:BE:87:A6...|{694084231, http:...|certificate_update|
+--------------------+--------------------+------------------+
only showing top 5 rows



The final output needs to contain the fingerprint value followed by a list of the certificates. Each item of the list should be the entire JSON object. Below is an example of an output row:

{"fingerprint":"0C:E4:AF:24:F1:AE:B1:09:B0:42:67:CB:F8:FC:B6:AF:1C:07:D6:5B", "certificates":[A, B, C]}

To achieve this, we will need to group by fingerprint and aggregate the certificates. As identified earlier, we will first need to combine the data and message_type columns into one JSON object again. Let's start by importing pyspark.sql.types.

In [14]:
from pyspark.sql.types import *

We will now create a new column "certificates" with the columns data and message_type converted to a JSON object.

In [15]:
duplicates2 = duplicates2.withColumn('certificates', F.to_json(F.struct(F.col('data'), F.col('message_type'))))

In [16]:
duplicates2.show(1)

+--------------------+--------------------+------------------+--------------------+
|         fingerprint|                data|      message_type|        certificates|
+--------------------+--------------------+------------------+--------------------+
|00:08:50:2A:49:B2...|{686067012, http:...|certificate_update|{"data":{"cert_in...|
+--------------------+--------------------+------------------+--------------------+
only showing top 1 row



Now that we have the information we want in the "fingerprint" and "certificates" columns, we can drop the original "data" and "message_type" columns.

In [17]:
duplicates2 = duplicates2.select('fingerprint', 'certificates')

In [18]:
duplicates2.show(1)

+--------------------+--------------------+
|         fingerprint|        certificates|
+--------------------+--------------------+
|00:08:50:2A:49:B2...|{"data":{"cert_in...|
+--------------------+--------------------+
only showing top 1 row



Next, we will group the dataframe records by "fingerprint" and aggregate the "certificates" into a list. 

In [19]:
#GroupBy "fingerprint" and aggregate "certificates"
grouped_df = duplicates2.groupBy("fingerprint").agg(F.collect_list("certificates"))

In [20]:
#Display one record with truncate set to False so we can view the entire output
grouped_df.show(1, False)

+-----------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

We have successfully aggregated the certificates into a list associated with the same fingerprint. The final step of part 1 is writing the output back to a JSON with gzip compression. 

In [21]:
#Write grouped_df to gzip JSON
#Line is commented to prevent further writing of output

#grouped_df.coalesce(1).write.option("compression","gzip").json('RD_outputs.json')

## Part 2: Bonus Analytical Exercise - Identify Phishing Domains

In the second part of the project, we aim to identify phishing domains using the CT data and look for insights on the results. Primarily, we want to answer the following questions:

1) What are the top TLDs for phishing domains?

2) What are the top issuers of certificates for phishing domains?

To accomplish this, we will use select keywords for frequently exploited services and search for different patterns.

In [22]:
#Create a new dataframe "domain" that is identical to the first dataframe after we imported the JSONlines
domain = df

The domains are contained under the leaf certificate. Additionally, information on the certificate authority is also contained under the leaf certificate structure. Let's edit our dataframe to only contain these columns.

In [23]:
domain = df.select('data.leaf_cert.extensions.authorityInfoAccess','data.leaf_cert.all_domains')

In [24]:
domain.show(10)

+--------------------+--------------------+
| authorityInfoAccess|         all_domains|
+--------------------+--------------------+
|CA Issuers - URI:...|[autodiscover.chi...|
|CA Issuers - URI:...|[develsystem.ru, ...|
|CA Issuers - URI:...|[content.insta.ki...|
|CA Issuers - URI:...|[him.workylla.com...|
|OCSP - URI:http:/...|[cpanel.thetroves...|
|CA Issuers - URI:...|[*.5e764d1fd1ea49...|
|CA Issuers - URI:...|[shop.catholicama...|
|CA Issuers - URI:...|[airguninsight.co...|
|CA Issuers - URI:...|[www.ojsrvvaairou...|
|CA Issuers - URI:...|[autodiscover.the...|
+--------------------+--------------------+
only showing top 10 rows



In [25]:
#Check data types
domain.dtypes

[('authorityInfoAccess', 'string'), ('all_domains', 'array<string>')]

To extract the information we want, we will use a regular expression (regexp) on the target column. In this case, the target column is "all_domains". We will cast "all_domains" to a string type column so we can implement the regexp.

In [26]:
domain = domain.withColumn('domains', F.col('all_domains').cast('string'))

In [27]:
domain.dtypes

[('authorityInfoAccess', 'string'),
 ('all_domains', 'array<string>'),
 ('domains', 'string')]

We can see that we have successfully created a new column "domains" that is a string type. In this exercise, we will identify domains containing strings with the keyword "google" or "paypal". The regexp will search for these keywords that are embedded within a string of other characters. If there is a match, the function will return the top level domain (TLD) to the column "TLD".

In [28]:
#Create new column "TLD" with information extracted via REGEXP
embedded_domains = domain.withColumn('TLD',(F.regexp_extract(F.col('domains'), r'\b(?:\S+)(google|paypal)(?:\S*)([.].\w+)\b',2)))

In [29]:
#Show 5 examples
embedded_domains.show(5)

+--------------------+--------------------+--------------------+---+
| authorityInfoAccess|         all_domains|             domains|TLD|
+--------------------+--------------------+--------------------+---+
|CA Issuers - URI:...|[autodiscover.chi...|[autodiscover.chi...|   |
|CA Issuers - URI:...|[develsystem.ru, ...|[develsystem.ru, ...|   |
|CA Issuers - URI:...|[content.insta.ki...|[content.insta.ki...|   |
|CA Issuers - URI:...|[him.workylla.com...|[him.workylla.com...|   |
|OCSP - URI:http:/...|[cpanel.thetroves...|[cpanel.thetroves...|   |
+--------------------+--------------------+--------------------+---+
only showing top 5 rows



We can see that we successfully created the TLD column. However, there are blank 'TLD' values for rows where the regexp did not match the pattern. Let's make sure we successfully extracted values for other rows by sorting the TLD column by descending.

In [30]:
embedded_domains.sort(F.col('TLD').desc()).show(3)

+--------------------+--------------------+--------------------+---+
| authorityInfoAccess|         all_domains|             domains|TLD|
+--------------------+--------------------+--------------------+---+
|CA Issuers - URI:...|[bid.googlezone.c...|[bid.googlezone.c...|.za|
|CA Issuers - URI:...|[paypal.sikone-en...|[paypal.sikone-en...|.za|
|OCSP - URI:http:/...|[googledocs.bhayi...|[googledocs.bhayi...|.za|
+--------------------+--------------------+--------------------+---+
only showing top 3 rows



As we can see, we successfully extracted the TLD capture group for other rows. We will need to replace blank values with a null type so we can exclude those rows from our count.

In [31]:
#Replace blank values with Null
embedded_domains = embedded_domains.withColumn('TLD', F.when(F.col('TLD') == '', None).otherwise(F.col('TLD')))

In [32]:
#Filter dataframe to remove rows with null values for TLD
embedded_domains = embedded_domains.filter(F.col('TLD').isNotNull())

Now that we have successfully removed the rows with null values for TLD, we can count the number of unique TLDs and sort by descending.

In [33]:
embedded_domains.groupBy('TLD').count().sort(F.col('count').desc()).show()

+-----+-----+
|  TLD|count|
+-----+-----+
| .com|  350|
|  .ms|  156|
|.info|   58|
| .net|   33|
|.blog|   27|
|  .ru|   20|
| .org|   20|
|  .br|   19|
|  .uk|   14|
|  .io|   13|
|  .ga|   12|
|  .de|   10|
|  .il|    9|
|  .mx|    8|
|  .ml|    7|
|  .za|    7|
|  .co|    6|
|.best|    5|
|  .ir|    5|
|  .au|    4|
+-----+-----+
only showing top 20 rows



The top five TLDs for phishing domains using either of the keywords embedded in the string are ".com", ".ms", ".info", ".net", and ".blog", sequentially. Additionally, there are multiple foreign TLDs associated with phishing domains. 

Lastly, we will check for the top five issuers of certificates for phishing domains in this sample.

In [34]:
embedded_domains.groupBy('authorityInfoAccess').count().sort(F.col('count').desc()).show(5,False)

+-------------------------------------------------------------------------------------------------------------------------+-----+
|authorityInfoAccess                                                                                                      |count|
+-------------------------------------------------------------------------------------------------------------------------+-----+
|CA Issuers - URI:http://cert.int-x3.letsencrypt.org/
OCSP - URI:http://ocsp.int-x3.letsencrypt.org
                      |557  |
|OCSP - URI:http://ocsp.comodoca.com
CA Issuers - URI:http://crt.comodoca.com/cPanelIncCertificationAuthority.crt
        |141  |
|OCSP - URI:http://ocsp.msocsp.com
CA Issuers - URI:http://www.microsoft.com/pki/mscorp/Microsoft%20IT%20TLS%20CA%201.crt
|45   |
|OCSP - URI:http://ocsp.msocsp.com
CA Issuers - URI:http://www.microsoft.com/pki/mscorp/Microsoft%20IT%20TLS%20CA%202.crt
|43   |
|OCSP - URI:http://ocsp.msocsp.com
CA Issuers - URI:http://www.microsoft.com/pki/mscorp/Mi

In this sample, the top issuer of certificates for phishing domains using any of the embedded keywords is letsencrypt.org, with a count of 557. This issuer is followed by comodoca.com and Microsoft.

## Conclusion

In this project, we successfully processed a large sample of CT data and identified duplicate certificates based on the leaf certificate fingerprint. Additionally, we identified some phishing domains that were using the keywords of frequently exploited sites (google and paypal) embedded in their domains. From this, we derived the top TLDs and certificate issuers associated with the phishing domains in this sample.