![Alt text](venice-logo-lion.png "Venice Logo")

# Welcome To Venice!

<div style="background-color:rgba(0, 0, 0, 0.0470588);">
This notebook is meant to serve as an interactive demo of Venice and some of it's concepts.  In this workbook we'll demo:

* Preparing and Processing a dataset with Spark
* Preparing a Venice store
* Pushing the dataset to Venice and querying it

To this end, we're going to demo a simple workflow that an AI engineer may go through when publishing processing and Publishing data to Venice.
First, we'll start by getting a dataset from Hugging Face.
    
</div>

## Preparing and Processing a dataset with Spark

<div style="background-color:rgba(0, 0, 0, 0.0470588);">
For this demo to be interesting we'll need an interesting dataset, and the means to prepare it.  We'll use spark and a dataset from [Hugging Face](https://huggingface.co/)

### What is Spark?

[Apache Spark](https://spark.apache.org/) is an open source data processing engine that is quite popular.  It enables users to set up data processing jobs in a distributed manner on a cluster (if you're familiar with Hadoop you probably understand some of the core concepts).  Venice supports writing data to a Venice store via Spark, and we will show case that capability here.  In the below cell, we'll use python and pyspark as our means to programmatically interact with a spark cluster that is running as part of this demo.

### What is our dataset?

We're going to grab a dataset from Hugging Face and upload it for processing to our spark cluster.  This dataset is a table of wine reviews!  Yay!
</div>

In [None]:
import requests
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import SparkSession

dataset = "alfredodeza/wine-ratings"

# Initialize our Spark Session
spark = SparkSession.builder.appName("VeniceWineReviewsDemo").getOrCreate()

# Query HuggingFace for the appropriate request paths for our dataset
HUGGING_FACE_PARQUET_API = "https://huggingface.co/api/datasets/{dataset}/parquet"
r = requests.get(HUGGING_FACE_PARQUET_API.format(dataset=dataset))
train_parquet_files = r.json()['default']['train']

# Download the datasets to our spark cluster
for url in train_parquet_files:
  spark.sparkContext.addFile(url)

# Process the parquet files on the spark cluster so we can query them
df = spark.read.parquet(SparkFiles.getRootDirectory() + "/*.parquet")

# Displaying first 10 rows of our dataset
df.show(n=10)

# It's normally good practice to stop your spark session at the end, however, we're
# going to reuse some of this state in the following cells, so we'll leave this commented
# out for now.  If for your own purposes you want to try and play with other datasets,
# be sure to run with spark.close between runs so as to clear out state and avoid errors.
# spark.stop()

### Using Spark To Prepare Our Dataset

<div style="background-color:rgba(0, 0, 0, 0.0470588);">

Now that we have our dataset, we need to manipulate it a bit. 

Venice is a key/value data store which uses [Avro](https://avro.apache.org/) as it's serialization format.  To that end, we'll need to process and transform our dataset to have a key/value structure and serialize it to Avro.

The key to our dataset in Venice needs to be both non nullable and unique.  So we'll need to make sure those constraints are applied to our dataset before we can push it.  Looking at our dataset, we want to select a field for our key.  'name' seems a likely choice (and who wouldn't want a database where you can look up a wine rating by it's name!).  So we'll have spark process this data to form key/value pairs by grouping the columns in our dataset, remove null entries and duplicates, and finally serialize the dataset to avro.

</div>

In [None]:
from pyspark.sql.functions import struct, col
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# Create a struct type for the value column and group together the columns
# which will make up our field value in the Venice store.
df_transformed = df.withColumn("value", struct(
    col("region").alias("region"),
    col("variety").alias("variety"),
    col("rating").alias("rating"),
    col("notes").alias("notes")
)).select(
    col("name").alias("key"),
    col("value")
).dropDuplicates(["key"])

# Define the new schema with key as non-nullable
updated_schema = StructType([
    StructField("key", StringType(), nullable=False),
    StructField("value", StructType([
        StructField("region", StringType(), nullable=True),
        StructField("variety", StringType(), nullable=True),
        StructField("rating", FloatType(), nullable=True),
        StructField("notes", StringType(), nullable=True)
    ]), nullable=False)
])

# Create new DataFrame with updated schema
df_non_nullable_key = spark.createDataFrame(df_transformed.rdd, schema=updated_schema)

# Show the resulting DataFrame schema
df_non_nullable_key.printSchema()

df_non_nullable_key.show(n=3)
df_non_nullable_key.write.format("avro").save("transformed_avro")

## Preparing a Venice store

<div style="background-color:rgba(0, 0, 0, 0.0470588);">
Looking good!  So we've got our data set.  Now we need to prepare a Venice store.  We've set up some endpoints in this demo that will enable you to interact with a venice cluster called "venice-cluster0".  We'll create a store called "wine-ratings-store" and we'll initialize the store with a schema that matches our dataset.  It's possible to generate the schema from the parqet file in the spark session, but to simplify this demo we've already generated the schema files.  We'll create our venice store with the below code:
</div>


In [None]:
import subprocess
from subprocess import Popen, PIPE, STDOUT

# Utility function for printing command output
def log_subprocess_output(pipe):
    for line in iter(pipe.readline, b''):
        print(line)

# Arguments for Venice store creation
create_store_args = [
    "./create-store.sh",
    "http://venice-controller:5555", 
    "venice-cluster0",
    "wine-ratings-store", # Name of our store
    "~/wineKeySchema.avsc",
    "~/wineValueSchema.avsc"
]

# Submit the job
process = Popen(create_store_args, stdout=PIPE, stderr=STDOUT)
with process.stdout:
    log_subprocess_output(process.stdout)
exitcode = process.wait()

## Pushing the dataset to Venice and querying it

<div style="background-color:rgba(0, 0, 0, 0.0470588);">
And now the moment we've been waiting for!  We've prepared our dataset and we've created our venice store.  Now let's transmit the data to Venice via spark!
    
With this demo we've provided the push job as a java jar which we'll submit to our spark cluster.  We've also prepared a properties file with the push job which contains some of the mandatory configurations needed for the push (things like the name of the store, the cluster's endpoint, and the path to the data that we'll need to transmit).  You can take a look at these configurations in batch-push-job.properties in the home directory of this demo.
</div>

In [None]:
# Path to the push job jar Java JAR file
jar_path = "bin/venice-push-job-all.jar"

# Arguments for spark-submit
spark_submit_args = [
    "spark-submit",
    "--class", "com.linkedin.venice.hadoop.VenicePushJob",  # Push job main class
    jar_path,
    "batch-push-job.properties"  # Configurations
]

# Submit the job
process = Popen(spark_submit_args, stdout=PIPE, stderr=STDOUT)
with process.stdout:
    log_subprocess_output(process.stdout)
exitcode = process.wait()

### Query the data

<div style="background-color:rgba(0, 0, 0, 0.0470588);">
And now finally, let's query this dataset from Venice!!
    </div>

In [None]:
# Query the store for the data
query_store_args = [
    "./fetch.sh",
    "http://venice-router:7777", 
    "wine-ratings-store",
    "A. Mano Puglia Primitivo 2006",
]

# Submit the job
process = Popen(query_store_args, stdout=PIPE, stderr=STDOUT)
with process.stdout:
    log_subprocess_output(process.stdout)
exitcode = process.wait()

__Saluti!!__

In [93]:
# Don't forget to clean up your spark session afterward
spark.stop()