# Hello Spark 2

Walkthrough of O'Reilly [Spark - The Definitive Guide](https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/), chapter 3

To install dependencies for this notebook locally, assuming the full Spark install is in `/usr/local/spark`:

```
$ virtualenv python=python3 spark
$ cd spark
$ source bin/activate
$ pip install pyspark jupyter
$ python -m ipykernel install --user --name spark --display-name "Python 3 (spark)"
$ SPARK_HOME=/usr/local/spark PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark
```

In Jupyter notebook, choose Kernel = "Python 3 (spark)".

Sample data can be downloaded from: http://cdn.oreillystatic.com/books/0636920034957/data.zip

In [1]:
spark

# Schemas

In [2]:
df = spark.read.format("json").load("data/flight-data/json/2015-summary.json")
df

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [3]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [4]:
df.schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

In [5]:
type(df.schema)

pyspark.sql.types.StructType

# Manual Schemas

In [6]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

manual_schema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), False)
])

df = spark.read.format("json").schema(manual_schema).load("data/flight-data/json/2015-summary.json")

In [7]:
df.take(10)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [8]:
df.sort("count").toPandas().head()

Unnamed: 0,DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
0,United States,Croatia,1
1,United States,Singapore,1
2,Moldova,United States,1
3,Malta,United States,1
4,United States,Gibraltar,1


# Columns

In [9]:
from pyspark.sql.functions import col, column

In [10]:
col("someColumnName")

Column<b'someColumnName'>

In [11]:
column("someColumnName")

Column<b'someColumnName'>

In [12]:
# this only works in scala?
try:
    df.col("count")
except Exception as ex:
    print(ex)

'DataFrame' object has no attribute 'col'


In [13]:
df["count"]

Column<b'count'>

In [14]:
df["count"] + 10

Column<b'(count + 10)'>

In [15]:
from pyspark.sql.functions import expr

expr("(((someCol + 5) * 200) - 6) < otherCol")

Column<b'((((someCol + 5) * 200) - 6) < otherCol)'>

In [16]:
df.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [17]:
for c in df.columns:
    df[c]

# Rows

In [18]:
from pyspark.sql import Row

my_row = Row("Hello", None, 1, False)
my_row[0], my_row[1], my_row[2]

('Hello', None, 1)

In [19]:
my_row.count(0)

1

# Creating DataFrames

In [20]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

manual_schema = StructType([
  StructField("some", StringType(), True),
  StructField("col", StringType(), True),
  StructField("names", LongType(), False)
])

my_rows = [Row("Hello", None, 1), Row("Hello Again", None, 2)]
my_df = spark.createDataFrame(my_rows, manual_schema)
my_df.show()

+-----------+----+-----+
|       some| col|names|
+-----------+----+-----+
|      Hello|null|    1|
|Hello Again|null|    2|
+-----------+----+-----+



In [21]:
my_df.show(1)

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+
only showing top 1 row



# Select and SelectExpr

In [22]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [23]:
df.select(
  "DEST_COUNTRY_NAME",
  "ORIGIN_COUNTRY_NAME")\
  .show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [24]:
df.select(
    expr("DEST_COUNTRY_NAME as destination"),
    expr("ORIGIN_COUNTRY_NAME as origin"),
).show(2)

+-------------+-------+
|  destination| origin|
+-------------+-------+
|United States|Romania|
|United States|Croatia|
+-------------+-------+
only showing top 2 rows



In [25]:
df.select(
  expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME")
).show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [26]:
df.selectExpr(
  "DEST_COUNTRY_NAME",
  "DEST_COUNTRY_NAME as AGAIN_WITH_THE_DEST_COUNTRY_NAME"
).show(2)

+-----------------+--------------------------------+
|DEST_COUNTRY_NAME|AGAIN_WITH_THE_DEST_COUNTRY_NAME|
+-----------------+--------------------------------+
|    United States|                   United States|
|    United States|                   United States|
+-----------------+--------------------------------+
only showing top 2 rows



In [27]:
df.selectExpr(
    "*", # all original columns
    "CONCAT('From ', ORIGIN_COUNTRY_NAME, ' to ', DEST_COUNTRY_NAME) as route",
    "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
    .show(10)

+-----------------+-------------------+-----+--------------------+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|               route|withinCountry|
+-----------------+-------------------+-----+--------------------+-------------+
|    United States|            Romania|   15|From Romania to U...|        false|
|    United States|            Croatia|    1|From Croatia to U...|        false|
|    United States|            Ireland|  344|From Ireland to U...|        false|
|            Egypt|      United States|   15|From United State...|        false|
|    United States|              India|   62|From India to Uni...|        false|
|    United States|          Singapore|    1|From Singapore to...|        false|
|    United States|            Grenada|   62|From Grenada to U...|        false|
|       Costa Rica|      United States|  588|From United State...|        false|
|          Senegal|      United States|   40|From United State...|        false|
|          Moldova|      Uni

In [28]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))", "count(*)").show(2)

+-----------+---------------------------------+--------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|count(1)|
+-----------+---------------------------------+--------+
|1770.765625|                              132|     256|
+-----------+---------------------------------+--------+



In [29]:
from pyspark.sql.functions import lit, expr

df.select(
    expr("*"),
    lit(42).alias("meaning of life")
).show(5)

+-----------------+-------------------+-----+---------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|meaning of life|
+-----------------+-------------------+-----+---------------+
|    United States|            Romania|   15|             42|
|    United States|            Croatia|    1|             42|
|    United States|            Ireland|  344|             42|
|            Egypt|      United States|   15|             42|
|    United States|              India|   62|             42|
+-----------------+-------------------+-----+---------------+
only showing top 5 rows



# withColumn

In [30]:
df.where("ORIGIN_COUNTRY_NAME = DEST_COUNTRY_NAME")\
  .withColumn(
    "withinCountry",
    expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
  .show(2)

+-----------------+-------------------+------+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|withinCountry|
+-----------------+-------------------+------+-------------+
|    United States|      United States|370002|         true|
+-----------------+-------------------+------+-------------+



In [31]:
df_long_col_name = df\
  .withColumn(
    "This Long Column-Name",
    expr("ORIGIN_COUNTRY_NAME"))

df_long_col_name.show(2)

+-----------------+-------------------+-----+---------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|This Long Column-Name|
+-----------------+-------------------+-----+---------------------+
|    United States|            Romania|   15|              Romania|
|    United States|            Croatia|    1|              Croatia|
+-----------------+-------------------+-----+---------------------+
only showing top 2 rows



In [32]:
df_long_col_name.selectExpr(
    "`This Long Column-Name`",
    "count"
).show(5)

+---------------------+-----+
|This Long Column-Name|count|
+---------------------+-----+
|              Romania|   15|
|              Croatia|    1|
|              Ireland|  344|
|        United States|   15|
|                India|   62|
+---------------------+-----+
only showing top 5 rows



# Dropping columns

In [33]:
df_skinny = df.drop("ORIGIN_COUNTRY_NAME")
df_skinny.show(5)

+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|    United States|   15|
|    United States|    1|
|    United States|  344|
|            Egypt|   15|
|    United States|   62|
+-----------------+-----+
only showing top 5 rows



# Cast

In [34]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [35]:
df.withColumn("count", col("count").cast("float")).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania| 15.0|
|    United States|            Croatia|  1.0|
|    United States|            Ireland|344.0|
|            Egypt|      United States| 15.0|
|    United States|              India| 62.0|
+-----------------+-------------------+-----+
only showing top 5 rows



# Filtering rows

In [36]:
df.filter(col("count") > 100).take(5)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Sint Maarten', count=325),
 Row(DEST_COUNTRY_NAME='Turks and Caicos Islands', ORIGIN_COUNTRY_NAME='United States', count=230),
 Row(DEST_COUNTRY_NAME='Italy', ORIGIN_COUNTRY_NAME='United States', count=382)]

In [37]:
df.where("count > 100").take(5)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Sint Maarten', count=325),
 Row(DEST_COUNTRY_NAME='Turks and Caicos Islands', ORIGIN_COUNTRY_NAME='United States', count=230),
 Row(DEST_COUNTRY_NAME='Italy', ORIGIN_COUNTRY_NAME='United States', count=382)]

In [38]:
df.where("count > 100").where("DEST_COUNTRY_NAME <> 'United States'").take(5)

[Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Turks and Caicos Islands', ORIGIN_COUNTRY_NAME='United States', count=230),
 Row(DEST_COUNTRY_NAME='Italy', ORIGIN_COUNTRY_NAME='United States', count=382),
 Row(DEST_COUNTRY_NAME='Iceland', ORIGIN_COUNTRY_NAME='United States', count=181),
 Row(DEST_COUNTRY_NAME='Luxembourg', ORIGIN_COUNTRY_NAME='United States', count=155)]

# Unique values

In [39]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

125

In [40]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

256

In [41]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").count()

256

In [42]:
df.select("ORIGIN_COUNTRY_NAME").count()

256

# Random samples

In [43]:
seed = 5
withReplacement = False
fraction = 0.25

df.sample(withReplacement, fraction, seed).count()

60

# Splits

In [44]:
df_splits = df.randomSplit([0.75, 0.25], seed)
df_splits[0].count() > df_splits[1].count()

True

In [45]:
# train, test
df_splits[0].count(), df_splits[1].count()

(192, 64)

# Union

In [46]:
from pyspark.sql import Row

schema = df.schema

newRows = [
  Row("New Country", "Other Country", 5),
  Row("New Country 2", "Other Country 3", 1)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

In [47]:
newDF.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|      New Country|      Other Country|    5|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



In [48]:
df.union(newDF) \
  .where("count = 1") \
  .where(col("ORIGIN_COUNTRY_NAME") != "United States") \
  .show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



# Sorting

In [49]:
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [50]:
# this doesn't appear to work...
df.orderBy(expr("`count` DESC")).show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



In [51]:
from pyspark.sql.functions import desc, asc

df.orderBy(desc("count")).show(5)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



In [52]:
df.orderBy("count", ascending=False).show(5)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



In [53]:
df.sort("count", ascending=False).show(5)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



In [54]:
df.sort(df["count"].desc()).show(5)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



In [55]:
df.sort(df.DEST_COUNTRY_NAME.desc()).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Zambia|      United States|    1|
|        Venezuela|      United States|  290|
|          Uruguay|      United States|   43|
|    United States|            Grenada|   62|
|    United States|            Romania|   15|
+-----------------+-------------------+-----+
only showing top 5 rows



# Limit

In [56]:
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



# Repartition

In [57]:
df.rdd.getNumPartitions()

1

In [58]:
df_part = df.repartition(5)
df_part.rdd.getNumPartitions()

5