/
nations-parquet-sql-aws-emr.py
71 lines (52 loc) · 1.95 KB
/
nations-parquet-sql-aws-emr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
%pyspark
"""
File name: nations-parquet-sql.py
Author: Jonathan Dawson
Date Created: 6/8/2016
Date Modified: 6/8/2016
Python Version: 2.7
PySpark Version: 1.6.1
Example of loading .parquet formatted files into a in-memory SQLContext table and running SQL queries against it.
This script is configured to run on the AWS EMR Spark service inside a Zeppelin notebook and pull a parquet file from
AWS s3.
"""
import sys
LINE_LENGTH = 200
def print_horizontal():
"""
Simple method to print horizontal line
:return: None
"""
for i in range(LINE_LENGTH):
sys.stdout.write('-')
print("")
try:
from pyspark import SparkContext
from pyspark import SQLContext
print ("Successfully imported Spark Modules -- `SparkContext, SQLContext`")
print_horizontal()
except ImportError as e:
print ("Can not import Spark Modules", e)
sys.exit(1)
sqlContext = SQLContext(sparkContext=sc)
# Loads parquet file located in AWS S3 into RDD Data Frame
parquetFile = sqlContext.read.parquet("s3://jon-parquet-format/nation.plain.parquet")
# Stores the DataFrame into an "in-memory temporary table"
parquetFile.registerTempTable("parquetFile")
# Run standard SQL queries against temporary table
nations_all_sql = sqlContext.sql("SELECT * FROM parquetFile")
# Print the result set
nations_all = nations_all_sql.map(lambda p: "Country: {0:15} Ipsum Comment: {1}".format(p.name, p.comment_col))
print("All Nations and Comments -- `SELECT * FROM parquetFile`")
print_horizontal()
for nation in nations_all.collect():
print(nation)
# Use standard SQL to filter
nations_filtered_sql = sqlContext.sql("SELECT name FROM parquetFile WHERE name LIKE '%UNITED%'")
# Print the result set
nations_filtered = nations_filtered_sql.map(lambda p: "Country: {0:20}".format(p.name))
print_horizontal()
print("Nations Filtered -- `SELECT name FROM parquetFile WHERE name LIKE '%UNITED%'`")
print_horizontal()
for nation in nations_filtered.collect():
print(nation)