## reading structured data for each customer
> ## including indicator of historical churn ('vol_disco_ind')

In [0]:
dataPath = f"/Volumes/main/ogden_dbsql_hands_on_workshop/csv_in/churn_modeling_data_w_o_text.csv"
spark_df = spark.read.csv(dataPath, header=True, inferSchema=True)  # letting Spark infer the schema
display(spark_df)

Databricks data profile. Run in Databricks to view.

In [0]:
# asked the Databricks Assistant:
"""
write a sql query that uses the last 2 characters from the DMA field to extract the state value, and then a case when statement that maps the various states to 6 possible geographic regions such as "southwest". use at least 40 of the 50 states in the case when statement.
"""
# results may vary:
"""
SELECT 
    *,
    SUBSTRING(DMA, -2) AS state,
    CASE 
        WHEN SUBSTRING(DMA, -2) IN ('TX', 'NM', 'AZ', 'OK') THEN 'southwest'
        WHEN SUBSTRING(DMA, -2) IN ('CA', 'NV', 'UT', 'CO') THEN 'west'
        WHEN SUBSTRING(DMA, -2) IN ('NY', 'NJ', 'PA', 'CT', 'MA', 'VT', 'NH', 'ME', 'RI') THEN 'northeast'
        WHEN SUBSTRING(DMA, -2) IN ('FL', 'GA', 'AL', 'SC', 'NC', 'VA', 'WV', 'KY', 'TN', 'MS', 'AR', 'LA') THEN 'southeast'
        WHEN SUBSTRING(DMA, -2) IN ('IL', 'IN', 'OH', 'MI', 'WI', 'MN', 'IA', 'MO', 'ND', 'SD', 'NE', 'KS') THEN 'midwest'
        WHEN SUBSTRING(DMA, -2) IN ('WA', 'OR', 'ID', 'MT', 'WY', 'AK', 'HI') THEN 'northwest'
        ELSE 'other'
    END AS region
FROM 
    your_table_name
"""

In [0]:
# asked the Databricks Assistant:
"""
rewrite the above to use pyspark code
"""
# results may vary:
"""
%python
from pyspark.sql.functions import substring, when, col

# Extract the state value from the last 2 characters of the DMA field
spark_df = spark_df.withColumn("state", substring(col("DMA"), -2, 2))

# Map the states to geographic regions
spark_df = spark_df.withColumn(
    "region",
    when(col("state").isin("TX", "NM", "AZ", "OK"), "southwest")
    .when(col("state").isin("CA", "NV", "UT", "CO"), "west")
    .when(col("state").isin("NY", "NJ", "PA", "CT", "MA", "VT", "NH", "ME", "RI"), "northeast")
    .when(col("state").isin("FL", "GA", "AL", "SC", "NC", "VA", "WV", "KY", "TN", "MS", "AR", "LA"), "southeast")
    .when(col("state").isin("IL", "IN", "OH", "MI", "WI", "MN", "IA", "MO", "ND", "SD", "NE", "KS"), "midwest")
    .when(col("state").isin("WA", "OR", "ID", "MT", "WY", "AK", "HI"), "northwest")
    .otherwise("other")
)

display(spark_df)
"""

In [0]:
from pyspark.sql.functions import substring, when, col

# Extract the state value from the last 2 characters of the DMA field
spark_df = spark_df.withColumn("state", substring(col("DMA"), -2, 2))

# Map the states to geographic regions
spark_df = spark_df.withColumn(
    "region",
    when(col("state").isin("TX", "NM", "AZ", "OK"), "southwest")
    .when(col("state").isin("CA", "NV", "UT", "CO"), "west")
    .when(col("state").isin("NY", "NJ", "PA", "CT", "MA", "VT", "NH", "ME", "RI"), "northeast")
    .when(col("state").isin("FL", "GA", "AL", "SC", "NC", "VA", "WV", "KY", "TN", "MS", "AR", "LA"), "southeast")
    .when(col("state").isin("IL", "IN", "OH", "MI", "WI", "MN", "IA", "MO", "ND", "SD", "NE", "KS"), "midwest")
    .when(col("state").isin("WA", "OR", "ID", "MT", "WY", "AK", "HI"), "northwest")
    .otherwise("other")
)

display(spark_df["cust_num", "DMA", "region"])

In [0]:
# API to present Spark dataframes as a queriable views, so we can use spark.sql() or run %sql cells:

spark_df.createOrReplaceTempView("spark_df_view")

In [0]:
%sql
SELECT region, 
       COUNT(*)                                      AS customer_cnt,
       SUM(vol_disco_ind)                            AS churn_cnt,
       format_number(churn_cnt/customer_cnt, '0.0%') AS churn_rate
FROM spark_df_view
GROUP BY region
ORDER BY customer_cnt DESC
;

## reading call center rep notes for each customer

In [0]:
dataPath = f"/Volumes/main/ogden_dbsql_hands_on_workshop/csv_in/text_encrypted.csv"
spark_txt_df = spark.read.csv(dataPath, header=True, inferSchema=True)  # letting Spark infer the schema
display(spark_txt_df)

In [0]:
# queriable view for the UNstructured data

spark_txt_df.createOrReplaceTempView("spark_txt_df_view")

## saving joined data as output table

In [0]:
%sql
CREATE OR REPLACE TABLE main.ogden_dbsql_hands_on_workshop.churn_silver
AS SELECT a.*,
          b.text_string
FROM spark_df_view a
LEFT JOIN spark_txt_df_view b
ON a.cust_num = b.cust_num
;

SELECT * FROM main.ogden_dbsql_hands_on_workshop.churn_silver;