# Import libraries

In [None]:
from pyspark.sql.functions import expr
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import upper, col, when

# Read customer profiles data using spark dataframe reader

In [None]:
customers = spark.read.table("bronze.crm.customers")
display_cols = ["customerid", "gender", "monthlycharges", "totalcharges"]
display(customers.select(display_cols))

# Read customer reviews data

In [None]:
customer_reviews = spark.read.table("bronze.crm.customer_reviews")
display(customer_reviews)

# Join Customer profiles and reviews dataframe

In [None]:
customer_df = customers.join(customer_reviews, "customerid") 
customer_df.printSchema()

# Select the relevant attributes

In [None]:
from pyspark.sql.functions import col

# Select the specified columns
selected_columns = [
    "customerid", 
    "gender", 
    "seniorcitizen", 
    "partner", 
    "dependents", 
    "tenure", 
    "phoneservice", 
    "multiplelines", 
    "internetservice", 
    "onlinesecurity", 
    "onlinebackup", 
    "deviceprotection", 
    "techsupport", 
    "streamingtv", 
    "streamingmovies", 
    "contract", 
    "paperlessbilling", 
    "paymentmethod", 
    "monthlycharges", 
    "totalcharges", 
    "review"
]
customers_selected = customer_df.select(*[col(c) for c in selected_columns])

# Drop rows with blank columns

In [None]:
from pyspark.sql import functions as F

# Drop rows with NA values in filtered_df and name the result filtered_customers
filtered_customers = customers_selected.dropna()
filtered_customers = filtered_customers.limit(200)

# Call GenAI model for sentiment analysis of customer reviews

In [None]:
filtered_customers = filtered_customers.withColumn('sentimentScore_str', expr("query_model('meta.llama-3.2-90b-vision-instruct', concat('What is the sentiment of the review text on a scale of 1 to 5, please give the output as an integer only', review))"))

In [None]:
filtered_customers = filtered_customers.withColumn("sentimentScore", filtered_customers["sentimentScore_str"].cast(IntegerType()))
filtered_customers = filtered_customers.drop("sentimentScore_str")

# Create crm schema

In [None]:
spark.sql("CREATE SCHEMA IF NOT EXISTS silver.crm").show()

# Save cleansed and transformed data to silver layer

In [None]:
table_name = "silver.crm.customers"
filtered_customers.write.mode("overwrite").format("delta").saveAsTable(table_name)