In [0]:
#spark config for vectorized reader 
spark.conf.set("spark.sql.iceberg.vectorization.enabled", "false")

In [0]:
%sql

--show Snowflake Databases from Iceberg Catalog SDK
USE CATALOG snowflake_catalog;
SHOW NAMESPACES;
SHOW NAMESPACES IN SUMMIT_24;

In [0]:
%sql

--show tables within the Selected Database 
USE SUMMIT_24.CURATED;
SHOW TABLES;

In [0]:
%sql
--test pulling data via SQL 
SELECT * FROM SUMMIT_24.CURATED.ICEBERG_SALES_TXN
LIMIT 10;

In [0]:
## ########################
## Display RowCount 
## Demo Iceberg table is auto refreshed from Snowflake
## #######################

#pull data via pySpark and save to a dataframe
df_txn = spark.table("SUMMIT_24.CURATED.ICEBERG_SALES_TXN")
row_count = df_txn.count()
row_count = f'{row_count:,}' 

#display rowcount in HTML 
displayHTML("""<table style='width:300px; height:100px; border:1px solid; 
               border-color:#d3d3d3; border-radius: 15px 15px 15px 15px;
               background-color:#eee'>
<tbody>
<tr> <td > <span style='font-size:20px'><center>Current Row Count  </span> </center></td></tr>
<tr>
<td style='width: 90%; color: black'>
          <center><b> <span style='font-size:30px'>""" + str(row_count) + 
          """</b></center> </td>
</tr>

</tbody>
</table>""")


In [0]:
#display dataframe output
df_txn.show()

In [0]:
## ###########################
## Dataframe transformations 
## ############################

#import pysparl functions 
from pyspark.sql.functions import count, avg, col, size, length

# perform aggrigation of data by State. Only bring back not null states that have 2 char abbreviation  
# convert SparkDF to PandasDF 
df_txn_grp = df_txn.groupBy("CUSTOMER_ADDR_STATE").agg(
    avg("PRODUCT_REVIEW_SENTIMENT_SCORE").alias("Avg Sentiment By State"), 
    count("CUSTOMER_ADDR_STATE").alias("State Count")
).filter(df_txn.CUSTOMER_ADDR_STATE.isNotNull()).where(col('State Count') > 0) \
.where(length(col("CUSTOMER_ADDR_STATE")) == 2) \
.filter(df_txn.CUSTOMER_ADDR_STATE.rlike('^[A-Z]+$')) \
.withColumnRenamed("CUSTOMER_ADDR_STATE", "Customer State") \
.toPandas()

#sort the data in pandas 
#df_txn_grp['State Count'] = df_txn_grp['State Count'].apply(lambda x: x*10)
df_txn_grp = df_txn_grp.sort_values(by='Avg Sentiment By State', ascending=False)


#display pandas df output
df_txn_grp.head()


In [0]:
## ###################################
## Chart Sentiment Analysis by State 
## ####################################

#import charting libraries 
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl

#create a barchat that shows average sentiment by state 
ax = sns.barplot(data=df_txn_grp, x='Avg Sentiment By State', y='Customer State', edgecolor='black')
sns.set_theme(rc={'figure.figsize':(18,28.27)})

#color and size the bars based on values of sentiment scores 
widths = np.array([bar.get_width() for bar in ax.containers[0]])
neg_cmap = mpl.colors.LinearSegmentedColormap.from_list('', ['orange', 'red'])
pos_cmap = mpl.colors.LinearSegmentedColormap.from_list('', ['yellow', 'green'])
min_width, max_width = widths.min(), widths.max()
for bar, w in zip(ax.containers[0], widths):
    bar.set_facecolor(neg_cmap(w / min_width) if w < 0 else pos_cmap(w / max_width))
#plt.tight_layout()

#show the chart
plt.show()

In [0]:
# optional display of US states and associated Sentiment 
display(df_txn_grp)