# Libraries


In [0]:
# Import required libraries for PySpark
# pyspark: Core PySpark library
# SparkContext: Entry point for Spark functionality
# SparkSession: Entry point for working with DataFrames
import pyspark
from pyspark import SparkContext 
from pyspark.sql import SparkSession

In [0]:
#sc=SparkContext() #Uncomment this line if you're creating a SparkContext() for the first time

# Create a SparkSession using the existing SparkContext
# SparkSession is the entry point for DataFrame and SQL functionality
ss=SparkSession(sc)

# Rename

In [0]:
# Copy and rename the large_csv.gz file to large.csv.gz
dbutils.fs.cp('/FileStore/tables/large_csv.gz', '/FileStore/tables/large.csv.gz')

Out[3]: True

# Question 1

In [0]:
# Read the large.csv.gz file into a DataFrame
# header=True indicates that the first row contains column names
large=ss.read.csv('/FileStore/tables/large.csv.gz', header=True)

In [0]:
# Register the DataFrame as a temporary SQL table named 'large'
# This allows us to run SQL queries on the data using SparkSQL
large.registerTempTable('large')



## Answer

In [0]:
# Execute SQL query to count distinct sentences in the 'large' table
# The query selects the count of unique sentences and aliases it as 'large'
# show() displays the results
ss.sql('select count(distinct sentence) large from large').show()

+------+
| large|
+------+
|389639|
+------+



# Question 2

In [0]:
# Execute SQL query to select all sentences from the 'large' table
# Store the result in large_sentence DataFrame
large_sentence=ss.sql('select sentence from large')

In [0]:
# Repartition the DataFrame into a single partition and write it as a CSV file
# repartition(1) combines all data into one partition
# write.csv writes the data to the specified path
# mode='overwrite' overwrites any existing data at that path
large_sentence.repartition(1).write.csv('/FileStore/tables/large_sentence', mode='overwrite')

In [0]:
# Read the CSV file from the specified path into an RDD
# textFile() creates an RDD where each element is a line from the file
large_sentence=sc.textFile('/FileStore/tables/large_sentence')

## Answer

In [0]:
# Map each sentence to a tuple of (word count, sentence)
# Sort by word count in descending order
# Map to just get the word counts
# Take the top 10 results
large_sentence.map(lambda x: (len(x.split(' ')), x)).sortBy(lambda x: x[0], ascending=False).map(lambda x: x[0]).take(10)

Out[10]: [4571, 2499, 562, 528, 426, 413, 382, 381, 348, 335]

# Question 3

In [0]:
# Calculate the average number of bigrams in the large sentence dataset
# 1. Map each sentence to count of bigrams (words-1) 
# 2. Map each count to [count, 1] for averaging
# 3. Reduce by summing counts and number of sentences
average_number_bigrams_large_sentence=large_sentence.map(lambda x: len(x.split(' '))-1).map(lambda x: [x, 1]).reduce(lambda x, y: [x[0]+y[0], x[1]+y[1]])

## Answer

In [0]:
# Calculate the average by dividing total number of bigrams by total number of sentences
# average_number_bigrams_large_sentence[0] contains sum of bigram counts
# average_number_bigrams_large_sentence[1] contains total number of sentences
average_number_bigrams_large_sentence[0]/average_number_bigrams_large_sentence[1]

Out[12]: 18.036295

# Question 4

In [0]:
# Split each sentence into a list of words to prepare for bigram creation
large_word=large_sentence.map(lambda x: x.split(' '))

In [0]:
def bigram(word):
    # Initialize empty list to store bigrams
    my_list=[]
    # Check if word list has at least 2 words to form bigrams
    if len(word)>=2:
        # Iterate through words to create bigrams
        for i in range(0, len(word)-1):
            # Get next word index
            j=i+1
            # Create bigram by concatenating current and next word with space
            my_list.append(word[i]+' '+word[j])
    return my_list

In [0]:
# Apply bigram function to each sentence to create bigrams
# flatMap is used since each sentence produces multiple bigrams that need to be flattened into a single RDD
large_bigram=large_word.flatMap(bigram)

## Answer

In [0]:
# Map each bigram to a tuple of (bigram, 1)
# Reduce by key to count frequency of each bigram
# Sort in descending order by frequency
# Take top 10 most frequent bigrams
large_bigram.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y).sortBy(lambda x: x[1], ascending=False).take(10)

Out[16]: [('of the', 76290),
 ('in the', 54056),
 ('to the', 25484),
 ('at the', 21588),
 ('is a', 19261),
 ('for the', 17942),
 ('on the', 16049),
 ('and the', 15822),
 ('as a', 13240),
 ('with the', 11928)]

# Question 5

In [0]:
# Read the MAGPIE dataset containing idioms from JSON file into a Spark DataFrame
magpie=ss.read.json('/FileStore/tables/MAGPIE_unfiltered.jsonl')

In [0]:
# Register magpie DataFrame as a temporary SQL table named 'magpie' for querying
magpie.registerTempTable('magpie')

In [0]:
# Convert RDD of bigrams to DataFrame with single column 'bigrams'
# Map each bigram to a single-element list to create proper structure for DataFrame
# Register as temporary table for SQL querying
large_bigram.map(lambda x: [x]).toDF(['bigrams']).registerTempTable('large_bigrams')



## Answer


In [0]:
# Count how many unique bigrams from our text appear in the MAGPIE idioms dataset
# Uses SQL to:
# 1. Select distinct bigrams from large_bigrams table 
# 2. Check if each bigram exists in the idioms column of magpie table
# 3. Count the total matches
ss.sql('select count(distinct bigrams) large from large_bigrams where bigrams in (select idiom from magpie)').show()

+-----+
|large|
+-----+
|   67|
+-----+



# Question 6


In [0]:
# Create a DataFrame with bigram frequencies:
# 1. Map each bigram to a tuple (bigram, 1)
# 2. Reduce by key to sum the counts for each bigram
# 3. Convert to DataFrame with bigram and frequency columns
# 4. Register as temp table for SQL querying
large_bigram.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y).toDF(['bigrams', 'frequency']).registerTempTable('large_bigrams_frequency')

## Answer


In [0]:
# Query to find bigrams ranked 2500-2510 by frequency (descending) and alphabetically
# 1. Inner query: Select all bigrams and frequencies, excluding those that are idioms
# 2. Assign dense rank based on frequency (desc) and bigram text (asc) 
# 3. Outer query: Filter to only show ranks 2500-2510
ss.sql('select * from (select *, dense_rank() over(order by frequency desc, bigrams asc) rank from large_bigrams_frequency where bigrams not in (select idiom from magpie)) where rank between 2500 and 2510').show()

+----------------+---------+----+
|         bigrams|frequency|rank|
+----------------+---------+----+
|         2018 to|      174|2500|
| Communist Party|      174|2501|
|Cup competition.|      174|2502|
|      During the|      174|2503|
|       a meeting|      174|2504|
|         a state|      174|2505|
|      a two-year|      174|2506|
|           at St|      174|2507|
|  been described|      174|2508|
|       bishop of|      174|2509|
|        chose to|      174|2510|
+----------------+---------+----+

