In [None]:
# Make the MarkLogic connector available to the underlying PySpark application.
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars "marklogic-spark-connector-2.3.2.jar" pyspark-shell'

# Define the connection details for the getting-started example application.
client_uri = "spark-example-user:password@localhost:8003"

# Initialize a Spark session.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark

In [None]:
# Load a DataFrame and view the first row.

df = spark.read.format("marklogic") \
    .option("spark.marklogic.client.uri", client_uri) \
    .option("spark.marklogic.read.opticQuery", "op.fromView('example', 'employee', '')") \
    .option("spark.marklogic.read.numPartitions", 1) \
    .load()

df.head()

In [None]:
# Demonstrate several operations being pushed down to MarkLogic.

from pyspark.sql.functions import desc
df.filter("HiredDate < '2020-01-01'") \
  .groupBy("State", "Department") \
  .count() \
  .orderBy(desc("count")) \
  .limit(10) \
  .show()

In [None]:
# Group employees by State and then calculate the max base salary across each department, sorting on 
# the max base salary in the Engineering department. Then plot the data by converting the Spark DataFrame to 
# a pandas DataFrame - https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.plot.html . 

from pyspark.sql.functions import desc
df.groupBy("State") \
  .pivot("Department") \
  .max("BaseSalary") \
  .orderBy(desc("Engineering")) \
  .limit(10) \
  .toPandas() \
  .plot(kind="bar", title="Max Base Salaries Across Departments By State", x=0, ylabel="Max Base Salary")

In [None]:
# Similar to the above example, but write the results as new documents to MarkLogic.

from pyspark.sql.functions import desc
df.groupBy("State") \
  .pivot("Department") \
  .max("BaseSalary") \
  .orderBy(desc("Engineering")) \
  .write \
  .format("com.marklogic.spark") \
  .option("spark.marklogic.client.uri", client_uri) \
  .option("spark.marklogic.write.permissions", "rest-reader,read,rest-writer,update") \
  .option("spark.marklogic.write.collections", "state-base-salaries") \
  .option("spark.marklogic.write.uriTemplate", "/state-base-salary/{State}.json") \
  .mode("append") \
  .save()

print("Finished writing documents to MarkLogic")