In [1]:
from pyspark.sql import SparkSession

In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName("Wizdome-task1").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/19 20:08:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
cves_df = spark.read.json("cves.json")

                                                                                

In [4]:
cves_df.show(5)

+-------------+---------------+--------------------+-----------+------------+-----------+--------------------+-----+--------------------+--------------------+-------------+------+
|          cve|            cwe|         description|is_rejected|modifiedDate|publishDate|          references|score|      score_metadata|        score_vector|score_version|status|
+-------------+---------------+--------------------+-----------+------------+-----------+--------------------+-----+--------------------+--------------------+-------------+------+
|CVE-1999-0095|[NVD-CWE-Other]|The debug command...|      false|  11-06-2019| 01-10-1988|[http://seclists....| 10.0|{false, HIGH, 10....|{LOW, NETWORK, NO...|          2.0|      |
|CVE-1999-0082|[NVD-CWE-Other]|CWD ~root command...|      false|  09-09-2008| 11-11-1988|[http://www.alw.n...| 10.0|{false, HIGH, 10....|{LOW, NETWORK, NO...|          2.0|      |
|CVE-1999-1471|[NVD-CWE-Other]|Buffer overflow i...|      false|  05-09-2008| 01-01-1989|[http://www

In [5]:
cpes_df = spark.read.json("cpes.json")

                                                                                

In [6]:
cpes_df.show(5)

+-------+--------------------+--------------------+-------+--------------------+-------------+----+------------+-----+---------------+---------------+----------+----------+---------+---------+--------------------+--------------+------+
|corrupt|                 cpe|              cpe_id|edition|          format_cpe|is_deprecated|lang|modifiedDate|other|   product_name|product_version|references|sw_edition|target_hw|target_sw|               title|update_version|vendor|
+-------+--------------------+--------------------+-------+--------------------+-------------+----+------------+-----+---------------+---------------+----------+----------+---------+---------+--------------------+--------------+------+
|  false|cpe:2.3:a:3com:3c...|BAE41D20-D4AF-4AF...|      *|cpe:2.3:a:3com:3c...|        false|   *|  12-01-2011|    *|       3cdaemon|              -|        []|         *|        *|        *|       3Com 3CDaemon|             *|  3com|
|  false|cpe:2.3:h:3com:3c...|DFB483A6-4F16-4B8...|     

In [7]:
cve2cpe_df = spark.read.json("cve2cpe.json")

                                                                                

In [8]:
cve2cpe_df.show(5)

+-------+--------------------+-------------+-------+--------------------+------------+--------------+------------+----+-----+------------+---------------+----------+---------+---------+----------+--------------+-----------+
|corrupt|                 cpe|          cve|edition|          format_cpe|from_version|including_from|including_to|lang|other|product_name|product_version|sw_edition|target_hw|target_sw|to_version|update_version|     vendor|
+-------+--------------------+-------------+-------+--------------------+------------+--------------+------------+----+-----+------------+---------------+----------+---------+---------+----------+--------------+-----------+
|  false|cpe:2.3:a:eric_al...|CVE-1999-0095|      *|cpe:2.3:a:eric_al...|            |         false|       false|   *|    *|    sendmail|           5.58|         *|        *|        *|          |             *|eric_allman|
|  false|cpe:2.3:a:ftp:ftp...|CVE-1999-0082|      *|cpe:2.3:a:ftp:ftp...|            |         false|   

In [9]:
from pyspark.sql.functions import col, collect_list

In [12]:
# Join cve2cpe_df with cpes_df to find the matching CPES for each CVE
match_cpes_df = cve2cpe_df.join(cpes_df, 
                               (cpes_df.product_name == cve2cpe_df.product_name) &
                               (cpes_df.vendor == cve2cpe_df.vendor) &
                               (cpes_df.product_version == cve2cpe_df.product_version) &
                               ((cve2cpe_df.from_version == "") | (cpes_df.product_version > cve2cpe_df.from_version)) &
                               ((cve2cpe_df.to_version == "") | (cpes_df.product_version < cve2cpe_df.to_version)),
                               how='left').select(cpes_df["cpe"],
                                                  cpes_df["cpe_id"],
                                                  cve2cpe_df["cve"],
                                                  cpes_df["title"])

In [13]:
print((match_cpes_df.count(), len(match_cpes_df.columns)))


[Stage 10:>                                                         (0 + 4) / 4]

(11944914, 4)


                                                                                

In [14]:
# Group the matching CPES by CVE and collect them into an array
cve_match_cpes_df = match_cpes_df.groupBy('cve').agg(collect_list('cpe').alias('cpe'))

In [16]:
print((cve_match_cpes_df.count(), len(cve_match_cpes_df.columns)))



(197711, 2)


                                                                                

In [17]:
cve_match_cpes_df.show(5)



+-------------+--------------------+
|          cve|                 cpe|
+-------------+--------------------+
|CVE-1999-0002|[cpe:2.3:o:redhat...|
|CVE-1999-0003|[cpe:2.3:o:ibm:ai...|
|CVE-1999-0004|                  []|
|CVE-1999-0005|[cpe:2.3:a:netsca...|
|CVE-1999-0007|[cpe:2.3:a:micros...|
+-------------+--------------------+
only showing top 5 rows



                                                                                

In [18]:
# Join cve_matching_cpes_df with cves_df to filter the matching CPES that exist in cves_df
output_df = cve_match_cpes_df.join(cves_df, 
                                   cve_match_cpes_df['cve'] == cves_df['cve'],
                                   how='inner').select(cve_match_cpes_df["cve"],
                                                       cves_df["description"],
                                                       cve_match_cpes_df["cpe"])

In [19]:
print((output_df.count(), len(output_df.columns)))

                                                                                

(197711, 3)


In [20]:
# Write the output to a JSON file
output_df.write.json('output1.json')

                                                                                