### Intersect with County File
1. Read files
2. Intersect using geopandas
3. Re-convert to Pyspark Dataframe
4. Filter the rsult to only the churned customer

#### Read location file using geopandas

In [0]:
# import geopandas (installed in cluster) and current_timestamp
import geopandas as gpd
from pyspark.sql.functions import current_timestamp



In [0]:
# Read silver table of location as Pandas dataframe
location_pdf = spark.read.table('customerchurn.silver.location').toPandas()

In [0]:
# Convert location dataframe into geopandas dataframe
location_gdf = gpd.GeoDataFrame(
    location_pdf,
    crs="EPSG:4326",
    geometry=gpd.points_from_xy(location_pdf.longitude, location_pdf.latitude)
)

#### Read county shapefile using geopandas

In [0]:
# Read shapefile from Unity Catalog Volume and convert to geopandas dataframe
county_shp_path = '/Volumes/ext_catalog/bronze/ca_county/CA_Counties.shp'
county_gdf = gpd.read_file(county_shp_path) \
                .to_crs("EPSG:4326")

In [0]:
# county_gdf.head()

#### Perform intersection to both data

In [0]:
# Perform spatial join to check if location points intersect with county polygons
result_gdf = gpd.sjoin(location_gdf, county_gdf)

In [0]:
# result_gdf.plot(column="NAME", markersize=1)

In [0]:
# Select columns to be used, rename the columns for consistency, and reset index
result_final_gdf = result_gdf[["customer_id", "zip_code", "latitude", "longitude", "NAME"]] \
                    .rename(columns ={'NAME':'county_name'}) \
                    .reset_index(drop=True)

#### Re-convert to Pyspark Dataframe

In [0]:
# Create spark dataframe from the intersection result
location_county_df = spark.createDataFrame(result_final_gdf) \
                            .drop('index') \
                            .withColumn('processing_date', current_timestamp())

In [0]:
# location_county_df.show(4)

#### Filter location-county file for only churned customer with status file

In [0]:
# Use spark SQL to read status_churned table
status_churned_df = spark.sql("""
                              SELECT *
                              FROM customerchurn.gold.status_churned
                              """)

In [0]:
# Perform semi-join with location_county_df and status_churned_df
location_churned_df = location_county_df \
                        .join(status_churned_df, location_county_df.customer_id == status_churned_df.customer_id, "semi") \
                        .withColumn('processing_date', current_timestamp())

#### Write the gold file

In [0]:
location_churned_df.write.format("delta").mode("overwrite").saveAsTable("customerchurn.gold.location_churned")