In [0]:
# import packages and create spark session

from pyspark.sql.functions import *
from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("ML Model")
         .getOrCreate())

sc = spark.sparkContext

In [0]:

# Create a UDF (User Defined Function) function to filter and transform 
# the data and generate model result

# User defined function
def transform_and_predict(df, ml_model, stringindexer):
    from pyspark.sql.functions import col, regexp_replace, lower, trim
    from pyspark.ml import PipelineModel

    # Preprocessing of the feature column
    df = df.withColumn("text", lower(df.text)) # Lowercase Conversion

    # Remove non letter characters and replace with nothing
    df = df.withColumn("text", regexp_replace("text", "[^a-zA-Z\\s]", "")) 

    # Remove Extra Spaces
    df = df.withColumn("text", regexp_replace("text", "\\s+", " ")) 

    # Remove Leading and Trailing Spaces
    df = df.withColumn("text", trim(df.text))

    # Load the saved machine learning pipeline model
    model = PipelineModel.load(ml_model)

    # Making the prediction
    prediction = model.transform(df)

    # predicted = prediction.select(col('text'), col('prediction'))
   
    # Decoding the indexer
    from pyspark.ml.feature import StringIndexerModel, IndexToString

    # Load in the StringIndexer that was saved
    indexer = StringIndexerModel.load(stringindexer)

    # Initialize the IndexToString converter
    i2s = IndexToString(inputCol = 'prediction', outputCol = 'decoded', labels = indexer.labels)
    # converted = i2s.transform(predicted)
    converted = i2s.transform(prediction)

    # Display the important columns
    return converted


In [0]:
# define this function

def saveFiles_multiple_call(ratings_path, ratings_agg_path, dfR, dfRAgg):

    # write the result to a folder container several files
  
    dfR.coalesce(1).write.option("header", "true") \
    .mode("overwrite") \
    .csv(ratings_path)

    dfRAgg.coalesce(1).write.option("header", "true") \
    .mode("overwrite") \
    .csv(ratings_agg_path)
    
    print('files successfully saved') 


In [0]:
# Function to extract date from folder/file name
def extract_date(name):
    return name.split('=')[1]



# Define the path to the daily files source
src_path = "/mnt/bd-project/de-yelp-daily/"

# Define the path to the final ratings data folder
final_path = "/mnt/bd-project/final-ratings/"

# get the list of folders in source
src_list = dbutils.fs.ls(src_path)
src_name_list = [item.name for item in src_list if item.isDir]
print(src_name_list)

# List files in final folder
final_list = dbutils.fs.ls(final_path)
final_name_list = [item.name for item in final_list if item.isFile()]
print(final_name_list)

# Extract dates from folder names in source folder
dates_in_src = sorted([extract_date(folder_name) for folder_name in src_name_list])

# Extract dates from file names in final
temp = sorted([extract_date(file_name) for file_name in final_name_list])
dates_in_final = [item.split(".")[0] + "/" for item in temp]


# Find missing files
missing_folders = \
    [folder_name for folder_name in dates_in_src if folder_name not in dates_in_final]

# Print the list of folders in source folder whose equivalent file names is missing in final folder
print("Folders name in source that is missing in final:")

if len(missing_folders) == 0:
    print("No missing folders to process")
else:
    for folder in missing_folders:
        print(f"dt={folder}")

['dt=2024-05-05/', 'dt=2024-05-07/', 'dt=2024-05-22/', 'dt=2024-05-25/', 'dt=2024-05-26/', 'dt=2024-05-27/', 'dt=2024-05-30/', 'dt=2024-06-02/', 'dt=2024-06-03/', 'dt=2024-06-06/', 'dt=2024-06-08/', 'dt=2024-06-09/', 'dt=2024-06-10/', 'dt=2024-06-12/', 'dt=2024-06-13/', 'dt=2024-06-14/', 'dt=2024-06-16/', 'dt=2024-06-17/', 'dt=2024-06-18/', 'dt=2024-06-19/', 'dt=2024-06-20/', 'dt=2024-06-22/', 'dt=2024-06-24/', 'dt=2024-06-26/', 'dt=2024-06-27/', 'dt=2024-07-02/', 'dt=2024-07-03/', 'dt=2024-07-04/', 'dt=2024-07-06/', 'dt=2024-07-08/', 'dt=2024-07-09/', 'dt=2024-07-10/', 'dt=2024-07-12/']
['dt=2024-05-05.csv', 'dt=2024-05-07.csv', 'dt=2024-05-22.csv', 'dt=2024-05-25.csv', 'dt=2024-05-26.csv', 'dt=2024-05-27.csv', 'dt=2024-05-30.csv', 'dt=2024-06-02.csv', 'dt=2024-06-03.csv', 'dt=2024-06-06.csv', 'dt=2024-06-08.csv', 'dt=2024-06-09.csv', 'dt=2024-06-10.csv', 'dt=2024-06-12.csv', 'dt=2024-06-13.csv', 'dt=2024-06-14.csv', 'dt=2024-06-16.csv', 'dt=2024-06-17.csv', 'dt=2024-06-18.csv', 'dt=2

In [0]:
# copy only csv files of ratings and aggregated ratings from temporary 
# folder to permanent folder
def CopyCsv(source_dir, target_dir, item):

    # List all files in the source directory
    files = dbutils.fs.ls(source_dir)

    # Filter for .csv files
    csv_files = [item for item in files if item.name.endswith('.csv')]

    # Since only one CSV file expected to be in folder
    if len(csv_files) == 1:
        source_file_path = csv_files[0].path
        
        target_file_path = target_dir + "/" + item + ".csv"
        dbutils.fs.cp(source_file_path, target_file_path)
        print("copy operation successful")
    else:
        print("Error: Expected 1 CSV file, found multiple.")


In [0]:
# delete contents of the temporary ratings and the temporary 
# aggregated ratings folders

def RemoveFolderContents(path):
    items = dbutils.fs.ls(path)

    # Remove each item
    for item in items:
        dbutils.fs.rm(item.path, True)

In [0]:
# set trained model path and use the trained model on daily yelp data to 
# predict the ratings and to use in further data analysis


# set trained model path and encoded label path
ml_modelX = "/mnt/bd-project/LRModel"
stringindexerY = "/mnt/bd-project/StringIndexer"


kanter = 0
# Loop through the list of folders and process each one
for item in missing_folders:

    # kanter += 1     # for testing
    # if kanter == 3:
    #     break
    
    if len(missing_folders) == 0:
        break


    print(item)
    itm = "dt=" + item.split("/")[0]
    print(itm)
    
    temp_pathX = "/mnt/bd-project/temp-ratings-result/" 
    temp_ratings_agg_pathX = "/mnt/bd-project/temp_ratings_agg/" 

    print("temp ratings path: " + temp_pathX + itm)
    print("temp ratings agg path: " + temp_ratings_agg_pathX + itm)

    
    pathX = f"/mnt/bd-project/de-yelp-daily/{itm}/"
    print(f"processing item...  : {itm}")

    postsdf = spark.read.parquet(pathX)
    
    # Run the UDF and generate the result
    resultdf = transform_and_predict(postsdf, ml_modelX, stringindexerY)

    tempresultdf = resultdf.drop('tokens', 'filtered', 'cv', 'features', 'rawPrediction', 'probability')

    # change the column name 
    ratingsdf = tempresultdf.withColumnRenamed('decoded', 'stars')     # .select('stars')
   
    ratingstempdf = ratingsdf.select('stars', 'prediction')

    # Aggregate the topics and calculate the total qty of each topic
    ratings_qtydf = ratingstempdf.select('stars')\
        .groupBy(col("stars"))\
            .agg(count('stars').alias('qty')).orderBy(desc('qty'))
      
    saveFiles_multiple_call(temp_pathX + itm, \
                            temp_ratings_agg_pathX + itm, \
                                ratingsdf, ratings_qtydf)
    
    # copy only aggregate ratings file from temp to final location
    to_final_agg = "/mnt/bd-project/bi/ml-ratings-agg/"     
    CopyCsv(temp_ratings_agg_pathX + itm, to_final_agg, itm)

    # remove temp aggregate folder content after successful copy
    RemoveFolderContents(temp_ratings_agg_pathX)
        

    # copy final ratings data from temp to final location
    to_final = "/mnt/bd-project/final-ratings/"        
    CopyCsv(temp_pathX + itm, to_final, itm)

    # remove temp rating folder contents after successful copy
    RemoveFolderContents(temp_pathX)
    
    print("Operation successfully completed")
    


2024-07-10/
dt=2024-07-10
temp ratings path: /mnt/bd-project/temp-ratings-result/dt=2024-07-10
temp ratings agg path: /mnt/bd-project/temp_ratings_agg/dt=2024-07-10
processing item...  : dt=2024-07-10
files successfully saved
copy operation successful
copy operation successful
Operation successfully completed
2024-07-12/
dt=2024-07-12
temp ratings path: /mnt/bd-project/temp-ratings-result/dt=2024-07-12
temp ratings agg path: /mnt/bd-project/temp_ratings_agg/dt=2024-07-12
processing item...  : dt=2024-07-12
files successfully saved
copy operation successful
copy operation successful
Operation successfully completed


In [3]:
# Code to check if missing_folders = 0 before displaying data in ratingsdf. 
# If missing_folders = 0, it implies no new file to process for the moment
# This is necessary to prevent causing error in the notebook when there 
# is no new file to populate the ratingsdf 

if (len(missing_folders) == 0):
    if 'ratingsdf' in globals():        # due to previous runs i.e when previous ratingsdf still in gloabls
        del globals()['ratingsdf']      # delete the previous ratingsdf from global scope else it will display data
        print("DataFrame 'ratingsdf' is not defined.")
    else:
        print("DataFrame 'ratingsdf' is not defined.")
else:
    ratingsdf.show(5)


+--------------------+----+-----+--------------------+--------------------+------+--------------------+-------------------+----------+-----+
|         business_id|cool|funny|           review_id|                text|useful|             user_id|               date|prediction|stars|
+--------------------+----+-----+--------------------+--------------------+------+--------------------+-------------------+----------+-----+
|XMaNtA_BuemFW1fn8...|   1|    1|qUHzUc3ub6NvRkvp9...|i love this place...|     1|1eqy55tfazEXqDvGq...|2024-07-12 23:54:08|       1.0|  4.0|
|CT2QgcDyqc5EmJ4HT...|   0|    1|O7tiIHsq41NQ4gwgZ...|im all about a ba...|     2|HTgsSLhJEcY2_V8LI...|2024-07-12 23:54:07|       0.0|  5.0|
|ZTTMa3yZ5MmOTzbXI...|   0|    0|7KYj-G8iUnSl3KVCn...|pasta house is a ...|     1|kV7kmmBw_8IPLyI5M...|2024-07-12 23:54:08|       1.0|  4.0|
|Xk2MNo4Kg9tikCM9Q...|   2|    0|LAo2VECR6A2DVDW3J...|full disclosure i...|     3|kV7kmmBw_8IPLyI5M...|2024-07-12 23:54:08|       0.0|  5.0|
|n_fUROdhfmLw

In [0]:
# Code to check if missing_folders = 0 before displaying data in ratings_qtydf. 
# If missing_folders = 0, it implies no new file to process for the moment
# This is necessary to prevent causing error in the notebook when there no 
# new file to populate the ratings_qtydf 

if (len(missing_folders) == 0):
    if 'ratings_qtydf' in globals():    # due to previous runs i.e when previous ratings_qtydf still in gloabls
        del globals()['ratings_qtydf']  # delete the previous ratings_qtydf from global scope else it will display data
        print("DataFrame 'ratings_qtydf' is not defined.")
    else:
        print("DataFrame 'ratings_qtydf' is not defined.")
else:
    display(ratings_qtydf.head(5))

stars,qty
5.0,11
4.0,9
3.0,2
