In [1]:
sc

<pyspark.context.SparkContext at 0xb2052c8c>

# Loading Data in RDD

In [70]:
lines = sc.textFile("input/file1")
lines

input/file1 MapPartitionsRDD[44] at textFile at NativeMethodAccessorImpl.java:-2

In [71]:
lines.collect()

[u'Hello World Bye World']

# Total Word Count

In [72]:
words = lines.flatMap(lambda l:l.split(" "))

In [5]:
words.count()

4

# Problem 1

In [115]:
lines = sc.wholeTextFiles("input")
lines

org.apache.spark.api.java.JavaPairRDD@93ee2

In [116]:
lines.collect()

[(u'file:/home/vagrant/csds-material/input/file2',
  u'Hello Hadoop Goodbye Hadoop\n'),
 (u'file:/home/vagrant/csds-material/input/file1', u'Hello World Bye World\n')]

In [117]:
line = []
for i in range(0, lines.count()):
    line.append(str(lines.collect()[i][1].strip("\n")))
line = sc.parallelize(line)

In [118]:
words = line.flatMap(lambda l:l.split(" "))
result = words.countByValue()
result

defaultdict(<type 'int'>, {'World': 2, 'Bye': 1, 'Hello': 2, 'Goodbye': 1, 'Hadoop': 2})

## Words Count

In [119]:
for key in result.keys():
    print key,":", result[key]

World : 2
Bye : 1
Hello : 2
Goodbye : 1
Hadoop : 2


# Problem 2

In [177]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

In [178]:
sqlContext = SQLContext(sc)

In [179]:
lines = sc.textFile("hive/purchases.txt")
parts = lines.map(lambda l: l.split(","))
purchases = parts.map(lambda p: (p[0], p[1], p[2], float(p[3]), p[4]))

In [180]:
fields = [StructField("timestamp", StringType(), True),
          StructField("location", StringType(), True),
          StructField("category", StringType(), True),
          StructField("price", FloatType(), True),
          StructField("card", StringType(), True),]
schema = StructType(fields)

In [184]:
purchases = sqlContext.createDataFrame(purchases, schema)
purchases.registerTempTable("purchases")

In [233]:
purchases.show()

timestamp           location       category             price  card      
2012-07-20 09:59:00 Corpus Christi CDs                  327.91 Cash      
2012-03-11 17:29:00 Durham         Books                115.09 Discover  
2012-07-31 11:43:00 Rochester      Toys                 332.07 MasterCard
2012-06-18 14:47:00 Garland        Computers            31.99  Visa      
2012-03-27 11:40:00 Tulsa          CDs                  452.18 Discover  
2012-05-31 10:57:00 Pittsburgh     Garden               492.25 Amex      
2012-08-22 14:35:00 Richmond       Consumer Electronics 346.0  Amex      
2012-09-23 16:45:00 Scottsdale     CDs                  21.58  Cash      
2012-10-17 11:29:00 Baton Rouge    Computers            226.26 Cash      
2012-07-03 11:05:00 Virginia Beach Women's Clothing     23.47  Cash      
2012-08-18 09:31:00 Norfolk        Men's Clothing       379.33 Cash      
2012-01-20 14:07:00 Wichita        Video Games          392.43 Cash      
2012-11-27 12:16:00 Saint Paul     Boo

## Answer Questions

### What is the average price of the products that were purchased via Mastercard?

In [272]:
sqlContext.sql("SELECT AVG(price) FROM purchases WHERE card='MasterCard'").show()

c0               
275.0677317417774


 The average price of the products that were purchased via Mastercard is 275.0677317417774

### Which date recorded the highest total sales?

In [265]:
df = sqlContext.sql("SELECT SUBSTRING(timestamp, 1, 10) AS date, price FROM purchases")
df.registerTempTable("df")

In [270]:
sqlContext.sql("SELECT date, SUM(price) AS totalsales FROM df GROUP BY date ORDER BY totalsales DESC").show()

date       totalsales        
2012-03-17 2384.480026245117 
2012-03-15 2144.109969139099 
2012-05-06 2093.20001411438  
2012-03-16 2083.8500175476074
2012-05-25 1990.0700035095215
2012-08-22 1985.4600067138672
2012-04-24 1855.8799743652344
2012-08-24 1833.3300170898438
2012-10-03 1832.3900451660156
2012-08-02 1780.2900085449219
2012-07-31 1778.9300079345703
2012-09-22 1773.219985961914 
2012-10-24 1731.8199768066406
2012-06-09 1721.4900207519531
2012-04-21 1716.75           
2012-08-09 1649.9600219726562
2012-02-10 1630.7099914550781
2012-04-30 1609.6999969482422
2012-10-08 1580.1900024414062
2012-02-19 1568.6400184631348


2012-03-17 recorded the highest total sales

### What is the minimum value of a product under the Computers category?

In [271]:
sqlContext.sql("SELECT MIN(price) FROM purchases WHERE category='Computers'").show()

c0  
0.38


The minimum value of a product under the Computers category is 0.38

### How many distinct categories of products are there?

In [275]:
sqlContext.sql("SELECT COUNT(DISTINCT(category)) FROM purchases").show()

c0
18


There are 18 distinct categories of products

### Which store location had the lowest total sales?

In [276]:
sqlContext.sql("SELECT location, SUM(price) AS totalsales FROM purchases GROUP BY location ORDER BY totalsales ASC").show()

location       totalsales        
Plano          784.9599838256836 
Denver         866.7500038146973 
Spokane        873.0800170898438 
Dallas         1021.6599979400635
Fremont        1025.8199882507324
San Bernardino 1033.759994506836 
Oakland        1077.409999847412 
Long Beach     1081.339988708496 
Seattle        1209.0000190734863
New Orleans    1226.5799991488457
San Antonio    1275.5699920654297
Oklahoma City  1310.6900177001953
Scottsdale     1320.9500102996826
Raleigh        1340.22998046875  
Kansas City    1387.4499999210238
Hialeah        1477.2599925994873
Lexington      1496.4299850463867
Fort Worth     1500.9999955892563
Albuquerque    1509.1999926567078
Lincoln        1526.919994354248 


The store at Plano had the lowest total sales of 784.9599838256836 