In [1]:
%pip install pyspark datasketch

You should consider upgrading via the '/Users/divyansh/.pyenv/versions/3.9.5/envs/.pyspark/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
import pyspark

conf = pyspark.SparkConf()
conf = conf.setAppName("ds8106-midterm")
# conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4040') ## to setup SPARK UI
# conf = conf.set('spark.jars', os.environ['GRAPHFRAMES_PATH']) ## graphframes in spark configuration

try:
    sc = pyspark.SparkContext(conf=conf)
except ValueError:
    # If a SparkContext is already running, get it
    sc = pyspark.SparkContext.getOrCreate()

 # streaming representation of this variable (jp notebook thingy)
spark = pyspark.SQLContext.getOrCreate(sc)

from pyspark.sql import functions as F
from pyspark.sql.window import Window

24/11/10 21:28:18 WARN Utils: Your hostname, Divyanshs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.138 instead (on interface en0)
24/11/10 21:28:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 21:28:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.ml.feature import NGram, Tokenizer

text_df = spark.read.text("data/hw1text")

processed_df = text_df.select(
F.regexp_replace(
        F.regexp_replace(F.lower(F.col("value")), "[^0-9a-z]", " "),
        "\\s+", " "
    ).alias("cleaned_text")
)

tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
tokenized_df = tokenizer.transform(processed_df)

trigram = NGram(n=3, inputCol="words", outputCol="trigrams")
trigram_df = trigram.transform(tokenized_df)
trigrams_df = trigram_df.select(F.explode(F.col("trigrams")).alias("trigram"))
top_ten_trigrams = trigrams_df.groupBy("trigram").count().orderBy(F.desc("count")).limit(10)


top_ten_trigrams = top_ten_trigrams.withColumn("first_two_words", F.expr("substring_index(trigram, ' ', 2)"))


bigram = NGram(n=2, inputCol="words", outputCol="bigrams")
bigram_df = bigram.transform(tokenized_df)
bigrams_df = bigram_df.select(F.explode(F.col("bigrams")).alias("bigram"))
bigram_counts = bigrams_df.groupBy("bigram").count().withColumnRenamed("count", "bigram_count")

result_df = top_ten_trigrams.join(bigram_counts, top_ten_trigrams.first_two_words == bigram_counts.bigram, "inner")
result_df = result_df.withColumn("conditional probability", F.col("count") / F.col("bigram_count"))

final_result = result_df.select("trigram", "count", "bigram_count", "conditional probability").orderBy(F.desc("count"))
final_result.show(truncate=False)

                                                                                

+------------------+-----+------------+-----------------------+
|trigram           |count|bigram_count|conditional probability|
+------------------+-----+------------+-----------------------+
|lt p gt           |1928 |1930        |0.9989637305699481     |
|the covid 19      |1718 |1760        |0.9761363636363637     |
|do n t            |1662 |1662        |1.0                    |
|of covid 19       |1589 |1621        |0.9802590993214065     |
|the spread of     |1196 |1306        |0.9157733537519143     |
|the number of     |1037 |1103        |0.9401631912964642     |
|p gt lt           |1037 |1936        |0.5356404958677686     |
|gt lt p           |1023 |1792        |0.5708705357142857     |
|one of the        |953  |1491        |0.6391683433936955     |
|of the coronavirus|907  |17484       |0.0518760009151224     |
+------------------+-----+------------+-----------------------+



In [4]:
bakery = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("data/Bakery.csv") 

transformedBakery = bakery \
    .withColumn("DayPart", 
        F.when((F.hour("Time") >= 6) & (F.hour("Time") < 11), "morning")
         .when((F.hour("Time") >= 11) & (F.hour("Time") < 14), "noon")
         .when((F.hour("Time") >= 14) & (F.hour("Time") < 17), "afternoon")
         .otherwise("evening")
    )

transformedBakery = transformedBakery \
    .groupBy("DayPart", "Item") \
    .agg(F.count("Item").alias("qty"))

windowSpec = Window.partitionBy("DayPart").orderBy(F.desc("qty"))

rankedBakery = transformedBakery.withColumn("Rank", F.row_number().over(windowSpec))

top3Bakery = rankedBakery.filter(F.col("Rank") <= 3)

finalResult = top3Bakery \
    .groupBy("DayPart") \
    .agg(F.collect_list("Item").alias("Top3Items"))

finalResult.show(truncate=False)

+---------+-----------------------+
|DayPart  |Top3Items              |
+---------+-----------------------+
|afternoon|[Coffee, Bread, Tea]   |
|evening  |[Coffee, Bread, Tea]   |
|morning  |[Coffee, Bread, Pastry]|
|noon     |[Coffee, Bread, Tea]   |
+---------+-----------------------+



In [5]:
import re
from datasketch import MinHash
from pyspark.sql.types import ArrayType, StringType

huffpost = spark.read.json("data/Huffpost.json")
huffpost = huffpost.filter((F.col("short_description").isNotNull()) & (F.col("short_description") != "")) \
    .dropDuplicates(["link"]) \
    .select("link", "headline", "category", "short_description")

def preprocess(text):
    text = re.sub(r'\W+', ' ', text.lower())
    return text.split()

def compute_minhash(tokens):
    m = MinHash(num_perm=128)
    for token in tokens:
        m.update(token.encode('utf8'))
    return m

base_item = "Kitten Born With Twisted Arms And Legs Finds A Mom Who Knows She\u2019s Perfect"
base_tokens = preprocess(base_item)
base_minhash = compute_minhash(base_tokens)

preprocess_udf = F.udf(preprocess, ArrayType(StringType()))
tokenized_data = huffpost.withColumn("tokens", preprocess_udf(F.col("short_description")))

def jaccard_similarity(m1, m2):
    return m1.jaccard(m2)

similar_items_with_scores = tokenized_data.rdd.map(lambda row: {
    "link": row["link"],
    "headline": row["headline"],
    "category": row["category"],
    "short_description": row["short_description"],
    "similarity": jaccard_similarity(base_minhash, compute_minhash(row["tokens"]))
})

top_5_similar_items = spark.createDataFrame(similar_items_with_scores).orderBy("similarity", ascending=False).limit(5)

top_5_similar_items.show(truncate=False)



+--------------+-----------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+----------+
|category      |headline                                                               |link                                                                                                                                   |short_description                                                                     |similarity|
+--------------+-----------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+----------+
|STYLE & BEAUTY|How A Simple Sh

                                                                                