# Useful links
## Data source
*Bristol City Council Land and Building Assets*

https://www.data.gov.uk/dataset/a98345ad-7f4c-4f6e-882b-e631dc1cc046/bristol-city-council-land-and-building-assets

*Bristol dataset*

https://www.bristol.gov.uk/files/documents/7241-land-property-2023/file

## Useful programming references
*Various Spark examples*

https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/

https://sparkbyexamples.com/pyspark/pyspark-sql-date-and-timestamp-functions/

https://spark.apache.org/examples.html

https://github.com/apache/spark/tree/master/examples/src/main/python

*OReilly's Learning Spark reference*

https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch04.html

*Recipe Objective - Explain StructType and StructField in PySpark*

https://www.projectpro.io/recipes/explain-structtype-and-structfield-pyspark-databricks#:~:text=The%20StructField%20in%20PySpark%20represents,the%20name%20of%20the%20StructField

*SPARK StructTypes*

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.StructType.html#:~:text=.StructType%5Bsource%5D-,Construct%20a%20StructType%20by%20adding%20new%20elements%20to%20it%2C%20to,)%2C%20metadata(optional)

In [1]:
%livy2.pyspark
# Load CSV - can be from a standalone local file, or online file source - here is is a local file in storage blob
import csv
import io
from io import StringIO


In [2]:
%livy2.pyspark

# Note the following will work OK, loading in the CSV file with an inferred schema to the df data frame. But instead, let us demonstrate how to add a defined schema when imorting cSV data to retain more control.
#df = spark.read.csv("/HdiSamples/BristolCityCouncilLandAndBuildingAssets-2024.csv", header=True, inferSchema=True)


In [3]:
%livy2.pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType 
from pyspark.sql.types import DoubleType, BooleanType, DateType, LongType, FloatType

# Using custom schema
schema =  StructType() \
      .add("Organisation Name", StringType(), True) \
      .add("Organisation Code", StringType(), True) \
      .add("Effective Date", DateType(), True) \
      .add("UPRN", StringType(), True) \
      .add("Property ID", IntegerType(), True) \
      .add("Property Type", StringType(), True) \
      .add("Property Name/Address (Where no UPRN)", StringType(), True) \
      .add("Property Address Detail", StringType(), True) \
      .add("Secondary Address Detail", StringType(), True) \
      .add("Street Number", StringType(), True) \
      .add("Street", StringType(), True) \
      .add("Town / Post Town", StringType(), True) \
      .add("Post Code", StringType(), True) \
      .add("Ward", StringType(), True) \
      .add("Geo X (Easting)", LongType(), True) \
      .add("Geo Y (Northing)", LongType(), True) \
      .add("Tenure Type", StringType(), True) \
      .add("Ground Lease In", StringType(), True) \
      .add("Ground Lease Out", StringType(), True) \
      .add("Lease In to Council", StringType(), True) \
      .add("Lease Out", StringType(), True) \
      .add("Licence In to Council", StringType(), True) \
      .add("Licence Out", StringType(), True) \
      .add("Sub-lease In to Council", StringType(), True) \
      .add("Sub-lease Out", StringType(), True) \
      .add("Vacant", StringType(), True) \
      .add("Asset Type", StringType(), True) \
      .add("Building Size - GIA (M2)", FloatType(), True) \
      .add("Site Area (Hectares)", FloatType(), True) \
      .add("Occupied by Council / Direct Service Property", StringType(), True) \
      .add("Purpose / Asset Category", StringType(), True)
      
df = spark.read.format("csv") \
      .options(header="True", inferSchema="False", delimiter=",", dateFormat="d/M/yyyy") \
      .schema(schema) \
      .load("/HdiSamples/BristolCityCouncilLandAndBuildingAssets-2024.csv")


In [4]:
%livy2.pyspark
from IPython.display import display
display(df)

In [5]:
%livy2.pyspark
# Show schema
spark.sql("DESCRIBE TABLE ss01shh_Bristol").show()

In [6]:
%livy2.pyspark
# Show dataframe
df.show()

In [7]:
%livy2.pyspark
# The data is now ready to use. First let's take the df and create a new virtial HIVE table to query in Spark with SPARK-SQL

# Register the dataframe as a virtual HIVE table to allow SparkSQL
#df.registerTempTable('BristolCouncilAssets') # depracated form of command, nopw replaced with ...
df.createOrReplaceTempView("ss01shh_Bristol")

 # Problem 1
How many ‘Properties’ are Bristol City Council (BCC) responsible for (as owner, user or manager)?

In [9]:
%livy2.pyspark
#Use PySpark commands to query dataframe
df.count()

In [10]:
%livy2.pyspark
# Alternatively, use SPARK-SQL to query the HIVE table we created
spark.sql("SELECT count(*) from ss01shh_Bristol").show()

 # Problem 2

How many unique ‘Property Type’s are Bristol City Council (BCC) responsible for (as owner, user or manager)?

In [12]:
%livy2.pyspark
spark.sql("SELECT DISTINCT `Property Type` FROM ss01shh_Bristol ORDER BY `Property Type`").show()

 The above output is broken down to show all the seperate property types - if we just want a total count then we can do as is shown below.


In [14]:
%livy2.pyspark
spark.sql("SELECT Count(*) AS Count FROM (SELECT DISTINCT `Property Type` FROM ss01shh_Bristol) types").show()

# Problem 3

How many properties are there in each of the ‘Property Types’?

In [16]:
%livy2.pyspark
spark.sql("SELECT `Property Type`, Count(*) AS Count FROM ss01shh_Bristol GROUP BY `Property Type` ORDER BY `Property Type`").show()

In [17]:
%livy2.sql
-- The same data but an alternative view using livy2.sql magic - now we can use the built in graphics (note as this is sql, comments start with '--')
SELECT `Property Type`, Count(*) AS Count FROM ss01shh_Bristol GROUP BY `Property Type` ORDER BY Count DESC LIMIT 15

# Problem 4

Show a histogram classifying the total area (in Ha) of each of these 'property types'


In [19]:
%livy2.pyspark
area_data = spark.sql("SELECT `Property Type`, Sum(`Site Area (Hectares)`) AS Total_Area FROM ss01shh_Bristol GROUP BY `Property Type` ORDER BY Sum(`Site Area (Hectares)`) DESC LIMIT 15").show()

In [20]:
%livy2.pyspark
# Create bar chart for total area by property type
import matplotlib
matplotlib.use('Agg')  # Use non-interactive backend for Zeppelin
import matplotlib.pyplot as plt
import pandas as pd
import io
import base64

# Initialize Zeppelin display object
try:
    from zeppelin import Zeppelin
    z = Zeppelin()
    print("Zeppelin object initialized successfully")
except ImportError:
    try:
        # Alternative import method
        import zeppelin
        z = zeppelin
        print("Zeppelin imported as module")
    except ImportError:
        print("Zeppelin module not available - will use alternative methods")

#plt.close('all') 

# For the data source, we will use the data query above - which created 'area_data' from the SQL query
area_data = spark.sql("SELECT `Property Type`, Sum(`Site Area (Hectares)`) AS Total_Area FROM ss01shh_Bristol GROUP BY `Property Type` ORDER BY Sum(`Site Area (Hectares)`) DESC LIMIT 15")

# Convert to Pandas DataFrame for plotting
area_df = area_data.toPandas()

# Create the bar chart
plt.figure(figsize=(14, 8))
bars = plt.bar(range(len(area_df)), area_df['Total_Area'], color='steelblue', alpha=0.7)

# Customise the chart
plt.title('Total Site Area by Property Type (Top 15)', fontsize=16, fontweight='bold')
plt.xlabel('Property Type', fontsize=12)
plt.ylabel('Total Area (Hectares)', fontsize=12)
plt.xticks(range(len(area_df)), area_df['Property Type'], rotation=45, ha='right')

# Add value labels on top of bars
for i, bar in enumerate(bars):
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height + height*0.01,
             f'{height:.1f}', ha='center', va='bottom', fontsize=9)

# Adjust layout to prevent label cutoff
plt.tight_layout()
plt.grid(axis='y', alpha=0.3)

# Plot output
show(plt)


# Display the data table as well
#print("\nData used for the chart:")
#area_df


In [21]:
%livy2.sql
-- ALTERNATIVE METHOD: Use Zeppelin's built-in SQL visualization
-- This is the most reliable method for charts in Zeppelin
-- The chart will appear automatically when you run this cell
SELECT 
    `Property Type` as property_type,
    ROUND(SUM(`Site Area (Hectares)`), 2) as total_area_hectares
FROM ss01shh_Bristol 
GROUP BY `Property Type` 
ORDER BY total_area_hectares DESC
LIMIT 15


# Problem 5

For each of these grouped properties, what are the numbers of properties in each ‘tenure type’ recorded?

In [23]:
%livy2.pyspark
spark.sql("SELECT `Property Type`, `Tenure Type`, COUNT(*) AS Count FROM ss01shh_Bristol GROUP BY `Tenure Type`, `Property Type` ORDER BY `Property Type` DESC").show()

In [24]:
%livy2.pyspark
