In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("XZ") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
df = spark\
        .read\
        .json("/tools/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json")


In [38]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [39]:
df.schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

**The example that follows shows how to create and
enforce a specific schema on a DataFrame.**

In [40]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), False, metadata={"hello":"world"})
    ])

df = spark\
        .read\
        .schema(myManualSchema)\
        .json("/tools/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json")
            
df.limit(5).show()


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



**There are a lot of different ways to construct and refer to columns but the two simplest ways are
by using the col or column functions. To use either of these functions, you pass in a column
name:**

In [45]:
from pyspark.sql.functions import col, column

c1 = col("col1")
print(c1)

c2 = column("col2")
print(c2)



Column<b'col1'>
Column<b'col2'>


**Columns provide a subset of expression functionality.
remember a couple of key points:
    Columns are just expressions.
    Columns and transformations of those columns compile to the same logical plan as parsed expressions (expression trees).**

In [46]:
df.columns


['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

**Rows aka Records**

In [47]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [53]:
# manually create a Row
from pyspark.sql import Row

row = Row("Hello", None, 1, False)
print(row[0])
print(row[1])

Hello
None


**Creating DataFrames**

In [55]:
df = spark\
        .read\
        .format("json")\
        .load("/tools/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json")
    

In [58]:
df.createOrReplaceTempView("dfTable")

spark.sql("select * from dfTable").limit(5).show()


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



**We can also create DataFrames on the fly**

In [60]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+



**select and selectExpr**

In [64]:
spark.sql("select * from dfTable").limit(1).show()

df.limit(1).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
+-----------------+-------------------+-----+

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
+-----------------+-------------------+-----+



In [65]:
spark.sql("select DEST_COUNTRY_NAME from dfTable").limit(1).show()

df.select("DEST_COUNTRY_NAME").limit(1).show()

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
+-----------------+

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
+-----------------+



In [88]:
spark.sql("SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME as ORIGIN, count * 10 as CNT FROM dfTable limit 1").show()

from pyspark.sql.functions import col, column, expr
# df.select(col("DEST_COUNTRY_NAME"), column("ORIGIN_COUNTRY_NAME"), expr("ORIGIN_COUNTRY_NAME")).limit(1).show()

df\
    .select("DEST_COUNTRY_NAME", 
          expr("ORIGIN_COUNTRY_NAME as ORIGIN").alias("ORIGIN"),
          expr("count * 10 as CNT"))\
    .show(1)

# even simpler
df\
    .selectExpr("DEST_COUNTRY_NAME", 
          "ORIGIN_COUNTRY_NAME as ORIGIN",
          "count * 10 as CNT")\
    .show(1)

+-----------------+-------+---+
|DEST_COUNTRY_NAME| ORIGIN|CNT|
+-----------------+-------+---+
|    United States|Romania|150|
+-----------------+-------+---+

+-----------------+-------+---+
|DEST_COUNTRY_NAME| ORIGIN|CNT|
+-----------------+-------+---+
|    United States|Romania|150|
+-----------------+-------+---+
only showing top 1 row

+-----------------+-------+---+
|DEST_COUNTRY_NAME| ORIGIN|CNT|
+-----------------+-------+---+
|    United States|Romania|150|
+-----------------+-------+---+
only showing top 1 row



In [89]:
# we can use any valid non-aggregating SQL statement, and as long as the columns resolve, it will be valid!
df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
.show(1)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
+-----------------+-------------------+-----+-------------+
only showing top 1 row



In [91]:
# in SQL
# SELECT avg(count), count(distinct(DEST_COUNTRY_NAME)) FROM dfTable LIMIT 1

df\
    .selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))")\
    .show(1)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [98]:
# spark literals

from pyspark.sql.functions import lit

df\
    .select(expr("*"),
           lit(1).alias("One"))\
    .show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



**There’s also a more formal way of adding a new column to a DataFrame**

In [109]:
from pyspark.sql.functions import col

df\
    .withColumn("xz", col("count") * 10)\
    .show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| xz|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|150|
|    United States|            Croatia|    1| 10|
+-----------------+-------------------+-----+---+
only showing top 2 rows



In [110]:
# rename a column

df\
    .withColumnRenamed("count", "xz")\
    .show(2)

+-----------------+-------------------+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| xz|
+-----------------+-------------------+---+
|    United States|            Romania| 15|
|    United States|            Croatia|  1|
+-----------------+-------------------+---+
only showing top 2 rows



In [114]:
df\
    .dropna()\
    .drop("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME")\
    .show(2)

+-----+
|count|
+-----+
|   15|
|    1|
+-----+
only showing top 2 rows



**Changing a Column’s Type (cast)**

In [117]:
from pyspark.sql.functions import col

df\
    .withColumn("count-long", col("count").cast("long"))\
    .show(2)

+-----------------+-------------------+-----+----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|count-long|
+-----------------+-------------------+-----+----------+
|    United States|            Romania|   15|        15|
|    United States|            Croatia|    1|         1|
+-----------------+-------------------+-----+----------+
only showing top 2 rows



**Filtering Rows**

In [120]:
# filter is equivalent to where
df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)

df.where(df.ORIGIN_COUNTRY_NAME == "Croatia").show(2)
df.where("ORIGIN_COUNTRY_NAME == 'Croatia'").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          

In [122]:
# where chaining
# Spark automatically performs all filtering operations at the same time regardless of the filter ordering

df\
    .where(df.ORIGIN_COUNTRY_NAME == "Croatia")\
    .where("count == 1")\
    .show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+



**Getting Unique Rows**

In [125]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()


256

In [126]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().show(2)

+-------------------+-----------------+
|ORIGIN_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-------------------+-----------------+
|            Romania|    United States|
|            Croatia|    United States|
+-------------------+-----------------+
only showing top 2 rows



**Sampling**

In [148]:
seed = 5
withReplacement = True
fraction = 0.01
df.sample(withReplacement, fraction, seed).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|       Costa Rica|      United States|  588|
|           Latvia|      United States|   19|
|        Venezuela|      United States|  290|
|    United States|         Martinique|   43|
|    United States|        Switzerland|  305|
|    United States|Trinidad and Tobago|  217|
+-----------------+-------------------+-----+



**Random Splits**

In [153]:
seed = 5
dataFrames = df.randomSplit([0.25, 0.75], seed)
print(dataFrames[0].count())
print(dataFrames[1].count())

60
196


### Concatenating and Appending Rows (Union)

In [160]:
#df.printSchema()
schema = df.schema

rows = [
    Row("xz1", "xz2", 1),
    Row("xz3", "xz4", 2),
]
rdd = spark.sparkContext.parallelize(rows)
#newDf = spark.createDataFrame(rdd, schema)
newDf = rdd.toDF(schema)

df\
    .union(newDf)\
    .where(df.DEST_COUNTRY_NAME.isin("xz1", "xz3"))\
    .show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|              xz1|                xz2|    1|
|              xz3|                xz4|    2|
+-----------------+-------------------+-----+



### Sorting Rows

In [9]:
# sort is equivalent to orderBy
from pyspark.sql.functions import expr, col

df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
df.sort(df.DEST_COUNTRY_NAME.asc()).show(2)
df.sort(df.DEST_COUNTRY_NAME.asc_nulls_first()).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Algeria|      United States|    4|
|           Angola|      United States|   15|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|

### Limit

In [10]:
df.limit(2).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+



### Repartition and Coalesce

In [11]:
df.rdd.getNumPartitions()

1

In [15]:
xz = df.repartition(5)
xz.rdd.getNumPartitions()

5

In [21]:
xz = df.repartition(col("DEST_COUNTRY_NAME"))
xz.rdd.getNumPartitions()

200

In [22]:
xz.where(xz.DEST_COUNTRY_NAME == "Russia").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Russia|      United States|  176|
+-----------------+-------------------+-----+



In [23]:
xz = df.repartition(5, col("DEST_COUNTRY_NAME"))
xz.rdd.getNumPartitions()

5

In [24]:
xz.where(xz.DEST_COUNTRY_NAME == "Russia").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Russia|      United States|  176|
+-----------------+-------------------+-----+



#### Coalesce, on the other hand, will not incur a full shuffle and will try to combine partitions. This
operation will shuffle your data into five partitions based on the destination country name, and
then coalesce them (without a full shuffle):

In [25]:
xz = df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
xz.rdd.getNumPartitions()

2

In [27]:
xz.where(xz.DEST_COUNTRY_NAME == "Russia").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Russia|      United States|  176|
+-----------------+-------------------+-----+



### Collecting Rows to the Driver (Ahtung! Any collection of data to the driver can be a very expensive operation!)

In [30]:
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India         

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [33]:
collectDF.toLocalIterator()

<itertools.chain at 0x7f003637c310>