# Loading Sphere data into Weaviate using Spark

This notebook demonstrates using [Apache Spark](https://spark.apache.org/) to load a subset of Facebook's [Sphere](https://ai.facebook.com/blog/introducing-sphere-meta-ais-web-scale-corpus-for-better-knowledge-intensive-nlp/) dataset into Weaviate.

### Installation

For this example we will just run Spark locally and install via the python `pyspark` package.

For demonstration purposes this notebook runs Spark locally. Please the the [Apache Spark docs](https://spark.apache.org/docs/latest/) or consult your cloud environment for installation and deploying a Spark cluster and choosing a language runtime other than python.

We have two spark libraries/jars as dependencies which we will install separately, the [Google Cloud Storage connector](https://cloud.google.com/dataproc/docs/tutorials/gcs-connector-spark-tutorial) and the [Weaviate Spark sonnector](https://github.com/semi-technologies/weaviate-spark-connector).

In [None]:
!pip3 install pyspark weaviate-client
!wget https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar
!wget https://github.com/semi-technologies/weaviate-spark-connector/releases/download/v0.1.2/weaviate-spark-connector-assembly-v0.1.2.jar

### Initializing a Spark Session

The below code will create a Spark Session with libraries mentioned above.

You will need a GCP service account to read from Google Cloud Storage. Please see the [following docs on how to create a service account](https://cloud.google.com/dataproc/docs/tutorials/gcs-connector-spark-tutorial).

In [None]:
from pyspark.sql import SparkSession
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.join(
    os.environ["HOME"], ".gcp.json"
)

spark = (
    SparkSession.builder.config(
        "spark.jars",
        "gcs-connector-hadoop3-latest.jar,weaviate-spark-connector-assembly-v0.1.2.jar",
    )
    .master("local[*]")
    .appName("weaviate")
    .getOrCreate()
)

spark._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")
spark.sparkContext.setLogLevel("WARN")

You should now have a Spark Session created and be able to view the [Spark UI](http://localhost:4040).

In [2]:
spark

### Reading Sphere from GCS

Here we will read a 1M subset of sphere from GCS.

In [None]:
df = spark.read.parquet("gs://sphere-demo/parquet/sphere.1M.parquet")

We can take a look at the first few records

In [None]:
df.limit(3).toPandas().head()

## Writing to Weaviate

The Weaviate Spark connector assumed that a [schema](https://weaviate.io/developers/weaviate/current/schema/index.html) has already been created. For this reason we will use the Python client to create the schema.

In [7]:
import weaviate

client = weaviate.Client("http://localhost:8080")

client.schema.create_class(
    {
        "class": "Sphere",
        "properties": [
            {"name": "raw", "dataType": ["string"]},
            {"name": "sha", "dataType": ["string"]},
            {"name": "title", "dataType": ["string"]},
            {"name": "url", "dataType": ["string"]},
        ],
    }
)

Next we will write the Spark dataframe to Weaviate. Note the `limit(100)` can be removed to load the full dataset.

In [None]:
df.limit(100).withColumnRenamed("id", "uuid").write.format("io.weaviate.spark.Weaviate") \
    .option("batchSize", 200) \
    .option("scheme", "http") \
    .option("host", "localhost:8080") \
    .option("id", "uuid") \
    .option("className", "Sphere") \
    .option("vector", "vector") \
    .mode("append").save()

Querying via the python client we can see the data has been loaded. Note this data is sourced from [Common Crawl](https://commoncrawl.org/) and so contains raw webcrawl data.

In [9]:
client.query.get("Sphere", "url").do()

{'data': {'Get': {'Sphere': [{'url': 'https://htdb.space/1909/r4526b.htm'},
    {'url': 'https://rootsofjusticetraining.org/2014/08/when-there-is-no-peace-where-are-the-saints/'},
    {'url': 'https://www.financialexpress.com/india-news/bawana-fire-congress-demands-judicial-probe-into-delhi-factory-blaze/1024238/'},
    {'url': 'http://teachersjourneytolife.com/tag/learning/page/2/'},
    {'url': 'https://www.deadseamoringa.com/keep-your-skin-looking-young-throughout-the-aging-process/'},
    {'url': 'https://www.rawstory.com/2019/03/gops-aggression-backfire-2020-democrats-consider-expanding-supreme-court/'},
    {'url': 'https://civilwar.mrdonn.org/missouri-compromise.html'},
    {'url': 'https://www.elixirofknowledge.com/2016/06/sacred-destination-hagia-sophia.html'},
    {'url': 'https://women2.com/2015/09/10/20150910tech-unicorn-culture-insights/'},
    {'url': 'https://www.adn.com/politics/article/report-ted-stevens-trial-suggests-suspensions-two-alaska-based-prosecutors/2012/05/2