In [1]:
data_key = sc.parallelize([('a',4),('b',3),('c',2),('a',8),('d',2),('b',1),('d',3)],4)

In [2]:
data_key.collect()

[('a', 4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3)]

In [4]:
data_key.reduceByKey(lambda x, y: x + y).collect()

[('b', 4), ('c', 2), ('a', 12), ('d', 5)]

In [1]:
stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

In [2]:
swimmersJSON = spark.read.json(stringJSONRDD)

In [3]:
swimmersJSON.collect()

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

In [5]:
swimmersJSON.createOrReplaceTempView("swimmersJSON")

In [6]:
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [8]:
spark.sql("select * from swimmersJSON where age > 20").collect()

[Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

In [9]:
spark.sql("select * from swimmersJSON where age > 20").show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [10]:
swimmersJSON.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



### プログラムからのスキーマの指定

In [11]:
# データ型のインポート
from pyspark.sql.types import *

# カンマ区切りのデータ生成
stringCSVRDD = sc.parallelize([
    (123,'Katie',19,'brown'),
    (234,'Michael',22,'green'),
    (345,'Simone',23,'blue')
])

# スキーマの指定
schema = StructType([
    StructField("id",LongType(),True),
    StructField("name",StringType(),True),
    StructField("age",LongType(),True),
    StructField("eyeColor",StringType(),True)
])

In [12]:
# RDDにスキーマを適用してDataFrameを生成する
swimmers = spark.createDataFrame(stringCSVRDD,schema)
swimmers.createOrReplaceTempView("swimmers")

In [13]:
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



In [14]:
swimmers.select("id","age").filter("age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [16]:
swimmers.select("name","eyeColor").filter("eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



In [23]:
spark.sql("select count(*) from swimmers").show()
spark.sql("select id, age from swimmers where age = 22").show()
spark.sql("select name,eyeColor from swimmers where eyeColor like 'b%'").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



In [58]:
# ファイルパスの設定
flightPerfFilePath = "departuredelays.csv"
airportsFilePath = "airport-codes-na.txt"

# Obtain Airports dataset
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
airports.createOrReplaceTempView("airports")

# Obtain Departure Delays dataset
flightPerf = spark.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")

# Cache the Departure Delays dataset 
flightPerf.cache()

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [59]:
spark.sql("""
select * from airports""")

DataFrame[City: string, State: string, Country: string, IATA: string]

In [61]:
spark.sql("""
select a.City,f.origin,
sum(f.delay) as Delays
from FlightPerformance f
join airports a
on a.IATA = f.origin
where a.State = 'WA'
group by a.City,f.origin
order by sum(f.delay) desc
""").show()

+-------+------+--------+
|   City|origin|  Delays|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+



In [63]:
spark.sql("""
select a.City,f.origin,sum(f.delay) as Delays
from FlightPerformance f
join airports a on a.IATA = f.origin
where a.State = 'WA'
group by a.City,f.origin
order by sum(f.delay) desc
""").show()

+-------+------+--------+
|   City|origin|  Delays|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+

