### Load Data with PySpark

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

24/01/17 19:35:39 WARN Utils: Your hostname, hongseung-giui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.7 instead (on interface en0)
24/01/17 19:35:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/17 19:35:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
shows = spark.read.json("./data/shows-silicon-valley.json")

shows.show()
shows.printSchema()

+--------------------+--------------------+--------------------+--------+---+--------------------+--------+--------------+--------------------+--------------------+----------+------+-------+-----------------+------+--------------------+--------+----------+--------------------+----------+------+
|           _embedded|              _links|           externals|  genres| id|               image|language|          name|             network|        officialSite| premiered|rating|runtime|         schedule|status|             summary|    type|   updated|                 url|webChannel|weight|
+--------------------+--------------------+--------------------+--------+---+--------------------+--------+--------------+--------------------+--------------------+----------+------+-------+-----------------+------+--------------------+--------+----------+--------------------+----------+------+
|{[{{{http://api.t...|{{http://api.tvma...|{tt2575988, 27716...|[Comedy]|143|{http://static.tv...| English|Silic

## Breaking the second dimension

### Extracting elements from an array

In [3]:
array_subset = shows.select('name','genres')

array_subset.show()

+--------------+--------+
|          name|  genres|
+--------------+--------+
|Silicon Valley|[Comedy]|
+--------------+--------+



In [4]:
array_subset = array_subset.select(
    "name", 
    array_subset.genres[0].alias("dot_and_index"),
    F.col("genres")[0].alias("col_and_index"),
    array_subset.genres.getItem(0).alias("dot_and_method"),
    F.col("genres").getItem(0).alias("col_and_method"))

array_subset.show()

+--------------+-------------+-------------+--------------+--------------+
|          name|dot_and_index|col_and_index|dot_and_method|col_and_method|
+--------------+-------------+-------------+--------------+--------------+
|Silicon Valley|       Comedy|       Comedy|        Comedy|        Comedy|
+--------------+-------------+-------------+--------------+--------------+



### Performing multiple operations on an array column

In [5]:
array_subset_repeated = array_subset.select(
    "name",
    F.lit("Comedy").alias("one"),
    F.lit("Horror").alias("two"),
    F.lit("Drama").alias("three"),
    F.col("dot_and_index")
).select(
    "name",
    F.array("one","two","three").alias("Some_Genres"),
    F.array_repeat("dot_and_index", 5).alias("Repeated_Genres")
)

array_subset_repeated.show()

+--------------+--------------------+--------------------+
|          name|         Some_Genres|     Repeated_Genres|
+--------------+--------------------+--------------------+
|Silicon Valley|[Comedy, Horror, ...|[Comedy, Comedy, ...|
+--------------+--------------------+--------------------+



In [6]:
array_subset_repeated.select("name", F.size("Some_Genres"), F.size("Repeated_Genres")).show()

+--------------+-----------------+---------------------+
|          name|size(Some_Genres)|size(Repeated_Genres)|
+--------------+-----------------+---------------------+
|Silicon Valley|                3|                    5|
+--------------+-----------------+---------------------+



In [7]:
array_subset_repeated.select(
    "name",
    F.array_distinct("Some_Genres"),
    F.array_distinct("Repeated_Genres")
).show()

+--------------+---------------------------+-------------------------------+
|          name|array_distinct(Some_Genres)|array_distinct(Repeated_Genres)|
+--------------+---------------------------+-------------------------------+
|Silicon Valley|       [Comedy, Horror, ...|                       [Comedy]|
+--------------+---------------------------+-------------------------------+



In [8]:
array_subset_repeated = array_subset_repeated.select(
    "name",
    F.array_intersect("Some_Genres","Repeated_Genres").alias("Genres")
)

array_subset_repeated.show()

+--------------+--------+
|          name|  Genres|
+--------------+--------+
|Silicon Valley|[Comedy]|
+--------------+--------+



### Using ‘array_position()’ to search for ‘Genres’ string

In [9]:
array_subset_repeated.select(
    "Genres", F.array_position("Genres","Comedy")
).show()

+--------+------------------------------+
|  Genres|array_position(Genres, Comedy)|
+--------+------------------------------+
|[Comedy]|                             1|
+--------+------------------------------+



### The map type: keys and values within a column

In [10]:
columns = ['name', 'language', 'type']

shows_map = shows.select(
    *[F.lit(column) for column in columns],
    F.array(*columns).alias("values")
)

shows_map = shows_map.select(F.array(*columns).alias("keys"), "values")

shows_map.show(1)

+--------------------+--------------------+
|                keys|              values|
+--------------------+--------------------+
|[name, language, ...|[Silicon Valley, ...|
+--------------------+--------------------+



In [11]:
shows_map = shows_map.select(
    F.map_from_arrays("keys", "values").alias("mapped")
)

shows_map.printSchema()

root
 |-- mapped: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [12]:
shows_map.show(1, False)

+---------------------------------------------------------------+
|mapped                                                         |
+---------------------------------------------------------------+
|{name -> Silicon Valley, language -> English, type -> Scripted}|
+---------------------------------------------------------------+



In [13]:
shows_map.select(
    F.col("mapped.name"),
    F.col("mapped")["name"],
    shows_map.mapped["name"]
).show()

+--------------+--------------+--------------+
|          name|  mapped[name]|  mapped[name]|
+--------------+--------------+--------------+
|Silicon Valley|Silicon Valley|Silicon Valley|
+--------------+--------------+--------------+



## Nesting columns within columns 

In [14]:
shows.select('schedule').printSchema()

root
 |-- schedule: struct (nullable = true)
 |    |-- days: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- time: string (nullable = true)



In [15]:
shows_clean = shows.select(F.col('schedule.days'))
shows_clean.printSchema()

root
 |-- days: array (nullable = true)
 |    |-- element: string (containsNull = true)



## Building and using the data frame schema

In [27]:
import pyspark.sql.types as T

schema = T.StructType([ 
    T.StructField("name", T.StringType()), 
    T.StructField("age", T.IntegerType()), 
    T.StructField("city", T.StringType()) 
]) 
  
data = [("Hong", 25, "Seoul"), 
        ("Lee", 30, "Incheon")]

df = spark.createDataFrame(data, schema) 

df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)



24/01/17 20:27:13 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 353498 ms exceeds timeout 120000 ms
24/01/17 20:27:13 WARN SparkContext: Killing executors is not supported by current scheduler.
24/01/17 20:27:19 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$