In [18]:
# http://ec2-54-159-219-64.compute-1.amazonaws.com:8888/notebooks/edmunds_spark_dataframe.ipynb#

ALL MODELS
 - ['errorType',
 'id',
 'makes',
 'makesCount',
 'message',
 'moreInfoUrl',
 'name',
 'niceName',
 'status',
 'time_stamp',
 'years']

ALL YEARS
 - ['id', 'col', 'year_id', 'year', 'styles']

ALL STYLES
 - ['id', 'col', 'trim_id', 'name', 'submodel', 'trim']

ALL SUBMODELS
 - ['id', 'submodel.body', 'submodel.modelName', 'submodel.niceName']

# Edmunds Spark DataFrames

In [19]:
# !sudo pip install pandas

from pyspark.sql.functions import desc, explode, col
from pprint import pprint

In [20]:
# A way to read json files:
df_0 = spark.read.load("s3a://edmundsvehicle/2017/03/06/21/*", format="json")

# Create 3 tables that conform to third normal form:
### MODEL, YEARS, STYLES

In [21]:
# first work with a single json object
MODEL = spark.read.json("s3a://edmundsvehicle/2017/03/06/21/*")

In [22]:
MODEL.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- niceName: string (nullable = true)
 |-- time_stamp: double (nullable = true)
 |-- years: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- styles: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- submodel: struct (nullable = true)
 |    |    |    |    |    |-- body: string (nullable = true)
 |    |    |    |    |    |-- modelName: string (nullable = true)
 |    |    |    |    |    |-- niceName: string (nullable = true)
 |    |    |    |    |-- trim: string (nullable = true)
 |    |    |-- year: long (nullable = true)



In [23]:
# Create First DataFrame.
# Let's test first our query.

# years[0] 
MODEL.selectExpr('id').show()

+------+
|    id|
+------+
|BMW_i3|
+------+



In [24]:
YEARS = MODEL.select(MODEL['id'], explode(MODEL['years']))
YEARS.show()

+------+--------------------+
|    id|                 col|
+------+--------------------+
|BMW_i3|[200460080,Wrappe...|
|BMW_i3|[200725667,Wrappe...|
|BMW_i3|[401612494,Wrappe...|
|BMW_i3|[401690178,Wrappe...|
+------+--------------------+



In [25]:
YEARS.printSchema()

root
 |-- id: string (nullable = true)
 |-- col: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- styles: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- submodel: struct (nullable = true)
 |    |    |    |    |-- body: string (nullable = true)
 |    |    |    |    |-- modelName: string (nullable = true)
 |    |    |    |    |-- niceName: string (nullable = true)
 |    |    |    |-- trim: string (nullable = true)
 |    |-- year: long (nullable = true)



In [26]:
YEARS = YEARS.withColumn("year_id", YEARS['col'].getField("id"))
YEARS = YEARS.withColumn("year", YEARS['col'].getField("year"))
YEARS = YEARS.withColumn("styles", YEARS['col'].getField("styles"))
YEARS.show()

+------+--------------------+---------+----+--------------------+
|    id|                 col|  year_id|year|              styles|
+------+--------------------+---------+----+--------------------+
|BMW_i3|[200460080,Wrappe...|200460080|2014|[[101402984,4dr H...|
|BMW_i3|[200725667,Wrappe...|200725667|2015|[[200725669,4dr H...|
|BMW_i3|[401612494,Wrappe...|401612494|2016|[[401612496,4dr H...|
|BMW_i3|[401690178,Wrappe...|401690178|2017|[[401690179,4dr H...|
+------+--------------------+---------+----+--------------------+



In [27]:
STYLES = YEARS.select(YEARS['id'], explode(YEARS['styles']))
STYLES.show()

+------+--------------------+
|    id|                 col|
+------+--------------------+
|BMW_i3|[101402984,4dr Ha...|
|BMW_i3|[200689763,4dr Ha...|
|BMW_i3|[200725669,4dr Ha...|
|BMW_i3|[200725668,4dr Ha...|
|BMW_i3|[401612496,4dr Ha...|
|BMW_i3|[401612495,4dr Ha...|
|BMW_i3|[401690179,4dr Ha...|
|BMW_i3|[401690196,4dr Ha...|
|BMW_i3|[401690180,60 Ah ...|
+------+--------------------+



In [28]:
STYLES.printSchema()

root
 |-- id: string (nullable = true)
 |-- col: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- submodel: struct (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- modelName: string (nullable = true)
 |    |    |-- niceName: string (nullable = true)
 |    |-- trim: string (nullable = true)



In [29]:
STYLES = STYLES.withColumn("trim_id", STYLES['col'].getField("id"))
STYLES = STYLES.withColumn("name", STYLES['col'].getField("name"))
STYLES = STYLES.withColumn("submodel", STYLES['col'].getField("submodel"))
STYLES = STYLES.withColumn("trim", STYLES['col'].getField("trim"))
STYLES.show()

+------+--------------------+---------+--------------------+--------------------+----------------+
|    id|                 col|  trim_id|                name|            submodel|            trim|
+------+--------------------+---------+--------------------+--------------------+----------------+
|BMW_i3|[101402984,4dr Ha...|101402984|4dr Hatchback w/R...|[Hatchback,i3 Hat...|w/Range Extender|
|BMW_i3|[200689763,4dr Ha...|200689763|4dr Hatchback (el...|[Hatchback,i3 Hat...|            Base|
|BMW_i3|[200725669,4dr Ha...|200725669|4dr Hatchback (el...|[Hatchback,i3 Hat...|            Base|
|BMW_i3|[200725668,4dr Ha...|200725668|4dr Hatchback w/R...|[Hatchback,i3 Hat...|w/Range Extender|
|BMW_i3|[401612496,4dr Ha...|401612496|4dr Hatchback w/R...|[Hatchback,i3 Hat...|w/Range Extender|
|BMW_i3|[401612495,4dr Ha...|401612495|4dr Hatchback (el...|[Hatchback,i3 Hat...|            Base|
|BMW_i3|[401690179,4dr Ha...|401690179|4dr Hatchback w/R...|[Hatchback,i3 Hat...|w/Range Extender|
|BMW_i3|[4

In [30]:
# SUBMODELS = STYLES.select(STYLES['id'], STYLES.submodel.body, STYLES.submodel.modelName, STYLES.submodel.niceName)
# SUBMODELS.show()

In [31]:
# run a SQL query

sqlCtx.registerDataFrameAsTable(MODEL, "MODEL")
sqlCtx.registerDataFrameAsTable(YEARS, "YEARS")
sqlCtx.registerDataFrameAsTable(STYLES, "STYLES")
# sqlCtx.registerDataFrameAsTable(SUBMODELS, "SUBMODELS")

In [32]:
df2 = sqlCtx.sql("""SELECT m.id, y.year from MODEL m
                    LEFT JOIN YEARS y on m.id = y.id""")
df2.collect()
df2.show()

+------+----+
|    id|year|
+------+----+
|BMW_i3|2017|
|BMW_i3|2016|
|BMW_i3|2015|
|BMW_i3|2014|
+------+----+



# Create Tables Using All the Models in the dataset

In [33]:
# new_df = df.select(df['id'], explode(df['years']['years']))

In [34]:
# We read all files:
ALL_MODELS = spark.read.json("s3a://edmundsvehicle/2017/*/*/*/*")

In [35]:
ALL_MODELS.cache()
type(ALL_MODELS)

pyspark.sql.dataframe.DataFrame

In [36]:
# df_all.selectExpr('id',
#                   'niceName',
#                   'time_stamp',
#                   'years[0].id as model_year_id',
#                   'years[0].styles[0].id as style_id',
#                   'years[0].styles[0].name as style',
#                   'years[0].styles[0].submodel.body as body',
#                   'years[0].styles[0].submodel.modelName as modelName',
#                   'years[0].styles[0].trim as trim').show(500)
# df_all.selectExpr('years').show(500)

In [37]:
# convert all to 3rd normal form schema

ALL_YEARS = ALL_MODELS.select(ALL_MODELS['id'], explode(ALL_MODELS['years']))

ALL_YEARS = ALL_YEARS.withColumn("year_id", ALL_YEARS['col'].getField("id"))
ALL_YEARS = ALL_YEARS.withColumn("year", ALL_YEARS['col'].getField("year"))
ALL_YEARS = ALL_YEARS.withColumn("styles", ALL_YEARS['col'].getField("styles"))

ALL_STYLES = ALL_YEARS.select(ALL_YEARS['id'], explode(ALL_YEARS['styles']))

ALL_STYLES = ALL_STYLES.withColumn("trim_id", ALL_STYLES['col'].getField("id"))
ALL_STYLES = ALL_STYLES.withColumn("name", ALL_STYLES['col'].getField("name"))
ALL_STYLES = ALL_STYLES.withColumn("submodel", ALL_STYLES['col'].getField("submodel"))
ALL_STYLES = ALL_STYLES.withColumn("trim", ALL_STYLES['col'].getField("trim"))

ALL_SUBMODELS = ALL_STYLES.select(ALL_STYLES['id'], ALL_STYLES.submodel.body, ALL_STYLES.submodel.modelName)

# register as SQL table
sqlCtx.registerDataFrameAsTable(ALL_MODELS, "ALL_MODELS")
sqlCtx.registerDataFrameAsTable(ALL_YEARS, "ALL_YEARS")
sqlCtx.registerDataFrameAsTable(ALL_STYLES, "ALL_STYLES")
sqlCtx.registerDataFrameAsTable(ALL_SUBMODELS, "ALL_SUBMODELS")

# run a SQL query
test_query = sqlCtx.sql("""SELECT m.id, y.year from ALL_MODELS m
                    LEFT JOIN ALL_YEARS y on m.id = y.id
                    WHERE m.id is not NULL
                    LIMIT 5""")
test_query.collect()
test_query.show()

+------------+----+
|          id|year|
+------------+----+
|BMW_3_Series|2017|
|BMW_3_Series|2016|
|BMW_3_Series|2015|
|BMW_3_Series|2014|
|BMW_3_Series|2013|
+------------+----+



In [38]:
info_query = sqlCtx.sql("""SELECT m.id, y.year from ALL_MODELS m
                    LEFT JOIN ALL_YEARS y on m.id = y.id
                    WHERE m.id is not NULL
                    LIMIT 500""")

In [39]:
info_query.toPandas().tail()

Unnamed: 0,id,year
495,BMW_M3,2001
496,BMW_M3,1999
497,BMW_M3,1998
498,BMW_M3,1997
499,BMW_M3,1996


In [40]:
# query = sqlCtx.sql("""SELECT * from ALL_MODELS m""")
# query.toPandas().head()

In [41]:
# model years

years_query = sqlCtx.sql("""SELECT m.id,
                     m.name,
                     y.year,
                     y.year_id
                    from ALL_MODELS m
                    INNER JOIN ALL_YEARS y on y.id = m.id
                    """)
model_years_df = years_query.toPandas()
model_years_df.tail(2)

Unnamed: 0,id,name,year,year_id
1110,Buick_Estate_Wagon,Estate Wagon,1990,3244
1111,BMW_X7,X7,2019,401702538


In [42]:
# model styles

styles_query = sqlCtx.sql("""SELECT m.id,
                     s.trim,
                     s.name as style_name,
                     submodel.body,
                    submodel.modelName as model_body_name
                    from ALL_MODELS m
                    INNER JOIN ALL_STYLES s on s.id = m.id
                    """)
model_styles_df = styles_query.toPandas()
model_styles_df.tail()

Unnamed: 0,id,trim,style_name,body,model_body_name
4101,BMW_M4_GTS,Base,2dr Coupe (3.0L 6cyl Turbo 7AM),Coupe,M4 GTS Coupe
4102,Audi_Q7_e_tron,Base,4dr SUV AWD (electric),SUV,Q7 e-tron SUV
4103,Audi_RS_6,quattro,4dr Sedan (4.2L 8cyl 5A),Sedan,RS 6 Sedan
4104,Buick_Estate_Wagon,Base,4dr Wagon,Wagon,Estate Wagon
4105,BMW_X7,Base,4dr SUV AWD,SUV,X7 SUV


In [43]:
# info_query = sqlCtx.sql("""SELECT b.id,
#                     m.name,
#                     y.year,
#                     y.year_id,
#                     s.trim,
#                     s.name as style_name,
#                     submodel.body,
#                     submodel.modelName as model_body_name
#                     from ALL_SUBMODELS b
#                     INNER JOIN ALL_STYLES s on s.id = b.id
#                     INNER JOIN ALL_YEARS y on y.id = b.id
#                     INNER JOIN ALL_MODELS m on m.id = b.id""")
# df = info_query.toPandas()
# df
# df.columns

In [44]:
years_query.groupBy(years_query['id']).count().sort(col("count").desc()).show()

+-----------------+-----+
|               id|count|
+-----------------+-----+
|     BMW_3_Series|   28|
|     BMW_7_Series|   28|
|     BMW_5_Series|   27|
|AM_General_Hummer|   27|
|          Audi_A6|   23|
|      Buick_Regal|   23|
|          Audi_A4|   22|
|           BMW_M3|   22|
|          Audi_A8|   22|
|         Acura_TL|   20|
|          Audi_S4|   19|
|           BMW_X5|   18|
|         Acura_RL|   17|
|        Acura_MDX|   17|
|          Audi_TT|   17|
|           BMW_M5|   16|
|    Buick_LeSabre|   16|
|        Acura_NSX|   16|
|    Buick_Century|   16|
|Buick_Park_Avenue|   15|
+-----------------+-----+
only showing top 20 rows



In [45]:
years_query.groupBy(years_query['id']).count().sort(col("count").desc()).toPandas()

Unnamed: 0,id,count
0,BMW_7_Series,28
1,BMW_3_Series,28
2,AM_General_Hummer,27
3,BMW_5_Series,27
4,Audi_A6,23
5,Buick_Regal,23
6,Audi_A8,22
7,BMW_M3,22
8,Audi_A4,22
9,Acura_TL,20


In [46]:
styles_query.groupBy(styles_query['body']).count().sort(col("count").desc()).show()

+-----------+-----+
|       body|count|
+-----------+-----+
|      Sedan| 1805|
|      Coupe|  677|
|        SUV|  665|
|Convertible|  501|
|      Wagon|  278|
|  Hatchback|  163|
|    Minivan|   11|
|  CTS Coupe|    6|
+-----------+-----+



In [48]:
# # from time import localtime, strftime
# # # datetime.datetime.now().strftime("%A, %d. %B %Y %I:%M%p")
# # # from time import gmtime, strftime
# # strftime("%a, %d %b %Y %H:%M:%S +0000", localtime())

# from datetime import datetime, timedelta
# from pytz import timezone
# import pytz
# utc = pytz.utc
# utc.zone

# pacific = timezone('US/Pacific')
# pacific.zone

# amsterdam = timezone('Europe/Amsterdam')
# fmt = '%Y-%m-%d %H:%M:%S %Z%z'

# loc_dt = pacific.localize(datetime.now())
# print loc_dt.strftime(fmt)

# # ams_dt = loc_dt.astimezone(amsterdam)
# # ams_dt.strftime(fmt)

from datetime import datetime
from pytz import timezone    

pacific = timezone('US/Pacific')
pacific_time = datetime.now(pacific)
# print pacific_time.strftime('%Y-%m-%d %H:%M:%S %Z%z')
print pacific_time.strftime('%A, %B, %d %Y %H:%M:%S')

Wednesday, March, 08 2017 19:00:41


In [65]:
# edmunds.py

import sys
import json
import pandas as pd
from pyspark import SparkConf, SparkContext

from datetime import datetime
from pytz import timezone   

# Constants
APP_NAME = "Top 20 Vehicle Models by Years"

# def filter_vehicles(data):
#     try:
#         tweet = json.loads(data)
#         if 'id' in tweet:
#             entities = tweet.get('entities')
#         if entities:
#             hashtags = entities.get('hashtags')
#         if hashtags:
#             return True
#         return False
#     except:
#         return False

# def get_hashtags(data):
#     try:
#         tweet = json.loads(data)
#         if 'id' in tweet:
#             entities = tweet.get('entities')
#         if entities:
#             hashtags = entities.get('hashtags')
#         if hashtags:
#             return [i['text'] for i in hashtags]
#     except:
#         pass


def main():
    try:
#         vehicles = sc.textFile("s3a://justw149/2017/02/*/*/*")
#         # tweets = sc.textFile(filename)
#         clean_tweets = tweets.filter(filter_hashtags)
#         hash_text = clean_tweets.flatMap(get_hashtags)
#         maps = hash_text.map(lambda text: (text,1))
#         hashtags_counts = maps.reduceByKey(lambda a,b: a + b)
#         reverse_hashtags = hashtags_counts.map(lambda x: (x[1], x[0]))
#         sorted_hashtags = reverse_hashtags.sortByKey(ascending=False)
#         # sorted_hashtags.top(20)
#         # take 20
#         top_hashtags = pd.DataFrame(sorted_hashtags.take(20), columns=['hashtag_count', 'hashtag'])

        # chart showing the models that have the most years
        top_models = years_query.groupBy(years_query['id']).count().sort(col("count").desc()).toPandas()
        
        # chart showing counts of body styles in the database
        body_style_counts = styles_query.groupBy(styles_query['body']).count().sort(col("count").desc()).toPandas()
    
        # get the current local time
        pacific = timezone('US/Pacific')
        pacific_time = datetime.now(pacific)
        # print pacific_time.strftime('%Y-%m-%d %H:%M:%S %Z%z')
#         print pacific_time.strftime('%A, %B, %d %Y %H:%M:%S')
    
        
        # export to html
        top_html = body_style_counts.to_html() + top_models.to_html()

        html = "<!DOCTYPE html><html><body>{}</body></html>".format(top_html.encode('utf-8'))
        results = open("topVehicles.html", 'w')
        results.write("Last Updated: "+ pacific_time.strftime('%A, %B, %d %Y %H:%M:%S') + " Pacific Time")
        results.write("<hr>Hi!  This webpage shows counts of body styles in the database and the number of years for each model.<br>It serves as the front end of my Edmunds car data pipeline.  - Justin J. Wang<hr>")
        results.write(html)
        results.close()

#         top_RDD = sc.parallelize(hashtags_counts.top(20))
#         top_RDD.saveAsTextFile('s3a://sparksubmitresultsjw/results')
        
    except Exception as e:
        print(e)


if __name__ == "__main__":
    # Configure spark
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster("local[*]")
#     sc = SparkContext(conf=conf)
    # filename = sys.argv[1]
    # Execute Main functionality
main()

In [123]:
# https://github.com/justwjr/DSCI6007-student/blob/master/5.4%20-%20Spark%20Submit/steps%20for%20LAB.txt

# run this .py file in local, after doing a "reverse scp" after getting .pem file into EMR

# reverse scp, run command on local machine:
# scp -i ~/the_cloud.pem hadoop@ec2-54-159-219-64.compute-1.amazonaws.com:~/topVehicles.html ~/OneDrive/Edmunds-Car-Data-Pipeline-sdk-python
# /Users/justw/OneDrive/Edmunds-Car-Data-Pipeline-sdk-python

# # spark_results_boto.py

# import ssl
# import boto
# from boto.s3.connection import S3Connection

# def boto_upload_s3(html_file):
#     conn = S3Connection()
#     if hasattr(ssl, '_create_unverified_context'):
#         ssl._create_default_https_context = ssl._create_unverified_context
#     website_bucket = conn.get_bucket('edmundssparksubmitresults')
#     output_file = website_bucket.new_key('top_models_spark.html')
#     output_file.content_type = 'text/html'
#     output_file.set_contents_from_string(html_file, policy='public-read')

# if __name__ == '__main__':
#     top = open("topVehicles.html", "r")
#     html_file = top.read()
#     boto_upload_s3(html_file)
# top.close()

Steps

1) Setup EMR: Spark, 2xlarge, or just do m3.xlarge, 3 machines
2) ssh: ssh -i ~/.ssh/the_cloud.pem hadoop@ec2-54-84-250-89.compute-1.amazonaws.com

put URL of master node into web browser
Make sure that the security group for your master node allows connections on that port.
http://ec2-54-84-250-89.compute-1.amazonaws.com:8890

3) copy .py to emr

4) sudo pip install pandas

5) unset PYSPARK_DRIVER_PYTHON

6) create the S3 folder for results

7) spark-submit tweets.py --> top20hashtags.html


reverse scp
8) scp from emr to local (top20hashtags)
run in local:

scp -i ~/the_cloud.pem hadoop@ec2-54-84-250-89.compute-1.amazonaws.com:~/top20hashtags.html ~/OneDrive/…

9) run sparkresultsboto.py in local, which puts top20hashtags.html to s3 bucket


10) enable static website hosting in s3: http://sparksubmitresults.s3-website-us-east-1.amazonaws.com/

http://sparksubmitresultsjw.s3-website-us-west-1.amazonaws.com/


add policy in permissions
{
	"Version": "2008-10-17",
	"Statement": [
		{
			"Sid": "AllowPublicRead",
			"Effect": "Allow",
			"Principal": {
				"AWS": "*"
			},
			"Action": [
				"s3:GetObject"
			],
			"Resource": [
				"arn:aws:s3:::edmundssparksubmitresults/*"
			]
		}
	]
}

https://s3-us-west-1.amazonaws.com/edmundssparksubmitresults/topVehicles.html