In [1]:
# Yes we need both these imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import *
from pyspark.sql.types import StructField, StructType
from pyspark.sql.catalog import UserDefinedFunction
import os

In [2]:
fs_prefix = "s3a://mailinglists/" # Create with mc as in ch1

In [3]:
os.environ["PYSPARK_PYTHON"] = "python3.6"
# See https://medium.com/@szinck/setting-up-pyspark-jupyter-and-minio-on-kubeflow-kubernetes-aab98874794f
session = (SparkSession.builder
           .appName("fetchMailingListData")
           .config("spark.executor.instances", "8")
           .config("spark.driver.memoryOverhead", "0.25")
           .config("spark.executor.memory", "6g")
           .config("spark.dynamicAllocation.enabled", "false")
           .config("spark.ui.enabled", "true")
           .config("spark.kubernetes.container.image",
                   "gcr.io/boos-demo-projects-are-rad/kubeflow/spark-worker/spark-py-36:v3.0.0-preview2-22")
           .config("spark.driver.bindAddress", "0.0.0.0")
           .config("spark.kubernetes.namespace", "kubeflow-programmerboo")
           .config("spark.master", "k8s://https://kubernetes.default")
           .config("spark.driver.host", "spark-driver.kubeflow-programmerboo.svc.cluster.local")
           .config("spark.kubernetes.executor.annotation.sidecar.istio.io/inject", "false")
           .config("spark.driver.port", "39235")
           .config("spark.blockManager.port", "39236")
            # If using minio - see https://github.com/minio/cookbook/blob/master/docs/apache-spark-with-minio.md
           .config("spark.hadoop.fs.s3a.endpoint", "minio-service.kubeflow.svc.cluster.local:9000")
           .config("fs.s3a.connection.ssl.enabled", "false")
           .config("fs.s3a.path.style.access", "true")
           # You can also add an account using the minio command as described in chapter 1
           .config("spark.hadoop.fs.s3a.access.key", "minio")
           .config("spark.hadoop.fs.s3a.secret.key", "minio123")
          ).getOrCreate()
sc = session.sparkContext

In [4]:
# Data fetch pipeline: Download mailing list data

In [5]:
list_name="spark-user"

In [6]:
mailing_list_template="http://mail-archives.apache.org/mod_mbox/{list_name}/{date}.mbox"

In [7]:
# Generate the possible dates

In [8]:
start_year=2020 # Change to 2002 once you've verified
end_year=2021
dates = ["{:d}{:02d}".format(year, month) for year in range(start_year, end_year) for month in range (1,12)]

In [9]:
def download_emails(date):
    import subprocess
    from mailbox import mbox
    import os
    mbox_filename = "{date}.mbox".format(date=date)
    url=mailing_list_template.format(list_name=list_name,date=date)
    subprocess.call(["wget", url])
    # Skip years that don't exist
    if not os.path.exists(mbox_filename):
        return []
    mail = mbox(mbox_filename.format(date=date), create=False)
    # LC the keys since the casing is non-consistent
    emails = list(map(lambda message: dict((k.lower(), v) for k, v in message.items()),
                                          mail.itervalues()))
    os.remove(mbox_filename)
    return emails

In [10]:
# Optional: test that it works locally
# download_emails("202001")

In [11]:
emails_rdd = sc.parallelize(dates).flatMap(download_emails).cache()

In [12]:
emails_rdd.count()

263

In [13]:
mailing_list_posts_mbox_df = emails_rdd.toDF(sampleRatio=1.0)



In [14]:
mailing_list_posts_mbox_df.cache()

DataFrame[accept-language: string, arc-authentication-results: string, arc-message-signature: string, arc-seal: string, authentication-results: string, cc: string, content-language: string, content-type: string, date: string, delivered-to: string, dkim-signature: string, from: string, list-help: string, list-id: string, list-post: string, list-unsubscribe: string, mailing-list: string, message-id: string, mime-version: string, precedence: string, received: string, received-spf: string, return-path: string, subject: string, thread-index: string, thread-topic: string, to: string, x-forefront-antispam-report: string, x-forefront-prvs: string, x-microsoft-antispam: string, x-microsoft-antispam-message-info: string, x-microsoft-antispam-prvs: string, x-ms-exchange-crosstenant-fromentityheader: string, x-ms-exchange-crosstenant-id: string, x-ms-exchange-crosstenant-mailboxtype: string, x-ms-exchange-crosstenant-network-message-id: string, x-ms-exchange-crosstenant-originalarrivaltime: string

In [15]:
mailing_list_posts_mbox_df.select("list-id", "In-Reply-To").take(5)

[Row(list-id='<user.spark.apache.org>', In-Reply-To=None),
 Row(list-id='<user.spark.apache.org>', In-Reply-To='\n <VI1PR07MB3520B3B51F6643C085171A0281200@VI1PR07MB3520.eurprd07.prod.outlook.com>'),
 Row(list-id='<user.spark.apache.org>', In-Reply-To='<1577751363377-0.post@n3.nabble.com>'),
 Row(list-id='<user.spark.apache.org>', In-Reply-To=None),
 Row(list-id='<user.spark.apache.org>', In-Reply-To=None)]

In [16]:
spark_mailing_list_data = mailing_list_posts_mbox_df.filter(
    mailing_list_posts_mbox_df["list-id"] == "<user.spark.incubator.apache.org>").repartition(60).cache()

In [17]:
spark_mailing_list_data.show()

+---------------+--------------------------+---------------------+--------+----------------------+---+----------------+------------+----+------------+--------------+----+---------+-------+---------+----------------+------------+----------+------------+----------+--------+------------+-----------+-------+------------+------------+---+---------------------------+----------------+--------------------+---------------------------------+-------------------------+------------------------------------------+----------------------------+-------------------------------------+--------------------------------------------+---------------------------------------------+-------------------------------------------+---------------------------+-------------------------------------------------+------------------------------+---------------+---------------------------------------+---------------------------+---------------------------+----------------------+--------------------+--------------------------+--

In [18]:
spark_mailing_list_data.printSchema()

root
 |-- accept-language: string (nullable = true)
 |-- arc-authentication-results: string (nullable = true)
 |-- arc-message-signature: string (nullable = true)
 |-- arc-seal: string (nullable = true)
 |-- authentication-results: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- content-language: string (nullable = true)
 |-- content-type: string (nullable = true)
 |-- date: string (nullable = true)
 |-- delivered-to: string (nullable = true)
 |-- dkim-signature: string (nullable = true)
 |-- from: string (nullable = true)
 |-- list-help: string (nullable = true)
 |-- list-id: string (nullable = true)
 |-- list-post: string (nullable = true)
 |-- list-unsubscribe: string (nullable = true)
 |-- mailing-list: string (nullable = true)
 |-- message-id: string (nullable = true)
 |-- mime-version: string (nullable = true)
 |-- precedence: string (nullable = true)
 |-- received: string (nullable = true)
 |-- received-spf: string (nullable = true)
 |-- return-path: string (null

In [19]:
def extract_date_from_email_datefield(datefield):
    if datefield is None:
        return None
    from datetime import datetime
    import time
    import email.utils
    parsed_date = email.utils.parsedate(datefield)
    return datetime.fromtimestamp(time.mktime((parsed_date)))


extract_date_from_email_datefield_udf = UserDefinedFunction(
    extract_date_from_email_datefield, StringType(), "extract_date_from_email_datefield")

session.catalog._jsparkSession.udf().registerPython(
    "extract_date_from_email_datefield",
    extract_date_from_email_datefield_udf._judf)

In [20]:
# Manually verify that our date parser is looking ok
spark_mailing_list_data.select(spark_mailing_list_data["Date"],
                               extract_date_from_email_datefield_udf(spark_mailing_list_data["Date"]).alias("email_date"),
                               to_date(spark_mailing_list_data["Date"])).take(5)

[]

In [21]:
mailing_list_posts_in_reply_to = spark_mailing_list_data.filter(
    spark_mailing_list_data["In-Reply-To"].isNotNull()).alias("mailing_list_posts_in_reply_to")
initial_posts = spark_mailing_list_data.filter(
    spark_mailing_list_data["In-Reply-To"].isNull()).alias("initial_posts").cache()

In [22]:
# See how many start-of-thread posts we have
initial_posts.count()

0

In [23]:
mailing_list_posts_in_reply_to.select("In-Reply-To").take(10)

[]

In [24]:
# Ok now it's time to save these
initial_posts.write.format("parquet").save(fs_prefix + "/initial_posts")
mailing_list_posts_in_reply_to.write.format("parquet").save(fs_prefix + "/initial_posts")

Py4JJavaError: An error occurred while calling o242.save.
: java.lang.NoClassDefFoundError: com/amazonaws/services/s3/model/MultiObjectDeleteException
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2532)
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2497)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2593)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3269)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:452)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:552)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:309)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:236)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.MultiObjectDeleteException
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	... 28 more


In [None]:
# Find the initial posts where no one replied
posts_without_replies = (initial_posts.join(
        mailing_list_posts_in_reply_to,
        col("mailing_list_posts_in_reply_to.In-Reply-To") == col("initial_posts.Message-Id"),
        "left_outer")
       .filter(col("mailing_list_posts_in_reply_to.Message-Id").isNull())).cache()
posts_without_replies.count()

In [None]:
posts_by_date_count = spark_mailing_list_data.select(
    date_trunc("dd", from_unixtime(spark_mailing_list_data.timestamp)).alias("date")) \
    .groupBy("date").count()

In [None]:
posts_by_date_count.toPandas()

In [None]:
tokenizer = None
# TODO - fix spacy tokenizetransformer
#try:
#    tokenizer = SpacyTokenizeTransformer(inputCol="body", outputCol="body_tokens")
#except:
#tokenizer = Tokenizer(inputCol="body", outputCol="body_tokens")
spacy_tokenizer = SpacyTokenizeTransformer(inputCol="body", outputCol="body_tokens")
builtin_tokenizer = tokenizer = Tokenizer(inputCol="body", outputCol="body_tokens")
tokenizer = spacy_tokenizer

In [None]:
# Todo - UDF to exctract & UDF to detect programming language & UDF to extract files in a stack trace


In [None]:
def extract_links(body):
    import re
    link_regex_str = r'(http(|s)://(.*?))([\s\n]|$)'
    itr = re.finditer(link_regex_str, body, re.MULTILINE)
    return list(map(lambda elem: elem.group(1), itr))

def extract_domains(links):
    from urllib.parse import urlparse
    def extract_domain(link):
        try:
            nloc = urlparse(link).netloc
            # We want to drop www and any extra spaces wtf nloc on the spaces.
            regex_str = r'^(www\.|)(.*?)\s*$'
            match = re.search(regex_str, nloc)
            return match.group(2)
        except:
            return None
    return list(map(extract_domain, links))

def contains_python_stack_trace(body):
    return "Traceback (most recent call last)" in body



def contains_probably_java_stack_trace(body):
    # Look for something based on regex
    # Tried https://stackoverflow.com/questions/20609134/regular-expression-optional-multiline-java-stacktrace - more msg looking
    # Tried https://stackoverflow.com/questions/3814327/regular-expression-to-parse-a-log-file-and-find-stacktraces
    # Yes the compile is per call, but it's cached so w/e
    import re
    stack_regex_str = r'^\s*(.+Exception.*):\n(.*\n){0,3}?(\s+at\s+.*\(.*\))+'
    match = re.search(stack_regex_str, body, re.MULTILINE)
    return match is not None


def contains_exception_in_task(body):
    # Look for a line along the lines of ERROR Executor: Exception in task 
    return "ERROR Executor: Exception in task" in body
    

In [None]:
extract_links_udf = UserDefinedFunction(
    extract_links, ArrayType(StringType()), "extract_links")

session.catalog._jsparkSession.udf().registerPython(
    "extract_links",
    extract_links_udf._judf)


extract_domains_udf = UserDefinedFunction(
    extract_domains, ArrayType(StringType()), "extract_domains")

session.catalog._jsparkSession.udf().registerPython(
    "extract_domains",
    extract_domains_udf._judf)


contains_python_stack_trace_udf = UserDefinedFunction(
    contains_python_stack_trace, BooleanType(), "contains_python_stack_trace")

session.catalog._jsparkSession.udf().registerPython(
    "contains_python_stack_trace",
    contains_python_stack_trace_udf._judf)


contains_probably_java_stack_trace_udf = UserDefinedFunction(
    contains_probably_java_stack_trace, BooleanType(), "contains_probably_java_stack_trace")

session.catalog._jsparkSession.udf().registerPython(
    "contains_probably_java_stack_trace",
    contains_probably_java_stack_trace_udf._judf)


contains_exception_in_task_udf = UserDefinedFunction(
    contains_exception_in_task, BooleanType(), "contains_exception_in_task")

session.catalog._jsparkSession.udf().registerPython(
    "contains_exception_in_task",
    contains_exception_in_task_udf._judf)

We could make this a transformer stage, but I'm lazy so we'll just use a UDF directly.

In [None]:
annotated_spark_mailing_list_data = spark_mailing_list_data.select(
    "*",
    extract_links_udf(spark_mailing_list_data.body).alias("links_in_email"),
    contains_python_stack_trace_udf(spark_mailing_list_data.body).alias("contains_python_stack_trace").cast("double"),
    contains_probably_java_stack_trace_udf(spark_mailing_list_data.body).alias("contains_java_stack_trace").cast("double"),
    contains_exception_in_task_udf(spark_mailing_list_data.body).alias("contains_exception_in_task").cast("double"))

In [None]:
annotated_spark_mailing_list_data.cache()

In [None]:
annotated_spark_mailing_list_data.show()

In [None]:
further_annotated = annotated_spark_mailing_list_data.withColumn(
    "domain_links",
    extract_domains_udf(annotated_spark_mailing_list_data.links_in_email)).withColumn(
    "is_thread_start",
    isnull(annotated_spark_mailing_list_data.in_reply_to).cast(DoubleType()))
# Long story, allow mixed UDF types
further_annotated.cache()
further_annotated.count()

In [None]:
body_hashing = HashingTF(inputCol="body_tokens", outputCol="raw_body_features", numFeatures=10000)
body_idf =IDF(inputCol="raw_body_features", outputCol="body_features")

In [None]:
body_word2Vec = Word2Vec(vectorSize=5, minCount=0, numPartitions=10, inputCol="body_tokens", outputCol="body_features")

In [None]:
assembler = VectorAssembler(
    inputCols=["body_features", "contains_python_stack_trace", "contains_java_stack_trace", 
              "contains_exception_in_task", "is_thread_start", "domain_features"],
    outputCol="features")

In [None]:
kmeans = KMeans(featuresCol="features", k=2, seed=42)

In [None]:
featureprep_pipeline = Pipeline(stages=[tokenizer, body_hashing, body_idf, domains_hashing, domains_idf, assembler])
pipeline = Pipeline(stages=[featureprep_pipeline, kmeans])

In [None]:
from pyspark.ml.pipeline import Transformer
isinstance(tokenizer, Transformer)

In [None]:
test = further_annotated.limit(10).cache()
test.count()

In [None]:
test_model = pipeline.fit(test)

In [None]:
test_result = test_model.transform(test)
test_result.toPandas()

In [None]:
data_prep_transformer = dataprep_pipeline.fit(further_annotated)
preped_data = data_prep_model.transform(further_annotated)
preped_data.count()