In [None]:
from pyspark.sql.types import *

csvSchema = StructType([
  StructField("timestamp", StringType(), False),
  StructField("site", StringType(), False),
  StructField("requests", IntegerType(), False)
])

csvFile = "/mnt/training/wikipedia/pageviews/pageviews_by_second.tsv"

csvDF = (spark.read
  .option('header', 'true')
  .option('sep', "\t")
  .schema(csvSchema)
  .csv(csvFile)
)

In [None]:
fileName = userhome + "/pageviews_by_second.parquet"
print("Output location: " + fileName)

(csvDF.write                       # Our DataFrameWriter
  .option("compression", "snappy") # One of none, snappy, gzip, and lzo
  .mode("overwrite")               # Replace existing files
  .parquet(fileName)               # Write DataFrame to Parquet files
)

In [None]:
# ANSWER

# The students will actually need to do this in two steps.
fileName = "dbfs:/mnt/training/wikipedia/clickstream/2015_02_clickstream.tsv"

# The first step will be to use inferSchema = true 
# It's the only way to figure out what the column and data types are
(spark.read
  .option("sep", "\t")
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(fileName)
  .printSchema()
)

In [None]:
# ANSWER

from pyspark.sql.types import *

# The second step is to create the schema
schema = StructType([
    StructField("prev_id", IntegerType(), False),
    StructField("curr_id", IntegerType(), False),
    StructField("n", IntegerType(), False),
    StructField("prev_title", StringType(), False),
    StructField("curr_title", StringType(), False),
    StructField("type", StringType(), False)
])

fileName = "dbfs:/mnt/training/wikipedia/clickstream/2015_02_clickstream.tsv"

#The third step is to read the data in with the user-defined schema
testDF = (spark.read
  .option("sep", "\t")
  .option("header", "true")
  .schema(schema)
  .csv(fileName)
)

testDF.printSchema()

In [None]:

%python
# ****************************************************************************
# Utility method to count & print the number of records in each partition.
# ****************************************************************************

def printRecordsPerPartition(df):
  def countInPartition(iterator): yield __builtin__.sum(1 for _ in iterator)
  results = (df.rdd                   # Convert to an RDD
    .mapPartitions(countInPartition)  # For each partition, count
    .collect()                        # Return the counts to the driver
  )
  
  print("Per-Partition Counts")
  i = 0
  for result in results: 
    i = i + 1
    print("#{}: {:,}".format(i, result))
  
# ****************************************************************************
# Utility to count the number of files in and size of a directory
# ****************************************************************************

def computeFileStats(path):
  bytes = 0
  count = 0

  files = dbutils.fs.ls(path)
  
  while (len(files) > 0):
    fileInfo = files.pop(0)
    if (fileInfo.isDir() == False):               # isDir() is a method on the fileInfo object
      count += 1
      bytes += fileInfo.size                      # size is a parameter on the fileInfo object
    else:
      files.extend(dbutils.fs.ls(fileInfo.path))  # append multiple object to files
      
  return (count, bytes)

# ****************************************************************************
# Utility method to cache a table with a specific name
# ****************************************************************************

def cacheAs(df, name, level):
  from pyspark.sql.utils import AnalysisException
  print("WARNING: The PySpark API currently does not allow specification of the storage level - using MEMORY-ONLY")
  
  try: spark.catalog.uncacheTable(name)
  except AnalysisException: None
  
  df.createOrReplaceTempView(name)
  spark.catalog.cacheTable(name)
  #spark.catalog.cacheTable(name, level)
  return df


# ****************************************************************************
# Simplified benchmark of count()
# ****************************************************************************

def benchmarkCount(func):
  import time
  start = float(time.time() * 1000)                    # Start the clock
  df = func()
  total = df.count()                                   # Count the records
  duration = float(time.time() * 1000) - start         # Stop the clock
  return (df, total, duration)


# ****************************************************************************
# Utility method to wait until the stream is read
# ****************************************************************************

def untilStreamIsReady(name):
  queries = list(filter(lambda query: query.name == name, spark.streams.active))

  if len(queries) == 0:
    print("The stream is not active.")

  else:
    while (queries[0].isActive and len(queries[0].recentProgress) == 0):
      pass # wait until there is any type of progress

    if queries[0].isActive:
      print("The stream is active and ready.")
    else:
      print("The stream is not active.")

None

In [None]:

%python
testResults = dict()

def toHash(value):
  from pyspark.sql.functions import hash
  from pyspark.sql.functions import abs
  values = [(value,)]
  return spark.createDataFrame(values, ["value"]).select(abs(hash("value")).cast("int")).first()[0]

def clearYourResults(passedOnly = True):
  whats = list(testResults.keys())
  for what in whats:
    passed = testResults[what][0]
    if passed or passedOnly == False : del testResults[what]

def validateYourSchema(what, df, expColumnName, expColumnType = None):
  label = "{}:{}".format(expColumnName, expColumnType)
  key = "{} contains {}".format(what, label)

  try:
    actualType = df.schema[expColumnName].dataType.typeName()
    
    if expColumnType == None: 
      testResults[key] = (True, "validated")
      print("""{}: validated""".format(key))
    elif actualType == expColumnType:
      testResults[key] = (True, "validated")
      print("""{}: validated""".format(key))
    else:
      answerStr = "{}:{}".format(expColumnName, actualType)
      testResults[key] = (False, answerStr)
      print("""{}: NOT matching ({})""".format(key, answerStr))
  except:
      testResults[what] = (False, "-not found-")
      print("{}: NOT found".format(key))
      
def validateYourAnswer(what, expectedHash, answer):
  # Convert the value to string, remove new lines and carriage returns and then escape quotes
  if (answer == None): answerStr = "null"
  elif (answer is True): answerStr = "true"
  elif (answer is False): answerStr = "false"
  else: answerStr = str(answer)

  hashValue = toHash(answerStr)
  
  if (hashValue == expectedHash):
    testResults[what] = (True, answerStr)
    print("""{} was correct, your answer: {}""".format(what, answerStr))
  else:
    testResults[what] = (False, answerStr)
    print("""{} was NOT correct, your answer: {}""".format(what, answerStr))

def summarizeYourResults():
  html = """<html><body><div style="font-weight:bold; font-size:larger; border-bottom: 1px solid #f0f0f0">Your Answers</div><table style='margin:0'>"""

  whats = list(testResults.keys())
  whats.sort()
  for what in whats:
    passed = testResults[what][0]
    answer = testResults[what][1]
    color = "green" if (passed) else "red" 
    passFail = "passed" if (passed) else "FAILED" 
    html += """<tr style='font-size:larger; white-space:pre'>
                  <td>{}:&nbsp;&nbsp;</td>
                  <td style="color:{}; text-align:center; font-weight:bold">{}</td>
                  <td style="white-space:pre; font-family: monospace">&nbsp;&nbsp;{}</td>
                </tr>""".format(what, color, passFail, answer)
  html += "</table></body></html>"
  displayHTML(html)

def logYourTest(path, name, value):
  value = float(value)
  if "\"" in path: raise ValueError("The name cannot contain quotes.")
  
  dbutils.fs.mkdirs(path)

  csv = """ "{}","{}" """.format(name, value).strip()
  file = "{}/{}.csv".format(path, name).replace(" ", "-").lower()
  dbutils.fs.put(file, csv, True)

def loadYourTestResults(path):
  from pyspark.sql.functions import col
  return spark.read.schema("name string, value double").csv(path)

def loadYourTestMap(path):
  rows = loadYourTestResults(path).collect()
  
  map = dict()
  for row in rows:
    map[row["name"]] = row["value"]
  
  return map

None

In [None]:

%python
from pyspark.sql.functions import expr, col, from_unixtime, to_date

dbutils.fs.rm(userhome + "/delta/iot-events/", True)

streamingEventPath = "/mnt/training/structured-streaming/events/"

(spark
  .read
  .option("inferSchema", "true")
  .json(streamingEventPath)
  .withColumn("date", to_date(from_unixtime(col("time").cast("Long"),"yyyy-MM-dd")))
  .withColumn("deviceId", expr("cast(rand(5) * 100 as int)"))
  .repartition(200)
  .write
  .mode("overwrite")
  .format("delta")
  .partitionBy("date")
  .save(userhome + "/delta/iot-events/"))

In [None]:
%python

#**********************************
# VARIOUS UTILITY FUNCTIONS
#**********************************

def assertSparkVersion(expMajor, expMinor):
  major = spark.conf.get("com.databricks.training.spark.major-version")
  minor = spark.conf.get("com.databricks.training.spark.minor-version")

  if (int(major) < expMajor) or (int(major) == expMajor and int(minor) < expMinor):
    msg = "This notebook must run on Spark version {}.{} or better, found.".format(expMajor, expMinor, major, minor)
    raise Exception(msg)
    
  return major+"."+minor

def assertDbrVersion(expMajor, expMinor):
  major = spark.conf.get("com.databricks.training.dbr.major-version")
  minor = spark.conf.get("com.databricks.training.dbr.minor-version")

  if (int(major) < expMajor) or (int(major) == expMajor and int(minor) < expMinor):
    msg = "This notebook must run on Databricks Runtime (DBR) version {}.{} or better, found.".format(expMajor, expMinor, major, minor)
    raise Exception(msg)
    
  return major+"."+minor

def assertIsMlRuntime():
  version = spark.conf.get("com.databricks.training.dbr.version")
  if "-ml-" not in version:
    raise Exception("This notebook must be ran on a Databricks ML Runtime, found {}.".format(version))

    
#**********************************
# GET AZURE DATASOURCE
#**********************************


def getAzureDataSource(): 
  datasource = spark.conf.get("com.databricks.training.azure.datasource").split("\t")
  source = datasource[0]
  sasEntity = datasource[1]
  sasToken = datasource[2]
  return (source, sasEntity, sasToken)

    
#**********************************
# GET EXPERIMENT ID
#**********************************

def getExperimentId():
  return spark.conf.get("com.databricks.training.experimentId")

#**********************************
# INIT VARIOUS VARIABLES
#**********************************

username = spark.conf.get("com.databricks.training.username", "unknown-username")
userhome = spark.conf.get("com.databricks.training.userhome", "unknown-userhome")

import sys
pythonVersion = spark.conf.set("com.databricks.training.python-version", sys.version[0:sys.version.index(" ")])

None # suppress output

In [None]:
from pyspark.sql.types import *

csvSchema = StructType([
  StructField("timestamp", StringType(), False),
  StructField("site", StringType(), False),
  StructField("requests", IntegerType(), False)
])

csvFile = "/mnt/training/wikipedia/pageviews/pageviews_by_second.tsv"

csvDF = (spark.read
  .option('header', 'true')
  .option('sep', "\t")
  .schema(csvSchema)
  .csv(csvFile)
)
