In [1]:
import boto3
from io import BytesIO
import gzip
import tempfile

In [2]:
#init spark
# Import the findspark module 
import findspark
import os
# Initialize via the full spark path
findspark.init("/opt/apache-spark")
# Import the SparkSession and SQLContext modules
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# Build the SparkSession
spark = SparkSession.builder \
   .master("local[12]") \
   .appName("CommonCrawl") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)
# Main entry point for Spark functionality. A SparkContext represents the
# connection to a Spark cluster, and can be used to create :class:`RDD` and
# broadcast variables on that cluster.      
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/02 10:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Function to parse each line
def parse_line(line):
    parts = line.split(' ', maxsplit=2)
    meta = parts[0]
    date= parts[1]
    json_data = json.loads(parts[2])
    return (meta,date, json_data['url'], json_data.get('mime', ""), json_data.get('mime-detected', ""), 
            int(json_data['status']), json_data.get('digest', ""), int(json_data['length']), 
            int(json_data['offset']), json_data.get('filename', ""), json_data.get('charset', ""), 
            json_data.get('languages', ""))

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType, ArrayType, BooleanType, DoubleType
import json
from pyspark.sql.functions import unix_timestamp, col
schema = StructType([
    StructField("MetaData", StringType(), True),
    StructField("date", StringType(), True),
    StructField("url", StringType(), True),
    StructField("mime", StringType(), True),
    StructField("mime-detected", StringType(), True),
    StructField("status", StringType(), True),
    StructField("digest", StringType(), True),
    StructField("length", IntegerType(), True),
    StructField("offset", IntegerType(), True),
    StructField("filename", StringType(), True),
    StructField("charset", StringType(), True),
    StructField("languages", StringType(), True)
])

bhSchema = StructType([
    StructField("_id", StringType(), True),
    StructField("_index", StringType(), True),
    StructField("author", StringType(), True),
    StructField("category", ArrayType(StringType()), True),
    StructField("checked", BooleanType(), True),
    StructField("confirmed", BooleanType(), True),
    StructField("content", StringType(), True),
    StructField("content_origin", StringType(), True),
    StructField("content_raw", StringType(), True),
    StructField("datasource", StringType(), True),
    StructField("date", TimestampType(), True),
    StructField("date_entry", TimestampType(), True),
    StructField("domain", StringType(), True),
    StructField("keywords", ArrayType(StringType()), True),
    StructField("language", StringType(), True),
    StructField("language_origin", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("location", StringType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("source", StringType(), True),
    StructField("submitted_by", StringType(), True),
    StructField("subtitle", StringType(), True),
    StructField("subtitle_origin", StringType(), True),
    StructField("summary", StringType(), True),
    StructField("title", StringType(), True),
    StructField("title_origin", StringType(), True),
    StructField("translated", BooleanType(), True),
    StructField("updated_by", StringType(), True)
])

In [5]:
from pyspark.sql.functions import unix_timestamp, col, split, reverse, split, regexp_replace,udf
from functools import reduce
from pyspark.sql import DataFrame

index_filenames = ["data/"+x for x in os.listdir("data/") if "indexes_cdx" in x]
print(index_filenames)
@udf(returnType=StringType())
def reverse_domain(domain):
    domain_parts = domain.split(',')
    reversed_domain = '.'.join(reversed(domain_parts))
    return reversed_domain
dfs = []
for temp_path in index_filenames:
    print(temp_path)
    rdd = sc.textFile(temp_path)
    parsed_rdd = rdd.map(parse_line)
    df = spark.createDataFrame(parsed_rdd, schema)
    df = df.withColumn("Date", unix_timestamp(col("Date"), "yyyyMMddHHmmss").cast(TimestampType()))
    
    # Split MetaData into TLD, Domain, and Path
    split_col = split(df['MetaData'], ',')
    df = df.withColumn('TLD', split_col.getItem(0))
    split_col2 = split(df['MetaData'], '\\)')
    df = df.withColumn('Domain', split_col2.getItem(0))
    df = df.withColumn('Path', split_col2.getItem(1))
    
    df = df.withColumn('Domain', reverse_domain('Domain'))
    dfs.append(df)
    #rdd = rdd.map(lambda line: line.split(","))
    
#    parquetFile.createOrReplaceTempView("parquetFile")
   # crawl_df = spark.sql("SELECT url_host_tld FROM parquetFile group by url_host_tld")
    #crawl_df.show()


df = reduce(DataFrame.unionAll, dfs)
#df.show()


['data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00023', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00024', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00025', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00026', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00027', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00028', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00029', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00030', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00031', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00032', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00033', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00034', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00096', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00097', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx-00098', 'data/cc-index_collections_CC-MAIN-2024-30_indexes_cdx

In [6]:
# load bh data

# Load the JSON file into a DataFrame using the defined schema
dfBH = spark.read.schema(bhSchema).json("../export_osdp_2024-04-25_07-58-58_1.json", multiLine=True)
# Show the DataFrame
dfBH.show(5)

# Print the schema
dfBH.printSchema()

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+------+--------------------+---------+-------+---------+--------------------+--------------------+-----------+----------+-------------------+-------------------+-----------------+--------------------+--------+---------------+--------+--------+---------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+----------+
|                 _id|_index|              author| category|checked|confirmed|             content|      content_origin|content_raw|datasource|               date|         date_entry|           domain|            keywords|language|language_origin|latitude|location|longitude|              source|submitted_by|            subtitle|     subtitle_origin|             summary|               title|        title_origin|translated|updated_by|
+--------------------+------+--------------------+---------+-------+---------+--------------------+--------------------+------

                                                                                


df.printSchema()
df.createOrReplaceTempView("index")
dfBH.createOrReplaceTempView("bh")

result = spark.sql("""
  SELECT bh.domain 
  FROM bh
  INNER JOIN index
  ON bh.domain = index.Domain
""")

#all domains that are present in both datasets
print(result.count())


In [10]:
#filter out all non relevant domains
for df in dfs:
    df.createOrReplaceTempView("index")
    dfBH.createOrReplaceTempView("bh")
    
    filtered_df = spark.sql("""
      SELECT *
      FROM index
      WHERE Domain IN (SELECT DISTINCT domain FROM bh)
    """)
    
    filtered_df.write.mode('append').csv('data/only_relevant_domains.csv', header=True)

                                                                                

df.printSchema()
df.createOrReplaceTempView("index")
dfBH.createOrReplaceTempView("bh")

result2 = spark.sql("""
  SELECT bh.domain, 
  COUNT(index.url) as url_count_cc, 
  COUNT(bh.source) as url_count_bh,
  COUNT(CASE WHEN index.url = bh.source THEN 1 END) AS url_overlap
  FROM bh
  INNER JOIN index
  ON bh.domain = index.Domain
  GROUP BY bh.domain
  ORDER BY url_count_cc DESC
""")

result2.write.csv('stats.csv', header=True, mode='overwrite')

In [None]:
#result2.count()