### Initialization

In [2]:
spark

In [3]:
spark.conf.set("spark.sql.shuffle.partitions", "4")

### Connection
Databricks notebooks are already connected to a cluster

In [5]:
# We already have a session called "spark"
print(type(spark))

# We also have a spark context called "sc"
print(type(sc))

### Pandas versus Spark DataFrames
Pandas DataFrames reside on the driver
Spark DataFrames are distributed on partitions across the cluster

In [7]:
sparkDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("/databricks-datasets/definitive-guide/data/retail-data/by-day/*.csv")

sparkDataFrame.createOrReplaceTempView("retail_data")
staticSchema = sparkDataFrame.schema
print(staticSchema)

In [8]:
sparkDataFrame.show(5)

In [9]:
pandasDataFrame = sparkDataFrame.toPandas()
print(pandasDataFrame.shape)
pandasDataFrame.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,580538,23084,RABBIT NIGHT LIGHT,48,2011-12-05 08:38:00,1.79,14075.0,United Kingdom
1,580538,23077,DOUGHNUT LIP GLOSS,20,2011-12-05 08:38:00,1.25,14075.0,United Kingdom
2,580538,22906,12 MESSAGE CARDS WITH ENVELOPES,24,2011-12-05 08:38:00,1.65,14075.0,United Kingdom
3,580538,21914,BLUE HARMONICA IN BOX,24,2011-12-05 08:38:00,1.25,14075.0,United Kingdom
4,580538,22467,GUMBALL COAT RACK,6,2011-12-05 08:38:00,2.55,14075.0,United Kingdom


In [10]:
type(sparkDataFrame)

In [11]:
sparkDataFrame.createOrReplaceTempView("purchases")

In [12]:
type(pandasDataFrame)

In [13]:
pandasDataFrame.createOrReplaceTempView("purchases")

### Optimizing Execution Plans
The Catalyst optimizer takes advantage of lazy execution to optimize execution plans

In [15]:
# In the following cells, we will perform a series of transformations
# The full set of steps is shown here for clarity
# Note that the function withColumn() adds a new column
from pyspark.sql.functions import lit, expr

df0 = spark.range(0, 40, 2).toDF("c0")
df1 = df0.withColumn("c1", lit(20))
df2 = df1.withColumn("c2", expr("c1 - c0"))
df3 = df2.withColumn("c3", expr("c2 / 2"))
df4 = df3.where("c3 % 2 = 0")
df5 = df4.sort("c3")
df6 = df5.withColumn("c4", expr("c3 + 3")).withColumn("c5", expr("c4 * c4")).withColumn("c6", expr("c4 + c5"))
df7 = df6.sort("c6")
df7.show()

In [16]:
# The code for df3 creates a range, then adds columns
# The code has 4 steps, but Spark plan does it in 3 steps
df0 = spark.range(0, 40, 2).toDF("c0")
df1 = df0.withColumn("c1", lit(20))
df2 = df1.withColumn("c2", expr("c1 - c0"))
df3 = df2.withColumn("c3", expr("c2 / 2"))
df3.explain()

# The code for df4 adds a filter as a 5th step
# The Spark plan does the filter right after creating the range
# This is what predicate pushdown does
# It eliminates the need to calculate column values for the rows that eventually are filtered out
df4 = df3.where("c3 % 2 = 0")
df4.explain()


In [17]:
# The code for df6 does a sort and adds more columns
df5 = df4.sort("c3")
df6 = df5.withColumn("c4", expr("c3 + 3")).withColumn("c5", expr("c4 * c4")).withColumn("c6", expr("c4 + c5"))
df6.explain()

# The code for df7 adds another sort
# The Spark plan only sorts the data once
df7 = df6.sort("c6")
df7.explain()


### Spark SQL
Some computations are easier to express in SQL than with traditional Pandas-like code

In [19]:
spark.sql("""
  SELECT CustomerId, SUM(UnitPrice * Quantity) as total_cost
  FROM purchases
  GROUP BY CustomerId
  HAVING count(Quantity) > 100
  """)\
  .collect()


### Machine Learning
The following cells show a simple model being trained

In [21]:
from pyspark.sql.functions import date_format, col
preppedDataFrame = sparkDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5)


In [22]:
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")


In [23]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")


In [24]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")


In [25]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
  .setOutputCol("features")


In [26]:
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])


In [27]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)


In [28]:
transformedTraining = fittedPipeline.transform(trainDataFrame)


In [29]:
transformedTraining.show()

In [30]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
  .setK(20)\
  .setSeed(1)


In [31]:
kmModel = kmeans.fit(transformedTraining)


In [32]:
summary = kmModel.summary
print("Cluster Sizes:")
print(summary.clusterSizes)
centers = kmModel.clusterCenters()
print("Cluster Centers:")
for center in centers:
  print(center)