# Hi!

This notebook was created using a local Apache Spark cluster.

If you are curious on what the deployment looks like,
see [this](https://github.com/kamadorueda/machine/blob/main/nixos-modules/spark/default.nix).

The cell below is the only thing that needs to be changed if you want to run it,
either on DataBricks or TALC.

Thank you!


In [27]:
import pyspark

# Step 1: Please change `sc` and `spark`
# so that they point to your spark cluster (TALC, DataBricks, etc)

conf = pyspark.SparkConf().setMaster("local[4]")
sc = pyspark.SparkContext.getOrCreate(conf)
spark = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate()

# Step 2: Please set the path to the files
ISSUES_DIR = "../step-1-structured-data/issues"
LABELS = "../step-2-labeling/labels-filled.csv"

In [28]:
import csv

with open(LABELS) as file:
    labels = tuple(csv.DictReader(file))

labels[0]

{'issue-number': '18544', 'was-fixed': 'yes'}

In [29]:
import nltk

nltk.download("punkt")
nltk.download("stopwords")

[nltk_data] Downloading package punkt to
[nltk_data]     /home/kamadorueda/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /home/kamadorueda/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [30]:
from itertools import (
    filterfalse,
)
from nltk.stem import (
    PorterStemmer,
)
import regex
import string

STEMMER = PorterStemmer()
PUNCTUATION = set(string.punctuation)
STOP_WORDS = set(nltk.corpus.stopwords.words("english"))
ALPHABETIC_AND_SPACE = set(string.ascii_letters + string.whitespace)
SPACE = regex.compile("\s+")
WORD_COUNTS = dict()

def normalize(text: str) -> list[str]:
    words = nltk.tokenize.word_tokenize(text)
    words = map(STEMMER.stem, words)
    words = filterfalse(STOP_WORDS.__contains__, words)
    text = " ".join(words)
    text = "".join(filterfalse(PUNCTUATION.__contains__, text))
    text = "".join(filter((ALPHABETIC_AND_SPACE).__contains__, text))
    text = SPACE.sub(" ", text).strip()
    words = text.split(" ")
    for word in words:
        WORD_COUNTS.setdefault(word, 0)
        WORD_COUNTS[word]+=1
        
    return words


In [31]:
# issue_numbers

import json
import os.path

data = []

for label in labels:
    issue_number = label["issue-number"]
    assert label["was-fixed"] in ("yes", "no")

    with open(os.path.join(ISSUES_DIR, f"{issue_number}.json")) as file:
        issue = json.load(file)

    assert issue["title"] is not None, issue_number

    data.append(
        {
            "issue_number": issue_number,
            "content": normalize(issue["title"] or "") + normalize(issue["body"] or ""),
            "was_fixed": label["was-fixed"] == "yes",
        }
    )


data = spark.createDataFrame(data)

data.show()


+--------------------+------------+---------+
|             content|issue_number|was_fixed|
+--------------------+------------+---------+
|[powershel, ssh, ...|       18544|     true|
|[handl, quotat, m...|       18540|     true|
|[problem, psnativ...|       18501|    false|
|[question, downlo...|       18500|     true|
|[defin, env, var,...|       18497|     true|
|[writeerror, pref...|       18490|    false|
|[semanticvers, do...|       18489|     true|
|[errorresponseexc...|       18485|     true|
|[pleas, remov, re...|       18478|     true|
|[class, regist, e...|       18476|     true|
|[gethelp, paramet...|       18463|     true|
|[invokepowershel,...|       18460|     true|
|[sendmailmessag, ...|       18459|    false|
|[switchprocess, b...|       18436|     true|
|[switchprocess, u...|       18433|     true|
|[switchprocess, e...|       18432|     true|
|[disablewindowsop...|       18431|     true|
|[systemprivateuri...|       18424|     true|
|[windowstyl, hidd...|       18423

In [32]:
with open("content.csv", mode="w", encoding="utf-8") as file:
    writer = csv.DictWriter(
        file, fieldnames=["issue_number", "was_fixed", "content"]
    )
    writer.writeheader()

    for row in data.collect():
        writer.writerow(
            {
                "issue_number": row.issue_number,
                "was_fixed": row.was_fixed,
                "content": " ".join(row.content),
            }
        )


In [33]:
from pyspark.ml.feature import (
    HashingTF,
    IDF,
)

tf = HashingTF(inputCol="content", outputCol="temp")
idf = IDF(inputCol="temp", outputCol="features")

data = tf.transform(data)
idf_model = idf.fit(data)
data = idf_model.transform(data)
data = data.select("issue_number", "features", "was_fixed")

data.show()

22/12/04 17:52:09 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
+------------+--------------------+---------+
|issue_number|            features|was_fixed|
+------------+--------------------+---------+
|       18544|(262144,[5940,829...|     true|
|       18540|(262144,[2470,124...|     true|
|       18501|(262144,[13828,18...|    false|
|       18500|(262144,[5078,853...|     true|
|       18497|(262144,[2966,392...|     true|
|       18490|(262144,[8297,226...|    false|
|       18489|(262144,[15769,19...|     true|
|       18485|(262144,[892,7777...|     true|
|       18478|(262144,[1206,243...|     true|
|       18476|(262144,[4274,594...|     true|
|       18463|(262144,[1968,338...|     true|
|       18460|(262144,[921,4214...|     true|
|       18459|(262144,[1206,433...|    false|
|       18436|(262144,[8297,119...|     true|
|       18433|(262144,[2139,777...|     true|
|       18432|(262144,[2139,777...|     true|
|       18431|(262144,[1182,680...|     

In [34]:
rows = data.collect()

22/12/04 17:52:09 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB


In [35]:
print("keywords:", len(WORD_COUNTS))
# take approximately the top 1000 keywords
threshold = sorted(WORD_COUNTS.values(), reverse=True)[999]

WORDS = sorted(
    [word for word in WORD_COUNTS if WORD_COUNTS[word] >= threshold],
    key=lambda word: WORD_COUNTS[word],
    reverse=True
)
WORD_INDEXES = list(map(tf.indexOf, WORDS))
print("taken keywords:", len(WORDS))

with open("features.csv", mode="w", encoding="utf-8") as file:
    writer = csv.DictWriter(
        file, fieldnames=["issue_number", "was_fixed", *WORDS]
    )
    writer.writeheader()

    for row in rows:
        writer.writerow(
            {
                "issue_number": row.issue_number,
                "was_fixed": row.was_fixed,
                **{
                    word: row.features[index]
                    for word, index in zip(WORDS, WORD_INDEXES)
                },
            }
        )


keywords: 13337
taken keywords: 1029
