# Problem 1

In [1]:
files = sc.wholeTextFiles("input/")

In [2]:
files.collect()

[(u'file:/home/vagrant/csds-material/input/file2',
  u'Hello Hadoop Goodbye Hadoop\n'),
 (u'file:/home/vagrant/csds-material/input/file1', u'Hello World Bye World\n')]

In [3]:
words = files.flatMapValues(lambda l: l.strip().split(" "))
counts = words.mapValues(lambda w: (w, 1))
results = sc.parallelize(counts.values().collect()).reduceByKey(lambda x, y : x+y)

In [4]:
for res in results.collect():
    print("%s: %d" % (res[0], res[1]))

World: 2
Bye: 1
Hello: 2
Goodbye: 1
Hadoop: 2


# Problem 2

In [5]:
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a tuple.
lines = sc.textFile("hive/purchases.txt")
parts = lines.map(lambda l: l.split(","))
logs = parts.map(lambda p: (p[0], p[1], p[2], float(p[3]), p[4].strip()))

# The schema is encoded in a string.
schemaString = "timestamp location category price card"

fields = [StructField(field_name, StringType(), True) 
          if field_name != "price" else StructField(field_name, FloatType(), True) 
          for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaLogs = sqlContext.createDataFrame(logs, schema)

# Register the DataFrame as a table.
schemaLogs.registerTempTable("purchases")

## What is the average price of the products that were purchased via Mastercard?

In [6]:
res = sqlContext.sql("SELECT AVG(price) as avg_price FROM purchases WHERE card='MasterCard'")
res.collect()

[Row(avg_price=275.0677317417774)]

## Which date recorded the highest total sales?

In [7]:
res = sqlContext.sql(
    "SELECT CAST(CAST(CAST(timestamp AS TIMESTAMP) AS DATE) AS STRING) as date, sum(price) as total "+
    "FROM purchases GROUP BY CAST(CAST(CAST(timestamp AS TIMESTAMP) AS DATE) AS STRING) Order BY total DESC LIMIT 1")
res.collect()

[Row(date=u'2012-03-17', total=2384.480026245117)]

## What is the minimum value of a product under the Computers category?

In [8]:
res = sqlContext.sql("SELECT price FROM purchases WHERE category='Computers' ORDER BY price LIMIT 1")
res.collect()

[Row(price=0.3799999952316284)]

## How many distinct categories of products are there?

In [9]:
res = sqlContext.sql("SELECT COUNT(DISTINCT category) as Num_Category FROM purchases")
res.collect()

[Row(Num_Category=18)]

## Which store location had the lowest total sales?

In [10]:
res = sqlContext.sql("SELECT location, SUM(price) as total FROM purchases GROUP BY location ORDER BY total LIMIT 1")
res.collect()

[Row(location=u'Plano', total=784.9599838256836)]

# Problem 3

In [11]:
purchasesDF = logs.toDF(["timestamp", "location", "category", "price", "card"])
purchasesDF.show(5)

timestamp           location       category  price  card      
2012-07-20 09:59:00 Corpus Christi CDs       327.91 Cash      
2012-03-11 17:29:00 Durham         Books     115.09 Discover  
2012-07-31 11:43:00 Rochester      Toys      332.07 MasterCard
2012-06-18 14:47:00 Garland        Computers 31.99  Visa      
2012-03-27 11:40:00 Tulsa          CDs       452.18 Discover  


In [12]:
from pyspark.sql.functions import *

In [13]:
# SELECT AVG(price) as avg_price FROM purchases WHERE card='MasterCard'
purchasesDF\
    .filter(purchasesDF.card == 'MasterCard')\
    .agg(avg(col('price')))\
    .show()

AVG(price)        
275.06773195876286


In [14]:
# SELECT CAST(CAST(CAST(timestamp AS TIMESTAMP) AS DATE) AS STRING) as date, sum(price) as total 
# FROM purchases GROUP BY CAST(CAST(CAST(timestamp AS TIMESTAMP) AS DATE) AS STRING) Order BY total DESC LIMIT 1

purchasesDF\
    .withColumn('date', purchasesDF['timestamp'].cast('timestamp').cast('date'))\
    .groupBy('date')\
    .agg(col('date'), sum(col('price')).alias('total'))\
    .sort(col('total').desc())\
    .show(1)

date       total             
2012-03-17 2384.4800000000005


In [15]:
# SELECT price FROM purchases WHERE category='Computers' ORDER BY price LIMIT 1
purchasesDF\
    .filter(purchasesDF.category == 'Computers')\
    .sort('price')\
    .select('price')\
    .show(1)

price
0.38 


In [16]:
# SELECT COUNT(DISTINCT category) as Num_Category FROM purchases
purchasesDF\
    .select('category')\
    .distinct()\
    .count()

18L

In [17]:
# SELECT location, SUM(price) as total FROM purchases GROUP BY location ORDER BY total LIMIT 1
purchasesDF\
    .groupBy('location')\
    .agg(col('location'), sum(col('price')).alias('total'))\
    .sort('total')\
    .show(1)

location total 
Plano    784.96
