# Spark Example

PySpark applications start with initializing SparkSession which is the entry point of PySpark as below. In case of running it in PySpark shell via pyspark executable, the shell automatically creates the session in the variable spark for users.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## DataFrame Creation

A PySpark DataFrame can be created via pyspark.sql.SparkSession.createDataFrame typically by passing a list of lists, tuples, dictionaries and pyspark.sql.Rows, a pandas DataFrame and an RDD consisting of such a list. pyspark.sql.SparkSession.createDataFrame takes the schema argument to specify the schema of the DataFrame. When it is omitted, PySpark infers the corresponding schema by taking a sample from the data.

Firstly, you can create a PySpark DataFrame from a list of rows

In [None]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Create a PySpark DataFrame with an explicit schema.

In [None]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Create a PySpark DataFrame from a pandas DataFrame

In [None]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Create a PySpark DataFrame from an RDD consisting of a list of tuples.

In [None]:
rdd = spark.sparkContext.parallelize([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

The DataFrames created above all have the same results and schema.

In [None]:
# All DataFrames above result same.
df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



## Viewing Data

The top rows of a DataFrame can be displayed using DataFrame.show().

In [None]:
df.show(1)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row



Alternatively, you can enable spark.sql.repl.eagerEval.enabled configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via spark.sql.repl.eagerEval.maxNumRows configuration.

In [None]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


The rows can also be shown vertically. This is useful when rows are too long to show horizontally.

In [None]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



You can see the DataFrame’s schema and column names as follows:

In [None]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [None]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



Show the summary of the DataFrame

In [None]:
df.select("a", "b", "c").describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



DataFrame.collect() collects the distributed data to the driver side as the local data in Python. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.

In [None]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

In order to avoid throwing an out-of-memory exception, use DataFrame.take() or DataFrame.tail().

In [None]:
df.take(1)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]

PySpark DataFrame also provides the conversion back to a pandas DataFrame to leverage pandas APIs. Note that toPandas also collects all data into the driver side that can easily cause an out-of-memory-error when the data is too large to fit into the driver side.

In [None]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


## Selecting and Accessing Data

PySpark DataFrame is lazily evaluated and simply selecting a column does not trigger the computation but it returns a Column instance.

In [None]:
df.a

Column<'a'>

In [None]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

These Columns can be used to select the columns from a DataFrame. For example, DataFrame.select() takes the Column instances that returns another DataFrame.

In [None]:
df.select(df.c).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



Assign new Column instance.

In [None]:
df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



To select a subset of rows, use DataFrame.filter().

In [None]:
df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



## Applying a Function

PySpark supports various UDFs and APIs to allow users to execute Python native functions. See also the latest Pandas UDFs and Pandas Function APIs. For instance, the example below allows users to directly use the APIs in a pandas Series within Python native function.

In [None]:
import pandas
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.a)).show()

Another example is DataFrame.mapInPandas which allows users directly use the APIs in a pandas DataFrame without any restrictions such as the result length.

In [None]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



## Grouping Data

PySpark DataFrame also provides a way of handling grouped data by using the common approach, split-apply-combine strategy. It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame.

In [None]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



Grouping and then applying the avg() function to the resulting groups.

In [None]:
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
|black|    6.0|   60.0|
| blue|    3.0|   30.0|
+-----+-------+-------+



You can also apply a Python native function against each group by using pandas APIs.

In [None]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
+-----+------+---+---+



Co-grouping and applying a function.

In [None]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def asof_join(l, r):
    return pd.merge_asof(l, r, on='time', by='id')

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
    asof_join, schema='time int, id int, v1 double, v2 string').show()

+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+



## Getting Data in/out

CSV is straightforward and easy to use. Parquet and ORC are efficient and compact file formats to read and write faster.

There are many other data sources available in PySpark such as JDBC, text, binaryFile, Avro, etc. See also the latest Spark SQL, DataFrames and Datasets Guide in Apache Spark documentation.



In [None]:
df.write.csv('ftds.csv', header=True)
spark.read.csv('ftds.csv', header=True).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|banana|  1| 10|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



## Working with SQL

DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily as below:

In [None]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



In addition, UDFs can be registered and invoked in SQL out of the box:

In [None]:
import pandas
from pyspark.sql.functions import pandas_udf

@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+



These SQL expressions can directly be mixed and used as PySpark columns.

In [None]:
from pyspark.sql.functions import expr

df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+



## Spark Data Practice

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date, timedelta, datetime
import time

First of all, a Spark session needs to be initialized. With the help of SparkSession, DataFrame can be created and registered as tables. Moreover, SQL tables are executed, tables can be cached, and parquet/JSON/CSV/Avro data formatted files can be read.

In [None]:
sc = SparkSession.builder.appName("PysparkFTDS")\
.config ("spark.sql.shuffle.partitions", "50")\
.config("spark.driver.maxResultSize","5g")\
.config ("spark.sql.execution.arrow.enabled", "true")\
.getOrCreate()

### Creating DataFrame

dataset [here](https://www.kaggle.com/cmenca/new-york-times-hardcover-fiction-best-sellers)

DataFrames can be created by reading text, CSV, JSON, and Parquet file formats. In our example, we will be using a .json formatted file. You can also find and read text, CSV, and Parquet file formats by using the related read functions as shown below.

In [None]:
#Creates a spark data frame called as raw_data.

#JSON
dataframe = sc.read.json('nyt2.json')

#TXT FILES
# dataframe_txt = sc.read.text('text_data.txt')

#CSV FILES
# dataframe_csv = sc.read.csv('csv_data.csv')

#PARQUET FILES
# dataframe_parquet = sc.read.load('parquet_data.parquet')

### Duplicates

In [None]:
dataframe.show(10)

+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+
|                 _id|  amazon_product_url|              author| bestsellers_date|         description|        price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|
+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+
|{5b4aa4ead3089013...|http://www.amazon...|       Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|       Bantam| {1}|           {0}|           ODD HOURS|          {1}|
|{5b4aa4ead3089013...|http://www.amazon...|     Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, null}|{{1212883200000}}|Little, Brown| {2}|           {1}|           

Duplicate values in a table can be eliminated by using dropDuplicates() function.

In [None]:
dataframe_dropdup = dataframe.dropDuplicates()
dataframe_dropdup.show(10)

+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+----------------+----+--------------+--------------------+-------------+
|                 _id|  amazon_product_url|              author| bestsellers_date|         description|        price|   published_date|       publisher|rank|rank_last_week|               title|weeks_on_list|
+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+----------------+----+--------------+--------------------+-------------+
|{5b4aa4ead3089013...|http://www.amazon...|Clive Cussler wit...|{{1213401600000}}|Juan Cabrillo and...|{26.95, null}|{{1214697600000}}|          Putnam| {4}|           {3}|         PLAGUE SHIP|          {2}|
|{5b4aa4ead3089013...|http://www.amazon...|      Jeffery Deaver|{{1215820800000}}|Detectives Lincol...|    {null, 0}|{{1217116800000}}|Simon & Schuster|{20}|           

### Queries

"Select" Operation

It is possible to obtain columns by attribute `("author")` or by indexing `(dataframe['author'])`.

In [None]:
#Show all entries in title column
dataframe.select("author").show(10)

#Show all entries in title, author, rank, price columns
dataframe.select("author", "title", "rank", "price").show(10)

+--------------------+
|              author|
+--------------------+
|       Dean R Koontz|
|     Stephenie Meyer|
|        Emily Giffin|
|   Patricia Cornwell|
|     Chuck Palahniuk|
|James Patterson a...|
|       John Sandford|
|       Jimmy Buffett|
|    Elizabeth George|
|      David Baldacci|
+--------------------+
only showing top 10 rows

+--------------------+--------------------+----+-------------+
|              author|               title|rank|        price|
+--------------------+--------------------+----+-------------+
|       Dean R Koontz|           ODD HOURS| {1}|   {null, 27}|
|     Stephenie Meyer|            THE HOST| {2}|{25.99, null}|
|        Emily Giffin|LOVE THE ONE YOU'...| {3}|{24.95, null}|
|   Patricia Cornwell|           THE FRONT| {4}|{22.95, null}|
|     Chuck Palahniuk|               SNUFF| {5}|{24.95, null}|
|James Patterson a...|SUNDAYS AT TIFFANY’S| {6}|{24.99, null}|
|       John Sandford|        PHANTOM PREY| {7}|{26.95, null}|
|       Jimmy Buffett|

The 'title' column is selected and a condition is added with a 'when' condition.

In [None]:
# Show title and assign 0 or 1 depending on title
dataframe.select("title", when(dataframe.title != 'ODD HOURS', 1).otherwise(0)).show(10)

+--------------------+-----------------------------------------------------+
|               title|CASE WHEN (NOT (title = ODD HOURS)) THEN 1 ELSE 0 END|
+--------------------+-----------------------------------------------------+
|           ODD HOURS|                                                    0|
|            THE HOST|                                                    1|
|LOVE THE ONE YOU'...|                                                    1|
|           THE FRONT|                                                    1|
|               SNUFF|                                                    1|
|SUNDAYS AT TIFFANY’S|                                                    1|
|        PHANTOM PREY|                                                    1|
|          SWINE NOT?|                                                    1|
|     CARELESS IN RED|                                                    1|
|     THE WHOLE TRUTH|                                                    1|

The “isin” operation is applied instead of “when” which can be also used to define some conditions to rows.

In [None]:
# Show rows with specified authors if in the given options

dataframe [dataframe.author.isin("John Sandford", "Emily Giffin")].show(5)

+--------------------+--------------------+-------------+-----------------+--------------------+-------------+-----------------+------------+----+--------------+--------------------+-------------+
|                 _id|  amazon_product_url|       author| bestsellers_date|         description|        price|   published_date|   publisher|rank|rank_last_week|               title|weeks_on_list|
+--------------------+--------------------+-------------+-----------------+--------------------+-------------+-----------------+------------+----+--------------+--------------------+-------------+
|{5b4aa4ead3089013...|http://www.amazon...| Emily Giffin|{{1211587200000}}|A woman's happy m...|{24.95, null}|{{1212883200000}}|St. Martin's| {3}|           {2}|LOVE THE ONE YOU'...|          {2}|
|{5b4aa4ead3089013...|http://www.amazon...|John Sandford|{{1211587200000}}|The Minneapolis d...|{26.95, null}|{{1212883200000}}|      Putnam| {7}|           {4}|        PHANTOM PREY|          {3}|
|{5b4aa4ead3089

In the brackets of the “Like” function, the % character is used to filter out all titles having the “ THE ” word. If the condition we are looking for is the exact match, then no % character shall be used.

In [None]:
# Show author and title is TRUE if title has " THE " word in titles

dataframe.select("author", "title", dataframe.title.like("% THE %")).show(15)

+--------------------+--------------------+------------------+
|              author|               title|title LIKE % THE %|
+--------------------+--------------------+------------------+
|       Dean R Koontz|           ODD HOURS|             false|
|     Stephenie Meyer|            THE HOST|             false|
|        Emily Giffin|LOVE THE ONE YOU'...|              true|
|   Patricia Cornwell|           THE FRONT|             false|
|     Chuck Palahniuk|               SNUFF|             false|
|James Patterson a...|SUNDAYS AT TIFFANY’S|             false|
|       John Sandford|        PHANTOM PREY|             false|
|       Jimmy Buffett|          SWINE NOT?|             false|
|    Elizabeth George|     CARELESS IN RED|             false|
|      David Baldacci|     THE WHOLE TRUTH|             false|
|        Troy Denning|          INVINCIBLE|             false|
|          James Frey|BRIGHT SHINY MORNING|             false|
|         Garth Stein|THE ART OF RACING...|            

StartsWith scans from the beginning of word/content with specified criteria in the brackets. In parallel, EndsWith processes the word/content starting from the end. Both of the functions are case-sensitive.

In [None]:
dataframe.select("author", "title", dataframe.title.startswith("THE")).show(5)

dataframe.select("author", "title", dataframe.title.endswith("NT")).show(5)

+-----------------+--------------------+----------------------+
|           author|               title|startswith(title, THE)|
+-----------------+--------------------+----------------------+
|    Dean R Koontz|           ODD HOURS|                 false|
|  Stephenie Meyer|            THE HOST|                  true|
|     Emily Giffin|LOVE THE ONE YOU'...|                 false|
|Patricia Cornwell|           THE FRONT|                  true|
|  Chuck Palahniuk|               SNUFF|                 false|
+-----------------+--------------------+----------------------+
only showing top 5 rows

+-----------------+--------------------+-------------------+
|           author|               title|endswith(title, NT)|
+-----------------+--------------------+-------------------+
|    Dean R Koontz|           ODD HOURS|              false|
|  Stephenie Meyer|            THE HOST|              false|
|     Emily Giffin|LOVE THE ONE YOU'...|              false|
|Patricia Cornwell|           THE

Substring functions to extract the text between specified indexes. In the following examples, texts are extracted from the index numbers (1, 3), (3, 6), and (1, 6).

In [None]:
dataframe.select(dataframe.author.substr(1, 3).alias("title")).show(5)
dataframe.select(dataframe.author.substr(3, 6).alias("title")).show(5)
dataframe.select(dataframe.author.substr(1, 6).alias("title")).show(5)

+-----+
|title|
+-----+
|  Dea|
|  Ste|
|  Emi|
|  Pat|
|  Chu|
+-----+
only showing top 5 rows

+------+
| title|
+------+
|an R K|
|epheni|
|ily Gi|
|tricia|
|uck Pa|
+------+
only showing top 5 rows

+------+
| title|
+------+
|Dean R|
|Stephe|
|Emily |
|Patric|
|Chuck |
+------+
only showing top 5 rows



Adding Columns

In [None]:
# Lit() is required while we are creating columns with exact values.

dataframe = dataframe.withColumn('new_column', lit('This is a new column'))

dataframe.show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|  amazon_product_url|           author| bestsellers_date|         description|        price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|    Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|       Bantam| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, 

Updating Columns

In [None]:
dataframe = dataframe.withColumnRenamed('amazon_product_url', 'URL')

dataframe.show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 URL|           author| bestsellers_date|         description|        price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|    Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|       Bantam| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, 

Removing Columns

In [None]:
dataframe_remove = dataframe.drop("publisher", "published_date").show(5)

dataframe_remove2 = dataframe.drop(dataframe.publisher).drop(dataframe.published_date).show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 URL|           author| bestsellers_date|         description|        price|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|    Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, null}| {2}|           {1}|            THE HOST|          {3}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  

### Inspect Data

In [None]:
# Returns dataframe column names and data types
dataframe.dtypes

# Displays the content of dataframe
dataframe.show()

# Return first n rows
dataframe.head()

# Returns first row
dataframe.first()

# Return first n rows
dataframe.take(5)

# Computes summary statistics
dataframe.describe().show()

# Returns columns of dataframe
dataframe.columns

# Counts the number of rows in dataframe
dataframe.count()

# Counts the number of distinct rows in dataframe
dataframe.distinct().count()

# Prints plans including physical and logical
dataframe.explain()

+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+--------------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 URL|              author| bestsellers_date|         description|        price|   published_date|           publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+--------------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|       Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|              Bantam| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|     Stephenie Meyer|{{12

### GroupBy

In [None]:
# Group by author, count the books of the authors in the groups

dataframe.groupBy("author").count().show(10)

+--------------------+-----+
|              author|count|
+--------------------+-----+
|          James Frey|    2|
|    Elin Hilderbrand|   58|
|   Sharon Kay Penman|    2|
|         Kate Jacobs|    3|
|       Karen Robards|    6|
|     Gary Shteyngart|    3|
|         Lisa Genova|    7|
|James Patterson a...|   30|
|         Ruth Reichl|    3|
|         JRR Tolkien|    2|
+--------------------+-----+
only showing top 10 rows



### Filter

In [None]:
# Filtering entries of title
# Only keeps records having value 'THE HOST'

dataframe.filter(dataframe["title"] == 'THE HOST').show(5)

+--------------------+--------------------+---------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------+-------------+--------------------+
|                 _id|                 URL|         author| bestsellers_date|         description|        price|   published_date|    publisher|rank|rank_last_week|   title|weeks_on_list|          new_column|
+--------------------+--------------------+---------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, null}|{{1212883200000}}|Little, Brown| {2}|           {1}|THE HOST|          {3}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|Stephenie Meyer|{{1212192000000}}|Aliens have taken...|{25.99, null}|{{1213488000000}}|Little, Brown| {2}|           {2}|

### Missing & Replacing Values

In [None]:
# Replace null values
dataframe.na.fill(50).show(5)

# Return new dataframe restricting rows with null values
dataframe.na.drop().show(5)

# Return new dataframe replacing one value with another
dataframe.na.replace(10, 20).show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 URL|           author| bestsellers_date|         description|        price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|    Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|       Bantam| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, 

### Repartitioning

It is possible to increase or decrease the existing level of partitioning in RDD Increasing can be actualized by using the repartition(self, numPartitions) function which results in a new RDD that obtains the higher number of partitions. Decreasing can be processed with coalesce(self, numPartitions, shuffle=False) function that results in a new RDD with a reduced number of partitions to a specified number.

In [None]:
# Dataframe with 10 partitions
dataframe.repartition(10).rdd.getNumPartitions()

# Dataframe with 1 partition
dataframe.coalesce(1).rdd.getNumPartitions()

1

### Running SQL Queries Programmatically

Raw SQL queries can also be used by enabling the “sql” operation on our SparkSession to run SQL queries programmatically and return the result sets as DataFrame structures.

In [None]:
# Registering a table
dataframe.registerTempTable("df")

In [None]:
sc.sql("select * from df").show(3)


sc.sql("select \
               CASE WHEN description LIKE '%love%' THEN 'Love_Theme' \
               WHEN description LIKE '%hate%' THEN 'Hate_Theme' \
               WHEN description LIKE '%happy%' THEN 'Happiness_Theme' \
               WHEN description LIKE '%anger%' THEN 'Anger_Theme' \
               WHEN description LIKE '%horror%' THEN 'Horror_Theme' \
               WHEN description LIKE '%death%' THEN 'Criminal_Theme' \
               WHEN description LIKE '%detective%' THEN 'Mystery_Theme' \
               ELSE 'Other_Themes' \
               END Themes \
       from df").groupBy('Themes').count().show()

+--------------------+--------------------+---------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 URL|         author| bestsellers_date|         description|        price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+---------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|  Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|       Bantam| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, null}|{{12

### Output

In [None]:
# Converting dataframe into an RDD
rdd_convert = dataframe.rdd

In [None]:
# Converting dataframe into a RDD of string
dataframe.toJSON().first()

'{"_id":{"$oid":"5b4aa4ead3089013507db18b"},"URL":"http://www.amazon.com/Odd-Hours-Dean-Koontz/dp/0553807056?tag=NYTBS-20","author":"Dean R Koontz","bestsellers_date":{"$date":{"$numberLong":"1211587200000"}},"description":"Odd Thomas, who can communicate with the dead, confronts evil forces in a California coastal town.","price":{"$numberInt":"27"},"published_date":{"$date":{"$numberLong":"1212883200000"}},"publisher":"Bantam","rank":{"$numberInt":"1"},"rank_last_week":{"$numberInt":"0"},"title":"ODD HOURS","weeks_on_list":{"$numberInt":"1"},"new_column":"This is a new column"}'

In [None]:
# Obtaining contents of df as Pandas dataFrame
dataframe.toPandas()

  Nested StructType not supported in conversion to Arrow
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,_id,URL,author,bestsellers_date,description,price,published_date,publisher,rank,rank_last_week,title,weeks_on_list,new_column
0,"(5b4aa4ead3089013507db18b,)",http://www.amazon.com/Odd-Hours-Dean-Koontz/dp...,Dean R Koontz,"((1211587200000,),)","Odd Thomas, who can communicate with the dead,...","(None, 27)","((1212883200000,),)",Bantam,"(1,)","(0,)",ODD HOURS,"(1,)",This is a new column
1,"(5b4aa4ead3089013507db18c,)",http://www.amazon.com/The-Host-Novel-Stephenie...,Stephenie Meyer,"((1211587200000,),)",Aliens have taken control of the minds and bod...,"(25.99, None)","((1212883200000,),)","Little, Brown","(2,)","(1,)",THE HOST,"(3,)",This is a new column
2,"(5b4aa4ead3089013507db18d,)",http://www.amazon.com/Love-Youre-With-Emily-Gi...,Emily Giffin,"((1211587200000,),)",A woman's happy marriage is shaken when she en...,"(24.95, None)","((1212883200000,),)",St. Martin's,"(3,)","(2,)",LOVE THE ONE YOU'RE WITH,"(2,)",This is a new column
3,"(5b4aa4ead3089013507db18e,)",http://www.amazon.com/The-Front-Garano-Patrici...,Patricia Cornwell,"((1211587200000,),)",A Massachusetts state investigator and his tea...,"(22.95, None)","((1212883200000,),)",Putnam,"(4,)","(0,)",THE FRONT,"(1,)",This is a new column
4,"(5b4aa4ead3089013507db18f,)",http://www.amazon.com/Snuff-Chuck-Palahniuk/dp...,Chuck Palahniuk,"((1211587200000,),)",An aging porn queens aims to cap her career by...,"(24.95, None)","((1212883200000,),)",Doubleday,"(5,)","(0,)",SNUFF,"(1,)",This is a new column
...,...,...,...,...,...,...,...,...,...,...,...,...,...
10190,"(5b4aa4ead3089013507dd959,)",https://www.amazon.com/Clancy-Line-Sight-Jack-...,Mike Maden,"((1530921600000,),)",Jack Ryan Jr. risks his life to protect a woma...,"(None, 0)","((1532217600000,),)",Putnam,"(11,)","(6,)",TOM CLANCY LINE OF SIGHT,"(4,)",This is a new column
10191,"(5b4aa4ead3089013507dd95a,)",https://www.amazon.com/Something-Water-Novel-C...,Catherine Steadman,"((1530921600000,),)",A documentary filmmaker and an investment bank...,"(None, 0)","((1532217600000,),)",Ballantine,"(12,)","(11,)",SOMETHING IN THE WATER,"(5,)",This is a new column
10192,"(5b4aa4ead3089013507dd95b,)",https://www.amazon.com/Little-Fires-Everywhere...,Celeste Ng,"((1530921600000,),)",An artist upends a quiet town outside Cleveland.,"(None, 0)","((1532217600000,),)",Penguin Press,"(13,)","(12,)",LITTLE FIRES EVERYWHERE,"(41,)",This is a new column
10193,"(5b4aa4ead3089013507dd95c,)",https://www.amazon.com/Shelter-Place-Nora-Robe...,Nora Roberts,"((1530921600000,),)",Survivors of a mass shooting outside a mall in...,"(None, 0)","((1532217600000,),)",St. Martin's,"(14,)","(5,)",SHELTER IN PLACE,"(6,)",This is a new column


In [None]:
# Write & Save File in .parquet format
dataframe.select("author", "title", "rank", "description")\
.write \
.save("Rankings_Descriptions.parquet")

In [None]:
# Write & Save File in .json format
dataframe.select("author", "title") \
.write \
.save("Authors_Titles.json",format="json")

In [None]:
# End Spark Session
sc.stop()

## Multiclass Text Classifications

Loading a CSV file dataset [here](https://www.kaggle.com/kaggle/san-francisco-crime-classification?select=train.csv)

In [None]:
from pyspark.sql import SQLContext

data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('train.csv')

Remove the columns we do not need and have a look the first five rows:

In [None]:
drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)

+--------------+--------------------+
|      Category|            Descript|
+--------------+--------------------+
|      WARRANTS|      WARRANT ARREST|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
+--------------+--------------------+
only showing top 5 rows



Apply printSchema() on the data which will print the schema in a tree format:

In [None]:
data.printSchema()

root
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)



Top 20 crime categories:

In [None]:
from pyspark.sql.functions import col
data.groupBy("Category") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+--------------------+------+
|            Category| count|
+--------------------+------+
|       LARCENY/THEFT|174900|
|      OTHER OFFENSES|126182|
|        NON-CRIMINAL| 92304|
|             ASSAULT| 76876|
|       DRUG/NARCOTIC| 53971|
|       VEHICLE THEFT| 53781|
|           VANDALISM| 44725|
|            WARRANTS| 42214|
|            BURGLARY| 36755|
|      SUSPICIOUS OCC| 31414|
|      MISSING PERSON| 25989|
|             ROBBERY| 23000|
|               FRAUD| 16679|
|FORGERY/COUNTERFE...| 10609|
|     SECONDARY CODES|  9985|
|         WEAPON LAWS|  8555|
|        PROSTITUTION|  7484|
|            TRESPASS|  7326|
|     STOLEN PROPERTY|  4540|
|SEX OFFENSES FORC...|  4388|
+--------------------+------+
only showing top 20 rows



Top 20 crime descriptions:

In [None]:
data.groupBy("Descript") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+--------------------+-----+
|            Descript|count|
+--------------------+-----+
|GRAND THEFT FROM ...|60022|
|       LOST PROPERTY|31729|
|             BATTERY|27441|
|   STOLEN AUTOMOBILE|26897|
|DRIVERS LICENSE, ...|26839|
|      WARRANT ARREST|23754|
|SUSPICIOUS OCCURR...|21891|
|AIDED CASE, MENTA...|21497|
|PETTY THEFT FROM ...|19771|
|MALICIOUS MISCHIE...|17789|
|   TRAFFIC VIOLATION|16471|
|PETTY THEFT OF PR...|16196|
|MALICIOUS MISCHIE...|15957|
|THREATS AGAINST LIFE|14716|
|      FOUND PROPERTY|12146|
|ENROUTE TO OUTSID...|11470|
|GRAND THEFT OF PR...|11010|
|POSSESSION OF NAR...|10050|
|PETTY THEFT FROM ...|10029|
|PETTY THEFT SHOPL...| 9571|
+--------------------+-----+
only showing top 20 rows



Model Pipeline

Spark Machine Learning Pipelines API is similar to Scikit-Learn. Spark pipeline includes three steps:

- regexTokenizer: Tokenization (with Regular Expression)
- stopwordsRemover: Remove Stop Words
- countVectors: Count vectors (“document-term vectors”)

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"]
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

StringIndexer

StringIndexer encodes a string column of labels to a column of label indices. The indices are in `(0, numLabels)`, ordered by label frequencies, so the most frequent label gets index `0`.

In our case, the label column (Category) will be encoded to label indices, from 0 to 32; the most frequent label (LARCENY/THEFT) will be indexed as 0.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      Category|            Descript|               words|            filtered|            features|label|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      WARRANTS|      WARRANT ARREST|   [warrant, arrest]|   [warrant, arrest]|(809,[17,32],[1.0...|  7.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(809,[11,17,35],[...|  1.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(809,[11,17,35],[...|  1.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,6],...|  0.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,6],...|  0.0|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



Partition Training & Test sets

In [None]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 614691
Test Dataset Count: 263358


Model Training and Evaluation

Logistic Regression using Count Vector Features

Our model will make predictions and score on the test set; we then look at the top 10 predictions from the highest probability

In [None]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+-------------+------------------------------+-----+----------+
|                      Descript|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8730172912256745,0.02054...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8730172912256745,0.02054...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8730172912256745,0.02054...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8730172912256745,0.02054...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8730172912256745,0.02054...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8730172912256745,0.02054...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8730172912256745,0.02054...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9721281687008145

Logistic Regression using TF-IDF Features

In [None]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)

idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+-------------+------------------------------+-----+----------+
|                      Descript|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8846868443884813,0.01886...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8846868443884813,0.01886...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8846868443884813,0.01886...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8846868443884813,0.01886...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8846868443884813,0.01886...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8846868443884813,0.01886...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8846868443884813,0.01886...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9721507088140049

Cross-Validation

In [None]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)

# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9918960725584134

Naive Bayes

In [None]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)

predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.999999999986501,1.521776...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.999999999986501,1.521776...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.999999999986501,1.521776...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.999999999986501,1.521776...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.999999999986501,1.521776...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.999999999986501,1.521776...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.999999999986501,1.521776...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.999999999986501,1.52177

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9934900857765636

Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# Train model with Training Data
rfModel = rf.fit(trainingData)

predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.564748070756399,0.073737...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.564748070756399,0.073737...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.564748070756399,0.073737...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.564748070756399,0.073737...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.564748070756399,0.073737...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.564748070756399,0.073737...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.564748070756399,0.073737...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.564748070756399,0.07373

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7153767272417834