# Spark DataFrame Assignments

In [1]:
import itertools as it
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os

# plotting options
%matplotlib inline
np.set_printoptions(linewidth=250)
plt.rc('font'  , size=18)
plt.rc('figure', figsize=(10, 8))
plt.rc('axes'  , labelsize=22)
plt.rc('legend', fontsize=16)

np.set_printoptions(precision=3)
plt.rc('figure', figsize=(10, 8))

In [2]:
os.chdir('%s/courses/coursera_bigdata/course3/week5' % os.getenv('DST'))
pwd = os.getcwd()
print(pwd)

/home/cloudera/Development/dst/courses/coursera_bigdata/course3/week5


## setup PySpark

In [3]:
import os
spark_home = os.environ.get('SPARK_HOME', None)
spark_home

'/usr/lib/spark'

In [4]:
from pyspark import SparkContext, SparkConf, SQLContext

from pyspark import SparkContext, SparkConf, SQLContext, HiveContext

myConf = SparkConf().setAppName('TestApp')\
                    .set('spark.executor.memory', '2G')\
                    .set('spark.hadoop.validateOutputSpecs', 'false')

sc      = SparkContext(conf=myConf)
sql_ctx = HiveContext(sc)

In [5]:
sql_ctx.createDataFrame([("somekey", 1)])

DataFrame[_1: string, _2: bigint]

## Slides walkthrough

In [6]:
text_RDD = sc.textFile('file:%s/testfile1.txt'%pwd)
text_RDD.collect()

[u'A long time ago in a galaxy far far away']

In [7]:
def split_words(line):
    return line.split()

def create_pair(word):
    return (word, 1)

pairs_RDD = text_RDD.flatMap(split_words).map(create_pair)
pairs_RDD.collect()

[(u'A', 1),
 (u'long', 1),
 (u'time', 1),
 (u'ago', 1),
 (u'in', 1),
 (u'a', 1),
 (u'galaxy', 1),
 (u'far', 1),
 (u'far', 1),
 (u'away', 1)]

In [8]:
students = sc.parallelize([
    [100, 'Ryan', 8.5, 'computer science'],
    [101, 'Bob' , 7.1, 'engineering'     ],
    [101, 'Carl', 6.2, 'engineering'     ]
])
students.collect()

[[100, 'Ryan', 8.5, 'computer science'],
 [101, 'Bob', 7.1, 'engineering'],
 [101, 'Carl', 6.2, 'engineering']]

In [9]:
def extract_grade(row):
    return row[2]

students.map(extract_grade).mean()

7.266666666666667

In [10]:
def extract_degree_grade(row):
    return (row[3], row[2])

students.map(extract_degree_grade).collect()

[('computer science', 8.5), ('engineering', 7.1), ('engineering', 6.2)]

In [11]:
degree_grade_RDD = students.map(extract_degree_grade)
degree_grade_RDD.reduceByKey(max).collect()

[('engineering', 7.1), ('computer science', 8.5)]

In [12]:
students_df = sql_ctx.createDataFrame(students, ['id', 'name', 'grade', 'degree'])
students_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- grade: double (nullable = true)
 |-- degree: string (nullable = true)



In [13]:
students_df.agg({'grade': 'mean'}).collect()

[Row(AVG(grade#4)=7.266666666666667)]

In [14]:
students_df.groupBy('degree').max('grade').collect()

[Row(degree=u'engineering', MAX(grade#4)=7.1),
 Row(degree=u'computer science', MAX(grade#4)=8.5)]

In [15]:
students_df.groupBy('degree').max('grade').show()

degree           MAX(grade#4)
engineering      7.1         
computer science 8.5         


#### creating dataframes

In [16]:
from pyspark.sql.types import *

schema = StructType([
    StructField('id'    , LongType()  , True),
    StructField('name'  , StringType(), True),
    StructField('grade' , DoubleType(), True),
    StructField('degree', StringType(), True),
])
schema

StructType(List(StructField(id,LongType,true),StructField(name,StringType,true),StructField(grade,DoubleType,true),StructField(degree,StringType,true)))

In [17]:
students_df = sql_ctx.createDataFrame(students, schema)
students_df.show()

id  name grade degree          
100 Ryan 8.5   computer science
101 Bob  7.1   engineering     
101 Carl 6.2   engineering     


In [18]:
students_json = """\
{"id":100, "name":"Alice", "grade":8.5, "degree":"Computer Science"}
{"id":101, "name":"Bob", "grade":7.1, "degree":"Engineering"}
"""
students_json

'{"id":100, "name":"Alice", "grade":8.5, "degree":"Computer Science"}\n{"id":101, "name":"Bob", "grade":7.1, "degree":"Engineering"}\n'

In [19]:
import string
with open("students.json", 'w+') as f:
    f.write(students_json)

In [22]:
sql_ctx.jsonFile('file:%s/students.json'%pwd).show()

degree           grade id  name 
Computer Science 8.5   100 Alice
Engineering      7.1   101 Bob  


In [23]:
yelp_df = sql_ctx.load(
    source      ='com.databricks.spark.csv',
    header      = 'true',
    inferSchema = 'true',
    path        = 'file:%s/index_data.csv'%pwd
)

In [24]:
yelp_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- type: string (nullable = true)
 |-- useful: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- full_address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- neighborhoods: string (nullable = true)
 |-- open: string (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- state: string (nullable = true)



In [25]:
yelp_df.count()

1000L

In [26]:
yelp_df.useful

Column<useful>

In [27]:
yelp_df['useful']

Column<useful>

In [28]:
yelp_df.select('useful')

DataFrame[useful: int]

### filtering

In [29]:
yelp_df.filter(yelp_df.useful >= 1).count()

601L

In [30]:
yelp_df.filter(yelp_df['useful'] >= 1).count()

601L

In [31]:
yelp_df.filter('useful >= 1').count()

601L

In [32]:
try:
    yelp_df['useful'].agg({'useful':'max'}).collect()
except Exception, e:
    print type(e), e

<type 'exceptions.AttributeError'> 'Column' object has no attribute 'agg'


In [33]:
yelp_df.select('useful')

DataFrame[useful: int]

In [34]:
yelp_df.select('useful').agg({'useful': 'max'}).collect()

[Row(MAX(useful#45)=28)]

### manipulation

In [35]:
yelp_df.select('id', 'useful').take(5)

[Row(id=u'fWKvX83p0-ka4JS3dc6E5A', useful=5),
 Row(id=u'IjZ33sJrzXqU-0X6U8NwyA', useful=0),
 Row(id=u'IESLBzqUCLdSzSqm0eCSxQ', useful=1),
 Row(id=u'G-WvGaISbqqaMHlNnByodA', useful=2),
 Row(id=u'1uJFq2r5QfJG_6ExMRCaGw', useful=0)]

In [36]:
yelp_df.select('id', yelp_df.useful/28*100).show(5)

id                   ((useful / 28) * 100)
fWKvX83p0-ka4JS3d... 17.857142857142858   
IjZ33sJrzXqU-0X6U... 0.0                  
IESLBzqUCLdSzSqm0... 3.571428571428571    
G-WvGaISbqqaMHlNn... 7.142857142857142    
1uJFq2r5QfJG_6ExM... 0.0                  


In [37]:
yelp_df.select('id', (yelp_df.useful/28*100).cast('int')).show(5)

id                   CAST(((useful / 28) * 100), IntegerType)
fWKvX83p0-ka4JS3d... 17                                      
IjZ33sJrzXqU-0X6U... 0                                       
IESLBzqUCLdSzSqm0... 3                                       
G-WvGaISbqqaMHlNn... 7                                       
1uJFq2r5QfJG_6ExM... 0                                       


In [38]:
yelp_df.select('id', (yelp_df.useful/28*100).cast('int').alias('useful_scaled')).show(5)

id                   useful_scaled
fWKvX83p0-ka4JS3d... 17           
IjZ33sJrzXqU-0X6U... 0            
IESLBzqUCLdSzSqm0... 3            
G-WvGaISbqqaMHlNn... 7            
1uJFq2r5QfJG_6ExM... 0            


In [39]:
useful_perc_data = yelp_df.select(
    yelp_df['id'].alias('uid'), 
    (yelp_df.useful/28*100).cast('int').alias('useful_scaled')
)
useful_perc_data.printSchema()

root
 |-- uid: string (nullable = true)
 |-- useful_scaled: integer (nullable = true)



### ordering by column

In [40]:
from pyspark.sql.functions import asc, desc

In [41]:
useful_perc_data = yelp_df.select(
    yelp_df['id'].alias('uid'), 
    (yelp_df.useful/28*100).cast('int').alias('useful_scaled')
).orderBy(desc('useful_scaled'))

useful_perc_data.printSchema()

root
 |-- uid: string (nullable = true)
 |-- useful_scaled: integer (nullable = true)



In [42]:
useful_perc_data.show(5)

uid                  useful_scaled
RqwFPp_qPu-1h87pG... 100          
YAXPKM-Hck6-mjF74... 82           
WRBYytJAaJI1BTQG5... 71           
sA_wkvAZpt4Hm6AXG... 71           
roMeHsyf55-_O7rpu... 67           


### Joins

In [43]:
joined_df = useful_perc_data.join(
    yelp_df,
    yelp_df.id==useful_perc_data.uid,
    'inner'
).select(useful_perc_data.uid, 'useful_scaled', 'review_count')

In [44]:
joined_df.printSchema()

root
 |-- uid: string (nullable = true)
 |-- useful_scaled: integer (nullable = true)
 |-- review_count: integer (nullable = true)



In [45]:
joined_df.show(5)

uid                  useful_scaled review_count
WRBYytJAaJI1BTQG5... 71            362         
GXj4PNAi095-q9ynP... 3             76          
1sn0-eY_d1Dhr6Q2u... 0             9           
MtFe-FuiOmo0vlo16... 0             7           
EMYmuTlyeNBy5QB9P... 7             19          


In [46]:
joined_df = useful_perc_data.join(
    yelp_df,
    yelp_df.id==useful_perc_data.uid,
    'inner'
).cache().select(useful_perc_data.uid, 'useful_scaled', 'review_count').show(5)

uid                  useful_scaled review_count
WRBYytJAaJI1BTQG5... 71            362         
GXj4PNAi095-q9ynP... 3             76          
1sn0-eY_d1Dhr6Q2u... 0             9           
MtFe-FuiOmo0vlo16... 0             7           
EMYmuTlyeNBy5QB9P... 7             19          


In [47]:
joined_df = useful_perc_data.join(
    yelp_df,
    yelp_df.id==useful_perc_data.uid,
    'inner'
).cache().select(
    useful_perc_data.uid, 
    'useful_scaled', 
    'review_count'
).show(5)

uid                  useful_scaled review_count
WRBYytJAaJI1BTQG5... 71            362         
GXj4PNAi095-q9ynP... 3             76          
1sn0-eY_d1Dhr6Q2u... 0             9           
MtFe-FuiOmo0vlo16... 0             7           
EMYmuTlyeNBy5QB9P... 7             19          


### server logs

In [48]:
sql_ctx._jsc.hadoopConfiguration().set('textinputformat.record.delimiter', '\r\n')

In [49]:
logs_df = sql_ctx.load(
    source      = 'com.databricks.spark.csv',
    header      = 'true',
    inferSchema = 'true',
    path        = 'file:%s/logs.csv'%pwd
)
logs_df.printSchema()

root
 |-- code: integer (nullable = true)
 |-- protocol: string (nullable = true)
 |-- request: string (nullable = true)
 |-- app: string (nullable = true)
 |-- user_agent_major: integer (nullable = true)
 |-- region_code: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- id: string (nullable = true)
 |-- city: string (nullable = true)
 |-- subapp: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- method: string (nullable = true)
 |-- client_ip: string (nullable = true)
 |-- user_agent_family: string (nullable = true)
 |-- bytes: integer (nullable = true)
 |-- referer: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- url: string (nullable = true)
 |-- os_major: integer (nullable = true)
 |-- longitude: double (nullable = true)
 |-- device_family: string (nullable = true)
 |-- record: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- time: string (nullable 

In [50]:
logs_df.count()

9410L

In [51]:
logs_df.show(5)

code protocol request              app       user_agent_major region_code country_code id                   city      subapp latitude           method client_ip       user_agent_family bytes referer country_name extension url                  os_major longitude          device_family record               user_agent           time                 os_family country_code3
200  HTTP/1.1 GET /metastore/ta... metastore null             00          SG           8836e6ce-9a21-449... Singapore table  1.2931000000000097 GET    128.199.234.236 Other             1041  -       Singapore              /metastore/table/... null     103.85579999999999 Other         demo.gethue.com:8... Mozilla/5.0 (comp... 2014-05-04T06:35:49Z Other     SGP          
200  HTTP/1.1 GET /metastore/ta... metastore null             00          SG           6ddf6e38-7b83-423... Singapore table  1.2931000000000097 GET    128.199.234.236 Other             1041  -       Singapore              /metastore/table/... null     103.

In [52]:
logs_df.groupBy('code').count().show()

code count
500  2    
301  71   
302  1943 
502  6    
304  117  
400  1    
200  7235 
401  10   
404  11   
408  14   


In [53]:
logs_df.groupBy('code').count().orderBy(desc('count')).show()

code count
200  7235 
302  1943 
304  117  
301  71   
408  14   
404  11   
401  10   
502  6    
500  2    
400  1    


In [54]:
logs_df.groupBy('code').avg('bytes').show()

code AVG(bytes#406)    
500  4684.5            
301  424.61971830985914
302  415.6510550694802 
502  581.0             
304  185.26495726495727
400  0.0               
200  41750.03759502419 
401  12472.8           
404  17872.454545454544
408  440.57142857142856


In [55]:
import pyspark.sql.functions as F

logs_df.groupBy('code').agg(
    logs_df.code,
    F.avg(logs_df.bytes),
    F.min(logs_df.bytes),
    F.max(logs_df.bytes)
).show()

code AVG(bytes#406)     MIN(bytes#406) MAX(bytes#406)
500  4684.5             422            8947          
301  424.61971830985914 331            499           
302  415.6510550694802  304            1034          
502  581.0              581            581           
304  185.26495726495727 157            204           
400  0.0                0              0             
200  41750.03759502419  0              9045352       
401  12472.8            8318           28895         
404  17872.454545454544 7197           23822         
408  440.57142857142856 0              514           


## Quiz 1

### question 1

In [56]:
sql_ctx._jsc.hadoopConfiguration().set('textinputformat.record.delimiter', '\n')

In [57]:
yelp_df = sql_ctx.load(
    source      ='com.databricks.spark.csv',
    header      = 'true',
    inferSchema = 'true',
    path        = 'file:%s/index_data.csv'%pwd
)

In [58]:
yelp_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- type: string (nullable = true)
 |-- useful: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- full_address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- neighborhoods: string (nullable = true)
 |-- open: string (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- state: string (nullable = true)



In [59]:
yelp_df.show(5)

business_id          cool date       funny id                   stars text                 type     useful user_id              name               full_address         latitude      longitude      neighborhoods open review_count state
9yKzy9PApeiPPOUJE... 2    2011-01-26 0     fWKvX83p0-ka4JS3d... 4     My wife took me h... business 5      rLtl8ZkDX5vH5nAx9... Morning Glory Cafe 6106 S 32nd St Ph... 33.3907928467 -112.012504578 []            True 116          AZ   
ZRJwVLyzEJq1VAihD... 0    2011-07-27 0     IjZ33sJrzXqU-0X6U... 4     I have no idea wh... business 0      0a2KyEL0d3Yb1V6ai... Spinato's Pizzeria 4848 E Chandler B... 33.305606842  -111.978759766 []            True 102          AZ   
6oRAC4uyJCsJl1X0W... 0    2012-06-14 0     IESLBzqUCLdSzSqm0... 4     love the gyro pla... business 1      0hT2KtfLiobPvh6cD... Haji-Baba          1513 E  Apache Bl... 33.4143447876 -111.913032532 []            True 265          AZ   
_1QQZuf4zZOyFCvXc... 1    2010-05-27 0     G-WvGaISbqqaMHlNn

In [60]:
yelp_df.select('cool').agg({'cool':'mean'}).show()

AVG(cool#472)
0.998        


### Question 2

In [61]:
yelp_df.filter('review_count>=10').groupBy('stars').mean('cool').show()

stars AVG(cool#472)     
2     0.5217391304347826
3     1.0817610062893082
4     1.0675944333996024
5     2.2222222222222223


### Question 3

In [62]:
yelp_df.filter('review_count>=10 and open="True" and stars=5').agg({'cool':'mean'}).show()

AVG(cool#472)
2.25         


In [63]:
yelp_df.filter('review_count>10 and open="True"').groupBy('stars').mean('cool').show()

stars AVG(cool#472)     
2     0.6               
3     1.0456140350877192
4     1.0759219088937093
5     2.2857142857142856


### Question 4

In [64]:
yelp_df.filter('review_count>=10 and open="True"')\
.groupBy('state')\
.sum('review_count')\
.orderBy(desc('sum(review_count)'))\
.show()

Py4JJavaError: An error occurred while calling o455.sort.
: org.apache.spark.sql.AnalysisException: cannot resolve 'sum(review_count)' given input columns state, SUM(review_count#487);
	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:48)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:45)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:250)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:250)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:50)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:249)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:263)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at scala.collection.AbstractIterator.to(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenUp(TreeNode.scala:292)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:247)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:103)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:117)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.immutable.List.foreach(List.scala:318)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:116)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at scala.collection.AbstractIterator.to(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:121)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3.apply(CheckAnalysis.scala:45)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3.apply(CheckAnalysis.scala:43)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:88)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.apply(CheckAnalysis.scala:43)
	at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1069)
	at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
	at org.apache.spark.sql.DataFrame.logicalPlanToDataFrame(DataFrame.scala:157)
	at org.apache.spark.sql.DataFrame.sort(DataFrame.scala:403)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)


In [73]:
import pyspark.sql.functions as F

yelp_df.filter('review_count>=10 and open="True"')\
.groupBy('state')\
.agg(F.sum('review_count').alias('max_review_count'))\
.orderBy(desc('max_review_count'))\
.show()

max_review_count
72214           
8394            
6720            
5764            
4081            
3470            
2125            
1876            
1778            
525             
429             


### Question 5

In [74]:
import pyspark.sql.functions as F

yelp_df.groupBy('business_id').agg(
    F.count(yelp_df.business_id).alias('venue_count'),
).orderBy(desc('venue_count')).show(5)

venue_count
6          
6          
4          
4          
4          


## Lesson 2 walkthrough

In [75]:
yelp_df = sql_ctx.load(
    source      ='com.databricks.spark.csv',
    header      = 'true',
    inferSchema = 'true',
    path        = 'file:%s/index_data.csv'%pwd
)
yelp_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- type: string (nullable = true)
 |-- useful: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- full_address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- neighborhoods: string (nullable = true)
 |-- open: string (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- state: string (nullable = true)



#### create a temporary table

In [76]:
yelp_df.registerTempTable('yelp')

In [77]:
filtered_yelp = sql_ctx.sql("""\
select * 
from yelp
where useful >= 1
""")
filtered_yelp

DataFrame[business_id: string, cool: int, date: string, funny: int, id: string, stars: int, text: string, type: string, useful: int, user_id: string, name: string, full_address: string, latitude: double, longitude: double, neighborhoods: string, open: string, review_count: int, state: string]

In [78]:
yelp_df.filter(yelp_df.useful>=1).count(), filtered_yelp.count()

(601L, 601L)

#### aggregation

In [79]:
sql_ctx.sql("""\
select 
    max(useful) as max_useful
from yelp
""")

DataFrame[max_useful: int]

In [80]:
yelp_df.agg({'useful':'max'}).show()

MAX(useful#568)
28             


#### joins

In [81]:
useful_perc_data.join(
    yelp_df,
    yelp_df.id==useful_perc_data.uid,
    'inner'    
).select(
    useful_perc_data.uid,
    'useful_scaled',
    'review_count'
)

DataFrame[uid: string, useful_scaled: int, review_count: int]

In [82]:
useful_perc_data.registerTempTable('useful_perc_data')

In [83]:
useful_perc_data.printSchema()

root
 |-- uid: string (nullable = true)
 |-- useful_scaled: integer (nullable = true)



In [84]:
sql_ctx.sql("""\
select
    useful_perc_data.uid
  , useful_scaled
  , review_count
from useful_perc_data
    inner join yelp on useful_perc_data.uid=yelp.id
"""
)

DataFrame[uid: string, useful_scaled: int, review_count: int]

#### HIVE tables

In [85]:
customers_df = sql_ctx.sql('select * from customers')
customers_df.show(5)

customer_id customer_fname customer_lname customer_email customer_password customer_street      customer_city customer_state customer_zipcode
1           Richard        Hernandez      XXXXXXXXX      XXXXXXXXX         6303 Heather Plaza   Brownsville   TX             78521           
2           Mary           Barrett        XXXXXXXXX      XXXXXXXXX         9526 Noble Embers... Littleton     CO             80126           
3           Ann            Smith          XXXXXXXXX      XXXXXXXXX         3422 Blue Pioneer... Caguas        PR             00725           
4           Mary           Jones          XXXXXXXXX      XXXXXXXXX         8324 Little Common   San Marcos    CA             92069           
5           Robert         Hudson         XXXXXXXXX      XXXXXXXXX         10 Crystal River ... Caguas        PR             00725           


In [86]:
customers_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zipcode: string (nullable = true)



In [91]:
sql_ctx.sql("""\
select 
    c.category_name
  , count(order_item_quantity) as item_count
from order_items oi
  inner join products p on oi.order_item_product_id=p.product_id
  inner join categories c on c.category_id=p.product_category_id
group by c.category_name
order by item_count desc
limit 10
""").show()

category_name        item_count
Cleats               24551     
Men's Footwear       22246     
Women's Apparel      21035     
Indoor/Outdoor Games 19298     
Fishing              17325     
Water Sports         15540     
Camping & Hiking     13729     
Cardio Equipment     12487     
Shop By Sport        10984     
Electronics          3156      


In [112]:
sql_ctx.sql("""\
select
    p.product_id   as product_id
  , p.product_name as product_name
  , r.revenue      as revenue
from products p
  inner join (
    select 
        oi.order_item_product_id as product_id
      , sum(cast(oi.order_item_subtotal as float)) as revenue
   from order_items oi
      inner join orders o on oi.order_item_order_id=o.order_id
    group by order_item_product_id
  ) r on p.product_id=r.product_id
order by r.revenue desc
limit 10
""").show()

product_id product_name         revenue           
1004       Field & Stream Sp... 6929653.690338135 
365        Perfect Fitness P... 4421143.14352417  
957        Diamondback Women... 4118425.570831299 
191        Nike Men's Free 5... 3667633.196662903 
502        Nike Men's Dri-FI... 3147800.0         
1073       Pelican Sunstream... 3099845.085144043 
403        Nike Men's CJ Eli... 2891757.6622009277
1014       O'Brien Men's Neo... 2888993.91355896  
627        Under Armour Girl... 1269082.6712722778
565        adidas Youth Germ... 67830.0           


In [113]:
yelp_df.saveAsTable('yelp_reviews')

In [114]:
sql_ctx.sql('select * from yelp_reviews').show(5)

business_id          cool date       funny id                   stars text                 type     useful user_id              name               full_address         latitude      longitude      neighborhoods open review_count state
9yKzy9PApeiPPOUJE... 2    2011-01-26 0     fWKvX83p0-ka4JS3d... 4     My wife took me h... business 5      rLtl8ZkDX5vH5nAx9... Morning Glory Cafe 6106 S 32nd St Ph... 33.3907928467 -112.012504578 []            True 116          AZ   
ZRJwVLyzEJq1VAihD... 0    2011-07-27 0     IjZ33sJrzXqU-0X6U... 4     I have no idea wh... business 0      0a2KyEL0d3Yb1V6ai... Spinato's Pizzeria 4848 E Chandler B... 33.305606842  -111.978759766 []            True 102          AZ   
6oRAC4uyJCsJl1X0W... 0    2012-06-14 0     IESLBzqUCLdSzSqm0... 4     love the gyro pla... business 1      0hT2KtfLiobPvh6cD... Haji-Baba          1513 E  Apache Bl... 33.4143447876 -111.913032532 []            True 265          AZ   
_1QQZuf4zZOyFCvXc... 1    2010-05-27 0     G-WvGaISbqqaMHlNn

## Quiz 2

##### question 1

In [120]:
sql_ctx.sql("""
select
    sum(1) as count
from orders o
where o.order_status='SUSPECTED_FRAUD'
""").show(5)

count
1558 


##### question 2

In [125]:
sql_ctx.sql("""
select * from order_items oi
""").printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_subtotal: float (nullable = true)
 |-- order_item_product_price: float (nullable = true)



In [132]:
sql_ctx.sql("""
select
    sum(cast(order_item_subtotal as float)) as total_amount
from order_items oi
group by order_item_order_id
order by total_amount desc
""").show(5)

total_amount      
3449.9099884033203
2859.8900032043457
2839.9100036621094
2779.8600006103516
2699.899990081787 


##### question 3

In [133]:
sql_ctx.sql("""
select * from order_items oi
""").printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_subtotal: float (nullable = true)
 |-- order_item_product_price: float (nullable = true)



In [134]:
sql_ctx.sql("""
select * from orders o
""").printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: long (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [136]:
sql_ctx.sql("""
select
    avg(oi.order_item_product_price) as avg_price
from order_items oi
  inner join orders o on oi.order_item_order_id=o.order_id
where 1=1
  and o.order_status='COMPLETE'
""").cache().show(5)

avg_price         
133.18070529114834


##### question 4

In [154]:
sql_ctx.sql("""
select
    o.order_customer_id as c_id
  , sum(oi.order_item_subtotal) as total_amount
from order_items oi
  inner join orders o on oi.order_item_order_id=o.order_id
where 1=1
  and o.order_status='COMPLETE'
group by o.order_customer_id
order by total_amount desc
""").show(5)

c_id  total_amount      
9337  6585.330139160156 
3710  6169.4001388549805
10744 5799.500072479248 
749   5759.540145874023 
5411  5174.560092926025 


##### question 5

In [159]:
sql_ctx.sql("""
select
    o.order_id as order_id
  , sum(oi.order_item_subtotal) as total_amount
from order_items oi
  inner join orders o on oi.order_item_order_id=o.order_id
where 1=1
  and o.order_status!='COMPLETE'
group by o.order_id
order by total_amount desc
""").show(5)

order_id total_amount      
68809    2779.8600006103516
68766    2699.899990081787 
68848    2399.959991455078 
68875    2399.949981689453 
68816    2329.939987182617 
