# Apache Spark Learning Notes

In [1]:
import pandas as pd
from pyspark.ml.classification import *
from pyspark.ml.clustering import KMeans, BisectingKMeans, GaussianMixture, LDA
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.feature import *
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.regression import *
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.mllib.evaluation import BinaryClassificationMetrics, RegressionMetrics, RankingMetrics
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.master("local").appName("Hello World").getOrCreate()

# 1. Key Concepts

## Spark
Apache Spark is a unified computing engine and a set of libraries for parallel data processing on computer clusters. Spark supports multiple widely used programming languages (Python, Java, Scala, and R), includes libraries for diverse tasks ranging from SQL to streaming and machine learning, and runs anywehere from a laoptop to a cluster of thousands of servers. 

## Spark Application
Spark Applications consist of a driver process and a set of executor processes. 


## Driver
The driver process runs your main() function, sits on a node in the cluster, and is responsible for maintaining infomration about the Spark Application, responding to a user's program or input; and anlyzing, distributing, and scheduling work across the executors. 

## Executor
The executors are responsible for actually carrying out the work that the driver assigns them. Each executor is responsible for exeucting code assigned ot it by the driver, and reporting the state of the computation on that executor back to the driver node. 

## Cluster Manager
The cluster manager controls physical machines and allocates resources to Spark Applications. This can be one of three core cluster managers: Spark's standalone cluster manager, YARN, or Mesos. 

## Partition
A partition is a collection of rows that sits on one physical machine in your cluster.

## Execution Modes
* Cluster mode. In a cluster mode, a user submits a pre-complied JAR, Python or R script to a cluster manager. The cluster manager then launches the driver process on a worker node inside the cluster, in addition to the executor processes. The cluster manager is responsible for maintaining all Spark Application-related processes. 
* Client mode. Client mode is nearly the same as cluster mode except that the Spark driver remains on the client machine that submitted the application. This means that the client machine is responsible for maintaining the Spark driver process, and the cluster manager maintains the executor processses.
* Local mode. It runs the entire Spark Application on a single machine. It achieves parallelism through threads on that single machine.

## A Spark Job
In general, there should be one Spark job for one action. Actions always return results. Each job breaks down into a series of stages, the number of which depends on how many shuffle operations need to take place.


## Stages
Stages in Spark represent groups of tasks that can be executed together to compute the same operation on multiple machines. In general, Spark will try to pack as much work as possible (i.e., as many transformations as possible inside your job) into the same stage, but the engine starts new stages after operations called shuffles. A shuffle represents a physical repartitioning of the data—for example, sorting a DataFrame, or grouping data that was loaded from a file by key (which requires sending records with the same key to the same node). This type of repartitioning requires coordinating across executors to move data around. Spark starts a new stage after each shuffle, and keeps track of what order the stages must run in to compute the final result.

## Tasks
Stages in Spark consist of tasks. Each task corresponds to a combination of blocks of data and a set of transformations that will run on a single executor. If there is one big partition in our dataset, we will have one task. If there are 1,000 little partitions, we will have 1,000 tasks that can be executed in parallel. A task is just a unit of computation applied to a unit of data (the partition).


## Distribution of Executors, Cores and Memory for a Spark Application
Theory:
* Hadoop/Yarn/OS Deamons: When we run spark application using a cluster manager like Yarn, there’ll be several daemons that’ll run in the background. So, while specifying num-executors, we need to make sure that we leave aside enough cores (~1 core per node) for these daemons to run smoothly.
* Yarn ApplicationMaster (AM): ApplicationMaster is responsible for negotiating resources from the ResourceManager and working with the NodeManagers to execute and monitor the containers and their resource consumption. If we are running spark on yarn, then we need to budget in the resources that AM would need (~1024MB and 1 Executor).
* HDFS Throughput: HDFS client has trouble with tons of concurrent threads. It was observed that HDFS achieves full write throughput with ~5 tasks per executor. So it’s good to keep the number of cores per executor below that number. * Spark Yarn executor MemoryOverhead which is maxima of 384MB or 7% of the executor memory.

Example: 
Consider a 10 node cluster with 16 cores and 64GB RAM per node.     
    * Based on the recommendations mentioned above for good HDFS throughput, Let’s assign --executor-core = 5
    * Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15
    * Total available of cores in cluster = 15 x 10 = 150
    * Number of available executors = (total cores/num-cores-per-executor) = 150 / 5 = 30
    * Leaving 1 executor for ApplicationManager => --num-executors = 29
    * Number of executors per node = 30/10 = 3
    * Memory per executor = 64GB/3 = 21GB
    * Counting off heap overhead = 7% of 21GB = 3GB
    * Actual --executor-memory = 21 - 3 = 18GB

## Transformation
A transformation instructs Spark how you would lije to modify a DataFrame to do what you want. There are two types of transformations: Narrow dependencies are those for which each input partition will contribute to only one output partition; a wide dependency style transformation will have input partitions contributing to many output partitions. With narrow transformations, Spark will automatically perform an operation called pipelining, meaning that if we specify multiple filters on DataFrames, they will all be performed in memory. For shuffles, Spark writes results to disk.

## Lazy Evaluation
Spark waits until the very last moment to execute the graph of computation instructions. This provides immense benefits because Spark can optimize the entire data flow from end to end. 


## Action
An action instructs Spark to compute a result from a series of transformations. There are three kinds of actuibs:
* Actions to view data in the console
* Actions to collect data to native objects in teh respective language
* Actions to write to output data 

## DataFrames and Datasets
DataFrames and Datasets are distributed table-like collections with well-defined rows and columns. They represent immutable, lazily evaluated plans that specify what operations to apply to data residing at a location to generate some output. When we perform an action on a DataFrame, we instruct Spark to perform the actual transformations and return the result. These represent plans of how to manipulate rows and columns to compute the user's desired result.

The difference between DataFrames and Datasets lies in "types". Both have types though. For DataFrames, Spark maintains types completelt and only checks whether those types line up to those specified in the schema at runtime. Datasets, on the other hand, check whether types conform to the specificiation at complie time. Datasets are only available to JVM-based languages (Scala and Java). 

# 2. Spark 101

In [3]:
myRange = spark.range(1000).toDF("number")

In [4]:
divisBy2 = myRange.where("number % 2 = 0")

In [5]:
flightData2015 = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("../data/flight-data/csv/2015-summary.csv")

In [6]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [7]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [8]:
flightData2015.count()

256

In [9]:
sqlWay = spark.sql("""
    SELECT DEST_COUNTRY_NAME, count(1)
    FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#14], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#14, 200)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#14], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/jielishen/Documents/CGit/AI-lab/learning-spark/data/flight-data/csv..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#14], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#14, 200)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#14], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/jielishen/Documents/CGit/AI-lab/learning-spark/data/flight-data/csv..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY

In [10]:
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [11]:
maxSql = spark.sql("""
    SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
    FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
    ORDER BY sum(count) DESC
    LIMIT 5
""")

maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [12]:
flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [13]:
flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .take(5)

[Row(DEST_COUNTRY_NAME='United States', destination_total=411352),
 Row(DEST_COUNTRY_NAME='Canada', destination_total=8399),
 Row(DEST_COUNTRY_NAME='Mexico', destination_total=7140),
 Row(DEST_COUNTRY_NAME='United Kingdom', destination_total=2025),
 Row(DEST_COUNTRY_NAME='Japan', destination_total=1548)]

# 3 Structured APIs: DataFrames, SQL, and Datasets

## Schemas

In [14]:
df = spark.read.format("json").load("../data/flight-data/json/2015-summary.json")
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [15]:
myManualSchema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), False, metadata={"hello":"world"})
])
df = spark.read.format("json").schema(myManualSchema).load("../data/flight-data/json/2015-summary.json")

## Columns

In [16]:
col("someColumnName")
column("someColumnName")

Column<b'someColumnName'>

## Expressions

In [17]:
expr("(((someCol + 5) * 200) - 6) < otherCol")

Column<b'((((someCol + 5) * 200) - 6) < otherCol)'>

## Rows

In [18]:
myRow = Row("Hello", None, 1, False)
myRow[0]

'Hello'

## Views

In [19]:
df.createOrReplaceTempView("dfTable")

## Creating DataFrame from Rows

In [20]:
myManualSchema = StructType([
  StructField("some", StringType(), True),
  StructField("col", StringType(), True),
  StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+



## Select

In [21]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [22]:
df.select(
    expr("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"))\
    .show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



In [23]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [24]:
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME")).show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



## selectExpr

In [25]:
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [26]:
df.selectExpr(
    "*", # all original columns
    "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
    .show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [27]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



## Literals

In [28]:
df.select(expr("*"), lit(1).alias("One")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



## Adding Columns

In [29]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [30]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



## Renaming Columns

In [31]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").show(2)

+-------------+-------------------+-----+
|         dest|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|United States|            Romania|   15|
|United States|            Croatia|    1|
+-------------+-------------------+-----+
only showing top 2 rows



## Reserved Characters

In [32]:
dfWithLongColName = df.withColumn(
    "This Long Column-Name",
    expr("ORIGIN_COUNTRY_NAME"))
dfWithLongColName.selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`")\
    .show(2)

+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows



## Removing Columns

In [33]:
df.drop("ORIGIN_COUNTRY_NAME").columns

['DEST_COUNTRY_NAME', 'count']

## Casting

In [34]:
df.withColumn("count2", col("count").cast("short")).printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)
 |-- count2: short (nullable = true)



## Filtering Rows

In [35]:
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



## Unique Rows

In [36]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

256

In [37]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

125

## Random Samples

In [38]:
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

126

## Random Splits

In [39]:
dataFrames = df.randomSplit([0.25, 0.75], seed)
print(dataFrames[0].count())
print(dataFrames[1].count())

60
196


## Union

In [40]:
schema = df.schema
newRows = [
    Row("New Country", "Other Country", 5),
    Row("New Country 2", "Other Country 3", 1)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
df.union(newDF)\
    .where("count = 1")\
    .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
    .show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



## Sorting

In [41]:
df.sort("count").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [42]:
df.orderBy("count", "DEST_COUNTRY_NAME").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [43]:
df.orderBy(expr("count desc")).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [44]:
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows



In [45]:
# It is sometimes advisable to sort within each partition before another set of transformations
spark.read.format("json").load("../data/flight-data/json/*-summary.json")\
    .sortWithinPartitions("count")

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

## Limit

In [46]:
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [47]:
df.orderBy(expr("count desc")).limit(6).show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
+--------------------+-------------------+-----+



## Repartition and Coalesce

In [48]:
df.rdd.getNumPartitions() # 1
df.repartition(5)
df.repartition(col("DEST_COUNTRY_NAME"))
df.repartition(5, col("DEST_COUNTRY_NAME"))
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

## Collecting Rows to the Driver

In [49]:
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count while collect doesn't

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

In [50]:
collectDF.collect()

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [51]:
collectDF.show(5) # this prints it out nicely

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [52]:
collectDF.show(5, False)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
+-----------------+-------------------+-----+
only showing top 5 rows



## Working with Booleans

In [53]:
df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("../data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.createOrReplaceTempView("dfTable")

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [54]:
df.where(col("InvoiceNo") != 536365)\
    .select("InvoiceNo", "Description")\
    .show(5, False)

+---------+-----------------------------+
|InvoiceNo|Description                  |
+---------+-----------------------------+
|536366   |HAND WARMER UNION JACK       |
|536366   |HAND WARMER RED POLKA DOT    |
|536367   |ASSORTED COLOUR BIRD ORNAMENT|
|536367   |POPPY'S PLAYHOUSE BEDROOM    |
|536367   |POPPY'S PLAYHOUSE KITCHEN    |
+---------+-----------------------------+
only showing top 5 rows



In [55]:
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+



In [56]:
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
    .where("isExpensive")\
    .select("unitPrice", "isExpensive").show(5)

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+



In [57]:
df.withColumn("isExpensive", expr("NOT UnitPrice <= 250"))\
    .where("isExpensive")\
    .select("Description", "UnitPrice").show(5)

+--------------+---------+
|   Description|UnitPrice|
+--------------+---------+
|DOTCOM POSTAGE|   569.77|
|DOTCOM POSTAGE|   607.49|
+--------------+---------+



## Working with Numbers

In [58]:
fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows



In [59]:
df.selectExpr(
    "CustomerId",
    "(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows



In [60]:
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
|          3.0|           2.0|
+-------------+--------------+
only showing top 2 rows



In [61]:
print(df.stat.corr("Quantity", "UnitPrice"))
df.select(corr("Quantity", "UnitPrice")).show()

-0.04112314436835551
+-------------------------+
|corr(Quantity, UnitPrice)|
+-------------------------+
|     -0.04112314436835551|
+-------------------------+



In [62]:
df.describe().show()

+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128| 4.151946589446603|15661.388719512195|          null|
| stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|15.638659854603892|1854.4496996893627|          null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C

In [63]:
df.stat.approxQuantile("UnitPrice", [0.5], 0.05)

[2.51]

In [64]:
df.select(monotonically_increasing_id()).show(2)

+-----------------------------+
|monotonically_increasing_id()|
+-----------------------------+
|                            0|
|                            1|
+-----------------------------+
only showing top 2 rows



## Working with Strings

In [65]:
df.select(initcap(col("Description"))).show(2)

+--------------------+
|initcap(Description)|
+--------------------+
|White Hanging Hea...|
| White Metal Lantern|
+--------------------+
only showing top 2 rows



In [66]:
df.select(col("Description"),
    lower(col("Description")),
    upper(lower(col("Description")))).show(2)

+--------------------+--------------------+-------------------------+
|         Description|  lower(Description)|upper(lower(Description))|
+--------------------+--------------------+-------------------------+
|WHITE HANGING HEA...|white hanging hea...|     WHITE HANGING HEA...|
| WHITE METAL LANTERN| white metal lantern|      WHITE METAL LANTERN|
+--------------------+--------------------+-------------------------+
only showing top 2 rows



In [67]:
df.select(
    ltrim(lit("    HELLO    ")).alias("ltrim"),
    rtrim(lit("    HELLO    ")).alias("rtrim"),
    trim(lit("    HELLO    ")).alias("trim"),
    lpad(lit("HELLO"), 3, " ").alias("lp"),
    rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

+---------+---------+-----+---+----------+
|    ltrim|    rtrim| trim| lp|        rp|
+---------+---------+-----+---+----------+
|HELLO    |    HELLO|HELLO|HEL|HELLO     |
|HELLO    |    HELLO|HELLO|HEL|HELLO     |
+---------+---------+-----+---+----------+
only showing top 2 rows



In [68]:
df.select(translate(col("Description"), "LEET", "1337"), col("Description"))\
  .show(2)

+----------------------------------+--------------------+
|translate(Description, LEET, 1337)|         Description|
+----------------------------------+--------------------+
|              WHI73 HANGING H3A...|WHITE HANGING HEA...|
|               WHI73 M37A1 1AN73RN| WHITE METAL LANTERN|
+----------------------------------+--------------------+
only showing top 2 rows



In [69]:
containsBlack = instr(col("Description"), "BLACK") >= 1
containsWhite = instr(col("Description"), "WHITE") >= 1
df.withColumn("hasSimpleColor", containsBlack | containsWhite)\
    .where("hasSimpleColor")\
    .select("Description").show(3, False)

+----------------------------------+
|Description                       |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |
|RED WOOLLY HOTTIE WHITE HEART.    |
+----------------------------------+
only showing top 3 rows



## Working with Dates and Timestamps

In [70]:
dateDF = spark.range(10)\
    .withColumn("today", current_date())\
    .withColumn("now", current_timestamp())
dateDF.show(2)

+---+----------+--------------------+
| id|     today|                 now|
+---+----------+--------------------+
|  0|2019-12-30|2019-12-30 22:30:...|
|  1|2019-12-30|2019-12-30 22:30:...|
+---+----------+--------------------+
only showing top 2 rows



In [71]:
dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(2)

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2019-12-25|        2020-01-04|
|        2019-12-25|        2020-01-04|
+------------------+------------------+
only showing top 2 rows



In [72]:
dateDF.withColumn("week_ago", date_sub(col("today"), 7))\
    .select(datediff(col("week_ago"), col("today"))).show(1)

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+
only showing top 1 row



In [73]:
dateDF.select(
    to_date(lit("2016-01-01")).alias("start"),
    to_date(lit("2017-05-22")).alias("end"))\
    .select(months_between(col("start"), col("end"))).show(1)

+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                    -16.67741935|
+--------------------------------+
only showing top 1 row



In [74]:
dateFormat = "yyyy-dd-MM"
cleanDateDF = spark.range(1).select(
    to_date(lit("2017-12-11"), dateFormat).alias("date"),
    to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF.show()

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+



In [75]:
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()

+----------------------------------+
|to_timestamp(`date`, 'yyyy-dd-MM')|
+----------------------------------+
|               2017-11-12 00:00:00|
+----------------------------------+



## Working with Nulls

In [76]:
df.na.drop("all", subset=["StockCode", "InvoiceNo"]).show(2)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows



In [77]:
df.na.fill("all", subset=["StockCode", "InvoiceNo"]).show(2)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows



In [78]:
df.na.fill({"StockCode": 5, "Description" : "No Value"}).show(2)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows



In [79]:
df.na.replace([""], ["UNKNOWN"], "Description").show(2)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows



## Working with Complex Types

In [80]:
complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.select(col("complex").getField("Description")).show(2)
complexDF.select(col("complex").getField("InvoiceNo")).show(2)

+--------------------+
| complex.Description|
+--------------------+
|WHITE HANGING HEA...|
| WHITE METAL LANTERN|
+--------------------+
only showing top 2 rows

+-----------------+
|complex.InvoiceNo|
+-----------------+
|           536365|
|           536365|
+-----------------+
only showing top 2 rows



In [81]:
df.select(split(col("Description"), " ")).show(2)

+---------------------+
|split(Description,  )|
+---------------------+
| [WHITE, HANGING, ...|
| [WHITE, METAL, LA...|
+---------------------+
only showing top 2 rows



In [82]:
df.select(split(col("Description"), " ").alias("array_col"))\
    .selectExpr("array_col[0]").show(2)

+------------+
|array_col[0]|
+------------+
|       WHITE|
|       WHITE|
+------------+
only showing top 2 rows



In [83]:
df.select(size(split(col("Description"), " "))).show(2)

+---------------------------+
|size(split(Description,  ))|
+---------------------------+
|                          5|
|                          3|
+---------------------------+
only showing top 2 rows



In [84]:
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)

+--------------------------------------------+
|array_contains(split(Description,  ), WHITE)|
+--------------------------------------------+
|                                        true|
|                                        true|
+--------------------------------------------+
only showing top 2 rows



## User-Defined Functions

In [85]:
udfExampleDF = spark.range(5).toDF("num")
def power3(double_value):
    return double_value ** 3
power3(2.0)
power3udf = udf(power3)
udfExampleDF.select(power3udf(col("num"))).show()

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
|          8|
|         27|
|         64|
+-----------+



In [86]:
spark.udf.register("power3py", power3, IntegerType())
udfExampleDF.selectExpr("power3py(num)").show()

+-------------+
|power3py(num)|
+-------------+
|            0|
|            1|
|            8|
|           27|
|           64|
+-------------+



## Count

In [87]:
df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("../data/retail-data/all/*.csv")\
    .coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")

In [88]:
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



## countDistinct

In [89]:
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



## approx_count_distinct

In [90]:
df.select(approx_count_distinct("StockCode", 0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



## First and Last

In [91]:
df.select(first("StockCode"), last("StockCode")).show()

+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
|                 85123A|                 22138|
+-----------------------+----------------------+



## Min and Max

In [92]:
df.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



## Sum

In [93]:
df.select(sum("Quantity")).show() # 5176450

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



## sumDistinct

In [94]:
df.select(sumDistinct("Quantity")).show() # 29310

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



## Avg

In [95]:
df.select(
    count("Quantity").alias("total_transactions"),
    sum("Quantity").alias("total_purchases"),
    avg("Quantity").alias("avg_purchases"),
    expr("mean(Quantity)").alias("mean_purchases"))\
.selectExpr(
    "total_purchases/total_transactions",
    "avg_purchases",
    "mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



## Variance and Standard Deviation

In [96]:
df.select(var_pop("Quantity"), var_samp("Quantity"), stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+-----------------+------------------+--------------------+---------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+-----------------+------------------+--------------------+---------------------+
|47559.30364660879| 47559.39140929848|  218.08095663447733|   218.08115785023355|
+-----------------+------------------+--------------------+---------------------+



## Covariance and Correlation

In [97]:
df.select(
    corr("InvoiceNo", "Quantity"), 
    covar_samp("InvoiceNo", "Quantity"),
    covar_pop("InvoiceNo", "Quantity")
).show()

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085636837E-4|             1052.7280543912716|            1052.7260778751674|
+-------------------------+-------------------------------+------------------------------+



## Collecting

In [98]:
df.agg(collect_set("Country"), collect_list("Country")).show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



## Grouping

In [99]:
df.groupBy("InvoiceNo").agg(count("Quantity").alias("quan"), expr("count(Quantity)")).show(5)

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
|   537691|  20|             20|
|   538041|   1|              1|
+---------+----+---------------+
only showing top 5 rows



## Window Functions

In [100]:
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
dfWithDate.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|      date|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|2010-12-01|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
+---------+---------+--------------------+--------+--------------+---------+----------+-

In [101]:
windowSpec = Window\
    .partitionBy("CustomerId", "date")\
    .orderBy(desc("Quantity"))\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [102]:
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [103]:
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [104]:
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
  .select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|     12346|2011-01-18|   74215|           1|                1|              74215|
|     12346|2011-01-18|  -74215|           2|                2|              74215|
|     12347|2010-12-07|      36|           1|                1|                 36|
|     12347|2010-12-07|      30|           2|                2|                 36|
|     12347|2010-12-07|      24|           3|                3|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|             

In [105]:
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

In [106]:
rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))\
  .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")\
  .orderBy("Date")
rolledUpDF.show()

+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|          null|         26814|
|2010-12-01|     Australia|           107|
|2010-12-01|        France|           449|
|2010-12-01|United Kingdom|         23949|
|2010-12-01|          EIRE|           243|
|2010-12-01|        Norway|          1852|
|2010-12-01|   Netherlands|            97|
|2010-12-01|       Germany|           117|
|2010-12-02|       Germany|           146|
|2010-12-02|          EIRE|             4|
|2010-12-02|          null|         21023|
|2010-12-02|United Kingdom|         20873|
|2010-12-03|        Poland|           140|
|2010-12-03|        France|           239|
|2010-12-03|       Germany|           170|
|2010-12-03|      Portugal|            65|
|2010-12-03|         Spain|           400|
|2010-12-03|   Switzerland|           110|
|2010-12-03|       Belgium|           528|
+----------

In [107]:
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))\
  .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()

+----+--------------------+-------------+
|Date|             Country|sum(Quantity)|
+----+--------------------+-------------+
|null|               Japan|        25218|
|null|           Australia|        83653|
|null|            Portugal|        16180|
|null|             Germany|       117448|
|null|                 RSA|          352|
|null|           Hong Kong|         4769|
|null|              Cyprus|         6317|
|null|         Unspecified|         3300|
|null|United Arab Emirates|          982|
|null|                null|      5176450|
|null|     Channel Islands|         9479|
|null|             Finland|        10666|
|null|             Denmark|         8188|
|null|               Spain|        26824|
|null|             Lebanon|          386|
|null|  European Community|          497|
|null|           Singapore|         5234|
|null|              Norway|        19247|
|null|      Czech Republic|          592|
|null|                 USA|         1034|
+----+--------------------+-------

In [108]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

## Inner Joins

In [109]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
  .toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
  .toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
  .toDF("id", "status")

In [110]:
person.join(graduateProgram, person["graduate_program"] == graduateProgram["id"]).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



## Outer Joins

In [111]:
person.join(graduateProgram, person["graduate_program"] == graduateProgram["id"], "outer").show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [112]:
person.join(graduateProgram, person["graduate_program"] == graduateProgram["id"], "left_outer").show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [113]:
person.join(graduateProgram, person["graduate_program"] == graduateProgram["id"], "right_outer").show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



## Left Semi Joins

In [114]:
person.join(graduateProgram, person["graduate_program"] == graduateProgram["id"], "left_semi").show()

+---+----------------+----------------+---------------+
| id|            name|graduate_program|   spark_status|
+---+----------------+----------------+---------------+
|  0|   Bill Chambers|               0|          [100]|
|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  2|Michael Armbrust|               1|     [250, 100]|
+---+----------------+----------------+---------------+



## Handling Duplicate Column Names in Joins

In [115]:
person.join(graduateProgram, person["graduate_program"] == graduateProgram["id"]).drop(graduateProgram.id).show()

+---+----------------+----------------+---------------+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| degree|          department|     school|
+---+----------------+----------------+---------------+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+-------+--------------------+-----------+



In [116]:
graduateProgram2 = graduateProgram.withColumnRenamed("id", "grad_id")
person.join(graduateProgram2, person["graduate_program"] == graduateProgram2["grad_id"]).show()

+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status|grad_id| degree|          department|     school|
+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|      0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|      1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|      1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+



## Reading and Writing Files

In [117]:
csvFile = spark.read.format("csv")\
    .option("header", "true")\
    .option("mode", "FAILFAST")\
    .option("inferSchema", "true")\
    .load("../data/flight-data/csv/2010-summary.csv")

In [118]:
csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
    .save("../data/my-tsv-file.tsv")

In [119]:
spark.read.format("parquet")\
    .load("../data/flight-data/parquet/2010-summary.parquet").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [120]:
spark.read.text("../data/flight-data/csv/2010-summary.csv")\
    .selectExpr("split(value, ',') as rows").show()

+--------------------+
|                rows|
+--------------------+
|[DEST_COUNTRY_NAM...|
|[United States, R...|
|[United States, I...|
|[United States, I...|
|[Egypt, United St...|
|[Equatorial Guine...|
|[United States, S...|
|[United States, G...|
|[Costa Rica, Unit...|
|[Senegal, United ...|
|[United States, M...|
|[Guyana, United S...|
|[United States, S...|
|[Malta, United St...|
|[Bolivia, United ...|
|[Anguilla, United...|
|[Turks and Caicos...|
|[United States, A...|
|[Saint Vincent an...|
|[Italy, United St...|
+--------------------+
only showing top 20 rows



In [121]:
csvFile.write.format("parquet").mode("overwrite")\
    .save("../data/my-parquet-file.parquet")

In [122]:
csvFile.limit(10).select("DEST_COUNTRY_NAME", "count")\
    .write.mode("overwrite").partitionBy("count").text("../data/five-csv-files2py.csv")

In [123]:
csvFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME")\
    .save("../data/partitioned-files.parquet")

# 4. Advanced Analytics and Machine Learning

## RFormula

In [124]:
sales = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("../data/retail-data/by-day/*.csv")\
    .coalesce(5)\
    .where("Description IS NOT NULL")
fakeIntDF = spark.read.parquet("../data/simple-ml-integers")
simpleDF = spark.read.json("../data/simple-ml")
scaleDF = spark.read.parquet("../data/simple-ml-scaling")
sales.show(2)
fakeIntDF.show(2)
simpleDF.show(2)
scaleDF.show(2)

+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|        Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084| RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077|DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows

+----+----+----+
|int1|int2|int3|
+----+----+----+
|   1|   2|   3|
|   7|   8|   9|
+----+----+----+
only showing top 2 rows

+-----+----+------+------------------+
|color| lab|value1|            value2|
+-----+----+------+------------------+
|green|good|     1|14.386294994851129|
| blue| bad|     8|14.386294994851129|
+-----+----

In [125]:
supervised = RFormula(formula="lab ~ . + color:value1 + color:value2")
supervised.fit(simpleDF).transform(simpleDF).take(2)

[Row(color='green', lab='good', value1=1, value2=14.386294994851129, features=SparseVector(10, {1: 1.0, 2: 1.0, 3: 14.3863, 5: 1.0, 8: 14.3863}), label=1.0),
 Row(color='blue', lab='bad', value1=8, value2=14.386294994851129, features=SparseVector(10, {2: 8.0, 3: 14.3863, 6: 8.0, 9: 14.3863}), label=0.0)]

## SQL Transformers

In [126]:
basicTransformation = SQLTransformer()\
    .setStatement("""
        SELECT sum(Quantity), count(*), CustomerID
        FROM __THIS__
        GROUP BY CustomerID
      """)
basicTransformation.transform(sales).show()

+-------------+--------+----------+
|sum(Quantity)|count(1)|CustomerID|
+-------------+--------+----------+
|          119|      62|   14452.0|
|          440|     143|   16916.0|
|          630|      72|   17633.0|
|           34|       6|   14768.0|
|         1542|      30|   13094.0|
|          854|     117|   17884.0|
|           97|      12|   16596.0|
|          290|      98|   13607.0|
|          541|      27|   14285.0|
|          244|      31|   16561.0|
|          491|     152|   13956.0|
|          204|      76|   13533.0|
|          493|      64|   16629.0|
|          159|      38|   17267.0|
|         1140|      30|   13918.0|
|           55|      28|   18114.0|
|           88|       7|   14473.0|
|          150|      16|   14024.0|
|          206|      23|   12493.0|
|          138|      18|   15776.0|
+-------------+--------+----------+
only showing top 20 rows



## VectorAssembler

In [127]:
va = VectorAssembler().setInputCols(["int1", "int2", "int3"])
va.transform(fakeIntDF).show()

+----+----+----+------------------------------------+
|int1|int2|int3|VectorAssembler_0b70603953d9__output|
+----+----+----+------------------------------------+
|   1|   2|   3|                       [1.0,2.0,3.0]|
|   7|   8|   9|                       [7.0,8.0,9.0]|
|   4|   5|   6|                       [4.0,5.0,6.0]|
+----+----+----+------------------------------------+



## Bucketing

In [128]:
contDF = spark.range(20).selectExpr("cast(id as double)")
bucketBorders = [-1.0, 5.0, 10.0, 250.0, 600.0]
bucketer = Bucketizer().setSplits(bucketBorders).setInputCol("id").setOutputCol("result")
bucketer.transform(contDF).show(5)

+---+------+
| id|result|
+---+------+
|0.0|   0.0|
|1.0|   0.0|
|2.0|   0.0|
|3.0|   0.0|
|4.0|   0.0|
+---+------+
only showing top 5 rows



In [129]:
bucketer = QuantileDiscretizer().setNumBuckets(5).setInputCol("id").setOutputCol("result")
bucketer.fit(contDF).transform(contDF).show(5)

+---+------+
| id|result|
+---+------+
|0.0|   0.0|
|1.0|   0.0|
|2.0|   0.0|
|3.0|   1.0|
|4.0|   1.0|
+---+------+
only showing top 5 rows



## StandardScaler

In [130]:
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()

+---+--------------+-----------------------------------+
| id|      features|StandardScaler_4b675cd0d761__output|
+---+--------------+-----------------------------------+
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  1|[3.0,10.1,3.0]|               [3.58568582800318...|
+---+--------------+-----------------------------------+



## MinMaxScaler

In [131]:
minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
fittedminMax = minMax.fit(scaleDF)
fittedminMax.transform(scaleDF).show()

+---+--------------+---------------------------------+
| id|      features|MinMaxScaler_41e448422cee__output|
+---+--------------+---------------------------------+
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  1|[3.0,10.1,3.0]|                 [10.0,10.0,10.0]|
+---+--------------+---------------------------------+



## MaxAbsScaler

In [132]:
maScaler = MaxAbsScaler().setInputCol("features")
fittedmaScaler = maScaler.fit(scaleDF)
fittedmaScaler.transform(scaleDF).show()

+---+--------------+---------------------------------+
| id|      features|MaxAbsScaler_f76ce92361ab__output|
+---+--------------+---------------------------------+
|  0|[1.0,0.1,-1.0]|             [0.33333333333333...|
|  1| [2.0,1.1,1.0]|             [0.66666666666666...|
|  0|[1.0,0.1,-1.0]|             [0.33333333333333...|
|  1| [2.0,1.1,1.0]|             [0.66666666666666...|
|  1|[3.0,10.1,3.0]|                    [1.0,1.0,1.0]|
+---+--------------+---------------------------------+



## Normalizer

In [133]:
manhattanDistance = Normalizer().setP(1).setInputCol("features")
manhattanDistance.transform(scaleDF).show()

+---+--------------+-------------------------------+
| id|      features|Normalizer_2dcef9635b6a__output|
+---+--------------+-------------------------------+
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  1|[3.0,10.1,3.0]|           [0.18633540372670...|
+---+--------------+-------------------------------+



## StringIndexer

In [134]:
lblIndxr = StringIndexer().setInputCol("lab").setOutputCol("labelInd")
idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)
idxRes.show(5)

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|labelInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
+-----+----+------+------------------+--------+
only showing top 5 rows



In [135]:
labelReverse = IndexToString().setInputCol("labelInd")
labelReverse.transform(idxRes).show(5)

+-----+----+------+------------------+--------+----------------------------------+
|color| lab|value1|            value2|labelInd|IndexToString_3f6256d9ef77__output|
+-----+----+------+------------------+--------+----------------------------------+
|green|good|     1|14.386294994851129|     1.0|                              good|
| blue| bad|     8|14.386294994851129|     0.0|                               bad|
| blue| bad|    12|14.386294994851129|     0.0|                               bad|
|green|good|    15| 38.97187133755819|     1.0|                              good|
|green|good|    12|14.386294994851129|     1.0|                              good|
+-----+----+------+------------------+--------+----------------------------------+
only showing top 5 rows



## OneHotEncoder

In [136]:
lblIndxr = StringIndexer().setInputCol("color").setOutputCol("colorInd")
colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
ohe = OneHotEncoder().setInputCol("colorInd")
ohe.transform(colorLab).show(5)

+-----+--------+----------------------------------+
|color|colorInd|OneHotEncoder_95fbdce3cb24__output|
+-----+--------+----------------------------------+
|green|     1.0|                     (2,[1],[1.0])|
| blue|     2.0|                         (2,[],[])|
| blue|     2.0|                         (2,[],[])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
+-----+--------+----------------------------------+
only showing top 5 rows



## Tokenizer

In [137]:
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn.transform(sales.select("Description"))
tokenized.show(5, False)

+-------------------------------+-------------------------------------+
|Description                    |DescOut                              |
+-------------------------------+-------------------------------------+
|RABBIT NIGHT LIGHT             |[rabbit, night, light]               |
|DOUGHNUT LIP GLOSS             |[doughnut, lip, gloss]               |
|12 MESSAGE CARDS WITH ENVELOPES|[12, message, cards, with, envelopes]|
|BLUE HARMONICA IN BOX          |[blue, harmonica, in, box]           |
|GUMBALL COAT RACK              |[gumball, coat, rack]                |
+-------------------------------+-------------------------------------+
only showing top 5 rows



## CountVectorizer

In [138]:
cv = CountVectorizer()\
    .setInputCol("DescOut")\
    .setOutputCol("countVec")\
    .setVocabSize(500)\
    .setMinTF(1)\
    .setMinDF(2)
cv.fit(tokenized).transform(tokenized).show(5, False)

+-------------------------------+-------------------------------------+--------------------------------------+
|Description                    |DescOut                              |countVec                              |
+-------------------------------+-------------------------------------+--------------------------------------+
|RABBIT NIGHT LIGHT             |[rabbit, night, light]               |(500,[150,185,212],[1.0,1.0,1.0])     |
|DOUGHNUT LIP GLOSS             |[doughnut, lip, gloss]               |(500,[462,463,492],[1.0,1.0,1.0])     |
|12 MESSAGE CARDS WITH ENVELOPES|[12, message, cards, with, envelopes]|(500,[35,41,166],[1.0,1.0,1.0])       |
|BLUE HARMONICA IN BOX          |[blue, harmonica, in, box]           |(500,[10,16,36,352],[1.0,1.0,1.0,1.0])|
|GUMBALL COAT RACK              |[gumball, coat, rack]                |(500,[228,280,407],[1.0,1.0,1.0])     |
+-------------------------------+-------------------------------------+--------------------------------------+
o

## Word2Vec

In [139]:
documentDF = spark.createDataFrame([
    ("Hi I hear about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])
word2Vec = Word2Vec(vectorSize=3, minCount=1, inputCol="text", outputCol="result")
word2Vec.fit(documentDF).transform(documentDF).show(5, False)

+------------------------------------------+-----------------------------------------------------------------+
|text                                      |result                                                           |
+------------------------------------------+-----------------------------------------------------------------+
|[Hi, I, hear, about, Spark]               |[0.012795738875865936,-0.014568246621638537,0.05590147152543068] |
|[I, wish, Java, could, use, case, classes]|[-0.06516336490000997,0.003300988780600684,0.039509783898081095] |
|[Logistic, regression, models, are, neat] |[-0.06130524165928364,-0.034822551906108855,-0.02625546306371689]|
+------------------------------------------+-----------------------------------------------------------------+



## PCA

In [140]:
pca = PCA().setInputCol("features").setK(2)
pca.fit(scaleDF).transform(scaleDF).show(5, False)

+---+--------------+------------------------------------------+
|id |features      |PCA_e2df73176d3a__output                  |
+---+--------------+------------------------------------------+
|0  |[1.0,0.1,-1.0]|[0.07137194992484153,-0.45266548881478463]|
|1  |[2.0,1.1,1.0] |[-1.6804946984073725,1.2593401322219144]  |
|0  |[1.0,0.1,-1.0]|[0.07137194992484153,-0.45266548881478463]|
|1  |[2.0,1.1,1.0] |[-1.6804946984073725,1.2593401322219144]  |
|1  |[3.0,10.1,3.0]|[-10.872398139848944,0.030962697060149758]|
+---+--------------+------------------------------------------+



## PolynomialExpansion

In [141]:
pe = PolynomialExpansion().setInputCol("features").setDegree(2)
pe.transform(scaleDF).show()

+---+--------------+----------------------------------------+
| id|      features|PolynomialExpansion_1fe2e2429df8__output|
+---+--------------+----------------------------------------+
|  0|[1.0,0.1,-1.0]|                    [1.0,1.0,0.1,0.1,...|
|  1| [2.0,1.1,1.0]|                    [2.0,4.0,1.1,2.2,...|
|  0|[1.0,0.1,-1.0]|                    [1.0,1.0,0.1,0.1,...|
|  1| [2.0,1.1,1.0]|                    [2.0,4.0,1.1,2.2,...|
|  1|[3.0,10.1,3.0]|                    [3.0,9.0,10.1,30....|
+---+--------------+----------------------------------------+



## Logistic Regression

In [142]:
bInput = spark.read.format("parquet").load("../data/binary-classification")\
  .selectExpr("features", "cast(label as double) as label")
bInput.show(5)

+--------------+-----+
|      features|label|
+--------------+-----+
|[3.0,10.1,3.0]|  1.0|
|[1.0,0.1,-1.0]|  0.0|
|[1.0,0.1,-1.0]|  0.0|
| [2.0,1.1,1.0]|  1.0|
| [2.0,1.1,1.0]|  1.0|
+--------------+-----+



In [143]:
lr = LogisticRegression()
trainedModel = lr.fit(bInput)
print(trainedModel.coefficients)
print(trainedModel.intercept)

[6.848741326854929,0.3535658901019745,14.814900276915923]
-10.22569586448109


In [144]:
summary = trainedModel.summary
print(summary.areaUnderROC)
summary.roc.show()
summary.pr.show()

1.0
+---+------------------+
|FPR|               TPR|
+---+------------------+
|0.0|               0.0|
|0.0|0.3333333333333333|
|0.0|               1.0|
|1.0|               1.0|
|1.0|               1.0|
+---+------------------+

+------------------+---------+
|            recall|precision|
+------------------+---------+
|               0.0|      1.0|
|0.3333333333333333|      1.0|
|               1.0|      1.0|
|               1.0|      0.6|
+------------------+---------+



In [145]:
summary.objectiveHistory

[0.6730116670092563,
 0.5042829330409727,
 0.36356862066874396,
 0.1252407018038337,
 0.08532556611276214,
 0.035504876415730455,
 0.018196494508571255,
 0.008817369922959136,
 0.004413673785392143,
 0.002194038351234709,
 0.0010965641148080857,
 0.000547657551985314,
 0.00027376237951490126,
 0.0001368465223657475,
 6.841809037070595e-05,
 3.420707791038497e-05,
 1.7103176664232043e-05,
 8.551470106426904e-06,
 4.275703677941461e-06,
 2.1378240117781303e-06,
 1.068856405465203e-06,
 5.34260020257524e-07,
 2.668135105897439e-07,
 1.3204627865316843e-07,
 6.768401481686428e-08,
 3.314547718487037e-08,
 1.6151438837494788e-08,
 8.309350118269286e-09]

## Decision Tree Classifier

In [146]:
dt = DecisionTreeClassifier()
trainedModel = dt.fit(bInput)

## Random Forest Classifier

In [147]:
rfClassifier = RandomForestClassifier()
trainedModel = rfClassifier.fit(bInput)

## GBM Classifier

In [148]:
gbtClassifier = GBTClassifier()
trainedModel = gbtClassifier.fit(bInput)

## Naive Bayes Classifier

In [149]:
nb = NaiveBayes()
trainedModel = nb.fit(bInput.where("label != 0"))

## Evaluators for Classifiers

In [150]:
out = trainedModel.transform(bInput)\
    .select("prediction", "label")\
    .rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = BinaryClassificationMetrics(out)

In [151]:
print(metrics.areaUnderPR)
print(metrics.areaUnderROC)

0.6
0.5


## Classification Pipeline

In [152]:
df = spark.read.json("../data/simple-ml")
df.orderBy("value2").show()

+-----+----+------+------------------+
|color| lab|value1|            value2|
+-----+----+------+------------------+
|green|good|     1|14.386294994851129|
|green| bad|    16|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|green| bad|    16|14.386294994851129|
|green|good|    12|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red| bad|     2|14.386294994851129|
|  red| bad|    16|14.386294994851129|
|  red| bad|    16|14.386294994851129|
| blue| bad|     8|14.386294994851129|
|green|good|     1|14.386294994851129|
|green|good|    12|14.386294994851129|
| blue| bad|     8|14.386294994851129|
|  red|good|    35|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|  red| bad|    16|14.386294994851129|
|green|good|    12|14.386294994851129|
+-----+----+------+------------------+
only showing top 20 rows



In [153]:
train, test = df.randomSplit([0.7, 0.4])
rForm = RFormula()
lr = LogisticRegression().setLabelCol("label").setFeaturesCol("features")
stages = [rForm, lr]
pipeline = Pipeline().setStages(stages)
params = ParamGridBuilder()\
    .addGrid(rForm.formula, [
        "lab~ . + color:value1",
        "lab~ . + color:value1 + color:value2"])\
    .addGrid(lr.elasticNetParam, [0, 0.5, 1])\
    .addGrid(lr.regParam, [0.1, 2.0])\
    .build()
evaluator = BinaryClassificationEvaluator()\
    .setMetricName("areaUnderROC")\
    .setRawPredictionCol("prediction")\
    .setLabelCol("label")
tvs = TrainValidationSplit()\
    .setTrainRatio(0.75)\
    .setEstimatorParamMaps(params)\
    .setEstimator(pipeline)\
    .setEvaluator(evaluator)
tvsFitted = tvs.fit(train)
evaluator.evaluate(tvsFitted.transform(test))

0.788961038961039

## Linear Regression

In [154]:
df = spark.read.load("../data/regression")

In [155]:
lr = LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
trainedModel = lr.fit(df)
summary = trainedModel.summary
summary.residuals.show()
print(summary.totalIterations)
print(summary.objectiveHistory)
print(summary.rootMeanSquaredError)
print(summary.r2)

+--------------------+
|           residuals|
+--------------------+
| 0.12805046585610147|
|-0.14468269261572053|
|-0.41903832622420595|
|-0.41903832622420595|
|  0.8547088792080308|
+--------------------+

6
[0.5000000000000001, 0.4315295810362787, 0.3132335933881022, 0.31225692666554117, 0.309150608198303, 0.30915058933480255]
0.47308424392175996
0.7202391226912209


## Generalized Linear Regression

In [156]:
glr = GeneralizedLinearRegression()\
    .setFamily("gaussian")\
    .setLink("identity")\
    .setMaxIter(10)\
    .setRegParam(0.3)\
    .setLinkPredictionCol("linkOut")
trainedModel = glr.fit(df)

## Decision Tree Regressor

In [157]:
dtr = DecisionTreeRegressor()
trainedModel = dtr.fit(df)

## Random Forest Regressor

In [158]:
rf =  RandomForestRegressor()
trainedModel = rf.fit(df)

## GBT Regressor

In [159]:
gbt = GBTRegressor()
trainedModel = gbt.fit(df)

## Regression Pipeline

In [160]:
glr = GeneralizedLinearRegression().setFamily("gaussian").setLink("identity")
pipeline = Pipeline().setStages([glr])
params = ParamGridBuilder().addGrid(glr.regParam, [0, 0.5, 1]).build()
evaluator = RegressionEvaluator()\
    .setMetricName("rmse")\
    .setPredictionCol("prediction")\
    .setLabelCol("label")
cv = CrossValidator()\
    .setEstimator(pipeline)\
    .setEvaluator(evaluator)\
    .setEstimatorParamMaps(params)\
    .setNumFolds(2) # should always be 3 or more but this dataset is small
model = cv.fit(df)

In [161]:
out = model.transform(df)\
  .select("prediction", "label").rdd.map(lambda x: (float(x[0]), float(x[1])))
metrics = RegressionMetrics(out)
print("MSE: " + str(metrics.meanSquaredError))
print("RMSE: " + str(metrics.rootMeanSquaredError))
print("R-squared: " + str(metrics.r2))
print("MAE: " + str(metrics.meanAbsoluteError))
print("Explained variance: " + str(metrics.explainedVariance))

MSE: 0.180381626679437
RMSE: 0.42471358193426895
R-squared: 0.7745229666507037
MAE: 0.3277600580484718
Explained variance: 0.44483962083227907


## Recommendation

In [162]:
ratings = spark.read.text("../data/sample_movielens_ratings.txt")\
    .rdd.toDF()\
    .selectExpr("split(value , '::') as col")\
    .selectExpr(
        "cast(col[0] as int) as userId",
        "cast(col[1] as int) as movieId",
        "cast(col[2] as float) as rating",
        "cast(col[3] as long) as timestamp")
ratings.show(5, False)

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|0     |2      |3.0   |1424380312|
|0     |3      |1.0   |1424380312|
|0     |5      |2.0   |1424380312|
|0     |9      |4.0   |1424380312|
|0     |11     |1.0   |1424380312|
+------+-------+------+----------+
only showing top 5 rows



In [163]:
training, test = ratings.randomSplit([0.8, 0.2])
als = ALS()\
    .setMaxIter(5)\
    .setRegParam(0.01)\
    .setUserCol("userId")\
    .setItemCol("movieId")\
    .setRatingCol("rating")
alsModel = als.fit(training)
predictions = alsModel.transform(test)

In [164]:
alsModel.recommendForAllUsers(3)\
  .selectExpr("userId", "explode(recommendations)").show(5)

+------+---------------+
|userId|            col|
+------+---------------+
|    28|[25, 5.9192386]|
|    28| [69, 5.622871]|
|    28| [12, 5.043897]|
|    26| [94, 5.261216]|
|    26| [24, 5.089968]|
+------+---------------+
only showing top 5 rows



In [165]:
alsModel.recommendForAllItems(3)\
  .selectExpr("movieId", "explode(recommendations)").show(5)

+-------+---------------+
|movieId|            col|
+-------+---------------+
|     31|[12, 3.6853616]|
|     31| [9, 3.4390678]|
|     31|  [7, 3.234291]|
|     85|   [9, 5.23976]|
|     85| [16, 4.909928]|
+-------+---------------+
only showing top 5 rows



In [166]:
evaluator = RegressionEvaluator()\
    .setMetricName("rmse")\
    .setLabelCol("rating")\
    .setPredictionCol("prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = %f" % rmse)

Root-mean-square error = 1.640604


In [167]:
regComparison = predictions.select("rating", "prediction")\
  .rdd.map(lambda x: (x[0], x[1]))
metrics = RegressionMetrics(regComparison)
metrics.rootMeanSquaredError

1.6406039640307122

In [168]:
perUserActual = predictions\
    .where("rating > 2.5")\
    .groupBy("userId")\
    .agg(expr("collect_set(movieId) as movies"))
perUserPredictions = predictions\
  .orderBy(col("userId"), expr("prediction DESC"))\
  .groupBy("userId")\
  .agg(expr("collect_list(movieId) as movies"))

In [169]:
perUserActualvPred = perUserActual.join(perUserPredictions, ["userId"]).rdd\
    .map(lambda row: (row[1], row[2][:10]))
ranks = RankingMetrics(perUserActualvPred)
print(ranks.meanAveragePrecision)
print(ranks.precisionAt(5))

0.22283272283272287
0.42307692307692313


## K-Means

In [170]:
va = VectorAssembler()\
    .setInputCols(["Quantity", "UnitPrice"])\
    .setOutputCol("features")
sales = va.transform(spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("../data/retail-data/by-day/*.csv")
  .limit(50)
  .coalesce(1)
  .where("Description IS NOT NULL"))
sales.cache()
sales.show(5, False)

+---------+---------+-------------------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|Description                    |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |features   |
+---------+---------+-------------------------------+--------+-------------------+---------+----------+--------------+-----------+
|580538   |23084    |RABBIT NIGHT LIGHT             |48      |2011-12-05 08:38:00|1.79     |14075.0   |United Kingdom|[48.0,1.79]|
|580538   |23077    |DOUGHNUT LIP GLOSS             |20      |2011-12-05 08:38:00|1.25     |14075.0   |United Kingdom|[20.0,1.25]|
|580538   |22906    |12 MESSAGE CARDS WITH ENVELOPES|24      |2011-12-05 08:38:00|1.65     |14075.0   |United Kingdom|[24.0,1.65]|
|580538   |21914    |BLUE HARMONICA IN BOX          |24      |2011-12-05 08:38:00|1.25     |14075.0   |United Kingdom|[24.0,1.25]|
|580538   |22467    |GUMBALL COAT RACK              |6       |2011-12-05 08:38:00|2

In [171]:
km = KMeans().setK(5)
trainedModel = km.fit(sales)

In [172]:
summary = trainedModel.summary
print(summary.clusterSizes) # number of points
trainedModel.computeCost(sales)
centers = trainedModel.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

[8, 3, 12, 10, 17]
Cluster Centers: 
[ 2.5     11.24375]
[44.          1.16333333]
[11.33333333  1.1       ]
[23.2    0.956]
[4.88235294 3.95176471]


## Gaussian Mixtures

In [173]:
gmm = GaussianMixture().setK(5)
print(gmm.explainParams())
model = gmm.fit(sales)

featuresCol: features column name. (default: features)
k: Number of independent Gaussians in the mixture model. Must be > 1. (default: 2, current: 5)
maxIter: max number of iterations (>= 0). (default: 100)
predictionCol: prediction column name. (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (default: probability)
seed: random seed. (default: 1345763507599468680)
tol: the convergence tolerance for iterative algorithms (>= 0). (default: 0.01)


In [174]:
summary = model.summary
print(model.weights)
model.gaussiansDF.show()
summary.cluster.show(5)
print(summary.clusterSizes)
summary.probability.show(5)

[0.3684803248527374, 0.38593893518118794, 0.10000020841060411, 0.05680013894452062, 0.08878039261094998]
+--------------------+--------------------+
|                mean|                 cov|
+--------------------+--------------------+
|[4.49187138020203...|2.037679287818033...|
|[17.6580367608789...|40.65792305764172...|
|[2.40000222306067...|0.640002480263236...|
|[44.4395689808324...|30.30286302367439...|
|[9.30087635592238...|10.70332423413150...|
+--------------------+--------------------+

+----------+
|prediction|
+----------+
|         3|
|         1|
|         1|
|         1|
|         0|
+----------+
only showing top 5 rows

[18, 20, 5, 3, 4]
+--------------------+
|         probability|
+--------------------+
|[1.33736023145677...|
|[1.65827083745777...|
|[4.99169303496923...|
|[2.64376456275349...|
|[0.99461166737411...|
+--------------------+
only showing top 5 rows



## Latent Dirichlet Allocation

In [175]:
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn.transform(sales.drop("features"))
cv = CountVectorizer()\
    .setInputCol("DescOut")\
    .setOutputCol("features")\
    .setVocabSize(500)\
    .setMinTF(0)\
    .setMinDF(0)\
    .setBinary(True)
cvFitted = cv.fit(tokenized)
prepped = cvFitted.transform(tokenized)

In [176]:
lda = LDA().setK(10).setMaxIter(5)
trainedModel = lda.fit(prepped)

In [177]:
trainedModel.describeTopics(3).show()

+-----+-------------+--------------------+
|topic|  termIndices|         termWeights|
+-----+-------------+--------------------+
|    0|[99, 98, 105]|[0.01104912329130...|
|    1| [59, 78, 41]|[0.00900485008824...|
|    2|   [0, 3, 62]|[0.01547126507876...|
|    3|  [35, 6, 63]|[0.00902006606370...|
|    4|[10, 81, 136]|[0.00888752769442...|
|    5| [61, 21, 16]|[0.00875876021059...|
|    6| [96, 22, 88]|[0.00873411697154...|
|    7|   [7, 16, 2]|[0.01118540491147...|
|    8|  [87, 39, 2]|[0.01492126697720...|
|    9| [30, 56, 83]|[0.01188025399462...|
+-----+-------------+--------------------+



In [178]:
cvFitted.vocabulary

['water',
 'hot',
 'vintage',
 'bottle',
 'paperweight',
 '6',
 'home',
 'doormat',
 'landmark',
 'bicycle',
 'frame',
 'ribbons',
 '',
 'classic',
 'rose',
 'kit',
 'leaf',
 'sweet',
 'bag',
 'airline',
 'doorstop',
 'light',
 'in',
 'christmas',
 'heart',
 'calm',
 'set',
 'keep',
 'balloons',
 'night',
 'lights',
 '12',
 'tin',
 'english',
 'caravan',
 'stuff',
 'tidy',
 'oxford',
 'full',
 'cottage',
 'notting',
 'drawer',
 'mushrooms',
 'chrome',
 'champion',
 'amelie',
 'mini',
 'the',
 'giant',
 'design',
 'elegant',
 'tins',
 'jet',
 'fairy',
 "50's",
 'holder',
 'message',
 'blue',
 'storage',
 'tier',
 'covent',
 'world',
 'skulls',
 'font',
 'hearts',
 'skull',
 'clips',
 'bell',
 'red',
 'party',
 'chalkboard',
 'save',
 '4',
 'coloured',
 'poppies',
 'garden',
 'nine',
 'girl',
 'shimmering',
 'doughnut',
 'dog',
 '3',
 'tattoos',
 'chilli',
 'coat',
 'torch',
 'sunflower',
 'tale',
 'cards',
 'puncture',
 'woodland',
 'bomb',
 'knack',
 'lip',
 'collage',
 'rabbit',
 'sex