# **Wet File Data Extraction**
---

## 1. Import libraries 

In [0]:
pip install langid==1.1.6

In [0]:
from pyspark.sql import functions as F
import boto3
import botocore
import os


## 2. Connect to Boto3 

### 2.1 Set Secrets

In [0]:
aws_access_key_id = dbutils.secrets.get(scope='aws_cc', key='aws_access_key_id')
aws_secret_access_key = dbutils.secrets.get(scope='aws_cc', key='aws_secret_access_key')

### 2.2 Intialize boto3 client

In [0]:
# Optional: build client once (faster)
s3 = boto3.client(
    's3',
    region_name='us-east-1',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
)

### 4.3. Text transformation

In [0]:
df = spark.read.text(
    's3://mydbxbucketpractice/common_crawl/wet_files/CC-MAIN-20220629054527-20220629084527-00796.warc.wet.gz'
)

display(df)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

# 1) Add line_id column to preserve line order
df_with_id = df.withColumn(
    "line_id",
    F.row_number().over(Window.orderBy(F.monotonically_increasing_id()))
)

# 2) Add is_start column
df_with_id = df_with_id.withColumn(
    'is_start',
    F.when(F.col('value').rlike(r"^WARC/\d+\.\d+"), F.lit(1)).otherwise(F.lit(0))
)

# 3) Cumulative sum to create record_id (ordered by line_id)
w = Window.orderBy('line_id').rowsBetween(Window.unboundedPreceding, 0)
df_with_id = df_with_id.withColumn('record_id', F.sum('is_start').over(w))

# 4) Group lines back into records, but SORT lines by line_id inside the group
records = (
    df_with_id.groupBy('record_id')
      .agg(
          F.expr("""
              concat_ws(
                '\n',
                transform(
                  array_sort(collect_list(named_struct('line_id', line_id, 'value', value))),
                  x -> x.value
                )
              )
          """).alias('record_text')
      )
      .filter(F.col('record_text').rlike(r'^WARC/\d+\.\d+'))
)

# 5) Normalize newlines to '\n' so splitting works for both \n\n and \r\n\r\n
rec2 = records.withColumn(
    'record_text_norm',
    F.regexp_replace('record_text', r'\r\n', '\n')
)

# 6) Find separator between WARC headers and the rest (first blank line)
rec2 = rec2.withColumn('nn_idx', F.instr('record_text_norm', '\n\n'))

# 7) Extract headers and body using correct substring lengths
rec2 = (
    rec2
    .withColumn(
        'headers',
        F.when(
            F.col('nn_idx') > 0,
            F.expr('substring(record_text_norm, 1, nn_idx - 1)')
        ).otherwise(F.col('record_text_norm'))
    )
    .withColumn(
        'body',
        F.when(
            F.col('nn_idx') > 0,
            F.expr("""
                substring(
                    record_text_norm,
                    nn_idx + 2,
                    length(record_text_norm) - (nn_idx + 1)
                )
            """)
        ).otherwise(F.lit(""))
    )
)

# 8) Pull out common header fields
parsed = (
    rec2
    .withColumn('warc_version',    F.regexp_extract('record_text_norm', r"^(WARC/\d+\.\d+)", 1))
    .withColumn('warc_type',       F.regexp_extract('headers', r"(?m)^WARC-Type:\s*(.*)$", 1))
    .withColumn('target_uri',      F.regexp_extract('headers', r"(?m)^WARC-Target-URI:\s*(.*)$", 1))
    .withColumn('warc_date',       F.regexp_extract('headers', r"(?m)^WARC-Date:\s*(.*)$", 1))
    .withColumn('warc_record_id',  F.regexp_extract('headers', r"(?m)^WARC-Record-ID:\s*<?(.*)>?$", 1))
    .withColumn('warc_refers_to',  F.regexp_extract('headers', r"(?m)^WARC-Refers-To:\s*<?(.*)>?$", 1))
    .withColumn('block_digest',    F.regexp_extract('headers', r"(?m)^WARC-Block-Digest:\s*(.*)$", 1))
    .withColumn('language',        F.regexp_extract('headers', r"(?m)^WARC-Identified-Content-Language:\s*(.*)$", 1))
    .withColumn('content_type',    F.regexp_extract('headers', r"(?m)^Content-Type:\s*(.*)$", 1))
    .withColumn(
        'content_length',
        F.expr("""
            try_cast(
              nullif(
                regexp_extract(headers, '(?m)^Content-Length:\\s*(\\d+)$', 1),
                ''
              ) as int
            )
        """)
    )
)

# 9) Proper dataframe
warc_df = parsed.select(
    "record_id","warc_version","warc_type","target_uri","warc_date",
    "warc_record_id","warc_refers_to","block_digest","language",
    "content_type","content_length","body"
)

display(warc_df.limit(5))
print("total records:", warc_df.count())

In [0]:
# extracting Host from URL
df_us = (
    warc_df
    .withColumn("host", F.expr("parse_url(target_uri, 'HOST')"))   # full host (e.g., www.census.gov)
    .filter(F.col("host").isNotNull())
    .withColumn("host_lc", F.lower("host"))
    .withColumn("labels", F.split(F.col("host_lc"), r"\."))        # split on literal dot
    .filter(F.size(F.col("labels")) >= 2)
    .withColumn("tld", F.element_at(F.col("labels"), -1))
    .withColumn("sld", F.element_at(F.col("labels"), -2))
    .withColumn("base_domain", F.concat_ws(".", F.col("sld"), F.col("tld")))  # e.g., census.gov
    .filter(F.col("tld").isin("com", "org", "gov", "edu", "info"))
)

display(df_us.select("target_uri","host","base_domain").limit(5))


In [0]:
import pandas as pd
import langid

# Helper function for detection
def detect_text_language(text: str):
    if isinstance(text, str) and len(text.strip()) > 20:
        return langid.classify(text)[0]
    else:
        return None

# mapInPandas function
def detect_lang(batch_iter):
    for pdf in batch_iter:
        pdf["lang"] = pdf["body"].apply(detect_text_language)
        yield pdf

# Apply to dataframe
warc_with_lang = df_us.mapInPandas(
    detect_lang, 
    schema=df_us.schema.add("lang", "string")
)

# Keep only English
warc_en = warc_with_lang.filter(F.col("lang") == "en")

display(warc_en.limit(100))
print("English docs:", warc_en.count())

In [0]:
%run ./dictionary

In [0]:
from pyspark.sql import types as T
def normalize_text(col="body"):
    return F.trim(F.regexp_replace(F.lower(F.col(col)), r"\s+", " "))

docs = (
    warc_en
    .select("target_uri", "body", "host", "base_domain")
    .withColumn("text_norm", normalize_text("body"))
    .filter(F.length("text_norm") > 100)
)


# 3. Apply your regex dictionary keys
for key, pattern in acs_terms_2.items():
    docs = docs.withColumn(key, F.col("text_norm").rlike(pattern))


# 4. Derive higher-level ACS signal flags
docs = (
    docs
    .withColumn(
        "has_any_acs",
        F.col("acs_phrase") |
        F.col("acs_detail_tables") |
        F.col("acs_subject_tables") |
        F.col("acs_table_word") |
        F.col("acs_geo")
    )
    .withColumn(
        "has_citation_phrase",
        F.col("acs_links") | F.col("acs_near_table") | F.col("acs_near_moe")
    )
)

In [0]:
label_udf = F.udf(
    lambda a, c: (
        "cites_census" if c and a else
        ("repackages_census" if a and not c else "unrelated")
    ),
    T.StringType()
)

labeled_pages = docs.withColumn(
    "page_label",
    label_udf(F.col("has_any_acs"), F.col("has_citation_phrase"))
)

In [0]:
!pip install thefuzz

In [0]:
site_rollup = (
    labeled_pages
    .groupBy("base_domain")
    .agg(
        F.sum(F.when(F.col("page_label") == "cites_census", 1).otherwise(0)).alias("pages_cite"),
        F.sum(F.when(F.col("page_label") == "repackages_census", 1).otherwise(0)).alias("pages_repackage"),
        F.sum(F.when(F.col("page_label") != "unrelated", 1).otherwise(0)).alias("pages_about"),
        F.count("*").alias("pages_total")
    )
    .withColumn(
        "pct_cite",
        F.when(F.col("pages_about") > 0, F.col("pages_cite") / F.col("pages_about")).otherwise(F.lit(0.0))
    )
    .withColumn(
        "pct_repackage",
        F.when(F.col("pages_about") > 0, F.col("pages_repackage") / F.col("pages_about")).otherwise(F.lit(0.0))
    )
    .withColumn(
        "site_label",
        F.when(F.col("pct_cite") >= 0.5, "cites_census")
         .when(F.col("pct_repackage") >= 0.5, "repackages_census")
         .otherwise("mixed_or_unclear")
    )
)

In [0]:
## This block takes approximately ~7 mins

# Spark ML + MLflow imports
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, SQLTransformer, NGram, CountVectorizer, IDF, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import functions as F
import mlflow

# 0) Disable MLflow autologging on serverless + Connect 
try:
    mlflow.autolog(disable=True)
except Exception:
    pass

# 1) Data
data = (
    labeled_pages
    .select("text_norm", "page_label", "base_domain")
    .filter(F.col("text_norm").isNotNull() & (F.length("text_norm") > 100))
    .filter(F.col("page_label").isNotNull())
)

data = data.withColumn(
    "text_norm",
    F.lower(F.col("text_norm"))
)

display(data.groupBy("page_label").count().orderBy(F.desc("count")))
print("Training rows:", data.count())

train, test = data.randomSplit([0.8, 0.2], seed=42)

# 2) Featurization
tokenizer = RegexTokenizer(
    inputCol="text_norm",
    outputCol="tokens",
    pattern=r"\W+"
)
stopper   = StopWordsRemover(
    inputCol="tokens", 
    outputCol="tokens_nostop")

bigrams   = NGram(
    n=2, 
    inputCol="tokens_nostop", 
    outputCol="bigrams")

merge_tokens = SQLTransformer(
    statement="SELECT *, array_concat(tokens_nostop, bigrams) AS toks FROM __THIS__")


cv = CountVectorizer(
    inputCol="toks", 
    outputCol="tf",
    vocabSize=30000, 
    minDF=10, 
    binary=True)

assembler_tf = VectorAssembler(
    inputCols=["tf_uni", "tf_bi"], 
    outputCol="tf_all")

idf   = IDF(inputCol="tf", 
            outputCol="tfidf", 
            minDocFreq=5)


