# Dimension Creation Script
### We will create the following dimensions/facts
 - dim_book
 - dim_review
 - dim_genre
 - dim_author
 - facts_interactions (AKA facts)

In [1]:
!pip install gcsfs
!pip install openpyxl

[0m

In [2]:
import pandas as pd
import gcsfs
import calendar
from google.cloud import storage
from google.cloud.exceptions import NotFound
from pyspark.sql import Row
from pyspark.sql.functions import col, lower, instr, date_format, split, rand, regexp_replace, trim, max, expr, element_at
from pyspark.sql.functions import monotonically_increasing_id # virtually the same as factorize() from pandas.
from pyspark.sql import DataFrame as PySparkDataFrame

In [3]:
bucket_name = "goodreads_bucket"
PROJECT = 'cis4400-individual-project'
FRACTION = 0.01
SEED = 645

# Reading data

In [4]:
# We will read from the following folders
#
# cleaned/genre
# cleaned/authors
# cleaned/books
# cleaned/interactions
# cleaned/reviews

In [5]:
# Path to the GCS bucket
folder_name = "authors"
gcs_path = f"gs://{bucket_name}/cleaned/{folder_name}/"

# Read the Parquet files
authors_df = spark.read.parquet(gcs_path)

# Show the DataFrame schema and data
authors_df.printSchema()
authors_df.count()

                                                                                

root
 |-- author_id: integer (nullable = true)
 |-- average_rating: float (nullable = true)
 |-- name: string (nullable = true)
 |-- ratings_count: integer (nullable = true)
 |-- text_reviews_count: integer (nullable = true)



                                                                                

829529

In [6]:
# Path to the GCS bucket
folder_name = "genre"
gcs_path = f"gs://{bucket_name}/cleaned/{folder_name}/"

# Read the Parquet files
genre_df = spark.read.parquet(gcs_path)

# Show the DataFrame schema and data
genre_df.printSchema()
genre_df.count()

root
 |-- book_id: string (nullable = true)
 |-- children: long (nullable = true)
 |-- comics_graphic: long (nullable = true)
 |-- fantasy_paranormal: long (nullable = true)
 |-- fiction: long (nullable = true)
 |-- history: long (nullable = true)
 |-- mystery: long (nullable = true)
 |-- non_fiction: long (nullable = true)
 |-- poetry: long (nullable = true)
 |-- romance: long (nullable = true)
 |-- young_adult: long (nullable = true)



                                                                                

2360655

In [7]:
# Path to the GCS bucket
folder_name = "books"
gcs_path = f"gs://{bucket_name}/cleaned/{folder_name}/"

# Read the Parquet files
books_df = spark.read.parquet(gcs_path)#.sample(withReplacement=False, fraction=FRACTION, seed=SEED)

# Show the DataFrame schema and data
books_df.printSchema()
books_df.count()

root
 |-- asin: string (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: float (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- country_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- edition_information: string (nullable = true)
 |-- format: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- is_ebook: integer (nullable = true)
 |-- isbn: integer (nullable = true)
 |-- isbn13: long (nullable = true)
 |-- kindle_asin: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- link: string (nullable = true)
 |-- num_pages: integer (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- pu

                                                                                

2360655

In [8]:
# Path to the GCS bucket
folder_name = "interactions"
gcs_path = f"gs://{bucket_name}/cleaned/{folder_name}/"

# Read the Parquet files
interactions_df = spark.read.parquet(gcs_path).sample(withReplacement=False, fraction=0.4, seed=SEED)

# Show the DataFrame schema and data
interactions_df.printSchema()
interactions_df.count()

root
 |-- user_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- is_read: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- is_reviewed: string (nullable = true)



                                                                                

91457444

In [9]:
# Path to the GCS bucket
folder_name = "reviews"
gcs_path = f"gs://{bucket_name}/cleaned/{folder_name}/"

# Read the Parquet files
reviews_df = spark.read.parquet(gcs_path)#.sample(withReplacement=False, fraction=FRACTION, seed=SEED)

# Show the DataFrame schema and data
reviews_df.printSchema()
reviews_df.count()

root
 |-- book_id: integer (nullable = true)
 |-- n_comments: integer (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_text: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_added: timestamp (nullable = true)
 |-- date_updated: timestamp (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- read_at: timestamp (nullable = true)



                                                                                

15588978

# Function to help with dimension creation

In [10]:
def create_ids(df,id_column_name=None):
    #Transforms PySpark dataframe
    if isinstance(df, PySparkDataFrame):
        print("Transforming PySpark DataFrame...")
        
        if id_column_name is None:
            id_column_name = "id"
        
        # Extract the original schema
        original_schema = df.schema

        # Add an ID column using zipWithIndex
        df_with_index = (
            df.rdd
            .zipWithIndex()  # Add an index to each row
            .map(lambda x: Row(**dict(x[0].asDict(), id=x[1])))  # Add 'id' to each row
        )

        # Define the new schema with the ID column added
        new_schema = original_schema.add(id_column_name, "long")

        # Create a new DataFrame with the updated schema
        df_with_index = spark.createDataFrame(df_with_index, schema=new_schema)
        
        print("Completed transforming PySpark DataFrame.")
        return df_with_index
    
    #No valid dataframe found
    else:
        print("ERROR: INVALID DATAFRAME - NO PROCEDURE APPLIED.")
        print("Did you try inputting a PySpark Dataframe?")

In [11]:
# Function to check how unique a column is. This will be used to verify all ID's in an ID column are unique.
def check_uniqueness(df, col_name):
    if isinstance(df, PySparkDataFrame):
        total_count = df.count()
        distinct_count = df.select(col_name).distinct().count()
    elif isinstance(df, pd.DataFrame):
        total_count = len(df)
        distinct_count = df[col_name].nunique()

    if total_count == distinct_count:
        print(f"The column '{col_name}' contains all unique values.")
    else:
        print(f"The column '{col_name}' contains duplicates. Total rows: {total_count}, Distinct rows: {distinct_count}")

In [12]:
# initialize array. Append all dims and tables here. At the end, we loop through this list to save all dataframes.
tables = []

# DIMENSION CREATIONS

# Dim Review

In [13]:
#Create Dim
dim_review = reviews_df.drop("rating")

#Remove duplicates
dim_review = dim_review.distinct()

#Add unique ID
dim_review = create_ids(dim_review,"dim_review_id")

#Add to tables
tables.append([dim_review, "dim_review"])

#Show
dim_review.printSchema()

Transforming PySpark DataFrame...




Completed transforming PySpark DataFrame.
root
 |-- book_id: integer (nullable = true)
 |-- n_comments: integer (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_text: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_added: timestamp (nullable = true)
 |-- date_updated: timestamp (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- read_at: timestamp (nullable = true)
 |-- dim_review_id: long (nullable = true)




                                                                                

# Dim Author

In [14]:
#Create Dim
dim_author = authors_df

#Remove duplicates
dim_author = dim_author.distinct()

#Add unique ID
dim_author = create_ids(dim_author,"dim_author_id")

#Add to tables
tables.append([dim_author, "dim_author"])

#Show
dim_author.printSchema()

Transforming PySpark DataFrame...




Completed transforming PySpark DataFrame.
root
 |-- author_id: integer (nullable = true)
 |-- average_rating: float (nullable = true)
 |-- name: string (nullable = true)
 |-- ratings_count: integer (nullable = true)
 |-- text_reviews_count: integer (nullable = true)
 |-- dim_author_id: long (nullable = true)




                                                                                

# Dim Genre

In [15]:
#Create Dim
dim_genre = genre_df

#Remove duplicates
dim_genre = dim_genre.distinct()

#Add unique ID
dim_genre = create_ids(dim_genre,"dim_genre_id")

#Add to tables
tables.append([dim_genre, "dim_genre"])

#Show
dim_genre.printSchema()

Transforming PySpark DataFrame...




Completed transforming PySpark DataFrame.
root
 |-- book_id: string (nullable = true)
 |-- children: long (nullable = true)
 |-- comics_graphic: long (nullable = true)
 |-- fantasy_paranormal: long (nullable = true)
 |-- fiction: long (nullable = true)
 |-- history: long (nullable = true)
 |-- mystery: long (nullable = true)
 |-- non_fiction: long (nullable = true)
 |-- poetry: long (nullable = true)
 |-- romance: long (nullable = true)
 |-- young_adult: long (nullable = true)
 |-- dim_genre_id: long (nullable = true)




                                                                                

# Dim Book

In [16]:
#Create Dim
dim_book = books_df

#Drop unnecessary columns
dim_book = dim_book.drop("similar_books","popular_shelves","series","work_id")

#Remove duplicates
dim_book = dim_book.distinct()

# Flatten the array for "authors".
# Although this may exclude any author past the 3rd one, we are doing simple data analysis.
# We could also extract flatten for the roles too but I believe this would be too much for only specific analysis.
dim_book = dim_book.withColumn("author_1_id", element_at(expr("transform(authors, x -> x.author_id)"), 1)) \
       .withColumn("author_2_id", element_at(expr("transform(authors, x -> x.author_id)"), 2)) \
       .withColumn("author_3_id", element_at(expr("transform(authors, x -> x.author_id)"), 3))
dim_book = dim_book.drop("authors")

# Change author_id's to int
dim_book = dim_book.withColumn("author_1_id", col("author_1_id").cast("int"))
dim_book = dim_book.withColumn("author_2_id", col("author_2_id").cast("int"))
dim_book = dim_book.withColumn("author_3_id", col("author_3_id").cast("int"))

# Transfer new author ids to facts table and remove them here. 
# To start, create a new df with only book_id and the author ids. We will join this on facts
author_ids_df = dim_book.select("book_id","author_1_id","author_2_id","author_3_id")

# Drop them in book dimension as they are no longer needed here. 
dim_book = dim_book.drop("author_1_id","author_2_id","author_3_id")

#Add unique ID
dim_book = create_ids(dim_book,"dim_book_id")

#Add to tables
tables.append([dim_book, "dim_book"])

#Show
dim_book.printSchema()

Transforming PySpark DataFrame...




Completed transforming PySpark DataFrame.
root
 |-- asin: string (nullable = true)
 |-- average_rating: float (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- country_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- edition_information: string (nullable = true)
 |-- format: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- is_ebook: integer (nullable = true)
 |-- isbn: integer (nullable = true)
 |-- isbn13: long (nullable = true)
 |-- kindle_asin: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- link: string (nullable = true)
 |-- num_pages: integer (nullable = true)
 |-- publication_day: integer (nullable = true)
 |-- publication_month: integer (nullable = true)
 |-- publication_year: integer (nullable = true)
 |-- publisher: string (nullable = true)
 |-- ratings_count: integer (nullable = true)
 |-- text_reviews_count: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- title_wi



                                                                                

# Facts

In [None]:
# Create facts
facts = interactions_df

# Add foreign IDs, then drop foreign columns excluding its id.
facts = facts.join(
    reviews_df,
    on=["user_id", "book_id"], 
    how="left" 
)

# Join the new df we made with the author id's. 
facts = facts.join(
    author_ids_df,
    on="book_id", 
    how="left" 
)
# Delete it as we dont need it anymore. 
del author_ids_df

facts = facts.drop("date_added", "date_updated", "n_comments",
                  "n_votes", "read_at", "review_text",
                  "review_id", "review_text", "started_at")


#Remove duplicates
facts = facts.distinct()

# Add unique IDs
facts = facts.withColumn("fact_id", monotonically_increasing_id()+1)

# Change book_id and rating to int
facts = facts.withColumn("book_id", col("book_id").cast("int"))
facts = facts.withColumn("rating", col("rating").cast("int"))

# Add to tables
tables.append([facts, "facts"])

# Show
facts.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- is_read: string (nullable = true)
 |-- is_reviewed: string (nullable = true)
 |-- fact_id: long (nullable = false)



# Functions for saving

In [18]:
def check_folder_exists(folder_name):
    
    # Grab the bucket
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    
    # List objects in the bucket with the folder prefix
    blobs = bucket.list_blobs(prefix=folder_name + '/')
    
    # Check if any blob exists under this folder
    for blob in blobs:
        if blob.name.startswith(folder_name + '/'):
            print(f"Folder '{folder_name}' exists in bucket '{bucket_name}'.")
            return True
    
    print(f"Folder '{folder_name}' does not exist in bucket '{bucket_name}'.")
    return False

In [19]:
def create_folder(path, name):
    
    if check_folder_exists(name): 
        print(path+name+'/'+' already exists. Abandoning folder creation')
        return
    
    # Initialize the GCS client
    client = storage.Client()

    # Get the bucket
    bucket = client.get_bucket(bucket_name)

    # Define the folder path
    folder_path = path+name+'/'

    # Create a dummy file in the folder (this will simulate the folder in GCS)
    # The 'blob' will not be a real file, but will create the "folder"
    blob = bucket.blob(folder_path + 'placeholder.txt')  # You can name it anything, like 'placeholder.txt'

    # Upload an empty string or any content to simulate the folder creation
    blob.upload_from_string('')

    print(f"Folder '{folder_path}' created successfully in the bucket '{bucket_name}'")

In [20]:
def delete_files_named(name_of_file_to_delete):
    # Initialize a client
    client = storage.Client()

    # Specify the bucket name
    _bucket = client.bucket(bucket_name)

    # List and delete all files named "placeholder.txt"
    blobs = _bucket.list_blobs()

    count_deleted = 0
    for blob in blobs:
        if blob.name.endswith(name_of_file_to_delete):
            blob.delete()
            print(f"Deleted: {blob.name}")
            count_deleted += 1
            
    
    print(f"Deletion of all '{name_of_file_to_delete}' files is complete. Total deleted: "+str(count_deleted))

In [21]:
def save_df(df,name,save_as="parquet",gcs_path=None):

    #we must define the bucket variable at the start of the script, before calling this function.
    
    # Define the GCS path for saving the file
    if gcs_path == None:
        gcs_path = 'gs://'+bucket_name+'/dim_ready/'+name+'/'+name+'.'+save_as
    
    
    
    print(f"Creating '{name}' folder...")
    create_folder('dim_ready/',name)
    delete_files_named("placeholder.txt")
    
    print("Beginning saving process...")
    flag = 0 # this var will let us know is there as been an error
    
    #Saves Pandas dataframe
    if isinstance(df, pd.DataFrame):
        # Initialize GCS file system
        fs = gcsfs.GCSFileSystem(project=PROJECT)

        # Save DataFrame to the "cleaned" folder in the GCS bucket
        with fs.open(gcs_path, 'w') as f:
            if save_as == "csv":
                df.to_csv(f, index=False)
            else:
                print("INVALD FILE FORMAT. TRY [csv]")
                flag+=1
        
        if flag == 0:
            print("Successfully saved "+name+ "!")
    
    #Saves PySpark dataframe
    elif isinstance(df, PySparkDataFrame):
        if save_as == "parquet":
            df.write.parquet(gcs_path,mode="overwrite")
        elif save_as == "csv":
            df.write.csv(gcs_path,mode="overwrite")
        elif save_as == "avro":
            df.write.format("avro").save(gcs_path,mode="overwrite")
        else:
            print("INVALD FILE FORMAT. TRY [csv] or [parquet] or [avro]")
            flag+=1
        
        if flag == 0:
            print("Successfully saved "+name+ "!")
        
    #No valid dataframe found
    else:
        print("ERROR: INVALID DATAFRAME - NO PROCEDURE APPLIED")

In [22]:
#this function accepts a list of tables, that each contain 2 elements, the table itself, and the name of the table.
#Example, table[0] = [df_1,"df_1_name"], table[1][0] = df_2, table[3][1] = "df_4_name"
def save_list_of_df(list_of_tables,pandas_save="csv",pyspark_save="parquet"):
    for table in list_of_tables:
        if isinstance(table[0], pd.DataFrame):
            save_df(table[0],table[1],save_as=pandas_save)
        elif isinstance(table[0], PySparkDataFrame):
            save_df(table[0],table[1],save_as=pyspark_save)
        else:
            print("ERROR: INVALID LIST TO SAVE. TRY PASSING FORMAT - list = ([table_name, df])")

In [23]:
#We will simply save all Pandas dataframes as CSV's and all PySpark ones as parquet

ENABLE_SAVE = True #boolean to quickly turn on/off saving. Good to turn off saving when debugging.

if ENABLE_SAVE:
    save_list_of_df(tables)

Creating 'dim_review' folder...
Folder 'dim_review' does not exist in bucket 'goodreads_bucket'.
Folder 'dim_ready/dim_review/' created successfully in the bucket 'goodreads_bucket'
Deleted: dim_ready/dim_review/placeholder.txt
Deletion of all 'placeholder.txt' files is complete. Total deleted: 1
Beginning saving process...


                                                                                

Successfully saved dim_review!
Creating 'dim_author' folder...
Folder 'dim_author' does not exist in bucket 'goodreads_bucket'.
Folder 'dim_ready/dim_author/' created successfully in the bucket 'goodreads_bucket'
Deleted: dim_ready/dim_author/placeholder.txt
Deletion of all 'placeholder.txt' files is complete. Total deleted: 1
Beginning saving process...


                                                                                

Successfully saved dim_author!
Creating 'dim_genre' folder...
Folder 'dim_genre' does not exist in bucket 'goodreads_bucket'.
Folder 'dim_ready/dim_genre/' created successfully in the bucket 'goodreads_bucket'
Deleted: dim_ready/dim_genre/placeholder.txt
Deletion of all 'placeholder.txt' files is complete. Total deleted: 1
Beginning saving process...


                                                                                

Successfully saved dim_genre!
Creating 'dim_book' folder...
Folder 'dim_book' does not exist in bucket 'goodreads_bucket'.
Folder 'dim_ready/dim_book/' created successfully in the bucket 'goodreads_bucket'
Deleted: dim_ready/dim_book/placeholder.txt
Deletion of all 'placeholder.txt' files is complete. Total deleted: 1
Beginning saving process...


24/12/10 10:29:32 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Successfully saved dim_book!
Creating 'facts' folder...
Folder 'facts' does not exist in bucket 'goodreads_bucket'.
Folder 'dim_ready/facts/' created successfully in the bucket 'goodreads_bucket'
Deleted: dim_ready/facts/placeholder.txt
Deletion of all 'placeholder.txt' files is complete. Total deleted: 1
Beginning saving process...


                                                                                

Successfully saved facts!
