
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:

df = spark.read.csv("/FileStore/tables/large.csv", header=True, inferSchema=True)
df.show(5)
magpie_df = spark.read.json("/FileStore/tables/MAGPIE_unfiltered.jsonl")

+--------------------+--------------------+
|            sentence|              source|
+--------------------+--------------------+
|"The specific epi...|' in many cases i...|
|Adult and pediatr...|    pages_articles24|
|"He received the ...|    pages_articles24|
|He competed for G...|    pages_articles24|
|Despite an increa...|    pages_articles24|
+--------------------+--------------------+
only showing top 5 rows



In [0]:
# Create a view or table

temp_table_name = "large_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `large_csv`

sentence,source
"""The specific epithet """"seemannii"""" refers to someone with the surname 'Seemann","' in many cases it's botanist Berthold Carl Seemann (1825–1871)."""
Adult and pediatric nurse practitioner programs began in 1971.,pages_articles24
"""He received the """"Naim Frashëri"""" award from the Albanian presidency.""",pages_articles24
He competed for Germany in the 2018 Winter Olympics.,pages_articles24
"Despite an increase in the number of votes, Fan failed to win the re-election.",pages_articles24
"After the match, Cabana challenged Aries to a steel cage match for the title at Third Anniversary Celebration.",pages_articles24
Alexander Moissi (; ; 2 April 1879 – 22 March 1935) was an Austrian stage actor (and occasional film actor) of Albanian origin.,pages_articles24
His brother is the biomathematician Wolfgang Alt.,pages_articles24
"The frontage, refurbished in 2017, displays the carved door crowned with a curved fan light and several cartouches, rosettes and other motifs in the Art Nouveau fashion.",pages_articles24
"Tanaka was born in New York City, U.S. on 23 November 1986.",pages_articles24


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "large_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
# Q1
unique_sentence_count = df.select("sentence").distinct().count()
print(f"Number of unique sentences: {unique_sentence_count}")


Number of unique sentences: 389639


In [0]:
# Q2
from pyspark.sql.functions import size, split
# After word segmentation, calculate the number of words in each sentence
df_with_word_count = df.withColumn("word_count", size(split("sentence", " ")))
# Sort by word count and select the top 10 (only show word count)
top10_word_counts = df_with_word_count.select("word_count").orderBy("word_count", ascending=False).limit(10)

top10_word_counts.show()


+----------+
|word_count|
+----------+
|      4571|
|      2499|
|       562|
|       528|
|       426|
|       413|
|       382|
|       381|
|       348|
|       335|
+----------+



In [0]:
# Q3
from pyspark.sql.functions import size, split, col, avg
# Count the number of words first
df_with_word_count = df.withColumn("word_count", size(split("sentence", " ")))
# Recalculate the number of bigrams = number of words - 1, but at least 0 (avoid negative numbers)
df_with_bigram = df_with_word_count.withColumn("bigram_count", (col("word_count") - 1))
# Calculate the average value of bigram
average_bigrams = df_with_bigram.select(avg("bigram_count")).first()[0]

print(f"Average number of bigrams per sentence: {average_bigrams}")


Average number of bigrams per sentence: 18.0365125


In [0]:
# Q4
from pyspark.sql.functions import expr
from pyspark.sql.functions import explode, concat_ws
# First split the sentence into a list of words
words_df = df.withColumn("words", split(col("sentence"), " "))
# Convert to bigram (two consecutive words)
bigrams_df = words_df.withColumn("bigram", expr("transform(sequence(0, size(words)-2), i -> array(words[i], words[i+1]))"))
# Flatten into one row per bigram
flat_bigrams = bigrams_df.select(explode("bigram").alias("bigram_pair"))
# Combine into a single string (e.g. "of the")
flat_bigrams_str = flat_bigrams.select(concat_ws(" ", col("bigram_pair")).alias("bigram"))
# Count the top 10
top10_bigrams = flat_bigrams_str.groupBy("bigram") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(10)

top10_bigrams.show(truncate=False)


+--------+-----+
|bigram  |count|
+--------+-----+
|of the  |76294|
|in the  |54058|
|to the  |25486|
|at the  |21596|
|is a    |19316|
|for the |17946|
|on the  |16050|
|and the |15824|
|as a    |13240|
|with the|11929|
+--------+-----+



In [0]:
# Q5
# Convert to set to improve efficiency
idioms_list = [row["idiom"] for row in magpie_df.select("idiom").distinct().collect() if row["idiom"]]
idioms_set = set(idioms_list)
# Filter out Wikipedia bigrams that appear in MAGPIE idioms
matched_bigrams = flat_bigrams_str.filter(col("bigram").isin(idioms_set))
# Calculate the number of matches
matched_count = matched_bigrams.select("bigram").distinct().count()
print(f"Number of Wikipedia bigrams that appear in MAGPIE idioms: {matched_count}")


Number of Wikipedia bigrams that appear in MAGPIE idioms: 67


In [0]:

# Q6
# Again exclude bigrams that appear in idioms_set
filtered_bigrams = flat_bigrams_str.filter(~col("bigram").isin(idioms_set))
# Statistical frequency
ranked_bigrams = filtered_bigrams.groupBy("bigram") \
    .count() \
    .orderBy(col("count").desc(), col("bigram").asc())
# Get bigrams from rank 2500 to rank 2510
ranked_bigrams_subset = ranked_bigrams.limit(2510).orderBy(col("count").desc(), col("bigram").asc())
target_bigrams = ranked_bigrams_subset.tail(11)

# Print
for idx, row in enumerate(target_bigrams, start=2500):
    print(f"{idx}: {row['bigram']} – {row['count']}")


2500: to complete – 176
2501: was responsible – 176
2502: which took – 176
2503: working for – 176
2504: The show – 175
2505: a meeting – 175
2506: back in – 175
2507: built on – 175
2508: featured on – 175
2509: first team – 175
2510: from which – 175
