# Delta Sharing Release 0.6.0

We are excited about the release of **Delta Sharing 0.6.0**, which introduces several enhancements, including the following features:

- **Support for Apache Spark™ Structured Streaming** -  you can now use a Delta Sharing table as a source in Spark Structured Streaming, which allows recipients to stay up to date with the shared data. (#189, #190, #194, #195, #198, #199, #200, #201, #204, #205, #207, #208, #209, #211, #212, #214, #216, #217, #218, #219).
- **Support for querying a table using a timestamp** - this release supports a new `timestampAsOf` parameter in Delta Sharing data source (#186, #187, #188).
- **Updated Protocol documentation** - this release fixes a few minor issues in the PROTOCOL documentation (#213).

## Delta Sharing client initialization
First, we'll begin by creating a new Delta Sharing client to interact with our sharing server.

In [1]:
import delta_sharing
from pyspark.sql import SparkSession

from IPython.display import display, clear_output
from time import sleep

In [2]:
spark = SparkSession \
    .builder \
    .appName("Delta Sharing example") \
    .config("spark.jars.packages", "io.delta:delta-sharing-spark_2.12:0.6.0") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

:: loading settings :: url = jar:file:/Users/will.girten/Development/projects/delta_sharing/delta_sharing_env/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/will.girten/.ivy2/cache
The jars for the packages stored in: /Users/will.girten/.ivy2/jars
io.delta#delta-sharing-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e53f832b-83a8-4db9-b6bd-867ea0d77fe6;1.0
	confs: [default]
	found io.delta#delta-sharing-spark_2.12;0.6.0 in central
:: resolution report :: resolve 162ms :: artifacts dl 4ms
	:: modules in use:
	io.delta#delta-sharing-spark_2.12;0.6.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-e53f832b-83a

23/01/12 16:50:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/12 16:50:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# Load the local sharing profile
# Can be downloaded here: https://github.com/delta-io/delta-sharing/blob/main/examples/open-datasets.share
profile_file = "/Users/will.girten/Development/projects/delta_sharing/sharing-profile-local.share"

# Create a new Delta Sharing client
client = delta_sharing.SharingClient(profile_file)

# A quick test to make sure everying is working as expected
# We'll list all of the available tables on the sharing server
print(client.list_all_tables())

ConnectionError: HTTPConnectionPool(host='localhost', port=8080): Max retries exceeded with url: /delta-sharing/shares (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x118623640>: Failed to establish a new connection: [Errno 61] Connection refused'))

In [None]:
table_url = profile_file + "#delta_sharing.default.boston-housing"
pdf = delta_sharing.load_as_pandas(table_url)
display(pdf)

## Support for Spark Structured Streaming

New in this release, Delta Sharing now supports using a shared Deltatable as a source in Spark Structured Streaming. This is a great way for data recipeints to build real-time data applications using shared datasets.

In [5]:
# Read shared dataset as a Spark Structured Stream
shared_house_listings_stream = spark.readStream.format("deltaSharing").load(table_url)

# Calculate average crime rate
avg_crime = (shared_house_listings_stream
                .groupBy("rad")
                .avg("crim")
                .writeStream
                .outputMode("complete")
                .format("memory")
                .queryName("boston_housing_crime_avg")
                .start())

In [6]:
# Display an updated crime average summary every 10 seconds
for n in range(0, 6):
    clear_output(wait=True)
    crime_rate_df = spark.sql("""
    SELECT
       rad as `zone`,
       round(`avg(crim)`, 2) as `crime_rate`
    FROM
       boston_housing_crime_avg
    ORDER BY
       zone
    """).show()
    display(crime_rate_df)
    sleep(10)

+----+----------+
|zone|crime_rate|
+----+----------+
|   1|      0.04|
|   2|      0.08|
|   3|       0.1|
|   4|      0.41|
|   5|      0.67|
|   6|      0.15|
|   7|      0.15|
|   8|      0.37|
|  24|     12.76|
+----+----------+



None

In [None]:
def stop_all_streams():
    """Stops all active streams """
    print("Stopping all active streams.")
    for stream in spark.streams.active:
        print(stream)
        stream.stop()
    print("Stopped all active streams.")

In [None]:
stop_all_streams()

## Support for querying a shared table by timestamp

New in this release is support for querying a shared Delta Table based on a prior timestamp. The provided timestamp parameter must be a valid timestamp string. Behind the scenes, the Delta standalone reader uses Delta’s time travel feature to identify to correct table snapshot.  This is an excellent way for data recipients to read a prior version of a shared Delta Table.

In [None]:
# Let's load the shared COVID data set
table_url = profile_file + "#delta_sharing.default.owid-covid-data"

# A new timestamp parameter has been added to the `load_as_pandas()` function
data = delta_sharing.load_as_pandas(table_url, timestamp="2022-12-10 12:34:00")
display(data)