<a href="https://colab.research.google.com/github/sheikh495/Data_mining/blob/main/MORE_Spooky_authorship_via_Apache_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Getting Set Up (For Google Colab)

In [1]:
!pip install pyspark --quiet
!pip install -U -q PyDrive --quiet
!apt install openjdk-8-jdk-headless &> /dev/null

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("civComplaints") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()

In [5]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip &> /dev/null
!unzip ngrok-stable-linux-amd64.zip &> /dev/null
get_ipython().system_raw('./ngrok http 4050 &')

In [6]:
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

https://232e-35-229-80-221.ngrok.io


In [45]:
pip install pyodbc

Collecting pyodbc
  Downloading pyodbc-4.0.39-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (343 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/343.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━[0m [32m235.5/343.5 kB[0m [31m7.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m343.5/343.5 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyodbc
Successfully installed pyodbc-4.0.39


#Loading our Data

In [9]:
import pandas as pd

df = pd.read_csv('/content/sample_submission.csv').astype(str)

There are a lot of columns to this dataset, so let's set `vertical = True`.

In [10]:
complaints_df = spark.createDataFrame(df)

In [11]:
complaints_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- EAP: string (nullable = true)
 |-- HPL: string (nullable = true)
 |-- MWS: string (nullable = true)



In [15]:
import pandas as pd

# Read test.csv
test_data = pd.read_csv('/content/test.csv')
print("Test Data:")
print(test_data.head())

# Read train.csv
train_data = pd.read_csv('/content/train.csv')
print("\nTrain Data:")
print(train_data.head())


Test Data:
        id                                               text
0  id02310  Still, as I urged our leaving Ireland with suc...
1  id24541  If a fire wanted fanning, it could readily be ...
2  id00134  And when they had broken down the frail door t...
3  id27757  While I was thinking how I should possibly man...
4  id04081  I am not sure to what limit his knowledge may ...

Train Data:
        id                                               text author
0  id26305  This process, however, afforded me no means of...    EAP
1  id17569  It never once occurred to me that the fumbling...    HPL
2  id11008  In his left hand was a gold snuff box, from wh...    EAP
3  id27763  How lovely is spring As we looked from Windsor...    MWS
4  id12958  Finding nothing else, not even gold, the Super...    HPL


#Task 1: Spark SQL Mechanics

1. Use spark.sql statement to join the test and train data grouping by author and ordering by id.


In [18]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Read test data from CSV file
test_df = spark.read.csv('/content/test.csv', header=True, inferSchema=True)

# Read train data from CSV file
train_df = spark.read.csv('/content/train.csv', header=True, inferSchema=True)

# Register test DataFrame as a temporary view
test_df.createOrReplaceTempView('test')

# Register train DataFrame as a temporary view
train_df.createOrReplaceTempView('train')

# Perform the join, grouping, and ordering using Spark SQL
result_df = spark.sql('''
    SELECT tr.author, t.id, t.text
    FROM test t
    JOIN train tr ON t.id = tr.id
    GROUP BY tr.author, t.id, t.text
    ORDER BY t.id
''')


# Show the resulting DataFrame
result_df.show()



+------+---+----+
|author| id|text|
+------+---+----+
+------+---+----+



In [19]:
result_df = spark.sql('''
    SELECT tr.author, t.id, t.text
    FROM test t
    JOIN train tr ON t.id = tr.id
    GROUP BY tr.author, t.id, t.text
    ORDER BY t.id
''')

result_df.show()


+------+---+----+
|author| id|text|
+------+---+----+
+------+---+----+



2. Reverse engineer so these statement run


In [23]:
# Read the data from the CSV file into a DataFrame
spooky_sentences = spark.read.csv('/content/train.csv', header=True, inferSchema=True)

# Register the DataFrame as a temporary view
spooky_sentences.createOrReplaceTempView('spooky_sentences')

# Execute the SQL query
result_df = spark.sql("""
    SELECT text AS sentence,
    size(split(text, ' ')) AS word_count
    FROM spooky_sentences
    ORDER BY word_count DESC
""")

# Show the resulting DataFrame
result_df.show(10, truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

3. Write a subquery to count total words by author generating something like

In [24]:
result_df = spark.sql('''
    SELECT author, COUNT(*) as total_words
    FROM (
        SELECT author, split(text, ' ') as words
        FROM spooky_sentences
    )
    LATERAL VIEW explode(words) exploded_words AS word
    GROUP BY author
''')

result_df.show()


+--------------------+-----------+
|              author|total_words|
+--------------------+-----------+
| I'm all soul and...|         23|
| and the supposit...|          8|
|"" who preached a...|         14|
| at this period o...|          5|
| ""It gave me the...|         22|
| that these Blasp...|         35|
|      Madame Lalande|          7|
| and I cannot con...|         10|
| one of the ""Eng...|         10|
| you have straigh...|         69|
| and we continued...|          5|
| and in a few bri...|         17|
|      and very happy|          2|
| turning abruptly...|          4|
| who art called o...|          8|
| who gave me this...|          5|
|       Mr. Wyatt."""|         55|
|           Woodville|         10|
| and returned wit...|         26|
|  thet Afriky book?"|          1|
+--------------------+-----------+
only showing top 20 rows



#Task 2: Data Loading and Query Types

4. Write a user-defined function classifying word count >30 as wordy, <7 words as pity, and the difference
as not wordy generating an outcome something like


In [25]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the UDF to classify word counts
def classify_word_count(count):
    if count > 30:
        return "wordy"
    elif count < 7:
        return "pity"
    else:
        return "not wordy"

# Register the UDF
classify_word_count_udf = udf(classify_word_count, StringType())

# Apply the UDF to the DataFrame and create a new column
result_df = result_df.withColumn("classification", classify_word_count_udf("total_words"))

# Show the resulting DataFrame
result_df.show()


+--------------------+-----------+--------------+
|              author|total_words|classification|
+--------------------+-----------+--------------+
| I'm all soul and...|         23|     not wordy|
| and the supposit...|          8|     not wordy|
|"" who preached a...|         14|     not wordy|
| at this period o...|          5|          pity|
| ""It gave me the...|         22|     not wordy|
| that these Blasp...|         35|         wordy|
|      Madame Lalande|          7|     not wordy|
| and I cannot con...|         10|     not wordy|
| one of the ""Eng...|         10|     not wordy|
| you have straigh...|         69|         wordy|
| and we continued...|          5|          pity|
| and in a few bri...|         17|     not wordy|
|      and very happy|          2|          pity|
| turning abruptly...|          4|          pity|
| who art called o...|          8|     not wordy|
| who gave me this...|          5|          pity|
|       Mr. Wyatt."""|         55|         wordy|


In [26]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the UDF to classify word counts
def classify_author_category(count):
    if count > 30:
        return "wordy"
    elif count < 7:
        return "pity"
    else:
        return "not wordy"

# Register the UDF
classify_author_category_udf = udf(classify_author_category, StringType())

# Apply the UDF to the DataFrame and create a new column
result_df = result_df.withColumn("author_category", classify_author_category_udf("total_words"))

# Show the resulting DataFrame
result_df.show()


+--------------------+-----------+--------------+---------------+
|              author|total_words|classification|author_category|
+--------------------+-----------+--------------+---------------+
| I'm all soul and...|         23|     not wordy|      not wordy|
| and the supposit...|          8|     not wordy|      not wordy|
|"" who preached a...|         14|     not wordy|      not wordy|
| at this period o...|          5|          pity|           pity|
| ""It gave me the...|         22|     not wordy|      not wordy|
| that these Blasp...|         35|         wordy|          wordy|
|      Madame Lalande|          7|     not wordy|      not wordy|
| and I cannot con...|         10|     not wordy|      not wordy|
| one of the ""Eng...|         10|     not wordy|      not wordy|
| you have straigh...|         69|         wordy|          wordy|
| and we continued...|          5|          pity|           pity|
| and in a few bri...|         17|     not wordy|      not wordy|
|      and

In [27]:
result_df = result_df.withColumn("author_category", classify_author_category_udf("total_words"))

# Select the desired columns
result_df = result_df.select("author", "total_words", "author_category")

# Show the resulting DataFrame
result_df.show()


+--------------------+-----------+---------------+
|              author|total_words|author_category|
+--------------------+-----------+---------------+
| I'm all soul and...|         23|      not wordy|
| and the supposit...|          8|      not wordy|
|"" who preached a...|         14|      not wordy|
| at this period o...|          5|           pity|
| ""It gave me the...|         22|      not wordy|
| that these Blasp...|         35|          wordy|
|      Madame Lalande|          7|      not wordy|
| and I cannot con...|         10|      not wordy|
| one of the ""Eng...|         10|      not wordy|
| you have straigh...|         69|          wordy|
| and we continued...|          5|           pity|
| and in a few bri...|         17|      not wordy|
|      and very happy|          2|           pity|
| turning abruptly...|          4|           pity|
| who art called o...|          8|      not wordy|
| who gave me this...|          5|           pity|
|       Mr. Wyatt."""|         

#Task 3: Advanced SQL Functions and Expressions

5. Use functions “lower” and “concat” to combine all sentences into one string displaying something like

In [33]:
from pyspark.sql.functions import lower, concat, lit, collect_list, concat_ws

# Combine all sentences into a single string
combined_string = result_df.select(lower(concat(result_df["author"], lit(" "))).alias("combined_string")) \
    .agg(concat_ws("", collect_list("combined_string")).alias("combined_string")) \
    .select("combined_string")

# Show the combined string
combined_string.show(truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [42]:
# Show the train data DataFrame
train_data.show(truncate=False)


+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|id     |text                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 

In [93]:
import sqlite3

# Connect to the SQLite database
connection = sqlite3.connect('/content/train.db')  # Replace 'train.db' with your actual database file name

# Create a cursor object to execute SQL queries
cursor = connection.cursor()

# Execute the SQL query
cursor.execute("SELECT id, author, LOWER(text) AS concatenated_text FROM train")

# Fetch all the rows from the result
rows = cursor.fetchall()

# Display the result
print("+---+-----------------+------------------+")
print("|id |author           |concatenated_text |")
print("+---+-----------------+------------------+")
for row in rows:
    id_value, author, concatenated_text = row
    print(f"|{id_value} |{author} |{concatenated_text} |")
print("+---+-----------------+------------------+")

# Close the cursor and connection
cursor.close()
connection.close()


OperationalError: ignored

In [95]:
import sqlite3

# Connect to a new SQLite database file
connection = sqlite3.connect('/content/train.db')  # Replace 'train.db' with your desired database file name
connection.close()


In [97]:
import sqlite3
import csv

# Connect to the SQLite database
connection = sqlite3.connect('/content/train.db')  # Replace 'train.db' with your database file name

# Create a cursor object to execute SQL queries
cursor = connection.cursor()

# Create the 'train' table
cursor.execute('''CREATE TABLE IF NOT EXISTS train (
                    id INTEGER,
                    text TEXT,
                    author TEXT
                  )''')

# Open the CSV file
with open('/content/train.csv', 'r') as file:
    csv_reader = csv.reader(file)

    # Skip the header row if present
    next(csv_reader)

    # Iterate over the rows in the CSV file and insert them into the 'train' table
    for row in csv_reader:
        cursor.execute("INSERT INTO train VALUES (?, ?, ?)", row)

# Commit the changes to the database
connection.commit()

# Close the cursor and connection
cursor.close()
connection.close()


In [98]:
import sqlite3

# Connect to the SQLite database
connection = sqlite3.connect('/content/train.db')  # Replace 'train.db' with your database file name

# Create a cursor object to execute SQL queries
cursor = connection.cursor()

# Execute the SQL query to combine all sentences into one string
cursor.execute("SELECT id, author, LOWER(text) AS concatenated_text FROM train")

# Fetch all the rows from the result
result = cursor.fetchall()

# Display the result
for row in result:
    print(row)

# Close the cursor and connection
cursor.close()
connection.close()


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
('id23418', 'HPL', 'i say that i saw this thing, but it is only in conscious retrospection that i ever definitely traced its damnable approach to form.')
('id10659', 'HPL', 'its size must have been exaggerated, yet the stones lying about proved that it was no mere negro village.')
('id03247', 'EAP', 'the same individual submitted to me, without being at all aware of my intentions, a method of constructing balloons from the membrane of a certain animal, through which substance any escape of gas was nearly an impossibility.')
('id20632', 'EAP', "the paragraph beginning 'it is folly to suppose that the murder, etc.,' however it appears as printed in l'etoile, may be imagined to have existed actually thus in the brain of its inditer 'it is folly to suppose that the murder, if murder was committed on the body, could have been committed soon enough to have enabled her murderers to throw the body into the river before midnight; 

In [100]:
import sqlite3

# Connect to the SQLite database
connection = sqlite3.connect('/content/train.db')  # Replace 'train.db' with your database file name

# Create a cursor object to execute SQL queries
cursor = connection.cursor()

# Execute the SQL query to combine all sentences into one string
cursor.execute("SELECT id, author, GROUP_CONCAT(LOWER(text), ' ') AS concatenated_text FROM train")

# Fetch all the rows from the result
result = cursor.fetchall()

# Display the result in the desired format
print("+---+-----------------------------------------------------------------+")
print("|id |author |concatenated_text                                       |")
print("+---+-----------------------------------------------------------------+")
for row in result:
    id_value = str(row[0])  # Convert id_value to string
    author_value = row[1]
    concatenated_text = row[2]
    print(f"|{id_value:3s}|{author_value:6s}|{concatenated_text:65s}|")
print("+---+-----------------------------------------------------------------+")

# Close the cursor and connection
cursor.close()
connection.close()


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [48]:
!pip install pyodbc



In [58]:
import pyodbc


In [67]:
server = 'Your_Server_Name'   # Replace with the name or IP address of your SQL Server
database = 'Your_Database_Name'   # Replace with the name of your SQL Server database
username = 'Your_Username'   # Replace with your SQL Server username
password = 'Your_Password'   # Replace with your SQL Server password
driver = '{ODBC Driver 17 for SQL Server}'   # Use the appropriate ODBC driver name


In [68]:
connection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}"


In [None]:
!apt-get install -y curl gnupg
!curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
!curl https://packages.microsoft.com/config/debian/10/prod.list > /etc/apt/sources.list.d/mssql-release.list
!apt-get update
!ACCEPT_EULA=Y apt-get install -y msodbcsql17


In [None]:
!pip install db-sqlite3


In [71]:
import pyodbc


In [87]:
import pyodbc
import sqlite3
import csv
from datetime import date

try:
    # Specify the connection details
    server = 'Your_Server_Name'   # Replace with the name or IP address of your SQL Server
    database = 'Your_Database_Name'   # Replace with the name of your SQL Server database
    username = 'Your_Username'   # Replace with your SQL Server username
    password = 'Your_Password'   # Replace with your SQL Server password
    driver = '{ODBC Driver 17 for SQL Server}'   # Use the appropriate ODBC driver name

    # Create the connection string
    connection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}"

    # Attempt to establish the connection
    connection = pyodbc.connect(connection_string)

    # Connection successful, perform further operations
    cursor = connection.cursor()

    # Open the CSV file
    with open('/content/train.csv', 'r') as file:
        csv_reader = csv.reader(file)

        # Skip the header row if present
        next(csv_reader)

        # Iterate over the rows in the CSV file
        for row in csv_reader:
            # Assuming the table has columns 'id', 'text', and 'author'
            id_value = row[0]
            text_value = row[1]
            author_value = row[2]

            # Prepare the SQL query with parameter placeholders
            sql_query = "INSERT INTO your_table (id, text, author) VALUES (?, ?, ?)"

            # Execute the SQL query with the parameter values
            cursor.execute(sql_query, id_value, text_value, author_value)

    # Commit the changes to the database
    connection.commit()

    # Close the cursor and connection
    cursor.close()
    connection.close()

except pyodbc.Error as e:
    # Error occurred, handle it appropriately
    print(f"Error connecting to SQL Server: {str(e)}")


Error connecting to SQL Server: ('HYT00', '[HYT00] [Microsoft][ODBC Driver 17 for SQL Server]Login timeout expired (0) (SQLDriverConnect)')


In [89]:
import sqlite3
import csv

try:
    # Connect to the SQLite database
    conn = sqlite3.connect('your_database.db')

    # Create a cursor object to execute SQL queries
    cursor = conn.cursor()

    # Open the CSV file
    with open('/content/train.csv', 'r') as file:
        csv_reader = csv.reader(file)

        # Skip the header row if present
        next(csv_reader)

        # Iterate over the rows in the CSV file
        for row in csv_reader:
            # Assuming the table has columns 'id', 'text', and 'author'
            id_value = row[0]
            text_value = row[1]
            author_value = row[2]

            # Prepare the SQL query with parameter placeholders
            sql_query = "INSERT INTO your_table (id, text, author) VALUES (?, ?, ?)"

            # Execute the SQL query with the parameter values
            cursor.execute(sql_query, (id_value, text_value, author_value))

    # Commit the changes to the database
    conn.commit()

    # Close the cursor and connection
    cursor.close()
    conn.close()

except sqlite3.Error as e:
    # Error occurred, handle it appropriately
    print(f"Error connecting to SQLite database: {str(e)}")


Error connecting to SQLite database: no such table: your_table


In [91]:
import sqlite3
import csv

try:
    # Connect to the SQLite database
    conn = sqlite3.connect('your_database.db')

    # Create a cursor object to execute SQL queries
    cursor = conn.cursor()

    # Create a table to store the train data if it doesn't exist
    create_table_query = '''
    CREATE TABLE IF NOT EXISTS train (
        id INTEGER PRIMARY KEY,
        text TEXT,
        author TEXT
    )
    '''
    cursor.execute(create_table_query)

    # Open the CSV file
    with open('/content/train.csv', 'r') as file:
        csv_reader = csv.reader(file)

        # Skip the header row if present
        next(csv_reader)

        # Iterate over the rows in the CSV file
        for row in csv_reader:
            # Assuming the CSV columns are in the order id, text, author
            id_value = row[0]
            text_value = row[1]
            author_value = row[2]

            # Insert the data into the 'train' table
            insert_query = "INSERT INTO train (id, text, author) VALUES (?, ?, ?)"
            cursor.execute(insert_query, (id_value, text_value, author_value))

    # Commit the changes to the database
    conn.commit()

    # Close the cursor and connection
    cursor.close()
    conn.close()

except sqlite3.Error as e:
    # Error occurred, handle it appropriately
    print(f"Error connecting to SQLite database: {str(e)}")


Error connecting to SQLite database: datatype mismatch


#Task 4: Views and Temporary Tables

6. Create a view using spark.sql to displaying any one sentence for each author with words >30.


In [101]:
# Create a view using spark.sql
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW author_sentence_view AS
    SELECT author, text
    FROM (
        SELECT author, text, size(split(text, ' ')) AS word_count,
               row_number() OVER (PARTITION BY author ORDER BY size(split(text, ' ')) DESC) AS rn
        FROM spooky_sentences
    ) tmp
    WHERE word_count > 30 AND rn = 1
""")

# Query the view to display the results
result_df = spark.sql("SELECT * FROM author_sentence_view")
result_df.show(truncate=False)


+--------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|author                                                                                                                                      |text                                                                                                                                                                                                                                                                                           

#Task 5: Error Handling and Debugging


7. Add to task.6 a “try-except” block for any item you chose as long as its valid. For instance, it could be
an error for reading a file, displaying an entry without a sentence and similar.


In [102]:
try:
    # Create a view using spark.sql
    spark.sql("""
        CREATE OR REPLACE TEMPORARY VIEW author_sentence_view AS
        SELECT author, text
        FROM (
            SELECT author, text, size(split(text, ' ')) AS word_count,
                   row_number() OVER (PARTITION BY author ORDER BY size(split(text, ' ')) DESC) AS rn
            FROM spooky_sentences
        ) tmp
        WHERE word_count > 30 AND rn = 1
    """)

    # Query the view to display the results
    result_df = spark.sql("SELECT * FROM author_sentence_view")
    result_df.show(truncate=False)

except Exception as e:
    # Handle the exception
    print(f"An error occurred: {str(e)}")


+--------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|author                                                                                                                                      |text                                                                                                                                                                                                                                                                                           

In [103]:
try:
    # Read the file and create a DataFrame
    df = spark.read.csv("/content/train.csv", header=True)

    # Create a view using spark.sql
    spark.sql("""
        CREATE OR REPLACE TEMPORARY VIEW author_sentence_view AS
        SELECT author, text
        FROM (
            SELECT author, text, size(split(text, ' ')) AS word_count,
                   row_number() OVER (PARTITION BY author ORDER BY size(split(text, ' ')) DESC) AS rn
            FROM spooky_sentences
        ) tmp
        WHERE word_count > 30 AND rn = 1
    """)

    # Query the view to display the results
    result_df = spark.sql("SELECT * FROM author_sentence_view")
    result_df.show(truncate=False)

except FileNotFoundError:
    print("Error reading the file: File not found.")

except KeyError:
    print("Error occurred displaying entry without a sentence: KeyError: 'text'")


+--------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|author                                                                                                                                      |text                                                                                                                                                                                                                                                                                           

#Task 6: Spark SQL for Machine Learning

8. Calculate the lexical density by author displaying something like

In [104]:
# Register the DataFrame as a temporary view
df.createOrReplaceTempView("spooky_sentences")

# Calculate the lexical density by author using Spark SQL
result_df = spark.sql("""
    SELECT author,
        SUM(size(split(text, ' '))) AS total_words,
        AVG(size(split(text, ' '))) AS average_words,
        COUNT(DISTINCT text) AS unique_sentences,
        (COUNT(DISTINCT text) / COUNT(*)) AS lexical_density
    FROM spooky_sentences
    GROUP BY author
    ORDER BY lexical_density DESC
""")

# Show the resulting DataFrame
result_df.show(truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+----------------+---------------+
|author                                                                                                                                                                     |total_words|average_words|unique_sentences|lexical_density|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+----------------+---------------+
| you have straightened out the Feet                                                                                                                                        |69         |69.0         |1               |1.0            |
| ""It gave me the greatest pleasure to receive a letter from my unc