# Programming for Massive Data

This notebook is to be run using google colab.

## Introduction to Massive Data Programming

- Massive data, more commonly known as ``big data`` refers to extremely large and complex datasets that cannot be stored, processed, or analyzed using traditional data management tools.

### Characteristics of massive data (The 3Vs)
The three defining characteristics explain why massive data is fundamentally different from traditional data and requires a new approach to management and analysis.

**1. Volume**

- Volume refers to the enormous amount of data generated every second from a wide array of sources. It is measured in petabytes and even zettabytes, a scale that traditional data storage systems cannot manage.
- Example: A social media platform like Facebook ingests over 500 terabytes of new data every day through photo uploads, message exchanges, and comments.

**2. Velocity**

- Velocity is the rapid rate at which data is created, collected, and needs to be processed. In many applications, this requires near-real-time or real-time processing to have any meaningful impact.
- Example: High-frequency stock trading generates real-time market data that must be analyzed and acted upon instantly.

**3. Variety**

- Variety highlights the many different types and formats of data, which can be structured, semi-structured, or unstructured.Traditional systems were built to handle only structured data that fits neatly into tables.
1. ``Structured data``: Highly organized and easy to search, such as data in a relational database or a spreadsheet.
2. ``Semi-structured data``: A hybrid data type that has some organizational properties but no rigid schema. An email, for example, has structured elements (sender, recipient, timestamp) and unstructured text in the message body.
3. ``Unstructured data``: Data with no predefined format or organization, making it the most challenging to process. Examples include text documents, images, audio, and video files.

### Challenges of traditional programming for large datasets.

1. Data Volume and Storage:

- Inadequate Storage Solutions: Traditional databases and file systems are not designed to efficiently store and manage petabytes or exabytes of data.
- Cost and Complexity: Scaling traditional storage infrastructure to accommodate massive data volumes becomes prohibitively expensive and complex to maintain.

2. Data Velocity and Processing:

- Real-time Processing Limitations:
Traditional systems struggle to process data streams generated at high velocity in real-time, hindering immediate insights and reactions.
- Performance Bottlenecks:
Conventional processing methods become slow and inefficient when faced with the sheer volume and speed of Big Data, leading to delays in analysis.
3. Data Variety and Management:
- Handling Unstructured Data:
Traditional programming is primarily suited for structured data, struggling to manage and analyze diverse formats like text, images, video, and sensor data.
- Data Integration Challenges:
Integrating data from multiple, heterogeneous sources into a unified view for analysis is complex and time-consuming in traditional setups.
4. Data Quality and Veracity:
- Ensuring Data Accuracy:
Maintaining data quality, consistency, and accuracy across vast and diverse datasets is a significant challenge, as errors can lead to flawed analyses.
- Manual Data Preparation:
Traditional methods often require extensive manual effort for data cleaning, transformation, and enrichment, which is impractical for large datasets.
5. Scalability and Flexibility:
- Limited Scalability:
Traditional applications are often not designed to scale horizontally to handle ever-increasing data volumes and processing demands.
- Inflexibility to Changes:
Adapting traditional programs to new data types, sources, or analysis requirements can be cumbersome and require extensive reprogramming.
6. Computational Resources and Cost:
- Resource Intensive:
Processing large datasets with traditional methods can demand significant computational power, potentially leading to high infrastructure costs.
- Lack of Optimization:
Traditional programming may not fully leverage techniques like parallel processing or distributed computing, leading to inefficient resource utilization.

### Introduction to distributed computing concepts.

- Distributed computing involves multiple, independent computers connected by a network that work together to solve a problem as a single, coherent system.
- This approach provides benefits like enhanced scalability by adding more devices, fault tolerance so the system continues to run even if one computer fails, and increased performance through parallel processing.

#### Core Concepts
- ``Nodes``: Individual computers or devices that are part of the distributed system.
Message Passing: Nodes communicate and coordinate their actions by sending messages to each other over the network.
- ``Shared State``: The components of the system maintain a shared understanding of the overall state, even though they are physically separated.
- ``Independence``: Each node can operate independently, processing its own data.
Unified Appearance: To the end-user, the entire distributed system appears as a single, cohesive computer or resource.

## Programming for Distributed Systems
- The Hadoop ecosystem provides a framework for big data.
- HDFS ``(Hadoop Distributed File System)`` is the storage layer, breaking large files into blocks and distributing them across a cluster with replication for fault tolerance.
- ``MapReduce`` is the processing layer, a parallel programming model that transforms data through a ``map`` phase and aggregates results in a ``reduce`` phase, allowing complex computations on distributed data.  

#### HDFS (Hadoop Distributed File System)
- Purpose:
HDFS is a distributed file system designed to store very large datasets across a cluster of commodity hardware.
- Key Features:
1. Large Blocks: HDFS breaks large files into large blocks (e.g., 128MB or more), reducing overhead compared to traditional file systems.
2. Replication: To ensure fault tolerance, HDFS replicates each data block on multiple nodes in the cluster.
3. Scalability: HDFS can scale to thousands of nodes and petabytes of data.
4. Data Locality: It promotes data locality by moving computation to where the data resides, minimizing network congestion.

#### MapReduce
- Purpose:
MapReduce is a programming model and processing framework for performing large-scale, parallel data analysis.
- How it Works:
1. Map Phase: Input data is processed in parallel by "mapper" functions, which transform the data into key-value pairs.
Shuffle and Sort: A shuffle and sort process organizes these key-value pairs.
2. Reduce Phase: "Reducer" functions then aggregate and process the sorted pairs to produce the final output, which is stored back in HDFS.
- Benefits:
1. MapReduce abstracts the complexity of parallel processing, allowing developers to focus on the business logic rather than the intricacies of distributed computation.

#### The Hadoop Ecosystem
- The Hadoop ecosystem is a collection of projects and tools that complement the core components (HDFS, MapReduce, and YARN) to provide a complete solution for big data problems.
- Other components include Spark (for in-memory processing), Hive and Pig (for query-based data processing), and HBase (a NoSQL database).

## Apache Spark - Introduction

- Apache Spark is a unified computing engine and a set of libraries for parallel data processing on computer clusters i.e it's used to process very large datasets that cannot fit into a single computers' memory. For example 10GB of transaction files for an eccomerce company
- Its core components and libraries include:
    - Structured Streaming
    - Advanced Analytics
    - Libraries and Ecosystems
    - Datasets, DataFrames and SQL
    - RDDs and Distributed Variables
- It can be used by various prograaming languages including Python, Scala and Java over various interfaces for example Intergradet Development Environment like VSCode and using notebooks such as Google Colab
- Apache spark can load and store various data formats such as csv and parquet. It is also used in analytics, both batch and streaming, supports SQL and can be used to build and evaluate Machine learning models.
- It supports various backend storage services such as Amazon s3, works with message buses such as Apache kafka, real time analytics frameworks such as Apache Flink as well as databases such as Apache Cassandra

## Apache Spark - Setting Up
To run Apache spark on Google Colab, simply run the cell below to install the required libraries

In [6]:
%pip install -q pyspark notebook pandas numpy

In [7]:
# If on Colab
# Make sure to upload the data folder to google drive
from google.colab import drive
drive.mount('/content/drive')

BASE_DIR = "/content/drive/MyDrive/data"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [8]:
# If on local computer
# BASE_DIR = "data"

In [9]:
import os

## Testing out installed pyspark

In [10]:
# Import PySpark and initialize Spark session
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

# Create a DataFrame with sample data
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Show the DataFrame
df.show()

# Stop the Spark session
spark.stop()

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+



## Spark’s Basic Architecture
- Apache spark uses a group of computers, called a cluster in order to manage the processing of vary large datasets.
- Spark applications consist of:
    1. A ``Driver Process``
    2. A set of ``Executor Processes``
- The ``Driver Process`` runs your ``main() function``, sits on a node in the cluster, and is responsible for three things:
    1. Maintaining information about the Spark Application
    2. Responding to a user’s program or input
    3. Analyzing, distributing, and scheduling work across the executors
- The ``executors`` are responsible for actually carrying out the work that the driver assigns them. This means that each executor is responsible for only two things: executing code assigned to it by the driver, and reporting the state of the computation on that executor back to the driver node.
## The SparkSession
You control your Spark Application through a driver process called the SparkSession. The SparkSession instance is the way Spark executes user-defined manipulations across the cluster. Here's hpw to get a spark session:

In [11]:
# Import PySpark and initialize Spark session
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

# Stop the Spark session
spark.stop()

## Resilient Distributed Datasets (RDDs)

- In Apache Spark, a Resilient Distributed Dataset (RDD) is a fundamental, fault-tolerant collection of data that can be processed in parallel across a cluster.
- RDD operations are divided into two categories:
1. Transformations
2. Actions.

#### RDD characteristics

- Resilient: RDDs can recover from node failures. Spark tracks the lineage of all transformations, allowing it to re-create lost partitions of data if a node fails.
- Distributed: The data in an RDD is logically partitioned and distributed across multiple nodes in a cluster, enabling parallel processing.
- Immutable: Once created, an RDD cannot be changed. Any operation on an RDD results in a new RDD.
- Lazy Evaluation: Transformations on an RDD are not executed immediately. Instead, Spark records the transformations as a Directed Acyclic Graph (DAG) and only computes the data when an action is called.
- In-Memory Computing: RDDs can be cached in memory for much faster access in iterative algorithms and interactive data mining.

#### RDD operations
- RDD operations are classified as ``transformations`` or ``actions``.

#### Creating RDDS
1. From a local collection
2. From an external dataset (text file)
3. From dataframes and datasets
4. From other RDDS (transformations)


In [12]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("CreateRDDExample").getOrCreate()
# Create an RDD from a local collection
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)
# Show the elements of the RDD
for element in rdd.collect():
 print(element)
# Stop the Spark session
spark.stop()

1
2
3
4
5


In [13]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("CreateRDDFromFileExample").getOrCreate()
# Create an RDD from a text file
file_name = os.path.join(BASE_DIR, "timestamps.txt")
rdd = spark.sparkContext.textFile(file_name)
# Show the first few elements of the RDD
for line in rdd.take(10):
 print(line)
# Stop the Spark session
spark.stop()

1. **TimeSnap**: A playful name that suggests capturing moments in time.
2. **TubeTimer**: A straightforward name that combines "tube" (YouTube) with "timer".
3. **Stampify**: A fun, catchy name that implies converting video content into timestamped moments.
4. **TimeTag**: A simple, memorable name that conveys the idea of tagging moments with timestamps.
5. **VidMarks**: A name that suggests marking or bookmarking important moments in videos.
6. **Timeline**: A name that emphasizes the API's focus on extracting timestamps and creating a timeline of events.
7. **ClipClock**: A name that combines "clip" (a short video segment) with "clock" to convey the idea of timing.
8. **Momenta**: A name that suggests capturing and extracting meaningful moments from videos.
9. **TimeSeek**: A name that implies searching for and finding specific moments in time.
10. **StampKit**: A name that suggests a toolkit for extracting and working with timestamps.


#### Partitions
- To allow every executor to perform work in parallel, Spark breaks up the data into chunks called partitions.
- A partition is a collection of rows that sit on one physical machine in your cluster.
- A DataFrame’s partitions represent how the data is physically distributed across the cluster of machines during execution.

#### Transformations
Are instructions to pyspark on how to change a given dataframe. Transformations do not change the undrlying dataframe until an ``action`` is called on them. For example:

In [14]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

# Create a DataFrame with sample data
numbers = spark.range(10).toDF("number")

# Show the DataFrame
numbers.show()

# Filter numbers divisible by 3
divisible_by_3 = numbers.filter(numbers.number % 3 == 0)
# Will not execute until an action is called
print(divisible_by_3)

# Stop the Spark session
spark.stop()

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
+------+

DataFrame[number: bigint]


#### Examples of Transformations:
- ``map()``: Applies a function to each element of an RDD/DataFrame.
- ``filter():`` Returns a new RDD/DataFrame containing only elements that satisfy a given condition.
- ``select()``: Selects specified columns from a DataFrame.
- ``withColumn()``: Adds a new column or replaces an existing one in a DataFrame.
- ``groupBy()``: Groups data based on specified columns for aggregation.
- ``join()``: Combines two DataFrames based on a join expression.
- ``union()``: Combines two DataFrames with the same schema.
- ``distinct()``: Returns a new RDD/DataFrame with unique elements.

#### Lazy Evaluation
Lazy evaulation means that Spark will wait until the very last moment to execute the graph of computation instructions. In Spark, instead of modifying the data immediately when you express some operation, you build up a plan of transformations that you would like to apply to your source data. By waiting until the last minute to execute the code, Spark compiles this plan from your raw DataFrame transformations to a streamlined physical plan that will run as efficiently as possible across the cluster.

#### Actions
Transformations allow us to build up our logical transformation plan. To trigger the computation, we run an action. An action instructs Spark to compute a result from a series of transformations.The simplest action is ``count``, which gives us the total number of records in the DataFrame:

In [15]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

# Create a DataFrame with sample data
numbers = spark.range(10).toDF("number")

# Show the DataFrame
numbers.show()

# Filter numbers divisible by 3
divisible_by_3 = numbers.filter(numbers.number % 3 == 0)
# Use the count action to trigger execution
count_divisible_by_3 = divisible_by_3.count()
print(f"Count of numbers divisible by 3: {count_divisible_by_3}")

# Stop the Spark session
spark.stop()

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
+------+

Count of numbers divisible by 3: 4


The output of the preceding code should be ``4``. Of course, count is not the only action. There are three kinds of actions:
- Actions to view data in the console
- Actions to collect data to native objects in the respective language
- Actions to write to output data sources

In specifying this action, we started a Spark job that runs our filter transformation (a narrow transformation), then an aggregation (a wide transformation) that performs the counts on a per partition basis, and then a collect, which brings our result to a native object in the respective language.

#### Examples of Actions:
- ``collect()``: Returns all elements of an RDD/DataFrame as a list to the driver program. Use with caution on large datasets.
- ``show()``: Displays the top rows of a DataFrame.
- ``count()``: Returns the number of elements in an RDD/DataFrame.
- ``take(n)``: Returns the first n elements of an RDD/DataFrame.
- ``first() / head()``: Returns the first row or the first n rows of a DataFrame.
- ``reduce()``: Aggregates elements of an RDD using a specified function.
- ``saveAsTextFile() / write.format().save()``: Writes the RDD/DataFrame to an external storage system (e.g., HDFS, S3).

## Working with DataFrames and Spark SQL

- Apache Spark DataFrames are a distributed collection of data organized into named columns, conceptually similar to a table in a relational database or a data frame in R/Python (e.g., Pandas).
- They provide a higher-level abstraction than Resilient Distributed Datasets (RDDs) and are designed to simplify working with structured and semi-structured data in Spark. DataFrames are available in Scala, Java, Python, and R.
#### Advantages of DataFrames over RDDs
DataFrames offer several key advantages over RDDs:
1. Optimized Execution with Catalyst Optimizer:
DataFrames leverage Spark's Catalyst Optimizer, which analyzes queries and generates optimized execution plans. This includes techniques like predicate pushdown, column pruning, and join reordering, leading to significantly faster and more efficient processing compared to manual RDD transformations.
2. Schema Awareness and Type Safety (with Datasets):
While RDDs are untyped collections of objects, DataFrames introduce a schema, providing information about column names and data types. This enables Spark to perform optimizations and error checking. When using Datasets (an extension of DataFrames in Scala and Java), compile-time type safety is also provided, catching errors earlier in the development cycle.
SQL-like Interface:
3. DataFrames provide a user-friendly, SQL-like interface for data manipulation. This allows users to express complex transformations and queries using familiar SQL syntax or a DataFrame API that mirrors common database operations, making it more accessible to a broader range of users, including data analysts.
4. Integration with Spark Ecosystem:
DataFrames seamlessly integrate with other Spark libraries like Spark SQL, MLlib (Machine Learning Library), and GraphX, providing a unified and consistent API for various data processing tasks.
5. Easier Data Source Integration:
DataFrames simplify reading from and writing to various data sources (e.g., CSV, JSON, Parquet, Avro, ORC) through a unified API, making data ingestion and output more streamlined.

#### How to Create Dataframes
1. From a list of iterables
2. From a list of tuples
3. From a pandas dataframe

In [16]:
#### From a list of rows
from pyspark.sql import SparkSession, Row
from datetime import date, datetime

# Initialize SparkSession
spark = SparkSession.builder.appName("DataFrameCreation").getOrCreate()

# Create data as a list of Row objects
data = [
    Row(id=1, name="Alice", age=30, dob=date(1994, 5, 15)),
    Row(id=2, name="Bob", age=24, dob=date(2000, 1, 20)),
    Row(id=3, name="Charlie", age=35, dob=date(1989, 11, 1))
]

# Create DataFrame
df = spark.createDataFrame(data)

df.show()
df.printSchema()

+---+-------+---+----------+
| id|   name|age|       dob|
+---+-------+---+----------+
|  1|  Alice| 30|1994-05-15|
|  2|    Bob| 24|2000-01-20|
|  3|Charlie| 35|1989-11-01|
+---+-------+---+----------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- dob: date (nullable = true)



In [17]:
# From a list of tuples
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

# Initialize SparkSession
spark = SparkSession.builder.appName("DataFrameCreation").getOrCreate()

# Data as a list of tuples
data_tuples = [
    (1, "Alice", 30),
    (2, "Bob", 24),
    (3, "Charlie", 35)
]

# Define schema explicitly
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Create DataFrame with explicit schema
df_schema = spark.createDataFrame(data_tuples, schema=schema)

# Create DataFrame with inferred schema (if schema is omitted)
df_inferred = spark.createDataFrame(data_tuples, ["id", "name", "age"])

df_schema.show()
df_schema.printSchema()

df_inferred.show()
df_inferred.printSchema()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 30|
|  2|    Bob| 24|
|  3|Charlie| 35|
+---+-------+---+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 30|
|  2|    Bob| 24|
|  3|Charlie| 35|
+---+-------+---+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [18]:
# From a pandas DataFrame
import pandas as pd
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("DataFrameCreation").getOrCreate()

# Create a Pandas DataFrame
pandas_df = pd.DataFrame({
    'col1': [1, 2, 3],
    'col2': ['A', 'B', 'C']
})

# Convert Pandas DataFrame to PySpark DataFrame
spark_df = spark.createDataFrame(pandas_df)

spark_df.show()
spark_df.printSchema()

+----+----+
|col1|col2|
+----+----+
|   1|   A|
|   2|   B|
|   3|   C|
+----+----+

root
 |-- col1: long (nullable = true)
 |-- col2: string (nullable = true)



## DataFrames and SQL
We can also use SQL with dataframes:

In [19]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()
# Read a CSV file into a DataFrame
file_name = os.path.join(BASE_DIR, "flight-data", "csv", "2015-summary.csv")
df = spark.read.csv(file_name, header=True, inferSchema=True)

# Create a temporary view for SQL queries
df.createOrReplaceTempView("flights")
# Execute an SQL query
sqlDF = spark.sql("SELECT DEST_COUNTRY_NAME, count(1) AS flight_count FROM flights GROUP BY DEST_COUNTRY_NAME")
# Show the results
sqlDF.show(5)
spark.stop()

+-----------------+------------+
|DEST_COUNTRY_NAME|flight_count|
+-----------------+------------+
|         Anguilla|           1|
|           Russia|           1|
|         Paraguay|           1|
|          Senegal|           1|
|           Sweden|           1|
+-----------------+------------+
only showing top 5 rows



#### Pyspark SQL functions

1. Basic Functions:
- ``col(column_name) / column(column_name)``: Returns a Column object based on the given name.
- ``lit(value)``: Creates a Column of a literal value (e.g., string, integer, boolean).
- ``alias(name):`` Renames a column.
2. String Functions:
- ``concat(*cols)``: Concatenates multiple columns into a single string column.
- ``substring(str, pos, len)``: Extracts a substring from a string column.
- ``lower(col) / upper(col)``: Converts a string column to lowercase or uppercase.
- ``trim(col)``: Removes leading and trailing whitespace from a string column.
- ``like(column, pattern)``: Filters rows where a string column matches a SQL LIKE pattern.
3. Numeric/Math Functions:
- ``abs(col)``: Computes the absolute value.
- ``round(col, scale)``: Rounds a numeric column to a specified scale.
- ``sqrt(col)``: Computes the square root.
- ``ceil(col) / floor(col)``: Computes the ceiling or floor of a numeric column.
4. Date and Time Functions:
- ``current_date() / current_timestamp()``: Returns the current date or timestamp.
- ``date_add(date, days) / date_sub(date, days)``: Adds or subtracts days from a date column.
- ``year(col) / month(col) / dayofmonth(col)``: Extracts year, month, or day from a date column.
5. Conditional Functions:
- ``when(condition, value) / otherwise(value)``: Implements conditional logic similar to SQL's CASE WHEN.
- ``isnull(col) / isnotnull(col)``: Checks for null or non-null values.
6. Aggregate Functions:
- ``count(col) / countDistinct(col)``: Counts non-null values or distinct values.
- ``sum(col) / avg(col)``: Calculates the sum or average.
- ``min(col) / max(col)``: Finds the minimum or maximum value.
7. Array and Map Functions:
- ``array(*cols)``: Creates an array column from multiple input columns.
- ``explode(array_col)``: Expands an array column into separate rows for each element.

## Exercises
Here are a couple of exercises for working with spark and sql:
1. Working with Booleans
2. Working with Strings
3. Working with dates and timestamps

In [20]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()


file_name = os.path.join(BASE_DIR, "retail-data", "by-day", "2010-12-01.csv")
# Read a CSV file into a DataFrame
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load(file_name)

df.printSchema()

df.createOrReplaceTempView("dfTable")

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [21]:
# Working with booleans
from pyspark.sql.functions import col

df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(5, False)

+---------+-----------------------------+
|InvoiceNo|Description                  |
+---------+-----------------------------+
|536366   |HAND WARMER UNION JACK       |
|536366   |HAND WARMER RED POLKA DOT    |
|536367   |ASSORTED COLOUR BIRD ORNAMENT|
|536367   |POPPY'S PLAYHOUSE BEDROOM    |
|536367   |POPPY'S PLAYHOUSE KITCHEN    |
+---------+-----------------------------+
only showing top 5 rows



In [22]:
# Working with strings
# The initcap function will capitalize every word in a given string when that word is separated from another by a space

from pyspark.sql.functions import initcap
df.select(initcap(col("Description"))).show()

# in SQL 'SELECT initcap(Description) FROM dfTable'

+--------------------+
|initcap(Description)|
+--------------------+
|White Hanging Hea...|
| White Metal Lantern|
|Cream Cupid Heart...|
|Knitted Union Fla...|
|Red Woolly Hottie...|
|Set 7 Babushka Ne...|
|Glass Star Froste...|
|Hand Warmer Union...|
|Hand Warmer Red P...|
|Assorted Colour B...|
|Poppy's Playhouse...|
|Poppy's Playhouse...|
|Feltcraft Princes...|
|Ivory Knitted Mug...|
|Box Of 6 Assorted...|
|Box Of Vintage Ji...|
|Box Of Vintage Al...|
|Home Building Blo...|
|Love Building Blo...|
|Recipe Box With M...|
+--------------------+
only showing top 20 rows



In [23]:
# Upper and lowercase strings
from pyspark.sql.functions import lower, upper

df.select(col("Description"),
lower(col("Description")),
upper(lower(col("Description")))).show(2)

# in SQL SELECT Description, lower(Description), Upper(lower(Description)) FROM dfTable

+--------------------+--------------------+-------------------------+
|         Description|  lower(Description)|upper(lower(Description))|
+--------------------+--------------------+-------------------------+
|WHITE HANGING HEA...|white hanging hea...|     WHITE HANGING HEA...|
| WHITE METAL LANTERN| white metal lantern|      WHITE METAL LANTERN|
+--------------------+--------------------+-------------------------+
only showing top 2 rows



In [24]:
# Another trivial task is adding or removing spaces around a string. You can do this by using lpad, ltrim, rpad and rtrim, trim:

from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim

df.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 3, " ").alias("lp"),
rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

+------+------+-----+---+----------+
| ltrim| rtrim| trim| lp|        rp|
+------+------+-----+---+----------+
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
+------+------+-----+---+----------+
only showing top 2 rows



In [25]:
# Working with dates
from pyspark.sql.functions import current_date, current_timestamp

dateDF = spark.range(10)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")
dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [26]:
# add and subtract five days from today
from pyspark.sql.functions import date_add, date_sub

dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2025-09-13|        2025-09-23|
+------------------+------------------+
only showing top 1 row



In [27]:
# difference between two dates
from pyspark.sql.functions import datediff, months_between, to_date

dateDF.withColumn("week_ago", date_sub(col("today"), 7))\
.select(datediff(col("week_ago"), col("today"))).show(1)
dateDF.select(
to_date(lit("2016-01-01")).alias("start"),
to_date(lit("2017-05-22")).alias("end"))\
.select(months_between(col("start"), col("end"))).show(1)

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+
only showing top 1 row

+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                    -16.67741935|
+--------------------------------+
only showing top 1 row



## Aggregations
- Aggregating is the act of collecting something together and is a cornerstone of big data analytics.
- In an aggregation, you will specify a key or grouping and an aggregation function that specifies
how you should transform one or more columns.
- This function must produce one result for each group, given multiple input values.
-  Spark also allows us to create the following groupings types:
The simplest grouping is to just summarize a complete DataFrame by performing an
aggregation in a select statement.
- A ``group by`` allows you to specify one or more keys as well as one or more
aggregation functions to transform the value columns.
- A ``window`` gives you the ability to specify one or more keys as well as one or more
aggregation functions to transform the value columns. However, the rows input to the
function are somehow related to the current row.
- A ``grouping set`` which you can use to aggregate at multiple different levels. Grouping
sets are available as a primitive in SQL and via rollups and cubes in DataFrames.
- A ``rollup`` makes it possible for you to specify one or more keys as well as one or more
aggregation functions to transform the value columns, which will be summarized
hierarchically.
- A ``cube`` allows you to specify one or more keys as well as one or more aggregation
functions to transform the value columns, which will be summarized across all
combinations of columns.

In [34]:
# Read in purchase data

file_name = os.path.join(BASE_DIR, "retail-data", "all", "*.csv")
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load(file_name)\
.coalesce(5)

df.cache()

df.createOrReplaceTempView("dfTable")

In [35]:
# Count
from pyspark.sql.functions import count
df.select(count("StockCode")).show() # 541909

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [36]:
# count distinct
from pyspark.sql.functions import countDistinct

df.select(countDistinct("StockCode")).show() # 4070

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [37]:
# First and last value

from pyspark.sql.functions import first, last

df.select(first("StockCode"), last("StockCode")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|          85123A|          22138|
+----------------+---------------+



In [38]:
# Min and max

from pyspark.sql.functions import min, max
df.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [39]:
# sum
from pyspark.sql.functions import sum
df.select(sum("Quantity")).show() # 5176450

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



In [40]:
# Average
from pyspark.sql.functions import sum, count, avg, expr
df.select(
count("Quantity").alias("total_transactions"),
sum("Quantity").alias("total_purchases"),
avg("Quantity").alias("avg_purchases"),
expr("mean(Quantity)").alias("mean_purchases"))\
.selectExpr(
"total_purchases/total_transactions",
"avg_purchases",
"mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



In [41]:
# variance and standard deviation
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp
df.select(var_pop("Quantity"), var_samp("Quantity"),
stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+------------------+------------------+--------------------+---------------------+
| var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+------------------+------------------+--------------------+---------------------+
|47559.303646609354| 47559.39140929905|  218.08095663447864|   218.08115785023486|
+------------------+------------------+--------------------+---------------------+



#### Other aggregations
- skewness and kurtosis
- Covariance and Correlation

#### Window Functions
- You can also use window functions to carry out some unique aggregations by either computing some aggregation on a specific “window” of data, which you define by using a reference to the current data.
- This window specification determines which rows will be passed in to this function.
- A group-by takes data, and every row can go only into one grouping.
- A window function calculates a return value for every input row of a table based on a group of rows, called a frame.
- Each row can fall into one or more frames.
- A common use case is to take a look at a rolling average of some value for which each row represents one day.
- If you were to do this, each row would end up in seven different frames.
-  Spark supports three kinds of window functions:
1. ranking functions,
2. analytic functions,
3. aggregate functions.

## Joins
- A join brings together two sets of data, the left and the right, by comparing the value of one or more keys of the left and right and evaluating the result of a join expression that determines whether Spark should bring together the left set of data with the right set of data.
- The most common join expression, an equi-join, compares whether the specified keys in your left and right datasets are equal. If they are equal, Spark will combine the left and right datasets. The opposite is true for keys that do not match; Spark discards the rows that do not have matching keys.
- Spark also allows for much more sophsticated join policies in addition to equi-joins.

#### Join Types
- Whereas the join expression determines whether two rows should join, the join type determines what should be in the result set.
- There are a variety of different join types available in Spark for you to use:
1. Inner joins (keep rows with keys that exist in the left and right datasets)
2. Outer joins (keep rows with keys in either the left or right datasets)
3. Left outer joins (keep rows with keys in the left dataset)
4. Right outer joins (keep rows with keys in the right dataset)
5. Left semi joins (keep the rows in the left, and only the left, dataset where the key appears in the right dataset)
6. Left anti joins (keep the rows in the left, and only the left, dataset where they do not appear in the right dataset)
7. Natural joins (perform a join by implicitly matching the columns between the two datasets with the same names)
8. Cross (or Cartesian) joins (match every row in the left dataset with every row in the right dataset)

In [42]:
# create data
person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")

person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

#### Inner Join
- Inner joins evaluate the keys in both of the DataFrames or tables and include (and join together) only the rows that evaluate to true.
- In the following example, we join the graduateProgram DataFrame with the person DataFrame to create a new DataFrame:

In [43]:
# Inner join
joinExpression = person["graduate_program"] == graduateProgram['id']
person.join(graduateProgram, joinExpression).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



#### Outer Join
- Outer joins evaluate the keys in both of the DataFrames or tables and includes (and joins together) the rows that evaluate to true or false. - If there is no equivalent row in either the left or right DataFrame, Spark will insert null:

In [44]:
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|NULL|            NULL|            NULL|           NULL|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



#### Left Outer Joins
- Left outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from the left DataFrame as well as any rows in the right DataFrame that have a match in the left DataFrame.
- If there is no equivalent row in the right DataFrame, Spark will insert null:

In [45]:
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
| id| degree|          department|     school|  id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|   0|   Bill Chambers|               0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   2|Michael Armbrust|               1|     [250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   1|   Matei Zaharia|               1|[500, 250, 100]|
|  2|Masters|                EECS|UC Berkeley|NULL|            NULL|            NULL|           NULL|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+



#### Cross (Cartesian) Joins
- The last of our joins are cross-joins or cartesian products.
- Cross-joins in simplest terms are inner joins that do not specify a predicate.
- Cross joins will join every single row in the left DataFrame to ever single row in the right DataFrame. This will cause an absolute explosion in the number of rows contained in the resulting DataFrame.
- If you have 1,000 rows in each DataFrame, the crossjoin of these will result in 1,000,000 (1,000 x 1,000) rows.
- For this reason, you must very explicitly state that you want a cross-join by using the cross join keyword:

In [46]:
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
| id| degree|          department|     school| id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|  0|   Bill Chambers|               0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  2|Michael Armbrust|               1|     [250, 100]|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+



## Datasources
Include:
1. csv files
2. json files
3. orc files
4. parquet files
5. sql databases
6. text files


In [47]:
# Read and write csv files
file_name = os.path.join(BASE_DIR, "flight-data", "csv", "2010-summary.csv")

csvFile = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load(file_name)

csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
.save(os.path.join(BASE_DIR, "my-tsv-file.tsv"))

In [50]:
file_name = os.path.join(BASE_DIR, "flight-data", "json", "2010-summary.json")

jsonFile = spark.read.format("json").option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load(file_name)

jsonFile.write.format("json").mode("overwrite").save(os.path.join(BASE_DIR, "my-json-file.json"))

In [55]:
file_name = os.path.join(BASE_DIR, "flight-data", "parquet", "2010-summary.parquet")

parkFile = spark.read.format("parquet")\
.load(file_name)

parkFile.write.format("parquet").mode("overwrite")\
.save(os.path.join(BASE_DIR, "my-parquet-file.parquet"))

In [57]:
file_name = os.path.join(BASE_DIR, "flight-data", "orc", "2010-summary.orc")

orcFile = spark.read.format("orc").load(file_name)
orcFile.write.format("orc").mode("overwrite").save(os.path.join(BASE_DIR, "my-orc-file.orc"), compression="SNAPPY") # compression="SNAPPY"data/my-json-file.orc")

## Spark and Machine learning
- Machine learning tasks include:
1. Supervised learning, including classification and regression, where the goal is to predict a label for each data point based on various features.
2. Recommendation engines to suggest products to users based on behavior.
3. Unsupervised learning, including clustering, anomaly detection, and topic modeling, where the goal is to discover structure in the data.
4. Graph analytics tasks such as searching for patterns in a social network.

#### Supervised Learning
- The goal is simple: using historical data that already has labels (often called the dependent variables), train a modelto predict the values of those labels based on various features of the data points.
- One example would be to predict a person’s income (the dependent variable) based on age (a feature).
- Divided into:
1. Classification:
Classification is the act of training an algorithm to predict a dependent variable that is categorical (belonging to a discrete, finite set of
values). The most common case is binary classification, where our resulting model will make a prediction that a given item belongs to one of two groups. The canonical example is classifying email spam.
2. Regression
In regression, we instead try to predict a continuous variable (a real number). In simplest terms, rather than predicting a category, we want to predict a value on a number line.

#### Unsupervised Learning
- Unsupervised learning is the act of trying to find patterns or discover the underlying structure in a given set of data. This differs from supervised learning because there is no dependent variable (label) to predict.

## The Machine Learning workflow
- The overall process involves, the following steps (with some variation):
1. Gathering and collecting the relevant data for your task.
2. Cleaning and inspecting the data to better understand it.
3. Performing feature engineering to allow the algorithm to leverage the data in a suitable form (e.g., converting the data to numerical vectors).
4. Using a portion of this data as a training set to train one or more algorithms to generate some candidate models.
5. Evaluating and comparing models against your success criteria by objectively measuring results on a subset of the same data that was not used for training. This allows you to better understand how your model may perform in the wild.
6. Leveraging the insights from the above process and/or using the model to make predictions, detect anomalies, or solve more general business challenges.

#### What Is MLlib?
- MLlib is a package, built on and included in Spark, that provides interfaces for gathering and cleaning data, feature engineering and feature selection, training and tuning large-scale supervised and unsupervised machine learning models, and using those models in production.
#### High-Level MLlib Concepts
1. Transformers
- Transformers are functions that convert raw data in some way.
- This might be to create a new interaction variable (from two other variables), normalize a column, or simply change an Integer into a Double type to be input into a model.
- An example of a transformer is one that converts string categorical variables into numerical values that can be used in MLlib.
- Transformers are primarily used in preprocessing and feature engineering.
- Transformers take a DataFrame as input and produce a new DataFrame as output
2. Estimators
- Estimators are one of two kinds of things.
- First, estimators can be a kind of transformer that is initialized with data.
- For instance, to normalize numerical data we’ll need to initialize our transformation with some information about the current values in the column we would like to normalize.
- This requires two passes over our data—the initial pass generates the initialization values and the second actually applies the generated function over the data.
- In the Spark’s nomenclature, algorithms that allow users to train a model from data are also referred to as estimators.
3. Evaluators
- An evaluator allows us to see how a given model performs according to criteria we specify like a receiver operating characteristic (ROC) curve.
- After we use an evaluator to select the best model from the ones we tested, we can then use that model to make predictions.
4. Pipelines
- From a high level we can specify each of the transformations, estimations, and evaluations one by one, but it is often easier to specify our steps as stages in a pipeline.

#### ML Feature Preprocessing
- In the case of most classification and regression algorithms, you want to get your data into a column of type Double to represent the label and a column of type Vector (either dense or sparse) to represent the features.
- In the case of recommendation, you want to get your data into a column of users, a column of items (say movies or books), and a column of ratings.
- In the case of unsupervised learning, a column of type Vector (either dense or sparse) is needed to represent the features.
- In the case of graph analytics, you will want a DataFrame of vertices and a DataFrame
of edges.

In [59]:
# Read in the data
file_name = os.path.join(BASE_DIR, "retail-data", "by-day", "*.csv")

sales = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load(file_name)\
.coalesce(5)\
.where("Description IS NOT NULL")
fakeIntDF = spark.read.parquet(os.path.join(BASE_DIR, "simple-ml-integers"))
simpleDF = spark.read.json(os.path.join(BASE_DIR, "simple-ml"))
scaleDF = spark.read.parquet(os.path.join(BASE_DIR, "simple-ml-scaling"))

In [60]:
sales.cache()
sales.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

#### VectorAssembler
- It helps concatenate all your features into one big vector you can then pass into an estimator.
- It’s used typically in the last step of a machine learning pipeline and takes as input a number of columns of Boolean, Double, or Vector.
- This is particularly helpful if you’re going to perform a number of manipulations using a variety of transformers and need to gather all of those results together.

In [61]:
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler().setInputCols(["int1", "int2", "int3"])
va.transform(fakeIntDF).show()

+----+----+----+------------------------------------+
|int1|int2|int3|VectorAssembler_a97ec228fdb0__output|
+----+----+----+------------------------------------+
|   4|   5|   6|                       [4.0,5.0,6.0]|
|   7|   8|   9|                       [7.0,8.0,9.0]|
|   1|   2|   3|                       [1.0,2.0,3.0]|
+----+----+----+------------------------------------+



##### Bucketing
- This will split a given continuous feature into the buckets of your designation.

In [62]:
contDF = spark.range(20).selectExpr("cast(id as double)")

from pyspark.ml.feature import Bucketizer
bucketBorders = [-1.0, 5.0, 10.0, 250.0, 600.0]
bucketer = Bucketizer().setSplits(bucketBorders).setInputCol("id")
bucketer.transform(contDF).show()

+----+-------------------------------+
|  id|Bucketizer_7bc77823b8c8__output|
+----+-------------------------------+
| 0.0|                            0.0|
| 1.0|                            0.0|
| 2.0|                            0.0|
| 3.0|                            0.0|
| 4.0|                            0.0|
| 5.0|                            1.0|
| 6.0|                            1.0|
| 7.0|                            1.0|
| 8.0|                            1.0|
| 9.0|                            1.0|
|10.0|                            2.0|
|11.0|                            2.0|
|12.0|                            2.0|
|13.0|                            2.0|
|14.0|                            2.0|
|15.0|                            2.0|
|16.0|                            2.0|
|17.0|                            2.0|
|18.0|                            2.0|
|19.0|                            2.0|
+----+-------------------------------+



##### StandardScaler
- The StandardScaler standardizes a set of features to have zero mean and a standard deviationof 1.
- The flag withStd will scale the data to unit standard deviation while the flag withMean (false by default) will center the data prior to scaling it.

In [63]:
from pyspark.ml.feature import StandardScaler
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()

+---+--------------+-----------------------------------+
| id|      features|StandardScaler_2c3e21f3bfa7__output|
+---+--------------+-----------------------------------+
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  1|[3.0,10.1,3.0]|               [3.58568582800318...|
+---+--------------+-----------------------------------+



#### MinMaxScaler
- The MinMaxScaler will scale the values in a vector (component wise) to the proportional values on a scale from a given min value to a max value.
- If you specify the minimum value to be 0 and the maximum value to be 1, then all the values will fall in between 0 and 1:

In [64]:
from pyspark.ml.feature import MinMaxScaler
minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
fittedminMax = minMax.fit(scaleDF)
fittedminMax.transform(scaleDF).show()

+---+--------------+---------------------------------+
| id|      features|MinMaxScaler_69c938337a38__output|
+---+--------------+---------------------------------+
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  1|[3.0,10.1,3.0]|                 [10.0,10.0,10.0]|
+---+--------------+---------------------------------+



#### ElementwiseProduct
- The ElementwiseProduct allows us to scale each value in a vector by an arbitrary value.

In [65]:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
scaleUpVec = Vectors.dense(10.0, 15.0, 20.0)
scalingUp = ElementwiseProduct()\
.setScalingVec(scaleUpVec)\
.setInputCol("features")
scalingUp.transform(scaleDF).show()

+---+--------------+---------------------------------------+
| id|      features|ElementwiseProduct_943ac75107db__output|
+---+--------------+---------------------------------------+
|  0|[1.0,0.1,-1.0]|                       [10.0,1.5,-20.0]|
|  1| [2.0,1.1,1.0]|                       [20.0,16.5,20.0]|
|  0|[1.0,0.1,-1.0]|                       [10.0,1.5,-20.0]|
|  1| [2.0,1.1,1.0]|                       [20.0,16.5,20.0]|
|  1|[3.0,10.1,3.0]|                      [30.0,151.5,60.0]|
+---+--------------+---------------------------------------+



#### StringIndexer
- The simplest way to index is via the StringIndexer, which maps strings to different numerical IDs.

In [66]:
from pyspark.ml.feature import StringIndexer
lblIndxr = StringIndexer().setInputCol("lab").setOutputCol("labelInd")
idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)
idxRes.show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|labelInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|     1.0|
|  red| bad|     1| 38.97187133755819|     0.0|
|  red| bad|     2|14.386294994851129|     0.0|
|  red| bad|    16|14.386294994851129|     0.0|
|  red|good|    45| 38.97187133755819|     1.0|
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|  

## Regression
- Regression is the act of predicting a real number (or continuous variable) from a set of features (represented as numbers).

In [67]:
df = spark.read.load(os.path.join(BASE_DIR, "regression"))

In [68]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
print (lr.explainParams())
lrModel = lr.fit(df)

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.8)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber (default: 1.35)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
loss: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError)
maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0. (default: 0.0)
maxIter: max number of iterations (>= 0). (defaul

In [69]:
summary = lrModel.summary
summary.residuals.show()
print (summary.totalIterations)
print (summary.objectiveHistory)
print (summary.rootMeanSquaredError)
print (summary.r2)

+--------------------+
|           residuals|
+--------------------+
| 0.12805046585610147|
|-0.14468269261572053|
|-0.41903832622420595|
|-0.41903832622420595|
|  0.8547088792080308|
+--------------------+

5
[0.5000000000000001, 0.4315295810362787, 0.3132335933881022, 0.31225692666554117, 0.309150608198303, 0.30915058933480255]
0.47308424392175996
0.7202391226912209


#### K-Means Clustering
- In this algorithm, a user-specified number of clusters (ὅ) are randomly assigned to different points in the dataset.
- The unassigned points are then “assigned” to a cluster based on their proximity (measured in Euclidean distance) to the previously assigned point.
- Once this assignment happens, the center of this cluster (called the centroid) is computed, and the process repeats. All points are assigned to a particular centroid, and a new centroid is computed.
- We repeat this process for a finite number of iterations or until convergence (i.e., when our centroid locations stop changing).

In [70]:
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler()\
.setInputCols(["Quantity", "UnitPrice"])\
.setOutputCol("features")
sales = va.transform(spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(os.path.join(BASE_DIR, "retail-data", "by-day", "*.csv"))
.limit(50)
.coalesce(1)
.where("Description IS NOT NULL"))
sales.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, features: vector]

In [71]:
from pyspark.ml.clustering import KMeans
km = KMeans().setK(5)
print (km.explainParams())
kmModel = km.fit(sales)

distanceMeasure: the distance measure. Supported options: 'euclidean' and 'cosine'. (default: euclidean)
featuresCol: features column name. (default: features)
initMode: The initialization algorithm. This can be either "random" to choose random points as initial cluster centers, or "k-means||" to use a parallel variant of k-means++ (default: k-means||)
initSteps: The number of steps for k-means|| initialization mode. Must be > 0. (default: 2)
k: The number of clusters to create. Must be > 1. (default: 2, current: 5)
maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0. (default: 0.0)
maxIter: max number of iterations (>= 0). (default: 20)
predictionCol: prediction column name. (default: prediction)
seed: random seed. (default: 6343462656064669367)
solver:

In [72]:
summary = kmModel.summary
print (summary.clusterSizes) # number of points
centers = kmModel.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print (center)

[29, 2, 10, 8, 1]
Cluster Centers: 
[7.55172414 2.77172414]
[48.    1.32]
[23.2    0.956]
[ 2.5     11.24375]
[36.    0.85]
