In [None]:
# install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=c8c121d64cceb13dd655bc9eb1cbeb7b3079867bd1be47cdd3c441a9a60be824
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, IntegerType, FloatType, DoubleType, BooleanType, DateType, TimestampType
from pyspark.sql.functions import col, when
import datetime

# create SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [None]:
def create_df():
    # Prompt the user for the name of the table and the column names
    table_name = input("Enter the name of the table:")
    print("Available datatypes: string, int, float, double, boolean, date, timestamp.")
    # take column names and data types from user
    column_data = input(
    "Enter column names and data types (separated by commas, e.g. col1:int,col2:string):").split(",")

    fields = []
    for col_data in column_data:
        try:
            col_name, col_type = col_data.split(":")  # This will raise the ValueError
        except ValueError:
            print("An error occurred: you missed a column name or datatype, try again!")
            return create_df()
        if col_type == "int":
            fields.append(StructField(col_name, IntegerType(), True))
        elif col_type == "float":
            fields.append(StructField(col_name, FloatType(), True))
        elif col_type == "double":
            fields.append(StructField(col_name, DoubleType(), True))
        elif col_type == "boolean":
            fields.append(StructField(col_name, BooleanType(), True))
        elif col_type == "date":
            fields.append(StructField(col_name, DateType(), True))
        elif col_type == "timestamp":
            fields.append(StructField(col_name, TimestampType(), True))
        elif col_type == "string":
            fields.append(StructField(col_name, StringType(), True))
        else:
            print("Invalid datatype, try again!")
            return create_df()

    # create DataFrame schema based on column names and data types
    schema = StructType(fields)

    # create empty DataFrame with schema
    df = spark.createDataFrame([], schema)

    return table_name, column_data, df

In [None]:
from pyspark.sql.functions import array_contains

print("Welcome to database")
print("Please, note that the program is case sensitive!")

# create dictionary to store tables
tables = {}
tables_metadata = {}

# ask user if they want to create a table
create_table = input("Do you want to create a table? (Y/N) ")

if create_table.lower() == "y":

    table_name, column_data,  df = create_df()

    # create temporary view of DataFrame to use in Spark SQL queries
    df.createOrReplaceTempView(table_name)

    # add table to dictionary
    tables[table_name] = df
    tables_metadata[table_name] = column_data

    # ask user if they want to add, delete, update an entry or print the table
    while True:

        df = tables[table_name]
        column_data = tables_metadata[table_name] 
        schema = df.schema

        print("Your are editing in Table", table_name)

        modify_entry = input("Do you want to add, delete, update an entry (A/D/U), print the table (P), or switch table (S)? ")

        if modify_entry.lower() == "a":
            
            # create empty dictionary to store entry data
            entry_data = {}


           # take input for each column in the table
            for col_data in column_data:
               col_name, col_type = col_data.split(":")

               while True:
                 entry_value = input(f"Enter value for {col_name}: ")

                 try:
                    if col_type == "int":
                       entry_data[col_name] = int(entry_value)
                    elif col_type == "float" or col_type == "double":
                       entry_data[col_name] = float(entry_value)
                    elif col_type == "boolean":
                       entry_data[col_name] = bool(entry_value)
                    elif col_type == "date":
                       entry_data[col_name] = datetime.strptime(entry_value, "%Y-%m-%d").date()
                    elif col_type == "timestamp":
                       entry_data[col_name] = datetime.strptime(entry_value, "%Y-%m-%d %H:%M:%S")
                    else:
                       entry_data[col_name] = entry_value
                    break
                 except ValueError:
                    print(f"Invalid value for {col_name}. Please enter a value of type {col_type}.")
                    continue



            # create DataFrame with new entry
            new_entry = spark.createDataFrame([entry_data], schema)
            
            # append it to original DataFrame
            df = df.union(new_entry)

            # update temporary view of DataFrame
            df.createOrReplaceTempView(table_name)

            print("New entry added to table.")

        elif modify_entry.lower() == "d":

            print("Column names:", df.columns)
            # take column name and value to delete from user
            delete_col = input("Enter column name to delete entry from: ")
            delete_value = input("Enter value to delete: ")

            # search for column type
            for col_data in column_data:
                col_name, col_type = col_data.split(":")

                if delete_col != col_name:
                  continue

                if col_type == "int":
                    delete_value = int(delete_value)
                elif col_type == "float" or col_type == "double":
                    delete_value = float(delete_value)
                elif col_type == "boolean":
                    delete_value = bool(delete_value)
                elif col_type == "date":
                    delete_value = datetime.strptime(delete_value, "%Y-%m-%d").date()
                elif col_type == "timestamp":
                    delete_value = datetime.strptime(delete_value, "%Y-%m-%d %H:%M:%S")

            if delete_col in df.columns:
                    # delete entry from DataFrame
                    df = df.filter(f"{delete_col} != '{delete_value}'")
                    print(f"A row deleted from table {table_name}.")
            else:
                    print("Column is not found.")

        elif modify_entry.lower() == "u":         

            if df.count() == 0:
              print("Table is empty. You cannot update an empty table. Add entries first.")
            else:
              print("Column names:", df.columns)

              # take column name and value to update from user
              identifier_col = df.columns[0]  # use first column as identifier
              identifier_value = input(f"Enter {identifier_col} of row to update: ")
              # check if identifier exists in table
              if identifier_value not in df[identifier_col].values:
                print(f"{identifier_col} '{identifier_value}' does not exist in table.")
              else:
                 update_col = input("Enter column name to update entry: ")           
              
                  # check if update column name is valid
                 while update_col not in df.columns:
                         print(f"Invalid column name. Please select from the following column names:\n{df.columns}")
                         update_col = input("Enter column name to update entry: ")
                 update_col_new_value = input("Enter new value: ")
                    # search for column type
                 for col_data in column_data:
                        col_name, col_type = col_data.split(":")
                        if update_col != col_name:
                           continue
                        try:
                          if col_type == "int":
                             update_col_new_value = int(update_col_new_value)                             
                          elif col_type == "float":
                             update_col_new_value = float(update_col_new_value)
                          elif col_type == "boolean":
                             update_col_new_value = bool(update_col_new_value)
                          elif col_type == "date":
                             update_col_new_value = datetime.strptime(update_col_new_value, "%Y-%m-%d").date()
                          elif col_type == "timestamp":
                              update_col_new_value = datetime.strptime(update_col_new_value, "%Y-%m-%d %H:%M:%S")
                        except ValueError:
                           print(f"Invalid data type for column '{update_col}'. Please enter valid data type.")
                           break
                     # update table                 
                 
                 df.loc[df[identifier_col] == identifier_value, update_col] = update_col_new_value
                                                          
                 print("Table updated successfully.")                        
 

        elif modify_entry.lower() == "p":
            # show the contents of the DataFrame
            df.show()

        elif modify_entry.lower() == "s":
            print("The following are the available tables:")
            print([table for table in tables.keys()])

            # take table name to switch to from user
            switch_table = input("Enter table name to switch to or enter 'C' to create a new table: ")

            # check if the user wants to create new table
            if switch_table.lower() == "c":
                table_name, column_data, df = create_df()

                # create temporary view of DataFrame to use in Spark SQL queries
                df.createOrReplaceTempView(table_name)

                # add table to dictionary
                tables[table_name] = df
                tables_metadata[table_name] = column_data

            # check if table exists in dictionary
            elif switch_table in tables:
                # switch to selected table
                df = tables[switch_table]
                table_name = switch_table
            else:
                print("Table does not exist.")
                continue

        else:
            print("Invalid input.")
            continue

        # update table in dictionary
        tables[table_name] = df

        # ask user if they want to continue
        continue_modifying = input("Do you want to continue? (Y/N) ")

        if continue_modifying.lower() != "y":
            break

else:
    print("No table created.")

Welcome to database
Please, note that the program is case sensitive!
Do you want to create a table? (Y/N) y
Enter the name of the table:bigdata
Available datatypes: string, int, float, double, boolean, date, timestamp.
Enter column names and data types (separated by commas, e.g. col1:int,col2:string):name:string,id:int
Your are editing in Table bigdata
Do you want to add, delete, update an entry (A/D/U), print the table (P), or switch table (S)? d
Column names: ['name', 'id']
Enter column name to delete entry from: name
Enter value to delete: kiran
A row deleted from table bigdata.
Do you want to continue? (Y/N) y
Your are editing in Table bigdata
Do you want to add, delete, update an entry (A/D/U), print the table (P), or switch table (S)? p
+----+---+
|name| id|
+----+---+
+----+---+

Do you want to continue? (Y/N) y
Your are editing in Table bigdata
Do you want to add, delete, update an entry (A/D/U), print the table (P), or switch table (S)? u
Table is empty. You cannot update an e

KeyboardInterrupt: ignored