In [4]:
# Find path to PySpark.
import findspark
findspark.init('/Users/Steve/spark-2.0.2-bin-hadoop2.6')

# Import PySpark and initialize SparkContext object.
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()

#Pass SparkContext object to SQL Context
sqlContext = SQLContext(sc)

In [5]:
#Read data as dataframe
CensusDF = sqlContext.read.json('data\Census_2010.json')

#Register table with SQL Context
CensusDF.registerTempTable('Census_2010')
print (sqlContext.tableNames())

['census_2010']


In [5]:
#Select age column
query = 'SELECT age FROM census_2010'
queryResult = sqlContext.sql(query)
queryResult.show(5)

+---+
|age|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+
only showing top 5 rows



In [7]:
#Select age, males and females columns where 5< age < 15
query = 'SELECT age, males, females FROM census_2010 WHERE age > 5 AND age <15'
queryResult = sqlContext.sql(query)
queryResult.show(5)

+---+-------+-------+
|age|  males|females|
+---+-------+-------+
|  6|2093905|2007781|
|  7|2097080|2010281|
|  8|2101670|2013771|
|  9|2108014|2018603|
| 10|2114217|2023289|
+---+-------+-------+
only showing top 5 rows



In [26]:
#Extract male and female columns with SQL
#Use dataframe methods to print summary statistics

query = 'SELECT males, females FROM census_2010'
queryResult = sqlContext.sql(query)
queryResult.describe('males').show()
queryResult.describe('females').show()

+-------+------------------+
|summary|             males|
+-------+------------------+
|  count|               101|
|   mean|1520095.3168316833|
| stddev|  818587.208016823|
|    min|           1034415|
|    max|            975512|
+-------+------------------+

+-------+-----------------+
|summary|          females|
+-------+-----------------+
|  count|              101|
|   mean|1571460.287128713|
| stddev|748671.0493484351|
|    min|          1018530|
|    max|           973223|
+-------+-----------------+



In [10]:
#Read additional tables into SQL Context
Census2010df = sqlContext.read.json('data\Census_2010.json')
Census2010df.registerTempTable('Census_2010')

Census2000df = sqlContext.read.json('data\Census_2000.json')
Census2000df.registerTempTable('Census_2000')

Census1990df = sqlContext.read.json('data\Census_1990.json')
Census1990df.registerTempTable('Census_1990')

print (sqlContext.tableNames())

['census_1980', 'census_1990', 'census_2000', 'census_2010']


In [11]:
#Join Census 2010 and 2000 on 'age' column. Select 'total' column.

query = 'SELECT Census_2010.total AS 2010_Total, Census_2000.total AS 2000_Total FROM Census_2010 INNER JOIN Census_2000 ON Census_2010.age = Census_2000.age'
queryResult = sqlContext.sql(query)
queryResult.show()

+----------+----------+
|2010_Total|2000_Total|
+----------+----------+
|   4079669|   3733034|
|   4085341|   3825896|
|   4089295|   3904845|
|   4092221|   3970865|
|   4094802|   4024943|
|   4097728|   4068061|
|   4101686|   4101204|
|   4107361|   4125360|
|   4115441|   4141510|
|   4126617|   4150640|
|   4137506|   4152174|
|   4144742|   4145530|
|   4169316|   4139512|
|   4220043|   4138230|
|   4285424|   4137982|
|   4347028|   4133932|
|   4410804|   4130632|
|   4451147|   4111244|
|   4454165|   4068058|
|   4432260|   4011192|
+----------+----------+
only showing top 20 rows



In [12]:
#Join all tables and sum population total for each year
query = 'SELECT SUM(Census_2010.total) AS 2010_SUM,\
    SUM(Census_2000.total) AS 2000_SUM,\
    SUM(Census_1990.total) AS 1990_SUM\
    FROM Census_2010 JOIN Census_2000 JOIN Census_1990'
queryResult = sqlContext.sql(query)
queryResult.show()

+-----------------+-----------------+-----------------+
|         2010_SUM|         2000_SUM|         1990_SUM|
+-----------------+-----------------+-----------------+
|3.185232830316E12|2.903147423395E12|2.596222306047E12|
+-----------------+-----------------+-----------------+



In [13]:
sc.stop()