Reading a file from S3
==

In [1]:
sc

<pyspark.context.SparkContext at 0x7f2a40042f90>

In [3]:
rdd = sc.textFile("s3://dataforum2016/cards.csv")

In [6]:
for line in rdd.take(3):
    print line

Hearts,5,red
Clubs,A,black
Clubs,3,black


Data preparation
==

In [7]:
from collections import namedtuple
Card = namedtuple('Card', ('suit','rank','color'))

In [13]:
tuple_rdd = rdd.map(lambda line: tuple(line.split(",")))

In [14]:
for tup in tuple_rdd.take(3):
    print tup

(u'Hearts', u'5', u'red')
(u'Clubs', u'A', u'black')
(u'Clubs', u'3', u'black')


In [15]:
tuple_rdd.take(3)

[(u'Hearts', u'5', u'red'),
 (u'Clubs', u'A', u'black'),
 (u'Clubs', u'3', u'black')]

In [16]:
cards_rdd = tuple_rdd.map(lambda t: Card(*t))

In [19]:
cards_rdd.first()

Card(suit=u'Hearts', rank=u'5', color=u'red')

More RDD API
==

In [20]:
filtered_rdd = cards_rdd.filter(lambda card: card.rank != 'A').filter(lambda card: card.rank not in ['J','Q','K'])

In [21]:
filtered_rdd.take(5)

[Card(suit=u'Hearts', rank=u'5', color=u'red'),
 Card(suit=u'Clubs', rank=u'3', color=u'black'),
 Card(suit=u'Diamonds', rank=u'4', color=u'red'),
 Card(suit=u'Spades', rank=u'6', color=u'black'),
 Card(suit=u'Hearts', rank=u'4', color=u'red')]

In [28]:
filtered_rdd.cache()

PythonRDD[41] at RDD at PythonRDD.scala:43

In [26]:
cnt_by_color = filtered_rdd.groupBy(lambda card: card.color).map(lambda g: (g[0], len(g[1])))

In [27]:
cnt_by_color.collect()

[(u'black', 17), (u'red', 18)]

In [29]:
cnt_by_suit = filtered_rdd.groupBy(lambda card: card.suit).map(lambda g: (g[0], len(g[1])))

In [30]:
cnt_by_suit.collect()

[(u'Spades', 9), (u'Clubs', 8), (u'Diamonds', 9), (u'Hearts', 9)]

DataFrame and SQL API
==

In [31]:
df = cards_rdd.toDF()

In [32]:
df.show()

+--------+----+-----+
|    suit|rank|color|
+--------+----+-----+
|  Hearts|   5|  red|
|   Clubs|   A|black|
|   Clubs|   3|black|
|   Clubs|   J|black|
|Diamonds|   J|  red|
|Diamonds|   4|  red|
|   Clubs|   Q|black|
|  Spades|   6|black|
|  Hearts|   4|  red|
|Diamonds|   5|  red|
|   Clubs|   6|black|
|  Hearts|   Q|  red|
|Diamonds|   6|  red|
|Diamonds|   8|  red|
|Diamonds|   9|  red|
|  Hearts|   3|  red|
|  Spades|   Q|black|
|   Clubs|   9|black|
|  Hearts|   9|  red|
|Diamonds|  10|  red|
+--------+----+-----+
only showing top 20 rows



In [33]:
df.printSchema()

root
 |-- suit: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- color: string (nullable = true)



In [35]:
df.filter(df["rank"] == "7").show()

+--------+----+-----+
|    suit|rank|color|
+--------+----+-----+
|  Hearts|   7|  red|
|  Spades|   7|black|
|Diamonds|   7|  red|
+--------+----+-----+



In [36]:
df.registerTempTable("cards")

In [37]:
result_df = sqlContext.sql("SELECT suit, COUNT(*) as total_count FROM cards WHERE rank NOT IN ('J', 'Q', 'K', 'A') GROUP BY suit")

In [38]:
result_df.show()

+--------+-----------+
|    suit|total_count|
+--------+-----------+
|  Hearts|          9|
|   Clubs|          8|
|  Spades|          9|
|Diamonds|          9|
+--------+-----------+

